In [19]:
import csv
from confluent_kafka import Producer, KafkaError
import os
import time

# Kafka configuration
kafka_configuration = {
    'bootstrap.servers': 'localhost:9092',  # 確保 Kafka 服務運行於此地址
    'client.id': 'weather_producer',
    'queue.buffering.max.messages': 1000000,  # 增加緩衝的最大消息數量
    'queue.buffering.max.kbytes': 1024000,  # 增加緩衝區的大小（1 GB）
    'linger.ms': 100  # 讓 Producer 等待發送，批量處理
}

producer = Producer(kafka_configuration)

# CSV 檔案路徑
csv_file_path = '/Users/chive/documents/MacAir001STUDY/004Fall2024/CSC7740_BigData/CSC7740final/dataset/WeatherEvents_Jan2016-Dec2022.csv'

# Kafka topic
kafka_topic = 'weather'

# 檢查 Kafka 是否能連線
def check_kafka_connection():
    try:
        producer.list_topics(timeout=10)
        print("Kafka connection successful.")
    except KafkaError as e:
        if e.code() == KafkaError._TRANSPORT:
            print(f"ERROR! Kafka connection failed: {e}. Possible causes:")
            print("1. Kafka broker is not running on the specified address.")
            print("2. Incorrect broker address or port (check 'bootstrap.servers').")
            print("3. Firewall or network issues blocking the connection.")
        else:
            print(f"ERROR! Kafka connection failed: {e}")

# 訊息傳遞報告
def delivery_report(err, msg):
    if err is not None:
        print(f'ERROR ! Message delivery has failed: {err}')
    else:
        print(f'Topic {msg.topic()} has received your message. Delivery Successful | Offset {msg.offset()}')

# 檢查 CSV 檔案是否存在
if not os.path.exists(csv_file_path):
    print(f"ERROR! File not found: {csv_file_path}")
else:
    # 嘗試連線 Kafka
    check_kafka_connection()

    # 開啟 CSV 檔案並傳送訊息
    with open(csv_file_path, 'r') as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=",")
        header_row = next(csv_reader)  # 跳過表頭

        for row in csv_reader:
            message_value = ','.join(row)
            try:
                producer.produce(kafka_topic, value=message_value.encode('utf-8'), callback=delivery_report)
            except BufferError as e:
                print(f"Local buffer is full, waiting for free space: {e}")
                producer.poll(1)  # 等待 Kafka 處理完消息釋放空間
            
            # 主動調用 poll() 來處理已發送的消息，避免緩衝區過滿
            producer.poll(0)
        
        # 等待所有訊息傳送完成
        print("Flushing messages...")
        producer.flush()  # 等待所有訊息完成傳送
        print("All messages have been delivered or failed.")

Kafka connection successful.


IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Start a Spark session
spark = SparkSession.builder.appName("WeatherDataProcessing") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

# Define Kafka configuration
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'weather'

# Define the schema for the incoming weather data
weather_schema = StructType([
    StructField('StartTime(UTC)', StringType(), True),
    StructField('EndTime(UTC)', StringType(), True),
    StructField('Type', StringType(), True),
    StructField('Severity', StringType(), True),
    StructField('Precipitation(in)', FloatType(), True),
    StructField('AirportCode', StringType(), True),
    StructField('LocationLat', StringType(), True),
    StructField('LocationLng', StringType(), True),
    StructField('City', StringType(), True),
    StructField('State', StringType(), True)
])

# Read the Kafka stream
weather_data_df = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
    .option('subscribe', kafka_topic) \
    .load()

# Kafka value is in binary format, convert it to string
weather_data_df = weather_data_df.selectExpr("CAST(value AS STRING)")

# Parse the CSV data received from Kafka using the predefined schema
weather_data_df = weather_data_df.select(from_json(col("value"), weather_schema).alias("data")).select("data.*")

# Convert StartTime and EndTime to proper timestamp formats
weather_data_df = weather_data_df.withColumn("StartTime(UTC)", to_timestamp(col("StartTime(UTC)"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("EndTime(UTC)", to_timestamp(col("EndTime(UTC)"), "yyyy-MM-dd HH:mm:ss"))

# Define relevant columns based on the new dataset
DATETIME_START_COL = 'StartTime(UTC)'
DATETIME_END_COL = 'EndTime(UTC)'
EVENT_TYPE_COL = 'Type'
SEVERITY_COL = 'Severity'
PRECIPITATION_COL = 'Precipitation(in)'
AIRPORTCODE_COL= 'AirportCode'
LATITUDE_COL = 'LocationLat'
LONGITUDE_COL = 'LocationLng'
CITY_COL = 'City'
STATE_COL = 'State'

# Select the relevant columns to keep for further processing
weather_data_selected_df = weather_data_df.select(DATETIME_START_COL, DATETIME_END_COL, EVENT_TYPE_COL, SEVERITY_COL, 
                                                  PRECIPITATION_COL, AIRPORTCODE_COL, LATITUDE_COL, LONGITUDE_COL, 
                                                  CITY_COL, STATE_COL)

# Show the first few rows of the full dataset with selected columns
weather_data_selected_df.show()

# Start streaming and show the output
query = weather_data_selected_df.writeStream \
    .outputMode('append') \
    .format('console') \
    .start()

query.awaitTermination()


                                                                                

+-------------------+-------------------+----+--------+-----------------+-----------+-----------+-----------+--------+-----+
|     StartTime(UTC)|       EndTime(UTC)|Type|Severity|Precipitation(in)|AirportCode|LocationLat|LocationLng|    City|State|
+-------------------+-------------------+----+--------+-----------------+-----------+-----------+-----------+--------+-----+
|2016-01-06 23:14:00|2016-01-07 00:34:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-07 04:14:00|2016-01-07 04:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-07 05:54:00|2016-01-07 15:34:00|Snow|   Light|             0.03|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-08 05:34:00|2016-01-08 05:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-08 13:54:00|2016-01-08 15:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|


In [21]:
# Count the number of rows and columns in the filtered DataFrame
num_rows = weather_data_selected_df.count()
num_columns = len(weather_data_selected_df.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_columns)


[Stage 70:>                                                         (0 + 8) / 9]

Number of rows: 8627181
Number of columns: 10


                                                                                

In [22]:
# Print the schema of the entire dataset
weather_data_selected_df.printSchema()

root
 |-- StartTime(UTC): timestamp (nullable = true)
 |-- EndTime(UTC): timestamp (nullable = true)
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- AirportCode: string (nullable = true)
 |-- LocationLat: double (nullable = true)
 |-- LocationLng: double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [23]:
# Count missing values for each column in the full dataset
for c in weather_data_selected_df.columns:
    print(f'Missing values of column `{c}` count: {weather_data_df.where(col(c).isNull()).count()}')

                                                                                

Missing values of column `StartTime(UTC)` count: 0


                                                                                

Missing values of column `EndTime(UTC)` count: 0


                                                                                

Missing values of column `Type` count: 0


                                                                                

Missing values of column `Severity` count: 0


                                                                                

Missing values of column `Precipitation(in)` count: 0


                                                                                

Missing values of column `AirportCode` count: 0


                                                                                

Missing values of column `LocationLat` count: 0


                                                                                

Missing values of column `LocationLng` count: 0


                                                                                

Missing values of column `City` count: 16912


[Stage 100:>                                                        (0 + 8) / 9]

Missing values of column `State` count: 0


                                                                                

In [24]:
# Drop rows with any null values in the full dataset
not_null_weather_data_df = weather_data_selected_df.dropna()

# Show the result after dropping rows with null values
not_null_weather_data_df.show()

+-------------------+-------------------+----+--------+-----------------+-----------+-----------+-----------+--------+-----+
|     StartTime(UTC)|       EndTime(UTC)|Type|Severity|Precipitation(in)|AirportCode|LocationLat|LocationLng|    City|State|
+-------------------+-------------------+----+--------+-----------------+-----------+-----------+-----------+--------+-----+
|2016-01-06 23:14:00|2016-01-07 00:34:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-07 04:14:00|2016-01-07 04:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-07 05:54:00|2016-01-07 15:34:00|Snow|   Light|             0.03|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-08 05:34:00|2016-01-08 05:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|
|2016-01-08 13:54:00|2016-01-08 15:54:00|Snow|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|


In [25]:
# Specific weather columns: Group by 'Type' column and count occurrences in the filtered dataset
not_null_weather_data_df.groupBy(EVENT_TYPE_COL).count().show(truncate=False)



+-------------+-------+
|Type         |count  |
+-------------+-------+
|Cold         |231232 |
|Fog          |2009035|
|Storm        |61096  |
|Precipitation|157036 |
|Hail         |2921   |
|Snow         |1156334|
|Rain         |4992615|
+-------------+-------+



                                                                                

In [26]:
from typing import Dict, Iterable
from pyspark.sql.functions import col

# Weather condition dictionary to map specific data points to broader categories
def get_weather_conditions_aggregation_dict(weather_conditions: Iterable[str]) -> Dict[str, str]:
    
    weather_conditions_dict = dict()
  
    for weather_condition in weather_conditions:
        weather_condition_lowered = weather_condition.lower()

        if any(key in weather_condition_lowered for key in ['squall', 'thunderstorm']):
            weather_conditions_dict[weather_condition] = 'thunderstorm'
        elif any(key in weather_condition_lowered for key in ['drizzle', 'rain']):
            weather_conditions_dict[weather_condition] = 'rainy'
        elif any(key in weather_condition_lowered for key in ['sleet', 'snow']):
            weather_conditions_dict[weather_condition] = 'snowy'
        elif 'cloud' in weather_condition_lowered:
            weather_conditions_dict[weather_condition] = 'cloudy'
        elif any(key in weather_condition_lowered for key in ['fog', 'mist', 'haze']):
            weather_conditions_dict[weather_condition] = 'foggy'
        elif any(key in weather_condition_lowered for key in ['clear', 'sun']):
            weather_conditions_dict[weather_condition] = 'sunny'
        elif 'cold' in weather_condition_lowered:
            weather_conditions_dict[weather_condition] = 'cold'
        elif 'hail' in weather_condition_lowered:
            weather_conditions_dict[weather_condition] = 'hail'
    
    return weather_conditions_dict

# Select distinct weather conditions (event types) from the dataset
weather_conditions_all = not_null_weather_data_df.select(EVENT_TYPE_COL).distinct().rdd.flatMap(lambda x: x).collect()

# Create a dictionary that maps the weather event types to broader categories
weather_conditions_dict = get_weather_conditions_aggregation_dict(weather_conditions_all)

# Broadcast the dictionary for efficient lookup in a distributed environment
broadcast_dict = spark.sparkContext.broadcast(weather_conditions_dict)


                                                                                

In [27]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# UDF to map weather event types to broad categories
def map_weather_condition(weather_condition):
    return broadcast_dict.value.get(weather_condition, "Unknown")

# Register the UDF to map specific weather conditions
map_weather_condition_udf = udf(map_weather_condition, StringType())

# Apply the UDF to the event type column to create a new column with broad weather conditions
weather_measurements_aggregate_df = not_null_weather_data_df.withColumn(
    "Broad_Weather_Condition",
    map_weather_condition_udf(col(EVENT_TYPE_COL))
)

# Optionally, drop the original specific weather event type column
weather_measurements_aggregate_general_df = weather_measurements_aggregate_df.drop(EVENT_TYPE_COL)


In [28]:
# Not null and consolidated dataframe with broad weather conditions
SLOW_OPERATIONS: bool = True
if SLOW_OPERATIONS: 
    weather_measurements_aggregate_general_df.show()

+-------------------+-------------------+--------+-----------------+-----------+-----------+-----------+--------+-----+-----------------------+
|     StartTime(UTC)|       EndTime(UTC)|Severity|Precipitation(in)|AirportCode|LocationLat|LocationLng|    City|State|Broad_Weather_Condition|
+-------------------+-------------------+--------+-----------------+-----------+-----------+-----------+--------+-----+-----------------------+
|2016-01-06 23:14:00|2016-01-07 00:34:00|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|                  snowy|
|2016-01-07 04:14:00|2016-01-07 04:54:00|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|                  snowy|
|2016-01-07 05:54:00|2016-01-07 15:34:00|   Light|             0.03|       K04V|    38.0972|  -106.1689|Saguache|   CO|                  snowy|
|2016-01-08 05:34:00|2016-01-08 05:54:00|   Light|              0.0|       K04V|    38.0972|  -106.1689|Saguache|   CO|                 

In [29]:
# Define the broad weather condition column
BROAD_WEATHER_COL = 'Broad_Weather_Condition'

# Group by the broad weather condition column and count the occurrences
weather_measurements_aggregate_general_df.groupBy(BROAD_WEATHER_COL).count().show(truncate=False)



+-----------------------+-------+
|Broad_Weather_Condition|count  |
+-----------------------+-------+
|rainy                  |4992615|
|snowy                  |1156334|
|Unknown                |218132 |
|cold                   |231232 |
|hail                   |2921   |
|foggy                  |2009035|
+-----------------------+-------+



                                                                                

In [30]:
# Define the broad weather condition column name
BROAD_WEATHER_COL = 'Broad_Weather_Condition'

# Filter out the rows where Broad_Weather_Condition is 'Unknown'
weather_aggregate_filtered_df = weather_measurements_aggregate_general_df.filter(col(BROAD_WEATHER_COL) != "Unknown")

In [31]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define the window specification, ordering by start time in descending order within each broad weather condition
windowSpec = Window.partitionBy(BROAD_WEATHER_COL).orderBy(col(DATETIME_START_COL).desc())

# Add row number to each row within the window
weather_ranked_df = weather_aggregate_filtered_df.withColumn("row_number", row_number().over(windowSpec))

# Filter the ranked dataframe to get the latest 10,000 datasets for each Broad_Weather_Condition
weather_aggregate_10k_df = weather_ranked_df.filter(col("row_number") <= 10000)

# Drop the row_number column
weather_aggregate_final_df = weather_aggregate_10k_df.drop("row_number")

# Show the final dataframe, grouped by Broad_Weather_Condition and showing the count for each category
weather_aggregate_final_df.groupBy(BROAD_WEATHER_COL).count().show(truncate=False)



+-----------------------+-----+
|Broad_Weather_Condition|count|
+-----------------------+-----+
|rainy                  |10000|
|snowy                  |10000|
|cold                   |10000|
|hail                   |2921 |
|foggy                  |10000|
+-----------------------+-----+



                                                                                

In [32]:
#### Statistical distribution of the dataset

In [33]:
# Display summary statistics of the numerical columns in the final dataframe
weather_aggregate_final_df.describe().show()



+-------+--------+-------------------+-----------+-----------------+------------------+----------+-----+-----------------------+
|summary|Severity|  Precipitation(in)|AirportCode|      LocationLat|       LocationLng|      City|State|Broad_Weather_Condition|
+-------+--------+-------------------+-----------+-----------------+------------------+----------+-----+-----------------------+
|  count|   42921|              42921|      42921|            42921|             42921|     42921|42921|                  42921|
|   mean|    NULL|0.02936278278698031|       NULL|39.98051975955862|-96.75347946459746|      NULL| NULL|                   NULL|
| stddev|    NULL|0.12242399472858015|       NULL| 5.36094191369729|15.394954701008139|      NULL| NULL|                   NULL|
|    min|   Heavy|                0.0|       K01M|          24.5571|          -124.555| Abbeville|   AL|                   cold|
|    max|  Severe|               4.87|       KZZV|          48.9402|          -67.7928|Zionsville

                                                                                

In [34]:
# Path where the CSV file will be saved
final_dataset_path = "dataset/final_dataset"

# Save the DataFrame as a single CSV file
(weather_aggregate_final_df
    .coalesce(1)  # Combine all partitions into one
    .write
    .mode("overwrite")  # Overwrite if the file already exists
    .option("header", "true")  # Include header
    .csv(final_dataset_path))

                                                                                