# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [1]:
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from joblib import Parallel, delayed  
import multiprocessing

sc = SparkContext('local')
spark = SparkSession(sc)

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

print(schema)


StructType(List(StructField(Timestamp,LongType,true),StructField(Geohash,StringType,true),StructField(geopotential_height_lltw,FloatType,true),StructField(water_equiv_of_accum_snow_depth_surface,FloatType,true),StructField(drag_coefficient_surface,FloatType,true),StructField(sensible_heat_net_flux_surface,FloatType,true),StructField(categorical_ice_pellets_yes1_no0_surface,FloatType,true),StructField(visibility_surface,FloatType,true),StructField(number_of_soil_layers_in_root_zone_surface,FloatType,true),StructField(categorical_freezing_rain_yes1_no0_surface,FloatType,true),StructField(pressure_reduced_to_msl_msl,FloatType,true),StructField(upward_short_wave_rad_flux_surface,FloatType,true),StructField(relative_humidity_zerodegc_isotherm,FloatType,true),StructField(categorical_snow_yes1_no0_surface,FloatType,true),StructField(u-component_of_wind_tropopause,FloatType,true),StructField(surface_wind_gust_surface,FloatType,true),StructField(total_cloud_cover_entire_atmosphere,FloatType,tru

# Creating a Dataframe

Let's load our CSV into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [2]:
#df = spark.read.format('csv').option('sep', '\t').schema(schema).load('/Volumes/evo/Datasets/NAM_2015_S/*')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../nam/namin2015.tdv')
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../nam/nam_2015.tdv')
persist(df, "MEMORY_ONLY")
df.take(1)
df.count()

600000

# Playtime

In [14]:
really_hot = df.filter(df.temperature_surface > 320).count()
print(really_hot)

hot_and_humid = df.filter(df.temperature_surface > 313).filter(df.relative_humidity_zerodegc_isotherm > .8).count()
print(hot_and_humid)


351
3569
3569


In [None]:
df.filter(df.snow_cover_surface > .85).take(5)

In [33]:
from pyspark.sql.functions import col
df.groupBy('Geohash').count()
df.groupBy('Geohash').count().filter("`count` >= 1").sort(col("count").desc()).show()

+------------+-----+
|     Geohash|count|
+------------+-----+
|drm8nzh8sbkp|    2|
|9j4bjucdn0rb|    2|
|c4pjfr50egeb|    2|
|9kvevupns4hp|    2|
|97kpzgs9jj7z|    2|
|drzmy4hwne2p|    2|
|dxexgskw49zz|    2|
|cb3xpdnqxw00|    2|
|dxwx0srpd6jb|    2|
|9uyjt32t77pb|    2|
|f2459g70j9bp|    2|
|bc5j6rbzxn80|    2|
|f0yehqwpzmbp|    2|
|f9bs83qbucup|    2|
|dhqb0j0ktzpb|    2|
|c2t8s7stt5eb|    2|
|9vpntrm8m1sp|    2|
|ccbc3rmkuy7z|    2|
|c9cc4pvb7kxb|    2|
|dphgbrmcd4h0|    2|
+------------+-----+
only showing top 20 rows



# SQL

In [22]:
# Creating an SQL 'table'
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../NAM_2015_S/*')
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../nam/nammin.tdv')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../../p2-source/nam_mini.tdv')

df.createOrReplaceTempView("TEMP_DF")

# Let's get all the snow cover values:
snow = spark.sql("SELECT snow_cover_surface FROM TEMP_DF").collect()
# .collect() gives us a list of rows. Let's grab the first 10:
# for i in range(10):
#     print(snow[i])

# What's the maximum value?
snowmax = spark.sql("SELECT MAX(snow_cover_surface) as maxval FROM TEMP_DF").collect()
print('Max val observed:', snowmax)
# snow0 = spark.sql("SELECT count(*), Geohash from TEMP_DF group by Geohash order by count(*) DESC limit(2)\
#                    INTERSECT\
#                    SELECT count(*), Geohash from TEMP_DF where snow_depth_surface > 0 group by Geohash order by count(*) DESC").collect()
snow0 = spark.sql("SELECT count(*) as Count, Geohash from TEMP_DF group by Geohash order by count(*) DESC").collect()
snow1 = spark.sql("SELECT count(*) as Count, Geohash from TEMP_DF where snow_depth_surface > 0 group by Geohash order by count(*) DESC").collect()
snow = []
print(len(snow0))
print(len(snow))
for i in range(len(snow0)):
    print(snow0[i])
print()
for i in range(len(snow1)):
    if snow1[i] in snow0:
        snow.append(snow1[i])
print(snow)
    


Max val observed: [Row(maxval=100.0)]
6
0
Row(Count=3, Geohash='dw3m3wvnj5kp')
Row(Count=2, Geohash='9wgs3ue6s7kp')
Row(Count=2, Geohash='9wgs3ue6n7kp')
Row(Count=1, Geohash='952pcj4cd6zz')
Row(Count=1, Geohash='8yj6nw3d2500')
Row(Count=1, Geohash='d5bv8zxj3d5z')

[Row(Count=2, Geohash='9wgs3ue6n7kp')]


In [4]:
# When and where was the hottest temperature observed in the dataset?
df.createOrReplaceTempView("HOTEST_TEMP")
# temp_hostest = spark.sql("SELECT MAX(temperature_surface) as hotest FROM HOTEST_TEMP").collect()
temp_hostest = spark.sql(
    "select Geohash, Timestamp, temperature_surface from HOTEST_TEMP\
        where temperature_surface in (\
            select max(temperature_surface)\
                from HOTEST_TEMP)").collect()

print('Hostest temperature', temp_hostest)


count 1
Hostest temperature [Row(Geohash='d58j4p3rjbrb', Timestamp=1430848800000, temperature_surface=322.99365234375)]


In [56]:
# Where are you most likely to be struck by lightning? Use a precision of 4 Geohash characters and provide the top 3 locations.
import pyspark.sql.functions as func
df1 = df.withColumn('new_geohash', df.Geohash.substr(1, 4))
# newdf.groupBy('new_geohash').count().filter("`count` >= 1").show()
df2 = df1.groupBy('new_geohash').agg(func.sum('lightning_surface').alias("sum_lighting"))
df2.orderBy(df2.sum_lighting.desc()).limit(3).show()


+-----------+------------+
|new_geohash|sum_lighting|
+-----------+------------+
|       dwtb|         2.0|
|       dqvd|         1.0|
|       9p2w|         1.0|
+-----------+------------+



In [6]:
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('../nam/namin2015.tdv')
df.createOrReplaceTempView("TEMP_FEATURE")
# print(df.columns)

i = 0
for col in df.columns:
    if (i > 1):
        df.describe([col]).show()
    i = i + 1


['Timestamp', 'Geohash', 'geopotential_height_lltw', 'water_equiv_of_accum_snow_depth_surface', 'drag_coefficient_surface', 'sensible_heat_net_flux_surface', 'categorical_ice_pellets_yes1_no0_surface', 'visibility_surface', 'number_of_soil_layers_in_root_zone_surface', 'categorical_freezing_rain_yes1_no0_surface', 'pressure_reduced_to_msl_msl', 'upward_short_wave_rad_flux_surface', 'relative_humidity_zerodegc_isotherm', 'categorical_snow_yes1_no0_surface', 'u-component_of_wind_tropopause', 'surface_wind_gust_surface', 'total_cloud_cover_entire_atmosphere', 'upward_long_wave_rad_flux_surface', 'land_cover_land1_sea0_surface', 'vegitation_type_as_in_sib_surface', 'v-component_of_wind_pblri', 'albedo_surface', 'lightning_surface', 'ice_cover_ice1_no_ice0_surface', 'convective_inhibition_surface', 'pressure_surface', 'transpiration_stress-onset_soil_moisture_surface', 'soil_porosity_surface', 'vegetation_surface', 'categorical_rain_yes1_no0_surface', 'downward_long_wave_rad_flux_surface', 

+-------+-------------------------+
|summary|v-component_of_wind_pblri|
+-------+-------------------------+
|  count|                     6000|
|   mean|     -0.08666503445307414|
| stddev|         5.56083949538845|
|    min|               -28.080017|
|    max|                38.067932|
+-------+-------------------------+

+-------+------------------+
|summary|    albedo_surface|
+-------+------------------+
|  count|              6000|
|   mean|         15.847125|
| stddev|15.344792415020972|
|    min|               6.0|
|    max|             78.75|
+-------+------------------+

+-------+--------------------+
|summary|   lightning_surface|
+-------+--------------------+
|  count|                6000|
|   mean|0.033166666666666664|
| stddev| 0.17908652718614054|
|    min|                 0.0|
|    max|                 1.0|
+-------+--------------------+

+-------+------------------------------+
|summary|ice_cover_ice1_no_ice0_surface|
+-------+------------------------------+
|  count| 

+-------+---------------------------------------------+
|summary|convective_available_potential_energy_surface|
+-------+---------------------------------------------+
|  count|                                         6000|
|   mean|                                       449.15|
| stddev|                            917.5711054857119|
|    min|                                          0.0|
|    max|                                       6520.0|
+-------+---------------------------------------------+

+-------+----------------------------+
|summary|latent_heat_net_flux_surface|
+-------+----------------------------+
|  count|                        6000|
|   mean|          1735.9422437497774|
| stddev|           40791.46466266772|
|    min|                  -198.51514|
|    max|                   1000000.1|
+-------+----------------------------+

+-------+-------------------------+
|summary|surface_roughness_surface|
+-------+-------------------------+
|  count|                     6000|

In [None]:
from pyspark.sql.functions import avg

df.select(avg(df.wilting_point_surface)).show()

# Sampling

We can even create a sample dataset with Spark! Let's create a 10% sample (without replacement)

In [None]:
samp = df.sample(False, .1)

# Write it out to a file
samp.write.format('csv').save('sampled_output')

In [3]:
# df.columns
# df3 = spark.read.format('csv').option('sep', '\t').schema(schema).load('../nam/nam_2015.tdv')
df3 = spark.read.format('csv').option('sep', '\t').schema(schema).load('../NAM_2015_S/*')
df3.persist()
print("df3 done")

df3 done


In [None]:
# def computeCorrelation(i, j):
#     print(df3.columns[i], "-", df3.columns[j], "=", df3.corr(df3.columns[i], df3.columns[j]))
# df3.corr("geopotential_height_lltw", "drag_coefficient_surface")
string = ''
for i in range(2, len(df3.columns)):
    for j in range(2, len(df3.columns)):
        string = string + str(df3.corr(df3.columns[i], df3.columns[j])) + '\t'
    string = string + '\n'
print(string)

In [9]:
collect(df3)
# print(df3.columns[2], "-", df3.columns[5], "=", df3.corr(df3.columns[2], df3.columns[5]))

# what are your inputs, and what operation do you want to 
# perform on each input. For example...



# Parallel(n_jobs=num_cores)(delayed(computeCorrelation)(i, j)\
#                                      for i in range(2, columnLen) for j in range(2, columnLen))
# Parallel(n_jobs=num_cores)(delayed(computeCorrelation(2, 2)))
print("computating done")

computating done


# Smart

# THIS



In [5]:
# from multiprocessing import Pool

In [7]:
# import threading

In [3]:
# def computeCorrelation(i, j):
#     print(df3.columns[i], "-", df3.columns[j], "=", df3.corr(df3.columns[i], df3.columns[j]))
    
# columnLen = len(df3.columns)
# for i in range(2, columnLen):
#     for j in range(2, columnLen):
#         t = threading.Thread(target=computeCorrelation, args=(i, j))
#         t.start()
# print("done")

NameError: name 'threading' is not defined

In [10]:

# def computeCorrelation(x):
#     print(df3.columns[x[0]], "-", df3.columns[x[1]], "=", df3.corr(df3.columns[x[0]], df3.columns[x[1]]))
    
# columnLen = len(df3.columns)    
# pairs = list()
# pool = Pool(processes=8)
# for i in range(2, columnLen):
#     for j in range(2, columnLen):
#         pairs.append((i, j))
# pool.map(computeCorrelation, pairs)
# pool.close()
# pool.join()
# print("done")

AttributeError: 'float' object has no attribute 'corr'

sensible_heat_net_flux_surface - downward_long_wave_rad_flux_surface = -0.3995034765400761
upward_long_wave_rad_flux_surface - latent_heat_net_flux_surface = 0.8026181299216397
geopotential_height_lltw - geopotential_height_lltw = 0.6710553035349901
water_equiv_of_accum_snow_depth_surface - latent_heat_net_flux_surface = None
total_cloud_cover_entire_atmosphere - geopotential_height_lltw = planetary_boundary_layer_height_surface - latent_heat_net_flux_surface = -0.05699369538131042
geopotential_height_zerodegc_isotherm - latent_heat_net_flux_surface = org.apache.spark.sql.DataFrameStatFunctions@5c03fd88
categorical_freezing_rain_yes1_no0_surface - geopotential_height_lltw = temperature_tropopause - downward_long_wave_rad_flux_surface = org.apache.spark.sql.DataFrameStatFunctions@5c492a47
downward_short_wave_rad_flux_surface - latent_heat_net_flux_surface = u-component_of_wind_pblri - total_cloud_cover_entire_atmosphere = -0.03193341480227388
