# Init

In [6]:
import findspark
findspark.init("/Users/4most/Inclass/DSDE/final/spark-3.5.5-bin-hadoop3")

In [7]:
spark_url = 'local'

In [8]:
#library

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring, to_date, weekofyear, explode, regexp_replace, split, expr, min, round, date_sub, avg
from sklearn.preprocessing import MultiLabelBinarizer
from pyspark.sql import functions as F
import math
import requests
from datetime import datetime, timedelta
import pandas as pd


In [9]:
spark = SparkSession.builder \
    .master(spark_url) \
    .appName('Spark SQL') \
    .getOrCreate()

# Feature Engineering

In [10]:
path = '../bangkok_traffy.csv'
df = spark.read.csv(path, header=True, inferSchema=True)

                                                                                

In [11]:
#deleted column
del_col = ['photo','photo_after']

df = df.drop(*del_col)

df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- state: string (nullable = true)
 |-- star: string (nullable = true)
 |-- count_reopen: string (nullable = true)
 |-- last_activity: string (nullable = true)



In [12]:
df.count()

                                                                                

1718441

In [13]:
# at least 5 non-null values
df = df.na.drop(thresh=5)

In [14]:
# deselect the unknown type
df = df.filter(df["type"] != "{}")

In [15]:
df = df.withColumn("date", to_date(substring('timestamp',1,10), "yyyy-MM-dd"))
df = df.withColumn("year", substring("date", 1, 4))
df = df.withColumn("week", weekofyear("date"))

In [16]:
# df.show()

In [17]:
# filter more
df = df.filter(col("date") > "2022-09-01")
df = df.filter(col('subdistrict').isNotNull() & col('district').isNotNull() & col('province').isNotNull())

In [18]:
df.count()

                                                                                

384906

In [19]:
# filter the wrong case
train_col = ['type', 'coords', 'subdistrict', 'district', 'province', 'timestamp']

for c in train_col:
  df = df.filter(~(col(c).startswith("https:")))

In [20]:
# df.count()

In [21]:
df_cleaned = df.withColumn("type", regexp_replace(col("type"), "[{}]", ""))

In [22]:
allowed_types = ['ป้ายจราจร', 'เสียงรบกวน', 'สะพาน', 'PM2.5', 'กีดขวาง', 'ห้องน้ำ', 'ทางเท้า', 'ต้นไม้', 'การเดินทาง', 'คนจรจัด', 'ป้าย', 'ความสะอาด', 'คลอง', 'แสงสว่าง', 'น้ำท่วม', 'ท่อระบายน้ำ', 'ถนน', 'จราจร', 'ความปลอดภัย', 'สายไฟ', 'สัตว์จรจัด']

In [23]:
# สร้าง array literal สำหรับ allowed_types
allowed_types_expr = "array({})".format(",".join([f"'{t}'" for t in allowed_types]))

# สร้าง DataFrame ใหม่โดยแยก field 'type' เป็น array
df_with_array = df_cleaned.withColumn("type_array", split("type", ","))

# ใช้ Spark SQL expression เพื่อตรวจว่า type_array ⊆ allowed_types
filtered_df = df_with_array.filter(
    expr(f"array_except(type_array, {allowed_types_expr}) = array()")
)

In [24]:
# fix lat long already
filtered_df = filtered_df.withColumn("lat", split(col("coords"), ",")[1]) \
       .withColumn("long", split(col("coords"), ",")[0])

In [25]:
filtered_df.count()

                                                                                

359557

In [26]:
# for field in filtered_df.schema.fields:
#     if str(field.dataType).startswith("ArrayType"):
#         filtered_df = filtered_df.withColumn(field.name, col(field.name).cast("string"))

In [27]:
# filtered_df.coalesce(1).write.csv('used_data', header=True, mode='overwrite')

# Train Data

In [28]:
trained_df = filtered_df.select('year', 'week', 'date', 'lat', 'long', 'subdistrict', 'district', 'province', 'type_array')

In [43]:
# trained_df.show()

Multi-Hot Label

In [30]:
# from pyspark.ml.feature import CountVectorizer

# cv = CountVectorizer(inputCol="type_array", outputCol="type_vector", binary=True)
# model = cv.fit(trained_df)
# df_vectorized = model.transform(trained_df)

In [31]:
# df_vectorized.show()

In [32]:
# trained_df.orderBy(desc('date')).show()

In [33]:
trained_df.count()

                                                                                

359557

add holiday data

In [36]:
holiday_path = '../external_csv/bangkok_holidays.csv'
holiday_df = spark.read.csv(holiday_path, header=True, inferSchema=True)

In [37]:
# holiday_df.show()

+------------+--------------------+
|holiday_date|        holiday_name|
+------------+--------------------+
|  2021-01-01|      New Year's Day|
|  2021-02-12|    Chinese New Year|
|  2021-02-26|         Makha Bucha|
|  2021-04-06|          Chakri Day|
|  2021-04-12|    Songkran Holiday|
|  2021-04-13|            Songkran|
|  2021-04-14|    Songkran Holiday|
|  2021-04-15|    Songkran Holiday|
|  2021-05-03|Labour Day (in lieu)|
|  2021-05-04|H.M. King's Coron...|
|  2021-05-10|Royal Ploughing C...|
|  2021-05-26|   Visakha Bucha Day|
|  2021-06-03|H.M. Queen's Birt...|
|  2021-07-25|       Buddhist Lent|
|  2021-07-26|Asahna Bucha Day ...|
|  2021-07-28|H.M. King's Birthday|
|  2021-08-12|        Mother's Day|
|  2021-08-12|H.M. Queen Mother...|
|  2021-09-24|  Prince Mahidol Day|
|  2021-10-13|The Passing of Ki...|
+------------+--------------------+
only showing top 20 rows



In [38]:
trained_df = trained_df.withColumn("date", F.to_date("date"))
holiday_df = holiday_df.withColumn("holiday_date", F.to_date("holiday_date"))

# Extract week start (Monday) for main and holiday dates
trained_df = trained_df.withColumn("week_start", F.date_sub("date", F.dayofweek("date") - 2))
holiday_df = holiday_df.withColumn("holiday_week_start", F.date_sub("holiday_date", F.dayofweek("holiday_date") - 2))

# Join on the same week_start
df_joined = trained_df.join(holiday_df, trained_df.week_start == holiday_df.holiday_week_start, how="left")

# If holiday_date is not null, it means that week has a holiday
df_res = df_joined.withColumn("holiday_this_week", F.when(F.col("holiday_date").isNotNull(), True).otherwise(False))

df_res = df_res.drop("holiday_name", "holiday_week_start", "holiday_date", "week_start")

In [39]:
# df_res.filter(df_res['holiday_this_week'] == 'true').show()



+----+----+----------+--------+---------+--------------+-----------+-------------+--------------------+-----------------+
|year|week|      date|     lat|     long|   subdistrict|   district|     province|          type_array|holiday_this_week|
+----+----+----------+--------+---------+--------------+-----------+-------------+--------------------+-----------------+
|2022|  37|2022-09-18|13.77422|100.53592|      สามเสนใน|      พญาไท|กรุงเทพมหานคร|       [ท่อระบายน้ำ]|             true|
|2022|  37|2022-09-18|13.71498|100.58540|       พระโขนง|    คลองเตย|กรุงเทพมหานคร|        [เสียงรบกวน]|             true|
|2022|  37|2022-09-18|13.78256|100.68249|      สะพานสูง|   สะพานสูง|กรุงเทพมหานคร|           [ทางเท้า]|             true|
|2022|  37|2022-09-18|13.74363|100.49609|วังบูรพาภิรมย์|     พระนคร|กรุงเทพมหานคร|           [กีดขวาง]|             true|
|2022|  37|2022-09-18|13.72409|100.55125|       ลุมพินี|    ปทุมวัน|กรุงเทพมหานคร|       [ท่อระบายน้ำ]|             true|
|2022|  37|2022-09-18|13

                                                                                

add rain data

In [44]:

df_res = df_res.withColumn("date", to_date(col("date")))
# df_res = df_res.filter(col("date") > "2022-09-01")

df_res = df_res.drop("rounded_lat", "rounded_lon")

df_res = df_res.withColumn("rounded_lat", round(col("lat"), 2)) \
                             .withColumn("rounded_lon", round(col("long"), 2))

# fix lat long already
rain_df = spark.read.csv("../external_csv/rainfall_data.csv", header=True, inferSchema=True)
rain_df = rain_df.withColumn("date", to_date(col("date"))) \
                 .withColumn("rounded_lat", round(col("long"), 2)) \
                 .withColumn("rounded_lon", round(col("lat"), 2)) \
                 .select("date", "rounded_lat", "rounded_lon", "Precipitation (mm)")

df_res = df_res.withColumn("date_start", date_sub(col("date"), 7)) \
                             .withColumn("date_end", col("date"))

rain_df_renamed = rain_df.withColumnRenamed("date", "rain_date")
df_merged = df_res.join(
    rain_df_renamed,
    (df_res.rounded_lat == rain_df_renamed.rounded_lat) &
    (df_res.rounded_lon == rain_df_renamed.rounded_lon) &
    (rain_df_renamed.rain_date >= df_res.date_start) &
    (rain_df_renamed.rain_date <= df_res.date_end),
    how='left'
)


group_by_columns = [df_res[c] for c in df_res.columns if c not in ["date_start", "date_end"]]
df_merged = df_merged.select(
    *group_by_columns,
    col("Precipitation (mm)")
).groupBy(
    *[df_res[c] for c in df_res.columns if c not in ["date_start", "date_end"]]
).agg(
    avg(col("Precipitation (mm)")).alias("avg_precipitation_7days")
)

df_merged = df_merged.fillna({"avg_precipitation_7days": 0})

df_merged = df_merged.withColumn("lat", col("lat")) \
                     .withColumn("long", col("long"))


                                                                                

In [45]:
# df_merged.show()

add pm2.5 data

In [47]:
# === Step 1: Load PM2.5 data ===
pm25_df = spark.read.csv("../external_csv/pm25_data.csv", header=True, inferSchema=True)

# Format columns
pm25_df = pm25_df.withColumn("date", to_date(col("date"))) \
                 .withColumn("rounded_lat", round(col("rounded_lat"), 2)) \
                 .withColumn("rounded_lon", round(col("rounded_lon"), 2)) \
                 .select("date", "rounded_lat", "rounded_lon", "pm2_5")

# === Step 3: Join with PM2.5 ===
df_final = df_merged.join(
    pm25_df,
    on=[
        df_merged.date == pm25_df.date,
        df_merged.rounded_lat == pm25_df.rounded_lat,
        df_merged.rounded_lon == pm25_df.rounded_lon
    ],
    how="inner"
).select(
    df_merged["*"],  # all columns from df_merged
    pm25_df["pm2_5"] # only the PM2.5 value from the right table
)

df_final = df_final.fillna({"pm2_5": 0.0})


                                                                                

In [48]:
# df_final.show()

[Stage 54:>                                                         (0 + 1) / 1]

+----+----+----------+--------+---------+-----------+-----------+-------------+--------------------+-----------------+-----------+-----------+-----------------------+-----------------+
|year|week|      date|     lat|     long|subdistrict|   district|     province|          type_array|holiday_this_week|rounded_lat|rounded_lon|avg_precipitation_7days|            pm2_5|
+----+----+----------+--------+---------+-----------+-----------+-------------+--------------------+-----------------+-----------+-----------+-----------------------+-----------------+
|2022|  35|2022-09-02|13.59606|100.40091|      แสมดำ|บางขุนเทียน|กรุงเทพมหานคร|         [ความสะอาด]|            false|       13.6|      100.4|                   6.32|66.65833333333333|
|2022|  35|2022-09-02|13.60218|100.42565|      แสมดำ|บางขุนเทียน|กรุงเทพมหานคร|           [กีดขวาง]|            false|       13.6|     100.43|                   6.32|66.65833333333333|
|2022|  35|2022-09-02|13.60674|100.43503|    ท่าข้าม|บางขุนเทียน|กรุงเทพมหา

                                                                                

In [49]:
df_final = df_final.withColumn("avg_precipitation_7days", round(col("avg_precipitation_7days"), 2))

In [50]:
df_final = df_final.withColumn("pm2_5", round(col("pm2_5"), 2))

In [51]:
df_used = df_final.select('year', 'week', 'date', 'lat', 'long', 'subdistrict', 'district', 'province', 'type_array', 'holiday_this_week', 'avg_precipitation_7days', 'pm2_5')

In [52]:
df_used.count()

                                                                                

259248

In [53]:
for field in df_used.schema.fields:
    if str(field.dataType).startswith("ArrayType"):
        df_used = df_used.withColumn(field.name, col(field.name).cast("string"))

In [56]:
df_used.coalesce(1).write.csv('used_output', header=True, mode='overwrite')

                                                                                

In [None]:
# import requests
# import pandas as pd
# from datetime import datetime

# # Assuming df_merged is a Spark DataFrame
# unique_coords_df = df_merged.select("rounded_lat", "rounded_lon").dropDuplicates()
# unique_coords = unique_coords_df.toPandas()  # Convert to Pandas for API loop

# print(f"Unique coordinates: {len(unique_coords)}")

# start = "2022-09-02"
# end = "2025-01-16"
# all_results = []

# for _, row in unique_coords.iterrows():
#     lat = row['rounded_lat']
#     lon = row['rounded_lon']

#     print(f"📡 Fetching PM2.5 for ({lat}, {lon})...")

#     try:
#         url = "https://air-quality-api.open-meteo.com/v1/air-quality"
#         params = {
#             "latitude": lat,
#             "longitude": lon,
#             "hourly": "pm2_5",
#             "start_date": start,
#             "end_date": end,
#             "timezone": "Asia/Bangkok"
#         }

#         response = requests.get(url, params=params)
#         data = response.json()

#         hourly_data = data.get("hourly", {})
#         timestamps = hourly_data.get("time", [])
#         values = hourly_data.get("pm2_5", [])

#         if not timestamps or not values:
#             print(f"❌ No data for ({lat}, {lon})")
#             continue

#         pm25_df = pd.DataFrame({
#             "datetime": pd.to_datetime(timestamps),
#             "pm2_5": values
#         })

#         # 🗓️ Extract date and average per day
#         pm25_df['date'] = pm25_df['datetime'].dt.date
#         pm25_df = pm25_df.groupby('date', as_index=False).mean(numeric_only=True)
#         pm25_df['rounded_lat'] = lat
#         pm25_df['rounded_lon'] = lon

#         all_results.append(pm25_df)

#         print(f"✅ Success: {len(pm25_df)} daily values for ({lat}, {lon})")

#     except Exception as e:
#         print(f"❌ Error fetching for ({lat}, {lon}): {e}")

# # 💾 Save all results
# if all_results:
#     result_df = pd.concat(all_results, ignore_index=True)
#     result_df.to_csv("pm25_data.csv", index=False)
#     print("✅ All results saved to pm25_data.csv")
# else:
#     print("❌ No valid data to save.")
