# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [1]:
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    feats.append(StructField(line.strip(), FloatType(), True))
schema = StructType(feats)

print(schema)


StructType(List(StructField(WBANNO,FloatType,true),StructField(UTC_DATE,FloatType,true),StructField(UTC_TIME,FloatType,true),StructField(LST_DATE,FloatType,true),StructField(LST_TIME,FloatType,true),StructField(CRX_VN,FloatType,true),StructField(LONGITUDE,FloatType,true),StructField(LATITUDE,FloatType,true),StructField(AIR_TEMPERATURE,FloatType,true),StructField(PRECIPITATION,FloatType,true),StructField(SOLAR_RADIATION,FloatType,true),StructField(SR_FLAG,FloatType,true),StructField(SURFACE_TEMPERATURE,FloatType,true),StructField(ST_TYPE,FloatType,true),StructField(ST_FLAG,FloatType,true),StructField(RELATIVE_HUMIDITY,FloatType,true),StructField(RH_FLAG,FloatType,true),StructField(SOIL_MOISTURE_5,FloatType,true),StructField(SOIL_TEMPERATURE_5,FloatType,true),StructField(WETNESS,FloatType,true),StructField(WET_FLAG,FloatType,true),StructField(WIND_1_5,FloatType,true),StructField(WIND_FLAG,FloatType,true)))


# Creating a Dataframe

Let's load our space-separated file into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [9]:

# If you had a regular CSV or TDV file you could read it like so:
#df = spark.read.format('csv').option('sep', ' ').schema(schema).load('/Volumes/hdd/Datasets/some/where/*')


txt_rdd = sc.textFile('hdfs://orion12:9001/ncdc/2018/CRNS0101-05-2018-WY_Sundance_8_NNW.txt')

def conv(string):
    try:
        return float(string)
    except:
        return 0.0

df = txt_rdd.map(lambda row : [conv(r) for r in row.split()]).toDF(schema)

df.take(3)


[Row(WBANNO=94088.0, UTC_DATE=20180100.0, UTC_TIME=5.0, LST_DATE=20171232.0, LST_TIME=1705.0, CRX_VN=2.0, LONGITUDE=-104.44000244140625, LATITUDE=44.52000045776367, AIR_TEMPERATURE=-20.0, PRECIPITATION=0.0, SOLAR_RADIATION=0.0, SR_FLAG=0.0, SURFACE_TEMPERATURE=-19.299999237060547, ST_TYPE=0.0, ST_FLAG=0.0, RELATIVE_HUMIDITY=83.0, RH_FLAG=0.0, SOIL_MOISTURE_5=-99.0, SOIL_TEMPERATURE_5=-0.800000011920929, WETNESS=1094.0, WET_FLAG=0.0, WIND_1_5=1.0199999809265137, WIND_FLAG=0.0),
 Row(WBANNO=94088.0, UTC_DATE=20180100.0, UTC_TIME=10.0, LST_DATE=20171232.0, LST_TIME=1710.0, CRX_VN=2.0, LONGITUDE=-104.44000244140625, LATITUDE=44.52000045776367, AIR_TEMPERATURE=-20.100000381469727, PRECIPITATION=0.0, SOLAR_RADIATION=0.0, SR_FLAG=0.0, SURFACE_TEMPERATURE=-19.299999237060547, ST_TYPE=0.0, ST_FLAG=0.0, RELATIVE_HUMIDITY=83.0, RH_FLAG=0.0, SOIL_MOISTURE_5=-99.0, SOIL_TEMPERATURE_5=-0.800000011920929, WETNESS=1094.0, WET_FLAG=0.0, WIND_1_5=0.6600000262260437, WIND_FLAG=0.0),
 Row(WBANNO=94088.0, 

# Playtime

In [11]:
really_hot = df.filter(df.SURFACE_TEMPERATURE > 20).count()
print(really_hot)

df.filter(df.WETNESS > 1).count()

13647


105059

# SQL

In [12]:
# Creating an SQL 'table'
df.createOrReplaceTempView("TEMP_DF")

hum = spark.sql("SELECT RELATIVE_HUMIDITY FROM TEMP_DF").collect()
# .collect() gives us a list of rows. Let's grab the first 10:
for i in range(10):
    print(hum[i])

# What's the maximum value?
hummax = spark.sql("SELECT MAX(RELATIVE_HUMIDITY) as maxval FROM TEMP_DF").collect()

print('Max val observed:', hummax)

Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=83.0)
Row(RELATIVE_HUMIDITY=82.0)
Row(RELATIVE_HUMIDITY=82.0)
Row(RELATIVE_HUMIDITY=82.0)
Row(RELATIVE_HUMIDITY=82.0)
Max val observed: [Row(maxval=99.0)]


In [1]:
from pyspark.sql.functions import avg

df.select(avg(df.WETNESS)).show()


NameError: name 'df' is not defined

# Sampling

We can even create a sample dataset with Spark! Let's create a 10% sample (without replacement)

In [9]:
samp = df.sample(False, .1)

# Write it out
samp.write.format('csv').save('hdfs://orion12:50000/sampled_output')