In [None]:
from google.colab import drive
drive.mount('/content/drive')

!pip install pyspark pandas matplotlib seaborn # numpy dan scikit-learn tidak terlalu fokus di EDA, tapi bisa disertakan

# --- Sel 2: Inisialisasi SparkSession ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour, dayofweek, avg, count, when, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

spark = SparkSession.builder \
    .appName("MedanTrafficEDA_Colab") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession berhasil diinisialisasi.")

# Path dasar untuk data di Google Drive.
# Dalam lingkungan Hadoop, ini akan menjadi path HDFS (misalnya, "hdfs://namenode:9000/data/").
google_drive_base_path = "/content/drive/MyDrive/data/" # Sesuaikan path ini dengan lokasi folder Anda di GDrive

# --- Sel 3: Baca Data Mentah (Bronze Layer) ---
print("--- Membaca Data Mentah (Bronze Layer) ---")

traffic_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("speed_kmh", FloatType(), True),
    StructField("taxi_id", StringType(), True),
    StructField("index_right", StringType(), True),
    StructField("road_name", StringType(), True)
])

weather_schema = StructType([
    StructField("date_time", StringType(), True),
    StructField("location", StringType(), True),
    StructField("rainfall_mm", FloatType(), True),
    StructField("temperature_c", FloatType(), True),
    StructField("humidity_percent", FloatType(), True),
    StructField("visibility_km", FloatType(), True)
])

# Path file input di Google Drive
traffic_input_path = os.path.join(google_drive_base_path, "simulasi_trafik_medan.csv")
weather_input_path = os.path.join(google_drive_base_path, "cuaca_medan_2013-07-01.csv")

print(f"Membaca data lalu lintas dari: {traffic_input_path}")
print(f"Membaca data cuaca dari: {weather_input_path}")

try:
    df_traffic_raw = spark.read.csv(traffic_input_path, header=True, schema=traffic_schema)
    df_weather_raw = spark.read.csv(weather_input_path, header=True, schema=weather_schema)
    print("Data mentah (Bronze Layer) berhasil dibaca.")
    print("\nSchema df_traffic_raw:")
    df_traffic_raw.printSchema()
    print("\nData df_traffic_raw (5 baris teratas):")
    df_traffic_raw.show(5, truncate=False)
    print("\nSchema df_weather_raw:")
    df_weather_raw.printSchema()
    print("\nData df_weather_raw (5 baris teratas):")
    df_weather_raw.show(5, truncate=False)
except Exception as e:
    print(f"Kesalahan membaca data mentah: {e}")
    # Hentikan Spark jika terjadi kegagalan fatal dalam membaca data
    spark.stop()
    # Ini akan mencegah sel-sel berikutnya berjalan jika data dasar tidak ada
    raise

# --- Sel 4: Analisis Data Mentah (EDA Awal) ---
print("\n--- Melakukan Analisis Awal pada Data Mentah ---")

print("\nStatistik Deskriptif df_traffic_raw:")
df_traffic_raw.describe().show()
print("\nStatistik Deskriptif df_weather_raw:")
df_weather_raw.describe().show()

print(f"\nJumlah baris df_traffic_raw: {df_traffic_raw.count()}")
print(f"Jumlah baris df_weather_raw: {df_weather_raw.count()}")
print(f"Jumlah duplikasi df_traffic_raw: {df_traffic_raw.count() - df_traffic_raw.dropDuplicates().count()}")
print(f"Jumlah duplikasi df_weather_raw: {df_weather_raw.count() - df_weather_raw.dropDuplicates().count()}")

print("\nJumlah nilai NULL per kolom di df_traffic_raw:")
for col_name in df_traffic_raw.columns:
    print(f"  {col_name}: {df_traffic_raw.filter(col(col_name).isNull()).count()}")

print("\nJumlah nilai NULL per kolom di df_weather_raw:")
for col_name in df_weather_raw.columns:
    print(f"  {col_name}: {df_weather_raw.filter(col(col_name).isNull()).count()}")

print("\nAnalisis Kardinalitas Road Name:")
road_name_counts = df_traffic_raw.groupBy("road_name").count().orderBy(col("count").desc())
print("Top 10 Road Names (dari data mentah):")
road_name_counts.show(10, truncate=False)
print(f"Total unique road names (dari data mentah): {road_name_counts.count()}")

# --- Sel 5: Pemrosesan Data ke Silver Layer (Pembersihan & Normalisasi) ---
# Kode ini akan membuat df_silver.
# Dalam lingkungan Hadoop, df_silver ini biasanya akan disimpan ke HDFS Silver Layer
# dan bisa dibaca kembali oleh notebook lain jika diperlukan.
print("\n--- Memproses Data ke Silver Layer (Pembersihan & Normalisasi) ---")

df_traffic_clean = df_traffic_raw.dropDuplicates() \
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd, HH:mm:ss")) \
    .withColumn("road_name", when(col("road_name").isNull(), "Unknown").otherwise(col("road_name"))) \
    .filter(col("latitude").isNotNull() & col("longitude").isNotNull() & col("speed_kmh").isNotNull())

df_weather_clean = df_weather_raw.dropDuplicates() \
    .withColumn("date_time", to_timestamp(col("date_time"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("rainfall_mm", when(col("rainfall_mm").isNull(), 0.0).otherwise(col("rainfall_mm"))) \
    .withColumn("temperature_c", when(col("temperature_c").isNull(), 0.0).otherwise(col("temperature_c"))) \
    .withColumn("humidity_percent", when(col("humidity_percent").isNull(), 0.0).otherwise(col("humidity_percent"))) \
    .withColumn("visibility_km", when(col("visibility_km").isNull(), 0.0).otherwise(col("visibility_km")))

df_traffic_clean_hourly = df_traffic_clean.withColumn("hourly_timestamp", round(col("timestamp").cast("long") / 3600) * 3600) \
                                         .withColumn("hourly_timestamp", col("hourly_timestamp").cast("timestamp"))

df_silver = df_traffic_clean_hourly.join(
    df_weather_clean,
    df_traffic_clean_hourly.hourly_timestamp == df_weather_clean.date_time,
    "left"
).select(
    df_traffic_clean_hourly["*"],
    col("rainfall_mm"),
    col("temperature_c"),
    col("humidity_percent"),
    col("visibility_km")
).drop("hourly_timestamp")

print("Data Silver Layer berhasil dibuat.")
print("\nSchema df_silver:")
df_silver.printSchema()
print("\nData df_silver (5 baris teratas):")
df_silver.show(5, truncate=False)

# Simpan data Silver Layer ke Google Drive (sebagai simulasi penyimpanan di HDFS)
silver_output_path = os.path.join(google_drive_base_path, "output", "silver", "trafik_cuaca.parquet")
os.makedirs(os.path.dirname(silver_output_path), exist_ok=True)
df_silver.write.mode("overwrite").parquet(silver_output_path)
print(f"Data Silver Layer berhasil disimpan ke: {silver_output_path}")


# --- Sel 6: Visualisasi Data Silver Layer (menggunakan Pandas) ---
print("\n--- Melakukan Visualisasi Data Silver Layer ---")

# Konversi ke Pandas DataFrame. Ambil sampel jika data sangat besar untuk EDA.
# Untuk dataset Anda yang relatif kecil, mungkin tidak perlu sampling ekstrem.
# Jika data sangat besar, gunakan: sample_df_silver_pd = df_silver.sample(False, 0.01, seed=42).toPandas()
sample_df_silver_pd = df_silver.toPandas() # Atau gunakan .limit(10000).toPandas() jika masih terlalu besar

print("Ukuran sampel data Silver Layer untuk visualisasi:", sample_df_silver_pd.shape)
print("Data Silver Layer (sampel) untuk visualisasi (5 baris teratas):")
print(sample_df_silver_pd.head())


# Visualisasi 1: Tren Kecepatan Rata-rata per Jam
hourly_speed = sample_df_silver_pd.groupby(sample_df_silver_pd['timestamp'].dt.hour)['speed_kmh'].mean().reset_index()
plt.figure(figsize=(12, 7))
sns.lineplot(data=hourly_speed, x='timestamp', y='speed_kmh', marker='o')
plt.title('Average Speed by Hour of Day (Silver Layer)', fontsize=16)
plt.xlabel('Hour of Day', fontsize=12)
plt.ylabel('Average Speed (km/h)', fontsize=12)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

# Visualisasi 2: Korelasi antara Kecepatan dan Faktor Cuaca
# Pilih kolom numerik yang relevan untuk korelasi
numeric_cols = ['speed_kmh', 'rainfall_mm', 'temperature_c', 'humidity_percent', 'visibility_km']
correlation_matrix = sample_df_silver_pd[numeric_cols].corr()

plt.figure(figsize=(9, 7))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)
plt.title('Correlation Matrix of Speed and Weather Factors', fontsize=16)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.tight_layout()
plt.show()

# Visualisasi 3: Distribusi Kecepatan
plt.figure(figsize=(10, 6))
sns.histplot(sample_df_silver_pd['speed_kmh'], bins=50, kde=True, color='skyblue')
plt.title('Distribution of Speed (km/h) in Silver Layer', fontsize=16)
plt.xlabel('Speed (km/h)', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.grid(axis='y', alpha=0.75)
plt.tight_layout()
plt.show()

# --- Sel Terakhir: Hentikan SparkSession ---
spark.stop()
print("Sesi Spark dihentikan.")