In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, min, year, month, stddev, variance, corr
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [55]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [56]:
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("ClimateAnalysis").getOrCreate()

# Đọc dữ liệu từ file input.txt
df = spark.read.csv("/content/drive/MyDrive/dataset/VNweather_data.txt", header=True, inferSchema=True)

In [65]:
# Chuyển đổi kiểu dữ liệu của các cột sang số
numeric_cols = ["max", "min", "wind", "rain", "humidi", "cloud", "pressure"]
for col_name in numeric_cols:
    df = df.withColumn(col_name, col(col_name).cast("double"))

In [66]:
# Thêm cột temp (nhiệt độ trung bình), maxtemp, mintemp
df = df.withColumn("temp", (col("max") + col("min")) / 2)
df = df.withColumnRenamed("max", "maxtemp")
df = df.withColumnRenamed("min", "mintemp")
df = df.withColumnRenamed("humidi", "humidity")

In [68]:
# 1. Tính trung bình cực trị của các yếu tố khí hậu theo từng tỉnh
analysis_cols = ["maxtemp", "mintemp", "wind", "rain", "humidity", "cloud", "pressure", "temp"]

for col_name in analysis_cols:
    # Calculate max and min first
    extreme_df = df.groupBy("province").agg(
        max(col_name).alias(f"max_{col_name}"),
        min(col_name).alias(f"min_{col_name}")
    )
    # Then calculate the average of max and min in a separate step
    extreme_avg_by_province = extreme_df.groupBy("province").agg(
        avg(col(f"max_{col_name}")).alias(f"avg_max_{col_name}"),
        avg(col(f"min_{col_name}")).alias(f"avg_min_{col_name}")
    )
    print(f"Trung bình cực trị của {col_name} theo tỉnh:")
    extreme_avg_by_province.show()

Trung bình cực trị của maxtemp theo tỉnh:
+----------------+---------------+---------------+
|        province|avg_max_maxtemp|avg_min_maxtemp|
+----------------+---------------+---------------+
|      Long Xuyen|           40.0|           23.0|
|          My Tho|           41.0|           21.0|
|Ho Chi Minh City|           41.0|           22.0|
|         Ben Tre|           41.0|           21.0|
|       Nha Trang|           36.0|           19.0|
|        Hong Gai|           35.0|            8.0|
|       Hai Phong|           39.0|            7.0|
|         Cam Pha|           34.0|            8.0|
|       Hai Duong|           40.0|            8.0|
|          Ca Mau|           39.0|           22.0|
|        Cam Ranh|           35.0|           22.0|
|         Play Cu|           40.0|           16.0|
|        Nam Dinh|           43.0|            6.0|
|        Chau Doc|           40.0|           22.0|
|         Can Tho|           40.0|           22.0|
|          Da Lat|           36.0|      

In [70]:
# 2. Tính trung bình cực trị của các yếu tố khí hậu của một tỉnh theo mùa
df = df.withColumn("month", month(col("date")))

def get_season(month):
    if month in [3, 4, 5]:
        return "Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    elif month in [9, 10, 11]:
        return "Autumn"
    else:
        return "Winter"

get_season_udf = F.udf(get_season)
df = df.withColumn("season", get_season_udf(col("month")))

for col_name in analysis_cols:
    # Calculate max and min first
    extreme_df = df.groupBy("province", "season").agg(
        max(col_name).alias(f"max_{col_name}"),
        min(col_name).alias(f"min_{col_name}")
    )
    # Then calculate the average of max and min in a separate step
    extreme_avg_by_season = extreme_df.groupBy("province", "season").agg( # Group by both province and season
        avg(col(f"max_{col_name}")).alias(f"avg_max_{col_name}"),
        avg(col(f"min_{col_name}")).alias(f"avg_min_{col_name}")
    )
    print(f"Trung bình cực trị của {col_name} theo tỉnh và mùa:")
    extreme_avg_by_season.show()

Trung bình cực trị của maxtemp theo tỉnh và mùa:
+-------------+------+---------------+---------------+
|     province|season|avg_max_maxtemp|avg_min_maxtemp|
+-------------+------+---------------+---------------+
|          Hue|Summer|           38.0|           24.0|
|       Tan An|Spring|           41.0|           24.0|
|Buon Me Thuot|Autumn|           35.0|           20.0|
|     Nam Dinh|Spring|           43.0|           10.0|
|     Hong Gai|Spring|           34.0|           11.0|
|     Rach Gia|Summer|           35.0|           25.0|
|   Long Xuyen|Autumn|           36.0|           25.0|
|      Play Cu|Summer|           34.0|           20.0|
|          Hue|Winter|           34.0|           15.0|
|     Bac Lieu|Spring|           39.0|           25.0|
|      Cam Pha|Spring|           32.0|           11.0|
|     Bac Lieu|Winter|           35.0|           22.0|
|     Chau Doc|Summer|           37.0|           24.0|
|        Hanoi|Winter|           42.0|            7.0|
|     Bien Hoa|W

In [73]:
# 3. Tính trung bình cực trị của các yếu tố khí hậu của một tỉnh theo năm
df = df.withColumn("year", year(col("date")))
for col_name in analysis_cols:
    # Calculate max and min first
    extreme_df = df.groupBy("province", "year").agg(
        max(col_name).alias(f"max_{col_name}"),
        min(col_name).alias(f"min_{col_name}")
    )
    # Then calculate the average of max and min in a separate step
    extreme_avg_by_year = extreme_df.groupBy("province", "year").agg( # Group by both province and year
        avg(col(f"max_{col_name}")).alias(f"avg_max_{col_name}"),
        avg(col(f"min_{col_name}")).alias(f"avg_min_{col_name}")
    )
    print(f"Trung bình cực trị của {col_name} theo tỉnh và năm:")
    extreme_avg_by_year.show()

Trung bình cực trị của maxtemp theo tỉnh và năm:
+----------+----+---------------+---------------+
|  province|year|avg_max_maxtemp|avg_min_maxtemp|
+----------+----+---------------+---------------+
|    Ha Noi|2013|           44.0|           11.0|
|  Bien Hoa|2021|           38.0|           25.0|
|    Ha Noi|2011|           40.0|           11.0|
|  Hoa Binh|2012|           46.0|           12.0|
|  Hoa Binh|2020|           41.0|           12.0|
|   Cam Pha|2020|           33.0|           14.0|
|  Chau Doc|2011|           38.0|           25.0|
|   Play Cu|2014|           38.0|           20.0|
|  Hoa Binh|2011|           38.0|            9.0|
|   Ben Tre|2013|           38.0|           26.0|
|   Cam Pha|2011|           33.0|           11.0|
|     Hanoi|2011|           40.0|           11.0|
| Nha Trang|2016|           34.0|           20.0|
| Nha Trang|2020|           33.0|           22.0|
|  Cam Ranh|2020|           32.0|           22.0|
| Hai Duong|2015|           40.0|           12.0|
|

In [72]:
# 4. Thông tin về tỉnh nào có yếu tố khí hậu cao, thấp nhất
for col_name in analysis_cols:
    max_value = df.agg(max(col_name)).first()[0]
    min_value = df.agg(min(col_name)).first()[0]

    max_province = df.filter(col(col_name) == max_value).select("province").first()[0]
    min_province = df.filter(col(col_name) == min_value).select("province").first()[0]

    print(f"Yếu tố {col_name}:")
    print(f"  Cao nhất: {max_value} ở tỉnh {max_province}")
    print(f"  Thấp nhất: {min_value} ở tỉnh {min_province}")

Yếu tố maxtemp:
  Cao nhất: 46.0 ở tỉnh Hoa Binh
  Thấp nhất: 4.0 ở tỉnh Hoa Binh
Yếu tố mintemp:
  Cao nhất: 32.0 ở tỉnh Ha Noi
  Thấp nhất: 2.0 ở tỉnh Ha Noi
Yếu tố wind:
  Cao nhất: 54.0 ở tỉnh Tam Ky
  Thấp nhất: 1.0 ở tỉnh Da Lat
Yếu tố rain:
  Cao nhất: 596.4 ở tỉnh Cam Ranh
  Thấp nhất: 0.0 ở tỉnh Bac Lieu
Yếu tố humidity:
  Cao nhất: 100.0 ở tỉnh Da Lat
  Thấp nhất: 23.0 ở tỉnh Hoa Binh
Yếu tố cloud:
  Cao nhất: 100.0 ở tỉnh Buon Me Thuot
  Thấp nhất: 0.0 ở tỉnh Bac Lieu
Yếu tố pressure:
  Cao nhất: 1038.0 ở tỉnh Thai Nguyen
  Thấp nhất: 988.0 ở tỉnh Tam Ky
Yếu tố temp:
  Cao nhất: 37.0 ở tỉnh Ha Noi
  Thấp nhất: 3.5 ở tỉnh Hoa Binh


In [74]:
# 5. Xu hướng thay đổi lượng mưa và nhiệt độ trung bình của từng tỉnh
for province_name in df.select("province").distinct().rdd.flatMap(lambda x: x).collect():
    province_df = df.filter(col("province") == province_name)

    # Lượng mưa
    rain_trend = province_df.orderBy("year").select(
        "year", "rain",
        (col("rain") - F.lag("rain", 1).over(Window.orderBy("year"))).alias("rain_diff")
    ).agg(avg("rain_diff").alias("avg_rain_diff")).first().avg_rain_diff

    rain_trend_label = "Increasing" if rain_trend > 0 else "Decreasing" if rain_trend < 0 else "Stable"

    # Nhiệt độ trung bình
    avg_temp_trend = province_df.orderBy("year").select(
        "year", "temp",
        (col("temp") - F.lag("temp", 1).over(Window.orderBy("year"))).alias("temp_diff")
    ).agg(avg("temp_diff").alias("avg_temp_diff")).first().avg_temp_diff

    temp_trend_label = "Increasing" if avg_temp_trend > 0 else "Decreasing" if avg_temp_trend < 0 else "Stable"

    print(f"Tỉnh {province_name}:")
    print(f"  Xu hướng thay đổi lượng mưa: {rain_trend_label}")
    print(f"  Xu hướng thay đổi nhiệt độ trung bình: {temp_trend_label}")

Tỉnh Long Xuyen:
  Xu hướng thay đổi lượng mưa: Decreasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh My Tho:
  Xu hướng thay đổi lượng mưa: Decreasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Ho Chi Minh City:
  Xu hướng thay đổi lượng mưa: Decreasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Ben Tre:
  Xu hướng thay đổi lượng mưa: Decreasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Nha Trang:
  Xu hướng thay đổi lượng mưa: Decreasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Hong Gai:
  Xu hướng thay đổi lượng mưa: Increasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Hai Phong:
  Xu hướng thay đổi lượng mưa: Increasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Cam Pha:
  Xu hướng thay đổi lượng mưa: Increasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Hai Duong:
  Xu hướng thay đổi lượng mưa: Increasing
  Xu hướng thay đổi nhiệt độ trung bình: Increasing
Tỉnh Ca Ma

In [75]:
# 6. Thông tin về tỉnh có lượng mưa hoặc nhiệt độ cực trị trong 5 năm, 10 năm
for years in [5, 10]:
    max_year = df.agg(max("year")).first()[0]
    min_year = max_year - years + 1

    period_df = df.filter(col("year").between(min_year, max_year))

    max_rain = period_df.agg(max("rain")).first()[0]
    min_rain = period_df.agg(min("rain")).first()[0]
    max_rain_province = period_df.filter(col("rain") == max_rain).select("province").first()[0]
    min_rain_province = period_df.filter(col("rain") == min_rain).select("province").first()[0]

    max_temp = period_df.agg(max("temp")).first()[0]
    min_temp = period_df.agg(min("temp")).first()[0]
    max_temp_province = period_df.filter(col("temp") == max_temp).select("province").first()[0]
    min_temp_province = period_df.filter(col("temp") == min_temp).select("province").first()[0]

    print(f"Trong {years} năm gần đây ({min_year}-{max_year}):")
    print(f"  Lượng mưa cao nhất: {max_rain} ở tỉnh {max_rain_province}")
    print(f"  Lượng mưa thấp nhất: {min_rain} ở tỉnh {min_rain_province}")
    print(f"  Nhiệt độ trung bình cao nhất: {max_temp} ở tỉnh {max_temp_province}")
    print(f"  Nhiệt độ trung bình thấp nhất: {min_temp} ở tỉnh {min_temp_province}")


Trong 5 năm gần đây (2017-2021):
  Lượng mưa cao nhất: 596.4 ở tỉnh Cam Ranh
  Lượng mưa thấp nhất: 0.0 ở tỉnh Bac Lieu
  Nhiệt độ trung bình cao nhất: 36.5 ở tỉnh Ha Noi
  Nhiệt độ trung bình thấp nhất: 4.5 ở tỉnh Uong Bi
Trong 10 năm gần đây (2012-2021):
  Lượng mưa cao nhất: 596.4 ở tỉnh Cam Ranh
  Lượng mưa thấp nhất: 0.0 ở tỉnh Bac Lieu
  Nhiệt độ trung bình cao nhất: 37.0 ở tỉnh Ha Noi
  Nhiệt độ trung bình thấp nhất: 3.5 ở tỉnh Hoa Binh


In [80]:
# 7. Hệ số tương quan giữa các yếu tố khí hậu của Hà Nội trong năm 2019
province_name = "Ha Noi"  # Specify the province
year_val = 2019  # Specify the year

# Filter data for the specific province and year
year_df = df.filter((col("province") == province_name) & (year(col("date")) == year_val))

print(f"Hệ số tương quan giữa các yếu tố khí hậu của tỉnh {province_name} trong năm {year_val}:")

correlation_matrix = {}
for i in range(len(analysis_cols)):
  for j in range(i + 1, len(analysis_cols)):
    col1 = analysis_cols[i]
    col2 = analysis_cols[j]
    correlation = year_df.select(corr(col1, col2)).first()[0]
    # Check if correlation is None before adding to the matrix
    if correlation is not None:
        correlation_matrix[(col1, col2)] = correlation
    else:
        # Handle the case where correlation is None (e.g., print a message or assign a default value)
        print(f"Correlation between {col1} and {col2} is None. Check your data or calculation.")
        correlation_matrix[(col1, col2)] = 0.0  # For example, assign 0.0

# In bảng hệ số tương quan
header = f"{'':<15}" + "".join([f"{col:<15}" for col in analysis_cols])
print(header)
for col1 in analysis_cols:
    row = f"{col1:<15}"
    for col2 in analysis_cols:
        if (col1, col2) in correlation_matrix:
            row += f"{correlation_matrix[(col1, col2)]:<15.2f}"
        elif (col2, col1) in correlation_matrix:
            row += f"{correlation_matrix[(col2, col1)]:<15.2f}"
        else:
            row += f"{1.0:<15.2f}"  # Đường chéo chính
    print(row)
print("\n")

Hệ số tương quan giữa các yếu tố khí hậu của tỉnh Ha Noi trong năm 2019:
               maxtemp        mintemp        wind           rain           humidity       cloud          pressure       temp           
maxtemp        1.00           0.90           -0.05          0.05           -0.10          -0.66          -0.82          0.98           
mintemp        0.90           1.00           0.02           0.24           0.20           -0.40          -0.89          0.97           
wind           -0.05          0.02           1.00           0.08           0.17           0.21           0.04           -0.02          
rain           0.05           0.24           0.08           1.00           0.41           0.16           -0.33          0.14           
humidity       -0.10          0.20           0.17           0.41           1.00           0.60           -0.19          0.03           
cloud          -0.66          -0.40          0.21           0.16           0.60           1.00           0.35  

In [81]:
spark.stop()