In [10]:
#### TEST
import requests
import pandas as pd
from datetime import datetime
import time
import pytz
from datetime import timedelta

import nest_asyncio
import asyncio
import aiohttp
from prefect import flow, task # Prefect flow and task decorators
from math import ceil


nest_asyncio.apply()


@task
async def fetch_pollution_data(coord_df, batch_size=50):
    API_KEY = "68c16da5e675ad8635df84629765b118"
    # WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
    POLLUTION_ENDPOINT = "http://api.openweathermap.org/data/2.5/air_pollution"
    
    async def fetch_row(session, row):
        # await asyncio.sleep(1)  # ✅ พัก 1 วิแบบไม่บล็อก loop
        lat = row['lat']
        lon = row['lon']
        province = row['province']
        amphoe = row['amphoe']
        try:
            params = {
                "lat" : lat,
                "lon" : lon,
                "appid": API_KEY,
                "units": "metric"
            }
            async with session.get(POLLUTION_ENDPOINT, params=params) as response:
                if response.status != 200:
                    return {
                        'province': province,
                        'amphoe': amphoe,
                        'lat': lat,
                        'lon': lon,
                        'error': f"HTTP {response.status}"
                    }
                    data = await response.json()
                    dt = datetime.utcnow()
                    thai_tz = pytz.timezone('Asia/Bangkok')
                    localtime = dt.astimezone(thai_tz)
                    components = data['list'][0]['components']
                    pollution_dict = {
                        'timestamp': dt,
                        'year': dt.year,
                        'month': dt.month,
                        'day': dt.day,
                        'hour': dt.hour,
                        'minute': dt.minute,
                        'localtime': localtime,
                        'province' : province,
                        'amphoe' : amphoe,
                        'lat' : data['coord']['lat'],
                        'lon' : data['coord']['lon'],
                        # 'location': data['name'],
                        'aqi' : data['list'][0]['main']['aqi'],
                        'co' : components['co'],
                        'no' : components['no'],
                        'no2' : components['no2'],
                        'o3' : components['o3'],
                        'so2' : components['so2'],
                        'pm25' : components['pm2_5'],
                        'pm10' : components['pm10'],
                        'nh3' : components['nh3']
                    }
                    return pollution_dict

        except Exception as e:
            return {
                'province': row.get('province', 'unknown'),
                'amphoe': row.get('amphoe', 'unknown'),
                'lat': row.get('lat', None),
                'lon': row.get('lon', None),
                'error': str(e)
        }
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data: {e}")
            return None
        except KeyError as e:
            print(f"Error processing data: Missing key {e}")
            return None
        except Exception as e:
            print(f"Error: {e} at {province} - {amphoe}")
            return None



    pollution_results = []
    total_batches = ceil(len(coord_df) / batch_size)
    async with aiohttp.ClientSession() as session:
        for i in range(total_batches):
            batch = coord_df.iloc[i*batch_size:(i+1)*batch_size]
            tasks = [fetch_row(session, row) for _, row in batch.iterrows()]
            batch_results = await asyncio.gather(*tasks)
            pollution_results.extend(batch_results)

            print(f"✅ เสร็จ batch {i+1}/{total_batches}")
            if i < total_batches - 1:
                await asyncio.sleep(65)  # ✅ รอให้ผ่าน rate limit
                
    return pollution_results
            

        # Make API request
        # response = requests.get(POLLUTION_ENDPOINT, params=params)
        # response.raise_for_status()  # Raise an exception for bad status codes
        # data = response.json()
        # return data

# get_pollution_data()

# import time

# @flow(name="pollution-flow")
# async def pollution_flow():
#     start_time = time.perf_counter()  # ⏱ เริ่มจับเวลา

#     coord_df = pd.read_csv('./save/amphoe_coord.csv')
#     results = await fetch_pollution_data(coord_df)
#     pollution_data = pd.DataFrame(results)

#     end_time = time.perf_counter()  # ⏱ จับเวลาอีกครั้ง
#     elapsed = end_time - start_time
#     print(f"\n✅ ดึงข้อมูลเสร็จทั้งหมด ใช้เวลา {elapsed:.2f} วินาที")
#     return pollution_data

In [11]:
import time

@flow(name="pollution-flow")
async def pollution_flow():
    start_time = time.perf_counter()  # ⏱ เริ่มจับเวลา
    
    coord_df = pd.read_csv('./save/amphoe_coord.csv')
    pollution_results = await fetch_pollution_data(coord_df)
    pollution_data = pd.DataFrame(pollution_results)

    end_time = time.perf_counter()  # ⏱ จับเวลาอีกครั้ง
    print(f"\n✅ ดึงข้อมูลเสร็จทั้งหมด ใช้เวลา {end_time - start_time:.2f} วินาที")
    return pollution_data


# @flow(name="pollution-flow")
# async def pollution_flow():
#     start_time = time.perf_counter()
#     coord_df = pd.read_csv('./save/amphoe_coord.csv')
#     pollution_results = await fetch_pollution_data(coord_df)
    
#     # 👉 filter error ออกก่อนเก็บ
#     pollution_data = pd.DataFrame(pollution_results)
#     clean_data = pollution_data[pollution_data['error'].isna()].drop(columns=["error"], errors="ignore")

#     end_time = time.perf_counter()
#     print(f"\n✅ ดึงข้อมูลเสร็จทั้งหมด ใช้เวลา {end_time - start_time:.2f} วินาที")
#     return clean_data

In [17]:
pollution_data

NameError: name 'pollution_data' is not defined

In [12]:
# test-run pollution flow
import nest_asyncio
nest_asyncio.apply()

await pollution_flow()


✅ เสร็จ batch 1/19
✅ เสร็จ batch 2/19
✅ เสร็จ batch 3/19
✅ เสร็จ batch 4/19
✅ เสร็จ batch 5/19
✅ เสร็จ batch 6/19
✅ เสร็จ batch 7/19
✅ เสร็จ batch 8/19
✅ เสร็จ batch 9/19
✅ เสร็จ batch 10/19
✅ เสร็จ batch 11/19
✅ เสร็จ batch 12/19
✅ เสร็จ batch 13/19
✅ เสร็จ batch 14/19
✅ เสร็จ batch 15/19
✅ เสร็จ batch 16/19
✅ เสร็จ batch 17/19
✅ เสร็จ batch 18/19
✅ เสร็จ batch 19/19



✅ ดึงข้อมูลเสร็จทั้งหมด ใช้เวลา 1173.60 วินาที


Unnamed: 0,province,amphoe,lat,lon,error
0,กระบี่,เหนือคลอง,8.074722,99.003611,HTTP 429
1,กระบี่,ลำทับ,8.071667,99.291667,HTTP 429
2,กระบี่,ปลายพระยา,8.533333,98.862778,HTTP 429
3,กระบี่,เขาพนม,8.264722,99.049167,HTTP 429
4,กระบี่,อ่าวลึก,8.377500,98.721667,HTTP 429
...,...,...,...,...,...
920,แม่ฮ่องสอน,ปาย,19.358611,98.440000,HTTP 429
921,แม่ฮ่องสอน,แม่สะเรียง,18.159444,97.933611,HTTP 429
922,แม่ฮ่องสอน,แม่ลาน้อย,18.384444,97.936944,HTTP 429
923,แม่ฮ่องสอน,สบเมย,17.961944,97.933333,HTTP 429


In [None]:
 # lakeFS credentials from your docker-compose.yml
ACCESS_KEY = "access_key"
SECRET_KEY = "secret_key"
    
# lakeFS endpoint (running locally)
lakefs_endpoint = "http://lakefs-dev:8000/"
    
    # lakeFS repository, branch, and file path
repo = "pollution-data"
branch = "main"
path = "pollution.parquet"
    
    # Construct the full lakeFS S3-compatible path
lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"
    
    # Configure storage_options for lakeFS (S3-compatible)
storage_options = {
    "key": ACCESS_KEY,
    "secret": SECRET_KEY,
    "client_kwargs": {
        "endpoint_url": lakefs_endpoint
    }
    }
pollution_df.to_parquet(
    lakefs_s3_path,
    storage_options=storage_options,
    partition_cols=['year','month','day','hour'],
        
)

In [19]:
# กรองเอาเฉพาะข้อมูลที่ไม่มี key 'error'
clean_results = [r for r in results if 'error' not in r]
pollution_data = pd.DataFrame(clean_results)


NameError: name 'results' is not defined

In [None]:
errors = [r for r in results if 'error' in r]
pd.DataFrame(errors).to_csv("pollution_errors.csv", index=False)


In [None]:
# results = await fetch_pollution_data(coord_df)

# แยก clean กับ error ออก
clean_results = [r for r in results if 'error' not in r]
error_results = [r for r in results if 'error' in r]

# สร้าง DataFrame
clean_df = pd.DataFrame(clean_results)
error_df = pd.DataFrame(error_results)

# Save ไฟล์ clean ขึ้น LakeFS (หรือ local เตรียมอัป)
clean_df.to_parquet("pollution_data.parquet", index=False)

# เก็บ error ไว้ดูภายหลัง (optional)
if not error_df.empty:
    error_df.to_csv("pollution_errors.csv", index=False)


In [None]:
from prefect import flow, task

@task
async def fetch_pollution_data(...):
    ...

def clean_data(results):
    df = pd.DataFrame(results)
    df = df[df['error'].isna()]  # กรอง error ออก
    return df.drop(columns=['error'], errors='ignore')

def save_to_lakefs(df, path):
    df.to_parquet(path, index=False)

@flow
async def pollution_pipeline():
    raw_results = await fetch_pollution_data(...)
    clean_df = clean_data(raw_results)
    save_to_lakefs(clean_df, "s3://lakefs/bucket/pollution.parquet")
