In [0]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ParquetExample").getOrCreate()

# Read a CSV file into a DataFrame
df = spark.read.parquet("/FileStore/tables/extracted_parquet_parquet.gzip", header=True, inferSchema=True)

# Show the first 10 rows of the DataFrame
df.show(10)

+----+---+-----+---------+-----+----+------+------------------+-----+------+-----------+-----------+-----------+------------+------------+------------+
|YEAR|DOY| T10M|CLOUD_AMT|QV10M|  PW|    PS|GLOBAL_ILLUMINANCE|WS10M|EVLAND|   latitude|  longitude|       PCA1|        PCA2|        PCA3|        City|
+----+---+-----+---------+-----+----+------+------------------+-----+------+-----------+-----------+-----------+------------+------------+------------+
|2001|  1| 2.43|    96.34| 4.21| 1.8| 99.59|           3881.11|10.13|  5.13| 52.3730796|  4.8924534| 3880.99388|-96.60940259| 19.05901157|   Amsterdam|
|2001|  1|11.73|    80.28| 7.26|1.68| 98.85|          19408.04| 5.55| 17.01| 37.9839412| 23.7283052|19407.93612|-99.31509208| 13.98826986|      Athens|
|2001|  1|24.98|    47.09|11.47|2.83|100.95|          45094.98| 2.73| 26.39| 13.7524938|100.4935089|45094.89906|-94.73409344| 13.79627312|     Bangkok|
|2001|  1| 8.66|    61.77| 5.86|1.77| 97.56|           19163.0| 3.54|  3.95| 41.3828939|

In [0]:
from pyspark.sql.types import IntegerType, DoubleType, StringType

df = df.withColumn("YEAR",df["YEAR"].cast(IntegerType()))
df = df.withColumn("DOY",df["DOY"].cast(IntegerType()))
df = df.withColumn("T10M",df["T10M"].cast(DoubleType()))
df = df.withColumn("CLOUD_AMT",df["CLOUD_AMT"].cast(DoubleType()))
df = df.withColumn("QV10M",df["QV10M"].cast(DoubleType()))
df = df.withColumn("PW",df["PW"].cast(DoubleType()))
df = df.withColumn("PS",df["PS"].cast(DoubleType()))
df = df.withColumn("GLOBAL_ILLUMINANCE",df["GLOBAL_ILLUMINANCE"].cast(DoubleType()))
df = df.withColumn("WS10M",df["WS10M"].cast(DoubleType()))
df = df.withColumn("EVLAND",df["EVLAND"].cast(DoubleType()))
df = df.withColumn("latitude",df["latitude"].cast(DoubleType()))
df = df.withColumn("longitude",df["longitude"].cast(DoubleType()))
df = df.withColumn("PCA1",df["PCA1"].cast(DoubleType()))
df = df.withColumn("PCA2",df["PCA2"].cast(DoubleType()))
df = df.withColumn("PCA3",df["PCA3"].cast(DoubleType()))
df = df.withColumn("City",df["City"].cast(StringType()))

In [0]:
cities_row = df.select(['City']).distinct().collect()
cities = [row['City'] for row in cities_row]
print(cities)

['Cairo', 'Casablanca', 'Lima', 'Madrid', 'Prague', 'Singapore', 'Jakarta', 'Beijing', 'Rabat', 'Stockholm', 'Los Angeles', 'Moscow', 'Oslo', 'Dublin', 'Berlin', 'Kuala Lumpur', 'Lagos', 'London', 'Mumbai', 'Sydney', 'Tokyo', 'Vienna', 'Mexico City', 'Cape Town', 'Paris', 'Athens', 'Zurich', 'Istanbul', 'Toronto', 'Barcelona', 'Warsaw', 'Hong Kong', 'Amsterdam', 'Brussels', 'Buenos Aires', 'Manila', 'Johannesburg', 'Taipei', 'Helsinki', 'Hanoi', 'Bogota', 'Rome', 'Rio de Janeiro', 'Nairobi', 'Seoul', 'Bangkok', 'Dubai', 'Budapest', 'Santiago', 'New York']


In [0]:
from pyspark.sql.functions import col

mergedDic = {}
for city in cities:
    df_city = df.filter(col('City') == city)
    mergedDic[city] = df_city

In [0]:
#test
print(mergedDic.keys())
print(mergedDic['New York'].show(3))

dict_keys(['Cairo', 'Casablanca', 'Lima', 'Madrid', 'Prague', 'Singapore', 'Jakarta', 'Beijing', 'Rabat', 'Stockholm', 'Los Angeles', 'Moscow', 'Oslo', 'Dublin', 'Berlin', 'Kuala Lumpur', 'Lagos', 'London', 'Mumbai', 'Sydney', 'Tokyo', 'Vienna', 'Mexico City', 'Cape Town', 'Paris', 'Athens', 'Zurich', 'Istanbul', 'Toronto', 'Barcelona', 'Warsaw', 'Hong Kong', 'Amsterdam', 'Brussels', 'Buenos Aires', 'Manila', 'Johannesburg', 'Taipei', 'Helsinki', 'Hanoi', 'Bogota', 'Rome', 'Rio de Janeiro', 'Nairobi', 'Seoul', 'Bangkok', 'Dubai', 'Budapest', 'Santiago', 'New York'])
+----+---+-----+---------+-----+----+------+------------------+-----+------+----------+-----------+-----------+------------+-----------+--------+
|YEAR|DOY| T10M|CLOUD_AMT|QV10M|  PW|    PS|GLOBAL_ILLUMINANCE|WS10M|EVLAND|  latitude|  longitude|       PCA1|        PCA2|       PCA3|    City|
+----+---+-----+---------+-----+----+------+------------------+-----+------+----------+-----------+-----------+------------+-----------

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, mean, lit

    
def baseline(df_city):
    # Split data into train and test sets
    (train_data, test_data) = df_city.randomSplit([0.7, 0.3], seed=123)
    
    # Calculate the mean of the target variable in the training set
    mean_T10M = train_data.select(mean('T10M')).collect()[0][0]
    
    # Create a DataFrame with the mean as the prediction for all test examples
    predictions = test_data.withColumn('prediction', lit(mean_T10M))
    
    # Evaluate the model performance
    evaluator = RegressionEvaluator(labelCol="T10M", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    return rmse


def linear_regression(df_city):
    # Split data into train and test sets
    (train_data, test_data) = df_city.randomSplit([0.7, 0.3], seed=123)
    
    # Define the features and label
    feature_cols = ['PCA1', 'PCA2', 'PCA3']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    train_data = assembler.transform(train_data).select(col("T10M").alias("label"), col("features"))
    test_data = assembler.transform(test_data).select(col("T10M").alias("label"), col("features"))
    
    # Train the linear regression model
    lr = LinearRegression(featuresCol = 'features', labelCol='label')
    lr_model = lr.fit(train_data)
    
    # Make predictions on test data
    predictions = lr_model.transform(test_data)
    
    # Evaluate the model performance
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    return rmse

def multiple_regression(df_city):
    # Split data into train and test sets
    (train_data, test_data) = df_city.randomSplit([0.7, 0.3], seed=123)
    
    # Define the features and label
    feature_cols = ['PCA1', 'PCA2', 'PCA3', 'DOY', 'CLOUD_AMT', 'QV10M', 'PW', 'PS', 'GLOBAL_ILLUMINANCE', 'WS10M', 'EVLAND']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    train_data = assembler.transform(train_data).select(col("T10M").alias("label"), col("features"))
    test_data = assembler.transform(test_data).select(col("T10M").alias("label"), col("features"))
    
    # Train the multiple regression model
    mr = LinearRegression(featuresCol = 'features', labelCol='label')
    mr_model = mr.fit(train_data)
    
    # Make predictions on test data
    predictions = mr_model.transform(test_data)
    
    # Evaluate the model performance
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    # Get the feature importance
    feature_importance = mr_model.coefficients
    return rmse


In [0]:
#test
newyorkdf = mergedDic["New York"]
print(baseline(newyorkdf))
print(linear_regression(newyorkdf))
print(multiple_regression(newyorkdf))

8.724222155744421
6.702703622269673
2.2275482436510194


In [0]:
print(cities)

['Cairo', 'Casablanca', 'Lima', 'Madrid', 'Prague', 'Singapore', 'Jakarta', 'Beijing', 'Rabat', 'Stockholm', 'Los Angeles', 'Moscow', 'Oslo', 'Dublin', 'Berlin', 'Kuala Lumpur', 'Lagos', 'London', 'Mumbai', 'Sydney', 'Tokyo', 'Vienna', 'Mexico City', 'Cape Town', 'Paris', 'Athens', 'Zurich', 'Istanbul', 'Toronto', 'Barcelona', 'Warsaw', 'Hong Kong', 'Amsterdam', 'Brussels', 'Buenos Aires', 'Manila', 'Johannesburg', 'Taipei', 'Helsinki', 'Hanoi', 'Bogota', 'Rome', 'Rio de Janeiro', 'Nairobi', 'Seoul', 'Bangkok', 'Dubai', 'Budapest', 'Santiago', 'New York']


In [0]:
df_results = pd.DataFrame(columns=['City', 'LinReg_RMSE', 'MultReg_RMSE','BaseLine_RMSE'])
for city in cities:
    df = mergedDic[city]
    linear = linear_regression(df)
    multi = multiple_regression(df)
    base = baseline(df)
    df_temp = pd.DataFrame({'City': city, 'LinReg_RMSE': linear, 'MultReg_RMSE': multi,'BaseLine_RMSE':base}, index=[0])
    df_results = pd.concat([df_results, df_temp], ignore_index=True)
    

In [0]:
print(df_results)

              City LinReg_RMSE MultReg_RMSE BaseLine_RMSE
0            Cairo    4.311845     2.428255      6.517738
1       Casablanca    3.307361     2.086417      4.411729
2             Lima    0.997675     0.674446      1.283152
3           Madrid    4.731552     2.659818       7.99981
4           Prague    5.294961     2.231649      8.596102
5        Singapore    0.887906     0.521467      0.922122
6          Jakarta    0.686824     0.533368       0.71788
7          Beijing    6.661422     3.580931     11.639139
8            Rabat    3.082921     1.852666      4.108692
9        Stockholm    5.408081     1.747858      8.142192
10     Los Angeles    3.792334     3.170543      5.156891
11          Moscow    6.027384     3.350893      11.46875
12            Oslo    4.547945     1.992295      8.563352
13          Dublin    2.907315     0.684622      4.042204
14          Berlin    5.396394     1.941835      8.293438
15    Kuala Lumpur    0.754657      0.47046      0.813488
16           L

In [0]:
print(df_results['LinReg_RMSE'].mean())
print(df_results['MultReg_RMSE'].mean())
print(df_results['BaseLine_RMSE'].mean())

3.6074616590271216
1.6685593303407638
5.526351039235882
