In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, hour, month
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [48]:
# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("WeatherPredictionGBT") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

# In URL của Web UI để theo dõi
print("Spark Web UI running at:", spark.sparkContext.uiWebUrl)

Spark Web UI running at: http://DESKTOP-MGIJLCC:4040


## Đọc dữ liệu

In [49]:

# Khởi tạo Spark Session
spark = SparkSession.builder.appName("WeatherPrediction").getOrCreate()

# Đọc dữ liệu từ file CSV
file_path = "C:/bigdata/weatherHistory.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Hiển thị schema và một số dòng mẫu
df.printSchema()
df.show(5)

# Kiểm tra số lượng bản ghi
print(f"Total rows: {df.count()}")

root
 |-- Formatted Date: timestamp (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Loud Cover: double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)
 |-- Daily Summary: string (nullable = true)

+-------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|     Formatted Date|      Summary|Precip Type|  Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+--

In [50]:
# Thêm đặc trưng thời gian
df = df.withColumn("Hour", hour(col("Formatted Date")))
df = df.withColumn("Month", month(col("Formatted Date")))

In [51]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler

# Mã hóa cột Summary
indexer = StringIndexer(inputCol="Summary", outputCol="SummaryIndex")
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=["SummaryIndex"], outputCols=["SummaryVec"])
df = encoder.fit(df).transform(df)

In [52]:
# Chọn đặc trưng
feature_cols = ["Humidity", "Wind Speed (km/h)", "Wind Bearing (degrees)", 
                "Visibility (km)", "Pressure (millibars)", "SummaryVec", "Hour", "Month"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_df = assembler.transform(df).select(col("features"), col("Temperature (C)").alias("label"))

## Chuẩn bị dữ liệu học máy

In [53]:
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)
print(f"Training data rows: {train_data.count()}")
print(f"Test data rows: {test_data.count()}")

Training data rows: 77349
Test data rows: 19104


In [54]:
from pyspark.ml.regression import LinearRegression

# Khởi tạo mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Huấn luyện mô hình
lr_model = lr.fit(train_data)

# In hệ số và intercept của mô hình
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)

Coefficients:  [-26.702665789905367,-0.12341673000413964,0.0008594462699007523,0.3479277762958408,0.0,1.2441817679683755,0.2775126324074202,0.0,0.0,-1.017236717999512,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5454064319080595,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.004261462065913704,0.4350390034971129]
Intercept:  25.83523019866581


In [55]:
# Dự đoán trên tập kiểm tra
predictions = lr_model.transform(test_data)
predictions.select("features", "label", "prediction").show(5)

+--------------------+------------------+------------------+
|            features|             label|        prediction|
+--------------------+------------------+------------------+
|(33,[0,1,2,3,4,5,...| 39.58888888888889|28.551960211383097|
|(33,[0,1,2,3,4,5,...| 38.71666666666667|28.304873601684704|
|(33,[0,1,2,3,4,5,...|38.983333333333334| 27.92832382158092|
|(33,[0,1,2,3,4,5,...|38.705555555555556|  27.7919252503512|
|(33,[0,1,2,3,4,5,...| 28.81666666666667|28.079256507928385|
+--------------------+------------------+------------------+
only showing top 5 rows



In [56]:
from pyspark.ml.evaluation import RegressionEvaluator

# Tính RMSE
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Tính R²
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R2): {r2}")

Root Mean Squared Error (RMSE): 6.797060013251237
R-squared (R2): 0.49382941068349395


In [57]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=20)
rf_model = rf.fit(train_data)
predictions_rf = rf_model.transform(test_data)
rmse_rf = evaluator_rmse.evaluate(predictions_rf)
print(f"Random Forest RMSE: {rmse_rf}")

Random Forest RMSE: 4.589382240299312


In [58]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=50)
gbt_model = gbt.fit(train_data)
predictions_gbt = gbt_model.transform(test_data)

rmse_gbt = evaluator_rmse.evaluate(predictions_gbt)
r2_gbt = evaluator_r2.evaluate(predictions_gbt)
print(f"GBT RMSE: {rmse_gbt}")
print(f"GBT R2: {r2_gbt}")

GBT RMSE: 3.3165151342090855
GBT R2: 0.8794912268299956
