In [63]:
import asyncio
import aiohttp
import pandas as pd
from datetime import datetime
import pytz
import nest_asyncio

nest_asyncio.apply()

# Load districts from CSV
districts_df = pd.read_csv("districts.csv")  # ให้ใส่ไฟล์ไว้ใน working directory
districts = districts_df.set_index("district_id")[["district_en", "province_en", "lat", "lon"]].T.to_dict()

# API Info
WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
POLLUTION_ENDPOINT = "http://api.openweathermap.org/data/2.5/air_pollution"
API_KEY = "f937ef58aa2555b6d76a1119fd917eed"  # หรือโหลดจาก .env

# Async fetch
async def fetch_data(session, url, params):
    try:
        async with session.get(url, params=params) as response:
            response.raise_for_status()
            return await response.json()
    except aiohttp.ClientError as e:
        print(f"Request failed: {e}")
        return None

# One district
async def get_district_data(session, district, info):
    await asyncio.sleep(2)  # ✅ Delay เพื่อเลี่ยง API rate limit
    params = {"lat": info["lat"], "lon": info["lon"], "appid": API_KEY, "units": "metric"}

    weather_data, pollution_data = await asyncio.gather(
        fetch_data(session, WEATHER_ENDPOINT, params),
        fetch_data(session, POLLUTION_ENDPOINT, params)
    )

    if weather_data is None or pollution_data is None:
        return None

    timestamp = datetime.now()
    thai_tz = pytz.timezone('Asia/Bangkok')
    created_at = timestamp.replace(tzinfo=thai_tz)

    return {
        'timestamp': timestamp,
        'year': timestamp.year,
        'month': timestamp.month,
        'day': timestamp.day,
        'hour': timestamp.hour,
        'minute': timestamp.minute,
        #'district_id': info['district_id'],
        'district': info['district_en'],
        'province': info['province_en'],
        'location': weather_data.get('name', district),
        'weather_main': weather_data['weather'][0]['main'],
        'weather_description': weather_data['weather'][0]['description'],
        'main.temp': weather_data['main']['temp'],
        'main.temp_min': weather_data['main']['temp_min'],
        'main.temp_max': weather_data['main']['temp_max'],
        'main.feels_like': weather_data['main']['feels_like'],
        'main.pressure': weather_data['main']['pressure'],
        'main.humidity': weather_data['main']['humidity'],
        'visibility': weather_data.get('visibility', None),
        'wind.speed': weather_data['wind']['speed'],
        'wind.deg': weather_data['wind']['deg'],
        'components_co': pollution_data['list'][0]['components']['co'],
        'components_no': pollution_data['list'][0]['components']['no'],
        'components_no2': pollution_data['list'][0]['components']['no2'],
        'components_o3': pollution_data['list'][0]['components']['o3'],
        'components_so2': pollution_data['list'][0]['components']['so2'],
        'components_pm2_5': pollution_data['list'][0]['components']['pm2_5'],
        'components_pm10': pollution_data['list'][0]['components']['pm10'],
        'components_nh3': pollution_data['list'][0]['components']['nh3']
    }

# Main loop
async def get_all_districts_data():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for district, info in districts.items():
            tasks.append(get_district_data(session, district, info))
        return [r for r in await asyncio.gather(*tasks) if r is not None]

# Run and convert to DataFrame
all_data = await get_all_districts_data()
df = pd.DataFrame(all_data)
print(df)


Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/weather?lat=13.7263&lon=100.5279&appid=69211a02be6d9cb00f1d596960cc5336&units=metric'
Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/weather?lat=13.8271&lon=100.6743&appid=69211a02be6d9cb00f1d596960cc5336&units=metric'
Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/weather?lat=13.6982&lon=100.5048&appid=69211a02be6d9cb00f1d596960cc5336&units=metric'
Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/weather?lat=13.9047&lon=100.6572&appid=69211a02be6d9cb00f1d596960cc5336&units=metric'
Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/weather?lat=13.6592&lon=100.3991&appid=69211a02be6d9cb00f1d596960cc5336&units=metric'
Request failed: 429, message='Too Many Requests', url='https://api.openweathermap.org/data/2.5/

In [57]:
df.info()
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 923 entries, 0 to 922
Data columns (total 28 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   timestamp            923 non-null    datetime64[ns]
 1   year                 923 non-null    int64         
 2   month                923 non-null    int64         
 3   day                  923 non-null    int64         
 4   hour                 923 non-null    int64         
 5   minute               923 non-null    int64         
 6   district             923 non-null    object        
 7   province             923 non-null    object        
 8   location             923 non-null    object        
 9   weather_main         923 non-null    object        
 10  weather_description  923 non-null    object        
 11  main.temp            923 non-null    float64       
 12  main.temp_min        923 non-null    float64       
 13  main.temp_max        923 non-null  

Unnamed: 0,timestamp,year,month,day,hour,minute,district,province,location,weather_main,...,wind.speed,wind.deg,components_co,components_no,components_no2,components_o3,components_so2,components_pm2_5,components_pm10,components_nh3
0,2025-05-11 19:51:20.634164,2025,5,11,19,51,Nuea Khlong,Krabi,Nuea Khlong,Clouds,...,1.54,100,104.21,0.0,0.33,44.7,0.11,1.64,4.63,0.0
1,2025-05-11 19:51:20.475642,2025,5,11,19,51,Lam Thap,Krabi,Lam Thap,Clouds,...,0.97,276,105.36,0.0,0.31,45.01,0.09,1.39,3.75,0.0
2,2025-05-11 19:51:20.461571,2025,5,11,19,51,Plai Phraya,Krabi,Plai Phraya,Clouds,...,0.81,260,109.32,0.0,0.38,47.76,0.12,1.47,4.23,0.02
3,2025-05-11 19:51:20.632206,2025,5,11,19,51,Khao Phanom,Krabi,Khao Phanom,Clouds,...,1.54,100,107.02,0.0,0.32,47.44,0.11,1.62,4.77,0.0
4,2025-05-11 19:51:20.475165,2025,5,11,19,51,Ao Luek,Krabi,Ao Luek,Clouds,...,0.97,304,105.4,0.0,0.35,46.37,0.12,1.73,5.15,0.0


In [58]:
dt = datetime.now()
thai_tz = pytz.timezone('Asia/Bangkok')
dt = dt.replace(tzinfo=thai_tz)
print(dt) 

2025-05-11 19:51:35.409378+06:42


In [59]:
import pandas as pd

# lakeFS credentials from your docker-compose.yml
ACCESS_KEY = "access_key"
SECRET_KEY = "secret_key"

# lakeFS endpoint (running locally)
lakefs_endpoint = "http://lakefs-dev:8000/"

# lakeFS repository, branch, and file path
repo = "weather"
branch = "main"
path = "weather.parquet"

# Construct the full lakeFS S3-compatible path
lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"

# Configure storage_options for lakeFS (S3-compatible)
storage_options = {
    "key": ACCESS_KEY,
    "secret": SECRET_KEY,
    "client_kwargs": {
        "endpoint_url": lakefs_endpoint
    }
}

In [60]:
df.to_parquet(
    lakefs_s3_path,
    storage_options=storage_options,
    partition_cols=['year','month','day','hour'],
    
)

In [61]:
from datetime import datetime
import pytz

dt = datetime.now()
thai_tz = pytz.timezone('Asia/Bangkok')
dt = dt.replace(tzinfo=thai_tz)

In [62]:
print(dt)

2025-05-11 19:51:38.543886+06:42
