In [1]:
import matplotlib.pyplot as plt
import numpy as np

from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType
from datetime import datetime
from pyspark.sql import functions as F

In [2]:
hdfs_port = "hdfs://orion11:26990"
# data_path = "/nam_s/nam_201501_s*"
data_path = "/nam_s/*"
# data_path = "/sample/nam_tiny*"

In [3]:
feats = []
f = open('../features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

In [4]:
%%time

df = spark.read.format('csv').option('sep', '\t').schema(schema).load(f'{hdfs_port}{data_path}')
# df.take(1)
# print(df.head())

CPU times: user 3.14 ms, sys: 0 ns, total: 3.14 ms
Wall time: 4.97 s


In [5]:
# df.describe([
#     'visibility_surface',
# ]).show()
bay_area_hashes = {"9q8", "9q9", "9qc", "9qb"}

In [6]:
%%time

# t2 = df.select("Geohash", "visibility_surface")
# t2.describe()

# dg = df

dg = df.select("Geohash", "visibility_surface", df.Geohash.substr(1, 3).alias("front_hash"), "geopotential_height_cloud_base", "geopotential_height_surface")

dg = dg[dg.front_hash.isin(*bay_area_hashes)]

CPU times: user 3.86 ms, sys: 2.45 ms, total: 6.31 ms
Wall time: 371 ms


In [9]:
%%time

dg.createOrReplaceTempView("nam_small")
visibilities = spark.sql(
    f'''
    SELECT 
        *, 
        foggy_days/counts AS foggy_ratio 
        FROM(
            SELECT substr(Geohash,1,5) AS geoloc,
                AVG(visibility_surface) AS vis_surf_avg,
                SUM(CASE WHEN geopotential_height_cloud_base <= geopotential_height_surface AND visibility_surface < 24221 THEN 1 ELSE 0 END) AS foggy_days,
                SUM(1) as counts
            FROM nam_small 
            GROUP BY substr(Geohash,1,5)) AS t2
    ORDER BY foggy_ratio ASC, vis_surf_avg DESC, foggy_days ASC, counts ASC
    ''').collect()

CPU times: user 21.5 ms, sys: 3.33 ms, total: 24.8 ms
Wall time: 2min 47s


In [10]:
visibilities

[Row(geoloc='9q84e', vis_surf_avg=23238.728945567847, foggy_days=7, counts=418, foggy_ratio=0.01674641148325359),
 Row(geoloc='9q861', vis_surf_avg=23400.1193611053, foggy_days=7, counts=373, foggy_ratio=0.01876675603217158),
 Row(geoloc='9q822', vis_surf_avg=23421.318650966736, foggy_days=8, counts=410, foggy_ratio=0.01951219512195122),
 Row(geoloc='9q83d', vis_surf_avg=23465.39553915268, foggy_days=9, counts=422, foggy_ratio=0.02132701421800948),
 Row(geoloc='9q81g', vis_surf_avg=23384.61376036149, foggy_days=9, counts=395, foggy_ratio=0.02278481012658228),
 Row(geoloc='9q86k', vis_surf_avg=23163.6085847761, foggy_days=9, counts=387, foggy_ratio=0.023255813953488372),
 Row(geoloc='9q83v', vis_surf_avg=23400.391989103788, foggy_days=9, counts=382, foggy_ratio=0.02356020942408377),
 Row(geoloc='9q81r', vis_surf_avg=23228.625049409635, foggy_days=10, counts=410, foggy_ratio=0.024390243902439025),
 Row(geoloc='9q80d', vis_surf_avg=23592.60039320668, foggy_days=10, counts=409, foggy_ratio