In [14]:
# delete files from previous runs
!rm -f jfk_weather*

# download the file containing the data in CSV format
!wget http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz

# extract the data
!tar xvfz jfk_weather.tar.gz
    
# create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents
df = spark.read.option("header", "true").option("inferSchema","true").csv('jfk_weather.csv')

# register a corresponding query table
df.createOrReplaceTempView('df')
df.show

--2020-03-02 18:33:50--  http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz
Resolving max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)... 67.228.254.196
Connecting to max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)|67.228.254.196|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2575759 (2.5M) [application/x-tar]
Saving to: 'jfk_weather.tar.gz'


2020-03-02 18:33:51 (4.07 MB/s) - 'jfk_weather.tar.gz' saved [2575759/2575759]

./._jfk_weather.csv
jfk_weather.csv


<bound method DataFrame.show of DataFrame[STATION: string, STATION_NAME: string, ELEVATION: double, LATITUDE: double, LONGITUDE: double, DATE: string, REPORTTPYE: string, HOURLYSKYCONDITIONS: string, HOURLYVISIBILITY: string, HOURLYPRSENTWEATHERTYPE: string, HOURLYDRYBULBTEMPF: string, HOURLYDRYBULBTEMPC: string, HOURLYWETBULBTEMPF: string, HOURLYWETBULBTEMPC: string, HOURLYDewPointTempF: string, HOURLYDewPointTempC: string, HOURLYRelativeHumidity: string, HOURLYWindSpeed: string, HOURLYWindDirection: string, HOURLYWindGustSpeed: int, HOURLYStationPressure: string, HOURLYPressureTendency: int, HOURLYPressureChange: string, HOURLYSeaLevelPressure: string, HOURLYPrecip: string, HOURLYAltimeterSetting: string, DAILYMaximumDryBulbTemp: int, DAILYMinimumDryBulbTemp: int, DAILYAverageDryBulbTemp: int, DAILYDeptFromNormalAverageTemp: double, DAILYAverageRelativeHumidity: int, DAILYAverageDewPointTemp: int, DAILYAverageWetBulbTemp: int, DAILYHeatingDegreeDays: int, DAILYCoolingDegreeDays: int,

In [24]:

import random
random.seed(42)

from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindDirection <> 0
    and HOURLYStationPressure <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPrecip <> 0
    and HOURLYRelativeHumidity <> 0
    and HOURLYDRYBULBTEMPC <> 0
""")

In [26]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYWindDirection","HOURLYStationPressure"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)
from pyspark.ml.stat import Correlation
Correlation.corr(df_pipeline,"features").head()[0].toArray()

array([[ 1.        ,  0.06306013, -0.4204518 ],
       [ 0.06306013,  1.        , -0.19199348],
       [-0.4204518 , -0.19199348,  1.        ]])

In [4]:
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [29]:
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(
    labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

In [27]:
from pyspark.ml.regression import LinearRegression


lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 3.47898


In [8]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 7.01428


In [9]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder
bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")

In [10]:
def classification_metrics(prediction):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

In [11]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
#,"ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.613208


In [30]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=30)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.672956


In [13]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.688679
