# Computer Architecture

## Forecasting Weather Data

**Imports**

In [1]:
from pyspark.sql import Window
from pyspark.sql.functions import col, asc, desc, to_timestamp, unix_timestamp, from_unixtime
from pyspark.sql.functions import lit, lag, col
from pyspark.sql.types import StructType, StructField, LongType
import pyspark.sql.functions as func
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import s3fs

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1619471388794_0007,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Generating Features**

In [2]:
class LagGather:
    def __init__(self):
        self.nLags = 0
        self.FeatureNames = []

    def setLagLength(self, nLags):
        self.nLags = nLags
        return self

    def setInputCol(self, colname):
        self.columnName = colname
        return self

    def transform(self, df):
        df = df.withColumn("Series", lit('Univariate'))
        mywindow = Window.orderBy("Series")
        for i in range(self.nLags):
            strLag = self.columnName + '_LagBy_' + str(i + 1)
            df = df.withColumn(strLag, lag(df[self.columnName], i + 1).over(mywindow))
            self.FeatureNames.append(strLag)
        df = df.drop("Series")
        return df

    def getFeatureNames(self):
        return self.FeatureNames

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Models Training**

In [3]:
def Forecast(df, forecast_hours, nLags, timeSeriesColumn, sparksession):
    LeadWindow = Window.rowsBetween(0, forecast_hours)
    df = df.withColumn("label", func.last(df[timeSeriesColumn]).over(LeadWindow))
    
    features = [timeSeriesColumn]
    
    # Auto-regression feature
    LagTransformer = LagGather().setLagLength(nLags).setInputCol(timeSeriesColumn)
    df = LagTransformer.transform(df)
    featuresGenerated = LagTransformer.getFeatureNames()
    features.extend(featuresGenerated)
    
    
    df = df.dropna()
    vA = VectorAssembler().setInputCols(features).setOutputCol("features")
    df_m = vA.transform(df)
    
    
    # Splitting data into train, test
    splitratio = 0.7
    df_train, df_test = TimeSeriesSplit(df_m, splitratio, sparksession)
    
    
    rfr = RandomForestRegressor(featuresCol="features", labelCol="label", maxDepth=5, subsamplingRate=0.8, )
    model = rfr.fit(df_train)
    predictions_rfr_test = model.transform(df_test)
    predictions_rfr_train = model.transform(df_train)

    # RMSE is used as evaluation metric
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    RMSE_rfr_test = evaluator.evaluate(predictions_rfr_test)
    RMSE_rfr_train = evaluator.evaluate(predictions_rfr_train)
    return (df_test, df_train, predictions_rfr_test, predictions_rfr_train, RMSE_rfr_test, RMSE_rfr_train)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Splitting The Data**

In [4]:
def TimeSeriesSplit(df_m, splitRatio, sparksession):
    newSchema = StructType(df_m.schema.fields + [StructField("Row Number", LongType(), False)])
    new_rdd = df_m.rdd.zipWithIndex().map(lambda x: list(x[0]) + [x[1]])
    df_m2 = sparksession.createDataFrame(new_rdd, newSchema)
    total_rows = df_m2.count()
    splitFraction = int(total_rows * splitRatio)
    df_train = df_m2.where(df_m2["Row Number"] >= 0).where(df_m2["Row Number"] <= splitFraction)
    df_test = df_m2.where(df_m2["Row Number"] > splitFraction)

    return df_train, df_test

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def Difference(df, inputCol, outputCol):
    lag1Window = Window.rowsBetween(-1, 0)
    df = df.withColumn(outputCol, df[inputCol] - func.first(df[inputCol]).over(lag1Window))
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Convert Differenced Predictions Into Raw Predicions**

In [6]:
def Predict(i, df1, df2, timeSeriesCol, predictionCol, joinCol):
    dZCol = 'DeltaZ' + str(i)
    f_strCol = 'forecast_' + str(i) + 'hour'
    df = df1.join(df2, [joinCol], how="inner").orderBy(asc("dt_iso"))
    df = df.withColumnRenamed(predictionCol, dZCol)
    df = df.withColumn(f_strCol, col(dZCol) + col(timeSeriesCol))
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Making Predictions And Saving To Disk**

In [7]:
def SavePredictions(df, timeSeriesCol, forecast_hours, feature_nLags, filename, sparksession):
    
    diff_timeSeriesCol = "Diff_" + timeSeriesCol
    df = Difference(df, timeSeriesCol, diff_timeSeriesCol)

    RMSE_test = {}
    RMSE_train = {}
    corr_predict_train = {}
    corr_predict_test = {}
    strCol_prev = "forecast_0hour"

    index = 0
    for i in range(12, forecast_hours + 1, 12):

        
        df_test, df_train, predictions_test, predictions_train, RMSE_ts, RMSE_tr = Forecast(df.select("dt_iso", timeSeriesCol, diff_timeSeriesCol), i, feature_nLags, diff_timeSeriesCol, sparksession)

        RMSE_test.update({'forecast_' + str(i) + 'hour': RMSE_ts})
        RMSE_train.update({'forecast_' + str(i) + 'hour': RMSE_tr})

        
        if (i == 12):
            
            corr_predict_train = Predict(i, df_train.select("Row Number", "dt_iso", timeSeriesCol), predictions_train.select("Row Number", "prediction"), timeSeriesCol, "prediction", "Row Number")
            corr_predict_test = Predict(i, df_test.select("Row Number", "dt_iso", timeSeriesCol), predictions_test.select("Row Number", "prediction"), timeSeriesCol, "prediction", "Row Number")
        else:
            
            strCol_prev = "forecast_" + str(i - 12) + "hour"
            
            corr_predict_train = Predict(i, corr_predict_train, predictions_train.select("Row Number", "prediction"), strCol_prev, "prediction", "Row Number")
            corr_predict_test = Predict(i, corr_predict_test, predictions_test.select("Row Number", "prediction"), strCol_prev, "prediction", "Row Number")

            LeadWindow = Window.rowsBetween(0, index)
            a_strCol = "actual_" + str(i) + "hour"
            corr_predict_test = corr_predict_test.withColumn(a_strCol, func.last(corr_predict_test[timeSeriesCol]).over(LeadWindow))
            corr_predict_train = corr_predict_train.withColumn(a_strCol, func.last(corr_predict_test[timeSeriesCol]).over(LeadWindow))
            
            
        index += 1
    corr_predict_test.write.mode("overwrite").format("csv").option("header", "true").save(filename + "test.csv")
    corr_predict_train.write.mode("overwrite").format("csv").option("header", "true").save(filename + "train.csv")


    
    return RMSE_train, RMSE_test, corr_predict_train, corr_predict_test

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Cleaning Data**

In [8]:
def get_data(fileLocation):
    df = pd.read_csv(fileLocation)
    del df ["dt"]
    del df ["timezone"]
    del df ["city_name"]
    del df ["lat"]
    del df ["lon"]
    del df ["sea_level"]
    del df ["grnd_level"]
    del df ["weather_icon"]
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Starting The Spark Application**

In [9]:




conf = SparkConf()
spark = SparkSession.builder.appName("TimeSeries").master("local").config(conf=conf).getOrCreate()

data = []

for city in ["sf", "seattle"]:
    
    print("\n***************************************************")
    print(city)
    print("***************************************************")

    df = get_data("s3://spark-weather-project/data/" + city + ".csv")

    spark_df = spark.createDataFrame(df)


    timeSeriesCol = "temp"
    forecast_hours = 72
    num_lags = 3


    RMSE_train, RMSE_test, train, test = SavePredictions(spark_df, timeSeriesCol, forecast_hours, num_lags, "s3://spark-weather-project/data/" + city + "_", spark)
    
    data.append([RMSE_train, RMSE_test, train, test])
    
    print(RMSE_train)
    print(RMSE_test)




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


***************************************************
sf
***************************************************
{'forecast_12hour': 1.347097026810685, 'forecast_24hour': 1.0767214208591689, 'forecast_36hour': 1.3680354611696, 'forecast_48hour': 1.1389964586382983, 'forecast_60hour': 1.3765635853880558, 'forecast_72hour': 1.1634881704289837}
{'forecast_12hour': 1.2684462018868932, 'forecast_24hour': 0.9691871290424079, 'forecast_36hour': 1.2864138145664687, 'forecast_48hour': 1.0278541305351683, 'forecast_60hour': 1.2928025946466204, 'forecast_72hour': 1.043000292304716}

***************************************************
seattle
***************************************************
{'forecast_12hour': 1.598093178660386, 'forecast_24hour': 1.5341791876506672, 'forecast_36hour': 1.6465735861809274, 'forecast_48hour': 1.6046341972766143, 'forecast_60hour': 1.6801126599995369, 'forecast_72hour': 1.650786169238132}
{'forecast_12hour': 1.2478753022106495, 'forecast_24hour': 1.1707074761560423, 'f