# Weather Prediction

In [26]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
import pandas as pd
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.ml.regression import LinearRegression

In [27]:
spark = SparkSession.builder.appName('weather').getOrCreate()

In [28]:
df = spark.read.csv('weather-data.csv')

In [29]:
#change column names to resemble content
df = df.withColumnRenamed('_c0', "timestamp")\
        .withColumnRenamed('_c1', "temp")\
        .withColumnRenamed('_c2', "feels_like")\
        .withColumnRenamed('_c3', "pressure")\
        .withColumnRenamed('_c4', "humidity")\
        .withColumnRenamed('_c5', "dew_point")\
        .withColumnRenamed('_c6', "clouds")\
        .withColumnRenamed('_c7', "visibility")\
        .withColumnRenamed('_c8', "wind_speed")\
        .withColumnRenamed('_c9', "wind_deg")\
        .withColumnRenamed('_c10', "weather")\
        .withColumnRenamed('_c11', "pop")

In [30]:
df.show()

+----------+------+----------+--------+--------+---------+------+----------+----------+--------+--------------------+---+
| timestamp|  temp|feels_like|pressure|humidity|dew_point|clouds|visibility|wind_speed|wind_deg|             weather|pop|
+----------+------+----------+--------+--------+---------+------+----------+----------+--------+--------------------+---+
|1603123200|277.05|    271.63|    1012|      55|   269.33|    29|     10000|      4.12|     317|[{'id': 802, 'mai...|  0|
|1603126800|276.81|    271.75|    1012|      63|   270.74|    32|     10000|      3.87|     326|[{'id': 802, 'mai...|  0|
|1603130400|276.37|    271.61|    1014|      70|   271.63|    42|     10000|      3.62|     329|[{'id': 802, 'mai...|  0|
|1603134000|275.99|    271.33|    1015|      74|   271.98|    94|     10000|      3.56|     320|[{'id': 804, 'mai...|  0|
|1603137600|275.53|    270.85|    1015|      78|   272.22|    86|     10000|      3.64|     317|[{'id': 804, 'mai...|  0|
|1603141200|275.12|    2

In [31]:
# import pyspark.sql.functions as f
# df_split = df.select(f.split(df.weather,":")).rdd.flatMap(
#               lambda x: x).toDF(schema=["col1","col2","col3"])

In [32]:
#change column values to correct format - cast to float or integer
df = df.withColumn("temp", df["temp"].cast(FloatType()))
df = df.withColumn("feels_like", df["feels_like"].cast(FloatType()))
df = df.withColumn("pressure", df["pressure"].cast(IntegerType()))
df = df.withColumn("humidity", df["humidity"].cast(IntegerType()))
df = df.withColumn("dew_point", df["dew_point"].cast(FloatType()))
df = df.withColumn("clouds", df["clouds"].cast(IntegerType()))
df = df.withColumn("visibility", df["visibility"].cast(IntegerType()))
df = df.withColumn("wind_speed", df["wind_speed"].cast(FloatType()))
df = df.withColumn("wind_deg", df["wind_deg"].cast(IntegerType()))
df = df.withColumn("weather", df["weather"].cast(IntegerType()))
df = df.withColumn("pop", df["pop"].cast(FloatType()))

In [33]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- feels_like: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- dew_point: float (nullable = true)
 |-- clouds: integer (nullable = true)
 |-- visibility: integer (nullable = true)
 |-- wind_speed: float (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- pop: float (nullable = true)



# Linear Regression

### How does the temperature feel given the features pressure, humidity, dew_point, clouds, visibility and wind_speed?

In [34]:
# select feature columns
feature_columns = df.columns[3:9] 
feature_columns

['pressure', 'humidity', 'dew_point', 'clouds', 'visibility', 'wind_speed']

In [35]:
# necessary to assemble all feats in one vector
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")

In [36]:
data = assembler.transform(df)
# train/test split
train, test = data.randomSplit([0.7, 0.3])
# train the model
lr = LinearRegression(featuresCol="features", labelCol="feels_like")
model = lr.fit(train)
# evaluate the model
evaluation = model.evaluate(test)

# predicting on test set
predictions = model.transform(test)
predictions.select(predictions.columns[2]).show()

+----------+
|feels_like|
+----------+
|    271.16|
|    271.16|
|    271.16|
|    271.16|
|    271.63|
|    271.63|
|    271.63|
|    271.63|
|    271.63|
|     271.5|
|     271.5|
|    271.75|
|    271.75|
|    271.75|
|    271.75|
|    271.49|
|    271.49|
|    271.49|
|    271.49|
|    271.49|
+----------+
only showing top 20 rows



In [37]:
evaluation.meanAbsoluteError

0.3196993656625901

In [38]:
evaluation.rootMeanSquaredError

0.4282179049833272

In [39]:
evaluation.r2

0.9848431164922864

In [40]:
df.show()

+----------+------+----------+--------+--------+---------+------+----------+----------+--------+-------+---+
| timestamp|  temp|feels_like|pressure|humidity|dew_point|clouds|visibility|wind_speed|wind_deg|weather|pop|
+----------+------+----------+--------+--------+---------+------+----------+----------+--------+-------+---+
|1603123200|277.05|    271.63|    1012|      55|   269.33|    29|     10000|      4.12|     317|   null|0.0|
|1603126800|276.81|    271.75|    1012|      63|   270.74|    32|     10000|      3.87|     326|   null|0.0|
|1603130400|276.37|    271.61|    1014|      70|   271.63|    42|     10000|      3.62|     329|   null|0.0|
|1603134000|275.99|    271.33|    1015|      74|   271.98|    94|     10000|      3.56|     320|   null|0.0|
|1603137600|275.53|    270.85|    1015|      78|   272.22|    86|     10000|      3.64|     317|   null|0.0|
|1603141200|275.12|    270.45|    1016|      81|   269.45|    58|     10000|      3.64|     317|   null|0.0|
|1603144800|274.73|

# Multilayer Perceptron Classifier

### Is the probability of rain high, medium or low?

In [41]:
# bin values in 3 label classes: 2-high, 1-medium, 0-low
df = df.withColumn(
    'label',
    F.when((F.col("pop") >= 0.7), 2)\
    .when((F.col("pop") < 0.7) & (F.col('pop') > 0.3), 1)\
    .when((F.col("pop") <= 0.3), 0)
)

In [42]:
df.show()

+----------+------+----------+--------+--------+---------+------+----------+----------+--------+-------+---+-----+
| timestamp|  temp|feels_like|pressure|humidity|dew_point|clouds|visibility|wind_speed|wind_deg|weather|pop|label|
+----------+------+----------+--------+--------+---------+------+----------+----------+--------+-------+---+-----+
|1603123200|277.05|    271.63|    1012|      55|   269.33|    29|     10000|      4.12|     317|   null|0.0|    0|
|1603126800|276.81|    271.75|    1012|      63|   270.74|    32|     10000|      3.87|     326|   null|0.0|    0|
|1603130400|276.37|    271.61|    1014|      70|   271.63|    42|     10000|      3.62|     329|   null|0.0|    0|
|1603134000|275.99|    271.33|    1015|      74|   271.98|    94|     10000|      3.56|     320|   null|0.0|    0|
|1603137600|275.53|    270.85|    1015|      78|   272.22|    86|     10000|      3.64|     317|   null|0.0|    0|
|1603141200|275.12|    270.45|    1016|      81|   269.45|    58|     10000|    

In [43]:
#select features and assemble
feature_columns = df.columns[1:10] 
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")

In [63]:
feature_columns

['temp',
 'feels_like',
 'pressure',
 'humidity',
 'dew_point',
 'clouds',
 'visibility',
 'wind_speed',
 'wind_deg']

In [44]:
data2 = assembler.transform(df)

In [53]:
from pyspark.sql.functions import rand 


In [54]:
data2 = data2.orderBy(rand())

In [77]:
seed = 500
iterations = 100

# train/test split
splits = data2.randomSplit([0.8, 0.2], seed)
train = splits[0]
test = splits[1]

# layers for neural network (input, hidden1, hidden2, output)
layers = [9, 8, 5, 3]

# train the model
mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", maxIter=iterations, layers=layers, blockSize=128, seed=seed)
model = mlp.fit(train)

# compute test set accuracy
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test accuracy = 0.5838621940163191
