In [None]:
pip install datasets



In [None]:
from datasets import load_dataset
dataset = load_dataset('syntaxnoob/weather-prediction-prototype-aws')
train_dataset = dataset['train']
df = train_dataset.to_pandas()
df.to_csv('weather_data.csv', index=False)

In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Weather Data Loading") \
    .getOrCreate()
df = spark.read.csv('weather_data.csv', header=True, inferSchema=True)
df.show()

+-------------------+------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+-------------------+-------------------+------+----------+----------+------------------+------------------+------------------+------------------+---------------------+---------------------+-------------------+-----------------------+-----------------------+---------------+-------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+
|        timestamp_1|code_1|    air_pressure_1| air_temperature_1|relative_humidity_1|precipitation_1|      wind_speed_1|  wind_direction_1|        timestamp_2|    tar_timestamp_1|    tar_timestamp_2|code_2|tar_code_1|tar_code_2|    air_pressure_2|tar_air_pressure_1|tar_air_pressure_2| air_temperature_2|tar_air_temperature_1|tar_air_temperature_2|relative_humidity_2|tar_relative_humidity_1|tar_relative_hu

Handling missing values:

In [None]:
from pyspark.sql.functions import col, count,when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+------+--------------+-----------------+-------------------+---------------+------------+----------------+-----------+---------------+---------------+------+----------+----------+--------------+------------------+------------------+-----------------+---------------------+---------------------+-------------------+-----------------------+-----------------------+---------------+-------------------+-------------------+------------+----------------+----------------+----------------+--------------------+--------------------+
|timestamp_1|code_1|air_pressure_1|air_temperature_1|relative_humidity_1|precipitation_1|wind_speed_1|wind_direction_1|timestamp_2|tar_timestamp_1|tar_timestamp_2|code_2|tar_code_1|tar_code_2|air_pressure_2|tar_air_pressure_1|tar_air_pressure_2|air_temperature_2|tar_air_temperature_1|tar_air_temperature_2|relative_humidity_2|tar_relative_humidity_1|tar_relative_humidity_2|precipitation_2|tar_precipitation_1|tar_precipitation_2|wind_speed_2|tar_wind_speed_1|t

In [None]:
df.printSchema()

root
 |-- timestamp_1: timestamp (nullable = true)
 |-- code_1: integer (nullable = true)
 |-- air_pressure_1: double (nullable = true)
 |-- air_temperature_1: double (nullable = true)
 |-- relative_humidity_1: double (nullable = true)
 |-- precipitation_1: double (nullable = true)
 |-- wind_speed_1: double (nullable = true)
 |-- wind_direction_1: double (nullable = true)
 |-- timestamp_2: timestamp (nullable = true)
 |-- tar_timestamp_1: timestamp (nullable = true)
 |-- tar_timestamp_2: timestamp (nullable = true)
 |-- code_2: double (nullable = true)
 |-- tar_code_1: double (nullable = true)
 |-- tar_code_2: double (nullable = true)
 |-- air_pressure_2: double (nullable = true)
 |-- tar_air_pressure_1: double (nullable = true)
 |-- tar_air_pressure_2: double (nullable = true)
 |-- air_temperature_2: double (nullable = true)
 |-- tar_air_temperature_1: double (nullable = true)
 |-- tar_air_temperature_2: double (nullable = true)
 |-- relative_humidity_2: double (nullable = true)
 |-- 

Random forest

In [None]:
# Selecting numerical columns for clustering
features = [col for col in df.columns if not col.startswith('tar_') and 'timestamp' not in col and 'code' not in col]

In [None]:
from pyspark.sql.functions import hour, dayofweek, month

# Example transformation: converting 'timestamp_1' to more useful features
df = df.withColumn('hour_1', hour('timestamp_1'))
df = df.withColumn('day_of_week_1', dayofweek('timestamp_1'))
df = df.withColumn('month_1', month('timestamp_1'))

# Add these new columns to your features list if you want to include them in the model
features.extend(['hour_1', 'day_of_week_1', 'month_1'])

In [None]:
# Example transformation: converting 'timestamp_2' to more useful features
df = df.withColumn('hour_2', hour('timestamp_2'))
df = df.withColumn('day_of_week_2', dayofweek('timestamp_2'))
df = df.withColumn('month_2', month('timestamp_2'))

# Add these new columns to your features list if you want to include them in the model
features.extend(['hour_2', 'day_of_week_2', 'month_2'])

In [None]:
df[features]

DataFrame[air_pressure_1: double, air_temperature_1: double, relative_humidity_1: double, precipitation_1: double, wind_speed_1: double, wind_direction_1: double, air_pressure_2: double, air_temperature_2: double, relative_humidity_2: double, precipitation_2: double, wind_speed_2: double, wind_direction_2: double, hour_1: int, day_of_week_1: int, month_1: int, hour_2: int, day_of_week_2: int, month_2: int]

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


# Defining the features to be used
feature_columns = [
    'code_1', 'air_temperature_1', 'relative_humidity_1', 'precipitation_1',
    'wind_speed_1', 'wind_direction_1', 'air_pressure_2', 'air_temperature_2',
    'relative_humidity_2', 'precipitation_2', 'wind_speed_2', 'wind_direction_2',
    'hour_2', 'day_of_week_2', 'month_2', 'hour_1', 'day_of_week_1', 'month_1'
]

# List of target variables
targets = [
    'air_pressure_1', 'air_temperature_1', 'relative_humidity_1', 'precipitation_1',
    'wind_speed_1', 'wind_direction_1', 'air_pressure_2', 'air_temperature_2',
    'relative_humidity_2', 'precipitation_2', 'wind_speed_2', 'wind_direction_2'
]

def train_and_evaluate_model(df, feature_columns, target):

    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    rf = RandomForestRegressor(featuresCol="features", labelCol=target, numTrees=50, maxDepth=10)
    pipeline = Pipeline(stages=[assembler, rf])
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) for {target} on test data = {rmse}")
    predictions.select(target, "prediction").show(5)
    return model, rmse


models = {}
for target in targets:
    model, rmse = train_and_evaluate_model(df, feature_columns, target)
    models[target] = model



Root Mean Squared Error (RMSE) for air_pressure_1 on test data = 0.01777192079233687
+------------------+-------------------+
|    air_pressure_1|         prediction|
+------------------+-------------------+
|0.4756533700137549|0.46430077051568447|
|0.7921137093076565| 0.7833656339295042|
|0.4767537826685005| 0.4664177671755607|
|0.7974323704722592| 0.7852340656125913|
|0.4767537826685005| 0.4914772614592113|
+------------------+-------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) for air_temperature_1 on test data = 0.008902094453156347
+------------------+-------------------+
| air_temperature_1|         prediction|
+------------------+-------------------+
|0.3558696988322065|0.35740081458738404|
|0.4394591272280271| 0.4358974670041313|
| 0.357918459332104| 0.3575136488084082|
|0.4357713583282114| 0.4283559566373961|
|0.3614013521819299| 0.3581713865841553|
+------------------+-------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) for rel

K means clustering

In [None]:
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)

In [None]:
from pyspark.ml.clustering import KMeans


k = 5

kmeans = KMeans(featuresCol="features", k=k, seed=1)
model = kmeans.fit(df.limit(100))

predictions = model.transform(df)
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.21839406744530332
Cluster Centers: 
[ 0.59789852  0.41453254  0.64809885  0.          0.13125549  0.47111111
  0.70535687  0.43447381  0.62368789  0.          0.15309482  0.60872685
 11.5         6.         11.         11.5         6.         11.        ]
[ 0.63613022  0.39810831  0.82660757  0.          0.17028826  0.32834491
  0.63639386  0.39800587  0.8248046   0.          0.1659899   0.32421296
  1.5         6.         11.          1.5         6.         11.        ]
[ 0.68988232  0.40954722  0.72957887  0.          0.08708906  0.56975309
  0.61834021  0.39576362  0.81937522  0.          0.10486782  0.4012963
  9.          6.         11.          9.          6.         11.        ]
[ 0.64693844  0.39504759  0.61712732  0.          0.13129539  0.52863636
  0.64691759  0.3968356   0.60907958  0.          0.12818262  0.60515152
 14.36363636  6.         11.         14.36363636  6.         11.        ]
[ 0.6180269   0.39000717  0.83219113  

In [None]:

features = [
    'air_pressure_1', 'air_temperature_1', 'relative_humidity_1', 'precipitation_1',
    'wind_speed_1', 'wind_direction_1', 'air_pressure_2', 'air_temperature_2',
    'relative_humidity_2', 'precipitation_2', 'wind_speed_2', 'wind_direction_2',
    'hour_2', 'day_of_week_2', 'month_2', 'hour_1', 'day_of_week_1', 'month_1',
    'hour_2', 'day_of_week_2', 'month_2'
]


for i, center in enumerate(centers):
    print(f"Cluster {i+1} Centers:")
    for feature, value in zip(features, center):
        print(f"{feature}: {value:.2f}")
    print("\n")

Cluster 1 Centers:
air_pressure_1: 0.60
air_temperature_1: 0.41
relative_humidity_1: 0.65
precipitation_1: 0.00
wind_speed_1: 0.13
wind_direction_1: 0.47
air_pressure_2: 0.71
air_temperature_2: 0.43
relative_humidity_2: 0.62
precipitation_2: 0.00
wind_speed_2: 0.15
wind_direction_2: 0.61
hour_2: 11.50
day_of_week_2: 6.00
month_2: 11.00
hour_1: 11.50
day_of_week_1: 6.00
month_1: 11.00


Cluster 2 Centers:
air_pressure_1: 0.64
air_temperature_1: 0.40
relative_humidity_1: 0.83
precipitation_1: 0.00
wind_speed_1: 0.17
wind_direction_1: 0.33
air_pressure_2: 0.64
air_temperature_2: 0.40
relative_humidity_2: 0.82
precipitation_2: 0.00
wind_speed_2: 0.17
wind_direction_2: 0.32
hour_2: 1.50
day_of_week_2: 6.00
month_2: 11.00
hour_1: 1.50
day_of_week_1: 6.00
month_1: 11.00


Cluster 3 Centers:
air_pressure_1: 0.69
air_temperature_1: 0.41
relative_humidity_1: 0.73
precipitation_1: 0.00
wind_speed_1: 0.09
wind_direction_1: 0.57
air_pressure_2: 0.62
air_temperature_2: 0.40
relative_humidity_2: 0.82

Naive bayes classifier

In [None]:
df1=df

In [None]:
threshold_temperature = 0.5
threshold_airpressure=0.5
threshold_relativehumidity=0.5
threshold_preciptation=0.5
threshold_windspeed=0.5
df1 = df1.withColumn("high_humidity_2", (col("relative_humidity_2") > threshold_relativehumidity).cast("int"))
df1 = df1.withColumn("high_precipitation_1", (col("precipitation_1") > threshold_preciptation).cast("int"))
df1 = df1.withColumn("high_precipitation_2", (col("precipitation_2") > threshold_preciptation).cast("int"))
df1 = df1.withColumn("high_windspeed_1", (col("wind_speed_1") > threshold_windspeed).cast("int"))
df1 = df1.withColumn("high_windspeed_2", (col("wind_speed_2") > threshold_windspeed).cast("int"))

In [None]:
df1 = df1.withColumn("high_temperature_1", (col("air_temperature_1") > threshold_temperature).cast("int"))

In [None]:
df1 = df1.withColumn("high_temperature_2", (col("air_temperature_2") > threshold_temperature).cast("int"))

In [None]:
df1 = df1.withColumn("high_pressure_1", (col("air_pressure_1") > threshold_airpressure).cast("int"))

In [None]:
df1 = df1.withColumn("high_pressure_2", (col("air_pressure_2") > threshold_airpressure).cast("int"))

In [None]:
df1 = df1.withColumn("high_humidity_1", (col("relative_humidity_1") > threshold_relativehumidity).cast("int"))

In [None]:
df1.columns

['timestamp_1',
 'code_1',
 'air_pressure_1',
 'air_temperature_1',
 'relative_humidity_1',
 'precipitation_1',
 'wind_speed_1',
 'wind_direction_1',
 'timestamp_2',
 'tar_timestamp_1',
 'tar_timestamp_2',
 'code_2',
 'tar_code_1',
 'tar_code_2',
 'air_pressure_2',
 'tar_air_pressure_1',
 'tar_air_pressure_2',
 'air_temperature_2',
 'tar_air_temperature_1',
 'tar_air_temperature_2',
 'relative_humidity_2',
 'tar_relative_humidity_1',
 'tar_relative_humidity_2',
 'precipitation_2',
 'tar_precipitation_1',
 'tar_precipitation_2',
 'wind_speed_2',
 'tar_wind_speed_1',
 'tar_wind_speed_2',
 'wind_direction_2',
 'tar_wind_direction_1',
 'tar_wind_direction_2',
 'hour_1',
 'day_of_week_1',
 'month_1',
 'hour_2',
 'day_of_week_2',
 'month_2',
 'features',
 'high_humidity_2',
 'high_precipitation_1',
 'high_precipitation_2',
 'high_windspeed_1',
 'high_windspeed_2',
 'high_temperature_1',
 'high_temperature_2',
 'high_pressure_1',
 'high_pressure_2',
 'high_humidity_1']

In [None]:
features = [col for col in df.columns if
            not col.startswith('tar_') and
            'timestamp' not in col and
            'code' not in col and
            not col.startswith('air_') and
            not col.startswith('wind_') and
            not col.startswith('precipitation_') and
            not col.startswith('relative_') and
            not col.startswith('day_') and
            not col.startswith('month_') and
            not col.startswith('hour_')
            and 'features' not in col]


In [None]:

features = [col for col in df1.columns if 'high' in col]

df_high_features = df1.select(features)


df_high_features.show()

+---------------+--------------------+--------------------+----------------+----------------+------------------+------------------+---------------+---------------+---------------+
|high_humidity_2|high_precipitation_1|high_precipitation_2|high_windspeed_1|high_windspeed_2|high_temperature_1|high_temperature_2|high_pressure_1|high_pressure_2|high_humidity_1|
+---------------+--------------------+--------------------+----------------+----------------+------------------+------------------+---------------+---------------+---------------+
|              1|                   0|                   0|               0|               0|                 0|                 0|              1|              0|              1|
|              1|                   0|                   0|               0|               0|                 0|                 0|              0|              1|              1|
|              1|                   0|                   0|               0|               0|       

In [None]:
binary_features = [
    'high_temperature_1', 'high_humidity_1', 'high_windspeed_1',
    'high_temperature_2', 'high_humidity_2', 'high_windspeed_2'
]


assembler = VectorAssembler(inputCols=binary_features, outputCol="features")
df_high_features = assembler.transform(df_high_features)


df_high_features = df_high_features.withColumn("label", df1["high_precipitation_1"].cast("int"))

In [None]:

train_data, test_data = df_high_features.randomSplit([0.7, 0.3], seed=1234)

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:

nb = NaiveBayes(modelType="bernoulli")
model = nb.fit(train_data)

In [None]:

predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.9999882119954734


Decision tree classifier

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Row

In [None]:
feature_columns = [col for col in df.columns if col.startswith("air") or col.startswith("relative_humidity") or col.startswith("precipitation") or col.startswith("wind_speed") or col.startswith("wind_direction")]

In [None]:
from pyspark.sql.functions import col


if 'features' in df.columns:
    df = df.drop('features')


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

In [None]:
(training_data, test_data) = data.randomSplit([0.7, 0.3])

In [None]:
dt = DecisionTreeClassifier(labelCol="tar_code_1", featuresCol="features")
model = dt.fit(training_data)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="tar_code_1", predictionCol="prediction", metricName="accuracy")
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.42336


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [20, 40, 60]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

In [None]:
cv_model = crossval.fit(training_data)

In [None]:
predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.424559


In [None]:
best_model = cv_model.bestModel
print("Best Max Depth:", best_model.getMaxDepth())
print("Best Max Bins:", best_model.getMaxBins())

Best Max Depth: 5
Best Max Bins: 40
