In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, split, to_date, concat_ws,date_format
from pyspark.sql.functions import regexp_replace, split, col, trim,when,lag
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read HDFS Weather Data") \
    .master("local") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()


In [11]:
df_hanoi = spark.read.option("multiLine", True)\
               .option("header", True)\
               .option("inferSchema", False)\
               .option("encoding", "utf-8")\
               .csv("hdfs://namenode:9000/tmp/weather_data/history/ha-noi.csv")
df_hanoi.show()


+----------+-----+--------------------+-----+-------+-----+--------+------+-------+
|      Date| Time|             Weather| Temp|   Rain|Cloud|Pressure|  Wind|   Gust|
+----------+-----+--------------------+-----+-------+-----+--------+------+-------+
|2010-01-01|00:00|Patchy rain possible|18 °c|0.2\nmm| 100%| 1015 mb|3 km/h| 5 km/h|
|2010-01-01|03:00|Patchy rain possible|17 °c|0.3\nmm| 100%| 1014 mb|5 km/h| 7 km/h|
|2010-01-01|06:00|Patchy rain possible|17 °c|0.2\nmm| 100%| 1015 mb|6 km/h| 8 km/h|
|2010-01-01|09:00|            Overcast|17 °c|0.0\nmm| 100%| 1017 mb|6 km/h|12 km/h|
|2010-01-01|12:00|            Overcast|17 °c|0.0\nmm|  99%| 1015 mb|6 km/h|12 km/h|
|2010-01-01|15:00|Patchy rain possible|18 °c|0.1\nmm|  99%| 1014 mb|6 km/h| 8 km/h|
|2010-01-01|18:00|            Overcast|17 °c|0.0\nmm|  99%| 1015 mb|3 km/h| 4 km/h|
|2010-01-01|21:00|            Overcast|17 °c|0.0\nmm| 100%| 1015 mb|4 km/h|10 km/h|
|2010-01-02|00:00|Patchy rain possible|17 °c|0.3\nmm| 100%| 1016 mb|6 km/h|1

In [17]:
df_hanoi.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Temp: string (nullable = true)
 |-- Rain: string (nullable = true)
 |-- Cloud: string (nullable = true)
 |-- Pressure: string (nullable = true)
 |-- Wind: string (nullable = true)
 |-- Gust: string (nullable = true)



In [None]:
def data_process(df):
    df_clean=df.withColumn("Date",to_date(col("Date"),"yyyy-MM-dd")) \
               .withColumn("Time",date_format(col("Time"),,"HH:mm")) \
               .withColumn("Temp(°c)",regexp_replace("Temp","°c","").cast("double")) \
               .withColumn("Rain(mm)", regexp_replace("Rain", "mm", "").cast("double")) \
               .withColumn("Cloud", (regexp_replace("Cloud", "%", "").cast("double") )) \
               .withColumn("Pressure(mb)", regexp_replace("Pressure", "mb", "").cast("double")) \
               .withColumn("Wind(km/h)", regexp_replace("Wind", "km/h", "").cast("double")) \
               .withColumn("Gust(km/h)", regexp_replace("Gust", "km/h", "").cast("double")) 
    df_clean=df_clean.drop("Time","Temp","Rain","Cloud","Pressure","Wind","Gust")
    
    weather_type1 = ['Sunny', 'Clear', 'Partly cloudy']
    weather_type2 = ['Overcast', 'Cloudy', 'Patchy rain possible', 'Light drizzle', 'Light rain shower', 'Patchy light rain with thunder']
    weather_type3 = ['Heavy rain at times', 'Moderate or heavy rain shower', 'Moderate rain at times', 'Moderate rain']
    df_mapped = df_clean.withColumn(
        "Weather",
         when(col("Weather").isin(weather_type1), 1)
        .when(col("Weather").isin(weather_type2), 2)
        .when(col("Weather").isin(weather_type3), 3)
        .otherwise(0)
    )
    window_spec = Window.orderBy("Date","time")
    lag_steps=4
    df_lag=df_mapped
    for lag_step in range(1, lag_steps + 1):
        df_lag = df_lag \
            .withColumn(f"Temp_t-{lag_step}", lag("Temp(°c)", lag_step).over(window_spec)) \
            .withColumn(f"Rain_t-{lag_step}", lag("Rain(mm)", lag_step).over(window_spec)) \
            .withColumn(f"Cloud_t-{lag_step}", lag("Cloud", lag_step).over(window_spec)) \
            .withColumn(f"Pressure_t-{lag_step}", lag("Pressure(mb)", lag_step).over(window_spec)) \
            .withColumn(f"Wind_t-{lag_step}", lag("Wind(km/h)", lag_step).over(window_spec)) \
            .withColumn(f"Gust_t-{lag_step}", lag("Gust(km/h)", lag_step).over(window_spec))
    df_final = df.drop("Date","Time")
    X= df_final.drop("Weather")
    Y=df_final.select("Weather")
    

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, trim, lag, when, lit
from pyspark.sql.types import DoubleType
from pyspark.sql import Window
import pyspark.sql.functions as F

def data_preprocess(df):
    
    
    # Remove units and convert to numeric values
    df = df.withColumn("Temp", trim(regexp_replace(col("Temp"), "°c", "")))
    df = df.withColumn("Rain", trim(regexp_replace(col("Rain"), "\nmm", "")))
    df = df.withColumn("Cloud", trim(regexp_replace(col("Cloud"), "%", "")))
    df = df.withColumn("Pressure", trim(regexp_replace(col("Pressure"), "mb", "")))
    df = df.withColumn("Wind", trim(regexp_replace(col("Wind"), "km/h", "")))
    df = df.withColumn("Gust", trim(regexp_replace(col("Gust"), "km/h", "")))
    
    # Cast to proper data types
    df = df.withColumn("Date", col("Date").cast("timestamp"))
    df = df.withColumn("Temp", col("Temp").cast(DoubleType()))
    df = df.withColumn("Rain", col("Rain").cast(DoubleType()))
    df = df.withColumn("Cloud", col("Cloud").cast(DoubleType()))
    df = df.withColumn("Pressure", col("Pressure").cast(DoubleType()))
    df = df.withColumn("Wind", col("Wind").cast(DoubleType()))
    df = df.withColumn("Gust", col("Gust").cast(DoubleType()))
    
    # Rename columns
    df = df.withColumnRenamed("Temp", "Temp(°c)")
    df = df.withColumnRenamed("Rain", "Rain(nmm)")
    df = df.withColumnRenamed("Cloud", "Cloud(%)")
    df = df.withColumnRenamed("Pressure", "Pressure(mb)")
    df = df.withColumnRenamed("Wind", "Wind(km/h)")
    df = df.withColumnRenamed("Gust", "Gust(km/h)")
    
    # Group weather types
    weather_type1 = ["Sunny", "Clear", "Partly cloudy"]
    weather_type2 = ["Overcast", "Cloudy", "Patchy rain possible", "Light drizzle", "Light rain shower", "Patchy light rain with thunder"]
    weather_type3 = ["Heavy rain at times", "Moderate or heavy rain shower", "Moderate rain at times", "Moderate rain"]
    
    # Apply numeric encoding for Weather column
    df = df.withColumn("Weather", 
        when(col("Weather").isin(weather_type1), 1)
        .when(col("Weather").isin(weather_type2), 2)
        .when(col("Weather").isin(weather_type3), 3)
        .otherwise(0)
    )
    
    # Create window for lag features
    window_spec = Window.orderBy("Date", "Time")
    
    # Add lag features
    lag_steps = 3
    for lag_val in range(1, lag_steps + 1):
        df = df.withColumn(f"Temp_t-{lag_val}", lag(col("Temp(°c)"), lag_val).over(window_spec))
        df = df.withColumn(f"Rain_t-{lag_val}", lag(col("Rain(nmm)"), lag_val).over(window_spec))
        df = df.withColumn(f"Cloud_t-{lag_val}", lag(col("Cloud(%)"), lag_val).over(window_spec))
        df = df.withColumn(f"Pressure_t-{lag_val}", lag(col("Pressure(mb)"), lag_val).over(window_spec))
        df = df.withColumn(f"Wind_t-{lag_val}", lag(col("Wind(km/h)"), lag_val).over(window_spec))
        df = df.withColumn(f"Gust_t-{lag_val}", lag(col("Gust(km/h)"), lag_val).over(window_spec))
    
    # Drop Date and Time columns
    df = df.drop("Date", "Time")
    
    # Create X (features) and Y (target)
    X = df.drop("Weather")
    
    # Shift the Y for prediction (equivalent to Y[lag_steps + 1:])
    window_spec_shift = Window.orderBy(F.monotonically_increasing_id())
    Y = df.select("Weather").withColumn("id", F.monotonically_increasing_id())
    Y = Y.withColumn("Weather_shifted", lag(col("Weather"), -1).over(window_spec_shift))
    Y = Y.filter(col("Weather_shifted").isNotNull()).drop("Weather").withColumnRenamed("Weather_shifted", "Weather")
    
    # Filter X to remove NaN rows (equivalent to X[lag_steps:-1])
    X = X.filter(col(f"Temp_t-{lag_steps}").isNotNull())
    X = X.filter(F.monotonically_increasing_id() < F.lit(X.count() - 1))
    
    # Join X and Y
    X = X.withColumn("id", F.monotonically_increasing_id())
    result = X.join(Y, "id").drop("id")
    
    return result



In [21]:
df_clean=data_preprocess(df_hanoi)
df_clean.show(30)

+--------+---------+--------+------------+----------+----------+--------+--------+---------+------------+--------+--------+--------+--------+---------+------------+--------+--------+--------+--------+---------+------------+--------+--------+-------+
|Temp(°c)|Rain(nmm)|Cloud(%)|Pressure(mb)|Wind(km/h)|Gust(km/h)|Temp_t-1|Rain_t-1|Cloud_t-1|Pressure_t-1|Wind_t-1|Gust_t-1|Temp_t-2|Rain_t-2|Cloud_t-2|Pressure_t-2|Wind_t-2|Gust_t-2|Temp_t-3|Rain_t-3|Cloud_t-3|Pressure_t-3|Wind_t-3|Gust_t-3|Weather|
+--------+---------+--------+------------+----------+----------+--------+--------+---------+------------+--------+--------+--------+--------+---------+------------+--------+--------+--------+--------+---------+------------+--------+--------+-------+
|    17.0|      0.0|   100.0|      1017.0|       6.0|      12.0|    17.0|     0.2|    100.0|      1015.0|     6.0|     8.0|    17.0|     0.3|    100.0|      1014.0|     5.0|     7.0|    18.0|     0.2|    100.0|      1015.0|     3.0|     5.0|      2|


In [26]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col
def build_model(location,df):
    # Khởi tạo Spark session
    
    
    # Gọi hàm tiền xử lý trực tiếp từ Spark thay vì pandas
    
    
    # Lấy tất cả các tên cột trừ cột Weather (có thể chỉnh sửa nếu cần)
    feature_cols = [col_name for col_name in df.columns if col_name != "Weather"]
    
    # Chuyển dữ liệu về kiểu vector + standardize
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_assembled")
    scaler = StandardScaler(inputCol="features_assembled", outputCol="features", withStd=True, withMean=True)
    
    # RandomForest
    rf = RandomForestClassifier(labelCol="Weather", featuresCol="features")
    
    # Pipeline
    pipeline = Pipeline(stages=[assembler, scaler, rf])
    
    # Grid Search
    paramGrid = (ParamGridBuilder()
        .addGrid(rf.numTrees, [50, 100])
        .addGrid(rf.maxDepth, [5, 10, 20])
        .addGrid(rf.minInstancesPerNode, [2, 5])
        .build())
    
    # Evaluation & CV
    evaluator = MulticlassClassificationEvaluator(
        labelCol="Weather", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
    
    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=5
    )
    
    # Chia train/test
    train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)
    
    # Fit model
    print(f"Training model for {location}...")
    cv_model = cv.fit(train_data)
    
    # Dự đoán và đánh giá
    print(f"Evaluating model for {location}...")
    predictions = cv_model.transform(test_data)
    accuracy = evaluator.evaluate(predictions)
    
    # Chi tiết metric
    predictionAndLabels = predictions.select("prediction", "Weather").rdd.map(lambda x: (float(x[0]), float(x[1])))
    metrics = MulticlassMetrics(predictionAndLabels)
    
    print(f"Accuracy = {accuracy:.4f}")
    print(f"Weighted Precision = {metrics.weightedPrecision:.4f}")
    print(f"Weighted Recall = {metrics.weightedRecall:.4f}")
    print(f"Weighted F1 Score = {metrics.weightedFMeasure():.4f}")
    model_path = "hdfs://namenode:9000/tmp/model/random_model/"
    # Lưu model (có thể mở comment nếu cần)
    cv_model.bestModel.write().overwrite().save(f"{model_path}{location}_model")
    
    return accuracy

In [27]:
accu = build_model("ha-noi",df_clean)
print(accu)

Training model for ha-noi...
Evaluating model for ha-noi...




Accuracy = 0.9977
Weighted Precision = 0.9978
Weighted Recall = 0.9977
Weighted F1 Score = 0.9977
0.9977465635093518
