# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

print(schema)


# Creating a Dataframe

Let's load our CSV into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [None]:
#df = spark.read.format('csv').option('sep', '\t').schema(schema).load('/Volumes/evo/Datasets/NAM_2015_S/*')
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('nam_mini.tdv')
df.take(1)

# Playtime

In [None]:
really_hot = df.filter(df.temperature_surface > 320).count()
print(really_hot)

hot_and_humid = df.filter(df.temperature_surface > 313).filter(df.relative_humidity_zerodegc_isotherm > .8).count()
print(hot_and_humid)

In [None]:
df.filter(df.snow_cover_surface > .85).take(5)

# SQL

In [None]:
# Creating an SQL 'table'
df.createOrReplaceTempView("TEMP_DF")

# Let's get all the snow cover values:
snow = spark.sql("SELECT snow_cover_surface FROM TEMP_DF").collect()
# .collect() gives us a list of rows. Let's grab the first 10:
for i in range(10):
    print(snow[i])


# What's the maximum value?
snowmax = spark.sql("SELECT MAX(snow_cover_surface) as maxval FROM TEMP_DF").collect()

print('Max val observed:', snowmax)


In [None]:
from pyspark.sql.functions import avg

df.select(avg(df.wilting_point_surface)).show()

# Sampling

We can even create a sample dataset with Spark! Let's create a 10% sample (without replacement)

In [None]:
samp = df.sample(False, .1)

# Write it out to a file
samp.write.format('csv').save('sampled_output')