In [93]:
import os
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import s3fs
import time
from zoneinfo import ZoneInfo
from datetime import timedelta, datetime


# Set up environments of LakeFS
lakefs_endpoint = os.getenv("LAKEFS_ENDPOINT", "http://lakefs-dev:8000")
ACCESS_KEY = 'access_key'
SECRET_KEY = 'secret_key'

# Setting S3FileSystem for access LakeFS
fs = s3fs.S3FileSystem(
    key=ACCESS_KEY,
    secret=SECRET_KEY,
    client_kwargs={'endpoint_url': lakefs_endpoint}
)

def load_data():
    lakefs_path = "s3://dsi321-record-air-quality/main/airquality.parquet/year=2025"
    data_list = fs.glob(f"{lakefs_path}/*/*/*/*")
    df_all = pd.concat([pd.read_parquet(f"s3://{path}", filesystem=fs) for path in data_list], ignore_index=True)
    df_all['lat'] = pd.to_numeric(df_all['lat'], errors='coerce')
    df_all['long'] = pd.to_numeric(df_all['long'], errors='coerce')
    df_all['year'] = df_all['year'].astype("int64")
    df_all['month'] = df_all['month'].astype("int64")
    df_all['day'] = df_all['month'].astype("int64")
    df_all['hour'] = df_all['month'].astype("int64")
    df_all.drop_duplicates(inplace=True)
    df_all['PM25.aqi'] = df_all['PM25.aqi'].mask(df_all['PM25.aqi'] < 0, pd.NA)
    # Fill value "Previous Record" Group By stationID
    df_all['PM25.aqi'] = df_all.groupby('stationID')['PM25.aqi'].transform(lambda x: x.ffill())
    return df_all

In [94]:
df = load_data

In [95]:
print(df)

<function load_data at 0xffff281e3c40>


In [64]:
# เปลี่ยน data types กันด้วยนะ

In [47]:
df.to_csv('data.csv', encoding='utf-8', index=False)

In [49]:
from glob import glob
existing_parts = set()
paths = glob("data2.parquet/year=*/month=*/day=*/hour=*")

for path in paths:
    parts = path.split(os.sep)
    part_dict = {p.split("=")[0]: int(p.split("=")[1]) for p in parts if "=" in p}
    key = (part_dict["year"], part_dict["month"], part_dict["day"], part_dict["hour"])
    existing_parts.add(key)

# ✅ 3. กรองเฉพาะพาร์ทิชันใหม่
df["partition_key"] = list(zip(df["year"], df["month"], df["day"], df["hour"]))
df_new = df[~df["partition_key"].isin(existing_parts)].drop(columns=["partition_key"])

# ✅ 4. เขียนเฉพาะพาร์ทิชันใหม่เข้า data2.parquet
if not df_new.empty:
    import pyarrow as pa
    import pyarrow.dataset as ds
    table = pa.Table.from_pandas(df_new)
    ds.write_dataset(
        table,
        base_dir="data2.parquet",
        format="parquet",
        partitioning=["year", "month", "day", "hour"],
        existing_data_behavior="overwrite_or_ignore"
    )
    print(f"✅ Added {len(df_new)} new records to data2.parquet")
else:
    print("✨ No new partitions to add.")

✨ No new partitions to add.


In [50]:
print("ข้อมูลทั้งหมดใน df:", df[["year", "month", "day", "hour"]].drop_duplicates())

ข้อมูลทั้งหมดใน df:        year  month  day  hour
0      2025      5   15    16
374    2025      5   15    17
561    2025      5   15    18
748    2025      5   15    19
935    2025      5   15    20
...     ...    ...  ...   ...
13058  2025      5   18     5
13244  2025      5   18     6
13430  2025      5   18     7
13616  2025      5   18     8
13802  2025      5   18     9

[67 rows x 4 columns]
