In [12]:
import os
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import s3fs
import time
from zoneinfo import ZoneInfo
from datetime import timedelta, datetime


# Set up environments of LakeFS
lakefs_endpoint = os.getenv("LAKEFS_ENDPOINT", "http://localhost:8001")
ACCESS_KEY = 'access_key'
SECRET_KEY = 'secret_key'

# Setting S3FileSystem for access LakeFS
fs = s3fs.S3FileSystem(
    key=ACCESS_KEY,
    secret=SECRET_KEY,
    client_kwargs={'endpoint_url': lakefs_endpoint}
)

def load_data():
    lakefs_path = "s3://air-quality/main/airquality.parquet/year=2025"
    data_list = fs.glob(f"{lakefs_path}/*/*/*/*")
    df_all = pd.concat([pd.read_parquet(f"s3://{path}", filesystem=fs) for path in data_list], ignore_index=True)
    df_all['lat'] = pd.to_numeric(df_all['lat'], errors='coerce')
    df_all['long'] = pd.to_numeric(df_all['long'], errors='coerce')
    df_all['year'] = df_all['year'].astype(int)
    df_all['month'] = df_all['month'].astype(int)
    df_all['PM25.aqi'] = df_all['PM25.aqi'].mask(df_all['PM25.aqi'] < 0, pd.NA)
    df_all['PM25.aqi'] = df_all.groupby('stationID')['PM25.aqi'].transform(lambda x: x.fillna(method='ffill'))
    columns_to_convert = ['stationID', 'nameTH', 'nameEN', 'areaTH', 'areaEN', 'stationType']
    for col in columns_to_convert:
        df_all[col] = df_all[col].astype(pd.StringDtype())
    df_all.drop_duplicates(inplace=True)
    return df_all

In [13]:
df = load_data()

  df_all['PM25.aqi'] = df_all.groupby('stationID')['PM25.aqi'].transform(lambda x: x.fillna(method='ffill'))


In [14]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 30073 entries, 0 to 32673
Data columns (total 15 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   timestamp      30073 non-null  datetime64[ns]
 1   stationID      30073 non-null  string        
 2   nameTH         30073 non-null  string        
 3   nameEN         30073 non-null  string        
 4   areaTH         30073 non-null  string        
 5   areaEN         30073 non-null  string        
 6   stationType    30073 non-null  string        
 7   lat            30073 non-null  float64       
 8   long           30073 non-null  float64       
 9   PM25.color_id  30073 non-null  int64         
 10  PM25.aqi       30073 non-null  float64       
 11  year           30073 non-null  int64         
 12  month          30073 non-null  int64         
 13  day            30073 non-null  int32         
 14  hour           30073 non-null  int32         
dtypes: datetime64[ns](1), fl

In [15]:
df.to_csv('data.csv', encoding='utf-8', index=False)

In [16]:
df.to_parquet('data.parquet', partition_cols=['year','month','day','hour'], engine='pyarrow')