In [None]:
import os.path as path
import sys
import numpy as np
from pyspark import SparkContext, SparkConf
from lib import *
from pyspark.sql import *
# from utils import *
from lib.utils import *
import findspark
findspark.init()

%pylab inline


In [None]:
# Create a new spark and sql context
sc = create_sc(pyFiles=['lib/numpy_pack.py','lib/spark_PCA.py','lib/computeStatistics.py'])
sqlContext = SQLContext(sc)

# Create the names of the files/directories we're working with
data_dir = '../DataHW3'

if not path.exists(data_dir + '/' + 'stations.parquet'):
    getStations()

    
states = ['ND', 'SD', 'MN', 'IA', 'NE', 'TX', 'OK', 'KS']

for s in states:
    
    parquet = s + '.parquet'
    tarname = s + '.tgz'
    
    if not path.exists(data_dir + '/' + parquet):

        # pull the weather data for a particular state from the MAS-DSE S3 bucket
        getStateData(s, data_dir, tarname, parquet)

# Change in average daily snow fall

In [None]:
featureStr = "\'SNOW\'"

test_data = decadeMeasurementDelta(featureStr, states, data_dir, sqlContext)
master_dataframe = test_data[0]
for i in range(len(test_data)-1):
    master_dataframe = master_dataframe.union(test_data[i+1])

print("\n\nAmount of snow fall change, 70s to 00s")

plotter = leaflet(sqlContext, featureStr)
plotter.add(master_dataframe)
plotter.plot_all()
plotter.m


Legend:

In [None]:
plotter.color_legend()

# Change in average daily snow depth

In [None]:
featureStr = "\'SNWD\'"

test_data = decadeMeasurementDelta(featureStr, states, data_dir, sqlContext)
master_dataframe = test_data[0]
for i in range(len(test_data)-1):
    master_dataframe = master_dataframe.union(test_data[i+1])

print("\n\nChange in snow depth change, 70s to 00s")

plotter = leaflet(sqlContext, featureStr)
plotter.add(master_dataframe)
plotter.plot_all()
plotter.m

In [None]:
plotter.color_legend()

# Change in average max daily temperature

In [None]:
featureStr = "\'TMAX\'"

test_data = decadeMeasurementDelta(featureStr, states, data_dir, sqlContext)
master_dataframe = test_data[0]
for i in range(len(test_data)-1):
    master_dataframe = master_dataframe.union(test_data[i+1])

print("\n\nChange in average max daily temperature, 70s to 00s")

plotter = leaflet(sqlContext, featureStr)
plotter.add(master_dataframe)
plotter.plot_all()
plotter.m

In [None]:
plotter.color_legend()

# Change in average min daily temperature

In [None]:
featureStr = "\'TMIN\'"

test_data = decadeMeasurementDelta(featureStr, states, data_dir, sqlContext)
master_dataframe = test_data[0]
for i in range(len(test_data)-1):
    master_dataframe = master_dataframe.union(test_data[i+1])

print("\n\nChange in average min daily temperature, 70s to 00s")

plotter = leaflet(sqlContext, featureStr)
plotter.add(master_dataframe)
plotter.plot_all()
plotter.m

Legend:

In [None]:
plotter.color_legend()

# Verification of 'TX' values

Verification that the SNOW and SNWD metrics actually saw an average increase comparing across the two time periods

In [None]:
featureStr = "\'SNWD\'"

In [None]:
data_dir = '../DataHW3'
s = 'TX'
parquet = s + '.parquet'
parquet_path = data_dir + '/' + parquet
df = sqlContext.read.parquet(parquet_path)
sqlContext.registerDataFrameAsTable(df,f'table_{s}')

# 70s

###
Query = f"""
SELECT Station, Measurement, Values, longitude, latitude, Year
FROM table_{s}
WHERE Measurement=={featureStr} and (Year >= 1970 and Year < 1980)
"""
query70s = sqlContext.sql(Query)
rdd70s = query70s.rdd.map(lambda x: remove0sAndAverage(x, 'Values'))
dfs70 = sqlContext.createDataFrame(rdd70s)
sqlContext.registerDataFrameAsTable(dfs70, f'table_{s}_70s')


In [None]:
df70 = dfs70.toPandas()

In [None]:
# 00s

###
Query = f"""
SELECT Station, Measurement, Values, longitude, latitude, Year
FROM table_{s}
WHERE Measurement=={featureStr} and (Year >= 2000 and Year < 2010)
"""
query00s = sqlContext.sql(Query)
rdd00s = query00s.rdd.map(lambda x: remove0sAndAverage(x, 'Values'))
dfs00 = sqlContext.createDataFrame(rdd00s)
sqlContext.registerDataFrameAsTable(dfs00, f'table_{s}_00s')


In [None]:
df00 = dfs00.toPandas()

In [None]:
print("Mean TX SNOW values in the 70s: %s"%df70.Values.mean())
print("Mean TX SNOW values in the 00s: %s"%df00.Values.mean())