<a href="https://www.kaggle.com/code/pmtphamtuan/khaiphadl?scriptVersionId=254463285" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from plotly.offline import iplot, init_notebook_mode
init_notebook_mode(connected=True)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import warnings
warnings.filterwarnings("ignore")


In [2]:
paths = ['/kaggle/input/meteonet/NW_Ground_Stations/NW_Ground_Stations/NW_Ground_Stations_2016.csv',
         '/kaggle/input/meteonet/NW_Ground_Stations/NW_Ground_Stations/NW_Ground_Stations_2017.csv',
         '/kaggle/input/meteonet/NW_Ground_Stations/NW_Ground_Stations/NW_Ground_Stations_2018.csv']

In [3]:
num_cols = ['height_sta','dd', 'ff', 'precip','hu', 'td', 't', 'psl']
dtype = dict([(k,'float32') for k in num_cols])

In [4]:
df_list = [pd.read_csv(path, header=0, dtype=dtype) for path in paths]
weather_data = pd.concat(df_list, ignore_index=True)

print("Number of colums:", len(weather_data))
weather_data.head()


Number of colums: 65826837


Unnamed: 0,number_sta,lat,lon,height_sta,date,dd,ff,precip,hu,td,t,psl
0,14066001,49.33,-0.43,2.0,20160101 00:00,210.0,4.4,0.0,91.0,278.450012,279.850006,
1,14126001,49.15,0.04,125.0,20160101 00:00,,,0.0,99.0,278.350006,278.450012,
2,14137001,49.18,-0.46,67.0,20160101 00:00,220.0,0.6,0.0,92.0,276.450012,277.649994,102360.0
3,14216001,48.93,-0.15,155.0,20160101 00:00,220.0,1.9,0.0,95.0,278.25,278.950012,
4,14296001,48.8,-1.03,339.0,20160101 00:00,,,0.0,,,278.350006,


In [5]:
weather_data = weather_data.loc[weather_data['lat'] > 48.4]
weather_data = weather_data.loc[weather_data['lon'] > -1.6]
weather_data['date'] = pd.to_datetime(weather_data['date'])

In [6]:
values_to_fill = {col: weather_data[col].mean() for col in num_cols}
weather_data = weather_data.fillna(value=values_to_fill)

In [7]:
df_for_eda = weather_data.set_index('date')
df_for_eda['t'] = df_for_eda['t'] - 273.5

In [8]:
#Nhiệt độ và độ ẩm theo thời gian
fig_temp_humidity = px.line(
    df_for_eda['2016-01-01':'2018-12-30'][['t','hu']].resample('D').mean(),
    title = 'Temperature (°C) - Humidity ratio' 
)
fig_temp_humidity.update_xaxes(rangeslider_visible=True)
fig_temp_humidity.show()

In [9]:
# Lượng mưa hàng tuần
fig_precip = px.line(
    df_for_eda['2016-01-01':'2018-12-30']['precip'].resample('7D').sum(),
    title = 'Overall weekly precipitation'
)
fig_precip.update_xaxes(rangeslider_visible=True)
fig_precip.show()

In [10]:
if 'loc' not in locals() or not weather_data['number_sta'].isin([loc]).any():
    loc = weather_data['number_sta'].sample(1).values[0]
print(f"Sử dụng dữ liệu từ trạm ID: {loc}")

Sử dụng dữ liệu từ trạm ID: 50615001


In [11]:
def daily_forecast_data_prep(df_input, station_loc):
    df = df_input.loc[df_input['number_sta'] == station_loc].copy()
    df.set_index('date', inplace=True)
    df['hours'] = df.index.hour
    df['days'] = df.index.dayofyear
    df['years'] = df.index.year
    df['3'] = df['hours'] % 3
    df = df.loc[df['3'] == 0.0]
    
    date_range = pd.date_range(start='2016-01-01', end='2018-12-31', freq='D').strftime("%Y-%m-%d")
    df_concat_list = []
    for d in date_range:
        daily_data = df[d:d]
        if not daily_data.empty:
            df_concat_list.append(daily_data.drop_duplicates(subset='hours').reset_index(drop=True))

    if not df_concat_list:
        return pd.DataFrame()
        
    df = pd.concat(df_concat_list)
    
    final_cols = ["height_sta", "dd", "ff", "precip", "hu", "td", "t", "hours", "days", "years"]
    df = df[final_cols]
    return df

In [12]:
df_model = daily_forecast_data_prep(weather_data, loc)
print(f"Dataset for model have {len(df_model)} row.")

Dataset for model have 8750 row.


**> Pyspark******

In [13]:
spark = SparkSession.builder.appName("WeatherForecastingOriginalLogic").getOrCreate()
spark_df = spark.createDataFrame(df_model)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/06 00:25:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
feature_cols = ["height_sta", "dd", "ff", "precip", "hu", "td", "hours", "days", "years"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_transformed = assembler.transform(spark_df)


In [15]:
train_set, test_set = df_transformed.randomSplit([0.8, 0.2], seed=1234)

In [16]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline


feature_cols = ["height_sta", "dd", "ff", "precip", "hu", "td", "hours", "days", "years"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


pipeline = Pipeline(stages=[assembler])


df_transformed = pipeline.fit(spark_df).transform(spark_df)


df_transformed.select("features").show(truncate=False)



[Stage 0:>                                                          (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------+
|features                                                                                                           |
+-------------------------------------------------------------------------------------------------------------------+
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918212890625,280.88214111328125,0.0,1.0,2016.0]                |
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918212890625,280.88214111328125,3.0,1.0,2016.0]                |
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918212890625,280.88214111328125,15.0,1.0,2016.0]               |
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918212890625,280.88214111328125,18.0,1.0,2016.0]               |
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918212890625,280.88214111328125,21.0,1.0,2016.0]               |
|[61.0,189.46420288085938,3.8164443969726562,0.0,80.9918

                                                                                

In [17]:
lr = LinearRegression(featuresCol="features", labelCol="t")
lr_model = lr.fit(train_set)

dt = DecisionTreeRegressor(featuresCol="features", labelCol="t")
dt_model = dt.fit(train_set)

rf = RandomForestRegressor(featuresCol="features", labelCol="t")
rf_model = rf.fit(train_set)

25/08/06 00:25:54 WARN Instrumentation: [b7530d15] regParam is zero, which might cause numerical instability and overfitting.
25/08/06 00:25:55 WARN Instrumentation: [b7530d15] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [18]:
lr_predictions = lr_model.transform(test_set)
dt_predictions = dt_model.transform(test_set)
rf_predictions = rf_model.transform(test_set)

evaluator_rmse = RegressionEvaluator(labelCol="t", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="t", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="t", predictionCol="prediction", metricName="r2")




In [19]:
print("Results")
models = {
    "Linear Regression": lr_predictions,
    "Decision Tree": dt_predictions,
    "Random Forest": rf_predictions
}
for name, preds in models.items():
    rmse = evaluator_rmse.evaluate(preds)
    mae = evaluator_mae.evaluate(preds)
    r2 = evaluator_r2.evaluate(preds)
    print(f"\n{name}:")
    print(f"  RMSE: {rmse:.4f}")
    print(f"  MAE: {mae:.4f}")
    print(f"  R2: {r2:.4f}")


Results

Linear Regression:
  RMSE: 5.4352
  MAE: 4.3455
  R2: 0.0951

Decision Tree:
  RMSE: 3.2294
  MAE: 2.5582
  R2: 0.6805

Random Forest:
  RMSE: 4.5889
  MAE: 3.5572
  R2: 0.3550


In [20]:
#Lưu mô hình cho streamlit

model_output_path = "Linear_Regression_model" 
lr_model.write().overwrite().save(model_output_path)
print(f"Mô hình đã lưu tại: '{model_output_path}'")
spark.stop()

                                                                                

Mô hình đã lưu tại: 'Linear_Regression_model'
