In [None]:
import pyspark


In [19]:
import pandas as pd
from datetime import datetime, timedelta
import random

random.seed(42)
cities = ['Hanoi', 'Ho Chi Minh City', 'Da Nang', 'Can Tho', 'Hai Phong']
start_date = datetime.now() - timedelta(days=9)
dates = [(start_date + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(10)]
data = []

for city in cities:
    base_temp = random.randint(25, 32)
    for date in dates:
        temperature = base_temp + random.uniform(-5, 5)
        humidity = random.randint(60, 90)
        data.append({
            'City': city,
            'Date': date,
            'Temperature': round(temperature, 1),
            'Humidity': humidity
        })

df = pd.DataFrame(data)
filename = 'weather_data.csv'

# FIX: Remove sheet_name parameter for CSV files
df.to_csv(filename, index=False)

print(f"✓ CSV file '{filename}' created successfully!")
print(f"\nDataset Preview:")
print(df.head(10))
print(f"\nTotal entries: {len(df)}")


✓ CSV file 'weather_data.csv' created successfully!

Dataset Preview:
    City        Date  Temperature  Humidity
0  Hanoi  2025-10-14         21.3        68
1  Hanoi  2025-10-15         23.4        64
2  Hanoi  2025-10-16         28.4        81
3  Hanoi  2025-10-17         28.4        77
4  Hanoi  2025-10-18         21.9        73
5  Hanoi  2025-10-19         21.3        62
6  Hanoi  2025-10-20         23.2        76
7  Hanoi  2025-10-21         27.0        77
8  Hanoi  2025-10-22         23.0        80
9  Hanoi  2025-10-23         28.0        73

Total entries: 50


In [None]:
df = pd.read_csv("weather_data.csv")

In [None]:
import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
SparkSession.getActiveSession().stop() if SparkSession.getActiveSession() else None
spark = SparkSession.builder \
    .appName("WeatherDataProcessing") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.ui.enabled", "false") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

print("✓ SparkSession created successfully!")
print(f"Spark version: {spark.version}")


✓ SparkSession created successfully!
Spark version: 4.0.1


In [36]:
df_pyspark = spark.read.csv("weather_data.csv", header=True, inferSchema=True)
df_pyspark.show()

+----------------+----------+-----------+--------+
|            City|      Date|Temperature|Humidity|
+----------------+----------+-----------+--------+
|           Hanoi|2025-10-14|       21.3|      68|
|           Hanoi|2025-10-15|       23.4|      64|
|           Hanoi|2025-10-16|       28.4|      81|
|           Hanoi|2025-10-17|       28.4|      77|
|           Hanoi|2025-10-18|       21.9|      73|
|           Hanoi|2025-10-19|       21.3|      62|
|           Hanoi|2025-10-20|       23.2|      76|
|           Hanoi|2025-10-21|       27.0|      77|
|           Hanoi|2025-10-22|       23.0|      80|
|           Hanoi|2025-10-23|       28.0|      73|
|Ho Chi Minh City|2025-10-14|       27.5|      68|
|Ho Chi Minh City|2025-10-15|       31.1|      60|
|Ho Chi Minh City|2025-10-16|       30.6|      65|
|Ho Chi Minh City|2025-10-17|       30.0|      70|
|Ho Chi Minh City|2025-10-18|       25.8|      66|
|Ho Chi Minh City|2025-10-19|       32.6|      70|
|Ho Chi Minh City|2025-10-20|  

In [23]:
df_pyspark.printSchema()

root
 |-- City: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: integer (nullable = true)



In [25]:
df_pyspark.select("City", "Date", "Temperature").show(10)

+-----+----------+-----------+
| City|      Date|Temperature|
+-----+----------+-----------+
|Hanoi|2025-10-14|       21.3|
|Hanoi|2025-10-15|       23.4|
|Hanoi|2025-10-16|       28.4|
|Hanoi|2025-10-17|       28.4|
|Hanoi|2025-10-18|       21.9|
|Hanoi|2025-10-19|       21.3|
|Hanoi|2025-10-20|       23.2|
|Hanoi|2025-10-21|       27.0|
|Hanoi|2025-10-22|       23.0|
|Hanoi|2025-10-23|       28.0|
+-----+----------+-----------+
only showing top 10 rows


In [26]:
df_pyspark.describe().show()

+-------+----------------+------------------+-----------------+
|summary|            City|       Temperature|         Humidity|
+-------+----------------+------------------+-----------------+
|  count|              50|                50|               50|
|   mean|            NULL|29.305999999999997|            74.16|
| stddev|            NULL| 3.997928545267064|8.574666055257412|
|    min|         Can Tho|              21.3|               60|
|    max|Ho Chi Minh City|              36.9|               90|
+-------+----------------+------------------+-----------------+



In [37]:
df_pyspark.dtypes

[('City', 'string'),
 ('Date', 'date'),
 ('Temperature', 'double'),
 ('Humidity', 'int')]

In [38]:
df_pyspark = df_pyspark.withColumn("Temperature_F", (df_pyspark["Temperature"] * 9/5) + 32)


In [39]:
df_pyspark.show()

+----------------+----------+-----------+--------+-----------------+
|            City|      Date|Temperature|Humidity|    Temperature_F|
+----------------+----------+-----------+--------+-----------------+
|           Hanoi|2025-10-14|       21.3|      68|            70.34|
|           Hanoi|2025-10-15|       23.4|      64|            74.12|
|           Hanoi|2025-10-16|       28.4|      81|            83.12|
|           Hanoi|2025-10-17|       28.4|      77|            83.12|
|           Hanoi|2025-10-18|       21.9|      73|            71.42|
|           Hanoi|2025-10-19|       21.3|      62|            70.34|
|           Hanoi|2025-10-20|       23.2|      76|73.75999999999999|
|           Hanoi|2025-10-21|       27.0|      77|             80.6|
|           Hanoi|2025-10-22|       23.0|      80|             73.4|
|           Hanoi|2025-10-23|       28.0|      73|             82.4|
|Ho Chi Minh City|2025-10-14|       27.5|      68|             81.5|
|Ho Chi Minh City|2025-10-15|     

In [41]:
from pyspark.sql.functions import col, avg, count, when, rand
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import random

df_missing = df_pyspark.withColumn(
    "Temperature",
    when(rand() > 0.8, None).otherwise(col("Temperature"))
)

df_missing.show(5)

print("\nMissing values count:")
for column in df_missing.columns:
    missing_count = df_missing.filter(col(column).isNull()).count()
    print(f"{column}: {missing_count}")

+-----+----------+-----------+--------+-------------+
| City|      Date|Temperature|Humidity|Temperature_F|
+-----+----------+-----------+--------+-------------+
|Hanoi|2025-10-14|       21.3|      68|        70.34|
|Hanoi|2025-10-15|       23.4|      64|        74.12|
|Hanoi|2025-10-16|       28.4|      81|        83.12|
|Hanoi|2025-10-17|       NULL|      77|        83.12|
|Hanoi|2025-10-18|       NULL|      73|        71.42|
+-----+----------+-----------+--------+-------------+
only showing top 5 rows

Missing values count:
City: 0
Date: 0
Temperature: 11
Date: 0
Temperature: 11
Humidity: 0
Temperature_F: 0
Humidity: 0
Temperature_F: 0


In [42]:
mean_temp = df_missing.select(avg("Temperature")).collect()[0][0]
df_clean = df_missing.fillna(value=mean_temp, subset=["Temperature"])
df_clean = df_clean.withColumn(
    "Temp_Category",
    when(col("Temperature") < 28, "Cool")
    .when(col("Temperature") < 32, "Moderate")
    .otherwise("Hot"))
df_clean = df_clean.drop("Temperature_F")

df_clean.show(5)

+-----+----------+-----------------+--------+-------------+
| City|      Date|      Temperature|Humidity|Temp_Category|
+-----+----------+-----------------+--------+-------------+
|Hanoi|2025-10-14|             21.3|      68|         Cool|
|Hanoi|2025-10-15|             23.4|      64|         Cool|
|Hanoi|2025-10-16|             28.4|      81|     Moderate|
|Hanoi|2025-10-17|29.21794871794872|      77|     Moderate|
|Hanoi|2025-10-18|29.21794871794872|      73|     Moderate|
+-----+----------+-----------------+--------+-------------+
only showing top 5 rows


In [None]:
high_humidity = df_clean.filter(col("Humidity") > 80)
sorted_data = high_humidity.orderBy(col("Temperature").desc())

print("Top High humidity locations:")
sorted_data.show(5)
city_stats = df_clean.groupBy("City").agg(
    avg("Temperature").alias("Avg_Temp"),
    avg("Humidity").alias("Avg_Humidity"),
    count("*").alias("Number_of_Records")
)


city_stats.show()

Top High humidity locations:
+---------+----------+-----------+--------+-------------+
|     City|      Date|Temperature|Humidity|Temp_Category|
+---------+----------+-----------+--------+-------------+
|  Da Nang|2025-10-22|       35.6|      87|          Hot|
|  Can Tho|2025-10-17|       34.0|      81|          Hot|
|  Da Nang|2025-10-20|       33.6|      84|          Hot|
|  Da Nang|2025-10-17|       33.2|      87|          Hot|
|Hai Phong|2025-10-19|       30.7|      82|     Moderate|
+---------+----------+-----------+--------+-------------+
only showing top 5 rows
+---------+----------+-----------+--------+-------------+
|     City|      Date|Temperature|Humidity|Temp_Category|
+---------+----------+-----------+--------+-------------+
|  Da Nang|2025-10-22|       35.6|      87|          Hot|
|  Can Tho|2025-10-17|       34.0|      81|          Hot|
|  Da Nang|2025-10-20|       33.6|      84|          Hot|
|  Da Nang|2025-10-17|       33.2|      87|          Hot|
|Hai Phong|2025-10-

In [44]:
from pyspark.sql.functions import to_date, dayofyear
ml_data = df_clean.withColumn("Date", to_date("Date")) \
    .withColumn("DayOfYear", dayofyear("Date"))
assembler = VectorAssembler(inputCols=["Humidity", "DayOfYear"],outputCol="features")


final_data = assembler.transform(ml_data).select("features", "Temperature")
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)
lr = LinearRegression(featuresCol="features", labelCol="Temperature")
model = lr.fit(train_data)
predictions = model.transform(test_data)
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)
print("\nSample Predictions:")
predictions.select("Temperature", "prediction").show(5)
r2 = model.evaluate(test_data).r2
print(f"\nR-squared: {r2:.4f}")

Coefficients: [0.11412238863858215,0.21254647584458647]
Intercept: -40.79317501466185

Sample Predictions:
+-----------------+------------------+
|      Temperature|        prediction|
+-----------------+------------------+
|             29.0|27.920891075860318|
|             26.4|   29.424414708205|
|29.21794871794872| 28.90089766930982|
|             27.5|27.967985980158055|
|             30.0| 28.83387018496898|
+-----------------+------------------+
only showing top 5 rows
+-----------------+------------------+
|      Temperature|        prediction|
+-----------------+------------------+
|             29.0|27.920891075860318|
|             26.4|   29.424414708205|
|29.21794871794872| 28.90089766930982|
|             27.5|27.967985980158055|
|             30.0| 28.83387018496898|
+-----------------+------------------+
only showing top 5 rows

R-squared: -0.3663

R-squared: -0.3663
