### Task

In [7]:
# from dotenv import load_dotenv
# import os

# load_dotenv()
# API_KEY = os.getenv("OPENWEATHER_API_KEY")
# API_KEY

None


In [7]:
#### TEST
import requests
import pandas as pd
from datetime import datetime
import time
import pytz
from datetime import timedelta

import nest_asyncio
import asyncio
import aiohttp
from prefect import flow, task # Prefect flow and task decorators
from math import ceil

from dotenv import load_dotenv
import os

nest_asyncio.apply()

load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")
lakefs_endpoint = os.getenv("LAKEFS_ENDPOINT", "http://lakefs-dev:8000")

@task
async def fetch_pollution_data(df_sample, dt, localtime, batch_size=300):  #sample
    # API_KEY = "13fd31c548dbeae77a5c0d773a99df79"
    # API_KEY = os.getenv("OPENWEATHER_API_KEY")
    # WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
    POLLUTION_ENDPOINT = "http://api.openweathermap.org/data/2.5/air_pollution"

    # dt = datetime.utcnow()
    # thai_tz = pytz.timezone('Asia/Bangkok')
    # localtime = dt.astimezone(thai_tz)
    
    async def fetch_row(session, row):
        # await asyncio.sleep(1)  #พัก 1 วิแบบไม่บล็อก loop
        lat = row['lat']
        lon = row['lon']
        province = row['province_en']
        district = row['district_en']
        district_id = row['district_id']
        try:
            params = {
                "lat" : lat,
                "lon" : lon,
                "appid": API_KEY,
                "units": "metric"
            }
            
            
            async with session.get(POLLUTION_ENDPOINT, params=params) as response:
                # if response.status != 200:
                #     return {
                #         'province': province,
                #         'amphoe': amphoe,
                #         'lat': lat,
                #         'lon': lon,
                #         'error': f"HTTP {response.status}"
                #     }
                data = await response.json()
               
                components = data['list'][0]['components']
                pollution_dict = {
                    'timestamp': dt,
                    'year': dt.year,
                    'month': dt.month,
                    'day': dt.day,
                    'hour': dt.hour,
                    'minute': dt.minute,
                    'localtime': localtime,
                    'province' : province,
                    'district' : district,
                    'district_id' : district_id,
                    'lat' : data['coord']['lat'],
                    'lon' : data['coord']['lon'],
                    # 'location': data['name'],
                    'main.aqi' : data['list'][0]['main']['aqi'],
                    'components_co' : components['co'],
                    'components_no' : components['no'],
                    'components_no2' : components['no2'],
                    'components_o3' : components['o3'],
                    'components_so2' : components['so2'],
                    'components_pm2_5' : components['pm2_5'],
                    'components_pm10' : components['pm10'],
                    'components_nh3' : components['nh3']
                    }
                return pollution_dict

        # # Check Error
        # except Exception as e:
        #     return {
        #         'province': row.get('province', 'unknown'),
        #         'amphoe': row.get('amphoe', 'unknown'),
        #         'lat': row.get('lat', None),
        #         'lon': row.get('lon', None),
        #         'error': str(e)
        # }
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data: {e}")
            return None
        except KeyError as e:
            print(f"Error processing data: Missing key {e}")
            return None
        except Exception as e:
            print(f"Error: {e} at {province} - {district}")
            return None



    pollution_results = []
    total_batches = ceil(len(df_sample) / batch_size)  #sample
    async with aiohttp.ClientSession() as session:
        for i in range(total_batches):
            batch = df_sample.iloc[i*batch_size:(i+1)*batch_size]
            tasks = [fetch_row(session, row) for _, row in batch.iterrows()]
            batch_results = await asyncio.gather(*tasks)
            pollution_results.extend(batch_results)

            print(f"✅ เสร็จ batch {i+1}/{total_batches}")
            if i < total_batches - 1:
                await asyncio.sleep(65)  # ✅ รอให้ผ่าน rate limit

    return pollution_results



@task
async def fetch_weather_data(df_sample, dt, localtime, batch_size=300):  #sample
    # API_KEY = "13fd31c548dbeae77a5c0d773a99df79"
    # API_KEY = os.getenv("OPENWEATHER_API_KEY")
    WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
    # POLLUTION_ENDPOINT = "http://api.openweathermap.org/data/2.5/air_pollution"
    
    # dt = datetime.utcnow()
    # thai_tz = pytz.timezone('Asia/Bangkok')
    # localtime = dt.astimezone(thai_tz)

    
    async def fetch_row(session, row):
        # await asyncio.sleep(1)  #พัก 1 วิแบบไม่บล็อก loop
        lat = row['lat']
        lon = row['lon']
        province = row['province_en']
        district = row['district_en']
        district_id = row['district_id']
        try:
            params = {
                "lat" : lat,
                "lon" : lon,
                "appid": API_KEY,
                "units": "metric"
            }
            async with session.get(WEATHER_ENDPOINT, params=params) as response:
                # if response.status != 200:
                #     return {
                #         'province': province,
                #         'amphoe': amphoe,
                #         'lat': lat,
                #         'lon': lon,
                #         'error': f"HTTP {response.status}"
                #     }
                data = await response.json()

                
                weather_dict = {
                    'timestamp': dt,
                    'year': dt.year,
                    'month': dt.month,
                    'day': dt.day,
                    'hour': dt.hour,
                    'minute': dt.minute,
                    'localtime': localtime,
                    'province' : province,
                    'district' : district,
                    'district_id' : district_id,
                    'lat' : data['coord']['lat'],
                    'lon' : data['coord']['lon'], 
                    # "location": weather_data.get("name", district),
                    'weather_main' : data["weather"][0]["main"],
                    'weather_description' : data["weather"][0]["description"],
                    'main.temp' : data["main"]["temp"],
                    'main.temp_min' : data["main"]["temp_min"],
                    'main.temp_max' : data["main"]["temp_max"],
                    'main.feels_like' : data["main"]["feels_like"],
                    'main.pressure' : data["main"]["pressure"],
                    'main.humidity' : data["main"]["humidity"],
                    'visibility' : data.get("visibility"),
                    'wind.speed' : data["wind"]["speed"],
                    'wind.deg' : data["wind"]["deg"]
                    }
                return weather_dict
       
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data: {e}")
            return None
        except KeyError as e:
            print(f"Error processing data: Missing key {e}")
            return None
        except Exception as e:
            print(f"Error: {e} at {province} - {district}")
            return None



    weather_results = []
    total_batches = ceil(len(df_sample) / batch_size)  #sample
    async with aiohttp.ClientSession() as session:
        for i in range(total_batches):
            batch = df_sample.iloc[i*batch_size:(i+1)*batch_size]
            tasks = [fetch_row(session, row) for _, row in batch.iterrows()]
            batch_results = await asyncio.gather(*tasks)
            weather_results.extend(batch_results)

            print(f"✅ เสร็จ batch {i+1}/{total_batches}")
            if i < total_batches - 1:
                await asyncio.sleep(65)  # ✅ รอให้ผ่าน rate limit

    return weather_results



def clean_data(df):
    df = pd.DataFrame(df)
    # if 'error' in df.columns:
    #     # df = df[df['error'].isna()]  # กรอง row ที่มี error ออก
    #     df = df.drop(columns=['error'], errors='ignore')  # ลบคอลัมน์ error
    df['province'] = df['province'].astype("string")
    df['district'] = df['district'].astype("string")
    return df


def save_to_lakefs_pollution(df):
    # ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
    # SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")
    # lakefs_endpoint = os.getenv("LAKEFS_ENDPOINT", "http://lakefs-dev:8000")
    # ACCESS_KEY = "access_key"
    # SECRET_KEY = "secret_key"
    # lakefs_endpoint = "http://lakefs-dev:8000/"

    repo = "pollution-data"
    branch = "main"
    path = "pollution.parquet"
    
    lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"

    storage_options = {
        "key": ACCESS_KEY,
        "secret": SECRET_KEY,
        "client_kwargs": {"endpoint_url": lakefs_endpoint}
    }

    df.to_parquet(
        lakefs_s3_path,
        storage_options=storage_options,
        partition_cols=["year", "month", "day", "hour"],
    )


def save_to_lakefs_weather(df):
    # ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
    # SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")
    # lakefs_endpoint = os.getenv("LAKEFS_ENDPOINT", "http://lakefs-dev:8000")
    # ACCESS_KEY = "access_key"
    # SECRET_KEY = "secret_key"
    # lakefs_endpoint = "http://lakefs-dev:8000/"

    repo = "weather-data"
    branch = "main"
    path = "weather.parquet"
    
    lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"

    storage_options = {
        "key": ACCESS_KEY,
        "secret": SECRET_KEY,
        "client_kwargs": {"endpoint_url": lakefs_endpoint}
    }

    df.to_parquet(
        lakefs_s3_path,
        storage_options=storage_options,
        partition_cols=["year", "month", "day", "hour"],
    )



### FLOW

In [8]:
import time
import os
import pandas as pd

@flow(name="fetch_data_flow", log_prints=True)
async def fetch_data_flow():
    start_time = time.perf_counter()  # ⏱ เริ่มจับเวลา

    dt = datetime.utcnow()
    thai_tz = pytz.timezone('Asia/Bangkok')
    localtime = dt.astimezone(thai_tz)
    
    BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))  #/home/jovyan/work
    coord_path = os.path.join(BASE_DIR, "save", "district_coord.csv")
    coord_df = pd.read_csv(coord_path)
    df_sample = coord_df.sample(10)
    
    # pollution_results = await fetch_pollution_data(coord_df)
    pollution_results = await fetch_pollution_data(df_sample, dt, localtime)
    weather_results = await fetch_weather_data(df_sample, dt, localtime)
    # weather_data = pd.DataFrame(weather_results)
    # print(weather_data)
    # #Check Error
    # df_raw = pd.DataFrame(pollution_results)
    # if 'error' in df_raw.columns:
    #     error_df = df_raw[df_raw['error'].notna()]
    #     print("\n❗ Error Rows:")
    #     print(error_df[['province', 'amphoe', 'error']].head())
    
    pollution_data = clean_data(pollution_results)
    weather_data = clean_data(weather_results)
    
    end_time = time.perf_counter()  # ⏱ จับเวลาอีกครั้ง
    print(f"\n✅ ดึงข้อมูลเสร็จทั้งหมด ใช้เวลา {end_time - start_time:.2f} วินาที")
    save_to_lakefs_pollution(pollution_data)
    save_to_lakefs_weather(weather_data)
    # print("save to Lakefs Success")
    # print(pollution_data.head())
    # print(weather_data.head())
    # return pollution_data


In [9]:
# #Check path + Sample
# BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))  #/home/jovyan/work
# coord_path = os.path.join(BASE_DIR, "save", "district_coord.csv")
# coord_df = pd.read_csv(coord_path)
# df_sample = coord_df.sample(10)
# df_sample

Unnamed: 0,province_id,province,provinceEN,amphoe_id,amphoe,amphoeEN,lat,lon
77,46,กาฬสินธุ์,Kalasin,4614,ห้วยผึ้ง,Huai Phueng,16.6521,103.8912
262,30,นครราชสีมา,Nakhon Ratchasima,3004,คง,Khong,15.422308,102.41901
626,90,สงขลา,Songkhla,9005,เทพา,Thepha,6.79726,100.91025
183,86,ชุมพร,Chumphon,8606,พะโต๊ะ,Phato,9.791667,98.776667
456,66,พิจิตร,Phichit,6605,บางมูลนาก,Bang Mun Nak,16.0214,100.4172
57,10,กรุงเทพมหานคร,Bangkok,1012,ยานนาวา,Yan Nawa,13.6943,100.5394
518,21,ระยอง,Rayong,2108,นิคมพัฒนา,Nikhom Phatthana,14.90917,101.02673
209,63,ตาก,Tak,6303,สามเงา,Sam Ngao,17.24331,99.02256
329,55,น่าน,Nan,5505,ปัว,Pua,19.175,100.916
59,71,กาญจนบุรี,Kanchanaburi,7101,เมืองกาญจนบุรี,Mueang Kanchanaburi,14.06987,99.32769


### TEST

In [9]:
# test-run main flow
import nest_asyncio
nest_asyncio.apply()

await main_flow()


In [None]:
def clean_data(results):
    # pollution_result = pd.DataFrame(results)
    pollution_result = pollution_result[pollution_result['error'].isna()]  # กรอง error ออก
    return pollution_result.drop(columns=['error'], errors='ignore')

# แยก clean กับ error ออก
clean_results = [r for r in results if 'error' not in r]
error_results = [r for r in results if 'error' in r]

# สร้าง DataFrame
clean_df = pd.DataFrame(clean_results)
error_df = pd.DataFrame(error_results)

# Save ไฟล์ clean ขึ้น LakeFS (หรือ local เตรียมอัป)
clean_df.to_parquet("pollution_data.parquet", index=False)

# เก็บ error ไว้ดูภายหลัง (optional)
if not error_df.empty:
    error_df.to_csv("pollution_errors.csv", index=False)



In [None]:
@task
async def fetch_weather_data(lat, lon, timestamp):
    # ใช้ timestamp เดียวกันที่ส่งเข้ามา
    ...
    return {
        "timestamp": timestamp,
        "temperature": ...,
        "humidity": ...,
        ...
    }

@task
async def fetch_pollution_data(lat, lon, timestamp):
    ...
    return {
        "timestamp": timestamp,
        "pm25": ...,
        "pm10": ...,
        ...
    }


In [None]:
@flow
async def combined_flow():
    dt = datetime.utcnow()
    # หรือ localtime ถ้าจะทำให้ consistent ตามประเทศไทย
    timestamp = dt.replace(second=0, microsecond=0)  

    weather_data = await fetch_weather_data(lat, lon, timestamp)
    pollution_data = await fetch_pollution_data(lat, lon, timestamp)

    # รวมข้อมูลจากทั้ง 2 API
    combined = {**weather_data, **pollution_data}
    df = pd.DataFrame([combined])
    return df
