In [39]:
northern_provinces = [
    'bac-can', 'bac-giang', 'bac-ninh', 'cao-bang', 
    'dien-bien', 'ha-giang', 'ha-noi', 'hai-duong', 
    'hai-phong', 'hanoi', 'hoa-binh', 'hong-gai', 
    'lang-son', 'lao-cai', 'nam-dinh', 'ninh-binh', 
    'phu-ly', 'son-la', 'son-tay', 'thai-binh', 
    'thai-nguyen', 'tuyen-quang', 'uong-bi', 'viet-tri', 
    'vinh-yen', 'cam-pha'
]

central_provinces = [
    'da-lat', 'dong-hoi', 'ha-tinh', 'hoi-an', 
    'hue', 'kon-tum', 'nha-trang', 'phan-rang', 
    'phan-thiet', 'play-cu', 'quang-ngai', 'qui-nhon', 
    'tam-ky', 'thanh-hoa', 'tuy-hoa', 'vinh', 
    'buon-me-thuot', 'cam-ranh'
]

southern_provinces = [
    'bac-lieu', 'ben-tre', 'bien-hoa', 'ca-mau', 
    'can-tho', 'chau-doc', 'dong-xoai', 'ho-chi-minh-city', 
    'long-xuyen', 'my-tho', 'rach-gia', 'soc-trang', 
    'tan-an', 'tay-ninh', 'tra-vinh', 'vinh-long', 
    'vung-tau'
]


In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, split, to_date, concat_ws,date_format, to_timestamp,concat
from pyspark.sql.functions import regexp_replace, split, col, trim,when,lag

In [35]:
spark = SparkSession.builder \
    .appName("Read HDFS Weather Data") \
    .master("local") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

In [11]:
from pyspark.sql.functions import lit
from pyspark.sql.utils import AnalysisException

def merge_file(list_locations):
    df_all = None  # Khởi tạo biến lưu trữ DataFrame tổng hợp
    
    for location in list_locations:
        try:
            # Đọc file từ HDFS
            df = spark.read.option("multiLine", True) \
                .option("header", True) \
                .option("inferSchema", False) \
                .option("encoding", "utf-8") \
                .csv(f"hdfs://namenode:9000/tmp/weather_data/history/{location}.csv")
            
            # Thêm cột location với giá trị tương ứng
            df = df.withColumn("location", lit(location))
            
            # Gộp dữ liệu
            df_all = df if df_all is None else df_all.unionByName(df)
        
        except AnalysisException as e:
            print(f"File không tồn tại hoặc không thể đọc: {location}. Lỗi: {e}")
        except Exception as e:
            print(f"Đã xảy ra lỗi với file {location}: {e}")
    
    return df_all


In [14]:
df_north= merge_file(northern_provinces)
df_north.show()
print(df_north.count())

File không tồn tại hoặc không thể đọc: cao-bang. Lỗi: [PATH_NOT_FOUND] Path does not exist: hdfs://namenode:9000/tmp/weather_data/history/cao-bang.csv.
+----------+-----+--------------------+-----+-------+-----+--------+-------+-------+--------+
|      Date| Time|             Weather| Temp|   Rain|Cloud|Pressure|   Wind|   Gust|location|
+----------+-----+--------------------+-----+-------+-----+--------+-------+-------+--------+
|2010-01-01|00:00|Patchy rain possible|14 °c|0.1\nmm| 100%| 1016 mb| 4 km/h| 7 km/h| bac-can|
|2010-01-01|03:00|Patchy rain possible|14 °c|0.4\nmm| 100%| 1015 mb| 6 km/h| 9 km/h| bac-can|
|2010-01-01|06:00|Patchy rain possible|13 °c|0.2\nmm| 100%| 1016 mb| 6 km/h| 9 km/h| bac-can|
|2010-01-01|09:00|Patchy rain possible|13 °c|0.3\nmm| 100%| 1017 mb| 6 km/h| 9 km/h| bac-can|
|2010-01-01|12:00|Patchy rain possible|14 °c|0.3\nmm| 100%| 1016 mb| 2 km/h| 9 km/h| bac-can|
|2010-01-01|15:00|Patchy rain possible|14 °c|0.4\nmm| 100%| 1015 mb| 6 km/h| 9 km/h| bac-can|
|2

In [29]:
def data_process(df):
    df_clean=df.withColumn("Date",to_date(col("Date"),"yyyy-MM-dd")) \
               .withColumn("Time",date_format(col("Time"),"HH:mm")) \
               .withColumn("Temp(°c)",regexp_replace("Temp","°c","").cast("double")) \
               .withColumn("Rain(mm)", regexp_replace("Rain", "mm", "").cast("double")) \
               .withColumn("Cloud", (regexp_replace("Cloud", "%", "").cast("double") )) \
               .withColumn("Pressure(mb)", regexp_replace("Pressure", "mb", "").cast("double")) \
               .withColumn("Wind(km/h)", regexp_replace("Wind", "km/h", "").cast("double")) \
               .withColumn("Gust(km/h)", regexp_replace("Gust", "km/h", "").cast("double")) 
    df_clean = df_clean.withColumn("Timestamp", 
                                  to_timestamp(concat(col("Date").cast("string"), 
                                                     lit(" "), 
                                                     col("Time")), 
                                               "yyyy-MM-dd HH:mm"))
    df_clean = df_clean.drop("Time", "Date", "Temp", "Rain", "Cloud", "Pressure", "Wind", "Gust")
    return df_clean
    
    

In [30]:
df_north_clean=data_process(df_north)

In [31]:
df_north_clean.show()

+--------------------+--------+--------+--------+------------+----------+----------+-------------------+
|             Weather|location|Temp(°c)|Rain(mm)|Pressure(mb)|Wind(km/h)|Gust(km/h)|          Timestamp|
+--------------------+--------+--------+--------+------------+----------+----------+-------------------+
|Patchy rain possible| bac-can|    14.0|     0.1|      1016.0|       4.0|       7.0|2010-01-01 00:00:00|
|Patchy rain possible| bac-can|    14.0|     0.4|      1015.0|       6.0|       9.0|2010-01-01 03:00:00|
|Patchy rain possible| bac-can|    13.0|     0.2|      1016.0|       6.0|       9.0|2010-01-01 06:00:00|
|Patchy rain possible| bac-can|    13.0|     0.3|      1017.0|       6.0|       9.0|2010-01-01 09:00:00|
|Patchy rain possible| bac-can|    14.0|     0.3|      1016.0|       2.0|       9.0|2010-01-01 12:00:00|
|Patchy rain possible| bac-can|    14.0|     0.4|      1015.0|       6.0|       9.0|2010-01-01 15:00:00|
|Patchy rain possible| bac-can|    14.0|     0.2|      

In [40]:
df_cen= merge_file(central_provinces)
df_cen.show()
print(df_cen.count())

+----------+-----+-------------+-----+-------+-----+--------+-------+-------+--------+
|      Date| Time|      Weather| Temp|   Rain|Cloud|Pressure|   Wind|   Gust|location|
+----------+-----+-------------+-----+-------+-----+--------+-------+-------+--------+
|2010-01-01|00:00|Partly cloudy|15 °c|0.0\nmm|  57%| 1014 mb| 8 km/h|17 km/h|  da-lat|
|2010-01-01|03:00|Partly cloudy|14 °c|0.0\nmm|  50%| 1012 mb| 8 km/h|13 km/h|  da-lat|
|2010-01-01|06:00|Partly cloudy|13 °c|0.0\nmm|  42%| 1012 mb|10 km/h|19 km/h|  da-lat|
|2010-01-01|09:00|        Sunny|22 °c|0.0\nmm|   8%| 1013 mb|12 km/h|28 km/h|  da-lat|
|2010-01-01|12:00|        Sunny|23 °c|0.0\nmm|  13%| 1012 mb|13 km/h|33 km/h|  da-lat|
|2010-01-01|15:00|        Sunny|24 °c|0.0\nmm|   9%| 1010 mb|10 km/h|28 km/h|  da-lat|
|2010-01-01|18:00|        Clear|19 °c|0.0\nmm|  18%| 1012 mb| 7 km/h|18 km/h|  da-lat|
|2010-01-01|21:00|        Clear|15 °c|0.0\nmm|  24%| 1013 mb| 7 km/h|15 km/h|  da-lat|
|2010-01-02|00:00|Partly cloudy|13 °c|0.0\n

In [44]:
df_cen_process=data_process(df_cen)
df_cen_process.count()
df_cen_process.show()

+-------------+--------+--------+--------+------------+----------+----------+-------------------+
|      Weather|location|Temp(°c)|Rain(mm)|Pressure(mb)|Wind(km/h)|Gust(km/h)|          Timestamp|
+-------------+--------+--------+--------+------------+----------+----------+-------------------+
|Partly cloudy|  da-lat|    15.0|     0.0|      1014.0|       8.0|      17.0|2010-01-01 00:00:00|
|Partly cloudy|  da-lat|    14.0|     0.0|      1012.0|       8.0|      13.0|2010-01-01 03:00:00|
|Partly cloudy|  da-lat|    13.0|     0.0|      1012.0|      10.0|      19.0|2010-01-01 06:00:00|
|        Sunny|  da-lat|    22.0|     0.0|      1013.0|      12.0|      28.0|2010-01-01 09:00:00|
|        Sunny|  da-lat|    23.0|     0.0|      1012.0|      13.0|      33.0|2010-01-01 12:00:00|
|        Sunny|  da-lat|    24.0|     0.0|      1010.0|      10.0|      28.0|2010-01-01 15:00:00|
|        Clear|  da-lat|    19.0|     0.0|      1012.0|       7.0|      18.0|2010-01-01 18:00:00|
|        Clear|  da-

In [None]:
df_cen_process.write.format("csv").mode("overwrite").save("hdfs://namenode:9000/tmp/weather_data/central")


In [45]:
df = spark.read.option("multiLine", True) \
                .option("header", True) \
                .option("inferSchema", False) \
                .option("encoding", "utf-8") \
                .csv(f"hdfs://namenode:9000/tmp/weather_data/central/part-00012-e74cb50b-2b42-4a82-ad0d-08005814fdf9-c000.csv")
df.show(100)

+--------------------+------+----+---+------+----+----+------------------------+
|               Clear|tam-ky|21.0|0.0|1014.0| 6.0|14.0|2010-01-01T00:00:00.000Z|
+--------------------+------+----+---+------+----+----+------------------------+
|               Clear|tam-ky|21.0|0.0|1013.0| 6.0|12.0|    2010-01-01T03:00:...|
|               Clear|tam-ky|22.0|0.0|1014.0| 5.0| 8.0|    2010-01-01T06:00:...|
|               Sunny|tam-ky|26.0|0.0|1015.0| 2.0| 7.0|    2010-01-01T09:00:...|
|               Sunny|tam-ky|27.0|0.0|1013.0| 9.0|15.0|    2010-01-01T12:00:...|
|               Sunny|tam-ky|26.0|0.0|1011.0|11.0|20.0|    2010-01-01T15:00:...|
|               Clear|tam-ky|23.0|0.0|1013.0| 6.0|18.0|    2010-01-01T18:00:...|
|               Clear|tam-ky|21.0|0.0|1014.0| 5.0|14.0|    2010-01-01T21:00:...|
|               Clear|tam-ky|21.0|0.0|1014.0| 5.0|11.0|    2010-01-02T00:00:...|
|               Clear|tam-ky|21.0|0.0|1014.0| 5.0|10.0|    2010-01-02T03:00:...|
|               Clear|tam-ky