In [1]:
pip install pandas pyarrow fsspec s3fs

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m25.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import requests
import pandas as pd
from datetime import datetime
import pytz

# API endpoint and parameters
WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = "a8830679af88ae345ed7fb6aac741e34"  # Replace with your actual API key


provinces = {
    "Pathum Thani":{
        "lat": 14.0134,
        "lon": 100.5304
    },
    "Bangkok":{
            "lat": 13.7367,
            "lon": 100.5232
    },
    "Chiang Mai":{
        "lat": 18.7883,
        "lon": 98.9853
    },
    "Phuket":{
        "lat": 7.9519,
        "lon": 98.3381
    }
}
# Function to fetch and process weather data
def get_weather_data(province='Pathum Thani'):
    
    params = {
        "lat": provinces[province]['lat'],
        "lon": provinces[province]['lon'],
        "appid": API_KEY,
        "units": "metric"
    }
    try:
        # Make API request
        response = requests.get(WEATHER_ENDPOINT, params=params)
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        
        # Convert timestamp to datetime
        # created_at = datetime.fromtimestamp(data['dt'])

        dt = datetime.now()
        thai_tz = pytz.timezone('Asia/Bangkok')
        created_at = dt.replace(tzinfo=thai_tz)


        timestamp = datetime.now()
        
        # Create dictionary with required fields
        weather_dict = {
            'timestamp': timestamp,
            'year': timestamp.year,
            'month': timestamp.month,
            'day': timestamp.day,
            'hour': timestamp.hour,
            'minute': timestamp.minute,
            'created_at': created_at,
            'requested_province':province,
            'location': data['name'],
            'weather_main': data['weather'][0]['main'],
            'weather_description': data['weather'][0]['description'],
            'main.temp': data['main']['temp']
        }
        
        # Create DataFrame
        # df = pd.DataFrame([weather_dict])
        
        # return df
        return weather_dict
    
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None
    except KeyError as e:
        print(f"Error processing data: Missing key {e}")
        return None

In [3]:
df=pd.DataFrame([get_weather_data(p) for p in list(provinces.keys())])
df.info()
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 12 columns):
 #   Column               Non-Null Count  Dtype                       
---  ------               --------------  -----                       
 0   timestamp            4 non-null      datetime64[ns]              
 1   year                 4 non-null      int64                       
 2   month                4 non-null      int64                       
 3   day                  4 non-null      int64                       
 4   hour                 4 non-null      int64                       
 5   minute               4 non-null      int64                       
 6   created_at           4 non-null      datetime64[ns, Asia/Bangkok]
 7   requested_province   4 non-null      object                      
 8   location             4 non-null      object                      
 9   weather_main         4 non-null      object                      
 10  weather_description  4 non-null      objec

Unnamed: 0,timestamp,year,month,day,hour,minute,created_at,requested_province,location,weather_main,weather_description,main.temp
0,2025-05-02 07:31:34.207809,2025,5,2,7,31,2025-05-02 07:49:34.201424+07:00,Pathum Thani,Pathum Thani,Clouds,few clouds,32.36
1,2025-05-02 07:31:34.683904,2025,5,2,7,31,2025-05-02 07:49:34.683889+07:00,Bangkok,Bangkok,Rain,light rain,34.44
2,2025-05-02 07:31:34.782698,2025,5,2,7,31,2025-05-02 07:49:34.782686+07:00,Chiang Mai,Chiang Mai,Clouds,scattered clouds,34.22
3,2025-05-02 07:31:34.892172,2025,5,2,7,31,2025-05-02 07:49:34.892161+07:00,Phuket,Kathu,Clouds,few clouds,30.92


In [4]:

dt = datetime.now()
thai_tz = pytz.timezone('Asia/Bangkok')
dt = dt.replace(tzinfo=thai_tz)
print(dt) 

2025-05-02 07:31:35.485335+06:42


In [5]:
import pandas as pd

# lakeFS credentials from your docker-compose.yml
ACCESS_KEY = "access_key"
SECRET_KEY = "secret_key"

# lakeFS endpoint (running locally)
lakefs_endpoint = "http://lakefs-dev:8000/"

# lakeFS repository, branch, and file path
repo = "weather"
branch = "main"
path = "weather.parquet"

# Construct the full lakeFS S3-compatible path
lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"

# Configure storage_options for lakeFS (S3-compatible)
storage_options = {
    "key": ACCESS_KEY,
    "secret": SECRET_KEY,
    "client_kwargs": {
        "endpoint_url": lakefs_endpoint
    }
}

In [6]:
df.to_parquet(
    lakefs_s3_path,
    storage_options=storage_options,
    partition_cols=['year','month','day','hour'],
    
)

# test read parquet files

In [7]:
path_all_partition = 's3a://weather3/main/weather.parquet'

df2=pd.read_parquet(    
    path=path_all_partition,
    storage_options=storage_options
)
df2.info()
df2.head()

FileNotFoundError: weather3/main/weather.parquet

In [None]:
path_single_partition = 's3a://weather3/main/weather.parquet/year=2025/month=4/day=10/hour=7/70cf855c5c5e4659ae01aba885c731c3-0.parquet'

df2=pd.read_parquet(    
    path=path_single_partition,
    storage_options=storage_options,
)
df2.info()
df2.head()

In [8]:
print(dt)

2025-05-02 07:31:35.485335+06:42


# Test Duck and Dask

In [9]:
# pip install duckdb
import duckdb
import pandas as pd

# Connect to an in-memory DuckDB instance.
con = duckdb.connect(database=':memory:')

storage_options = {
    "key": ACCESS_KEY,
    "secret": SECRET_KEY,
    "client_kwargs": {
        "endpoint_url": lakefs_endpoint
    }
}

# Use DuckDB's read_parquet() function to read the dataset.
# Assume the dataset is stored in the directory "output_parquet_dataset" with hive partitions:
# output_parquet_dataset/year=2025/month=02/day=17/...
query = """
INSTALL httpfs;
LOAD httpfs;

SET s3_endpoint='lakefs-dev:8000';
SET s3_access_key_id='access_key'; 
SET s3_secret_access_key='secret_key'; 
SET s3_url_style='path';
SET s3_use_ssl=false;

SELECT *
FROM read_parquet('s3a://weather3/main/weather.parquet')
"""

df_duck = con.execute(query).df()

print("DuckDB Parquet Query Result:")
df_duck.info()
df_duck.head(20)

ModuleNotFoundError: No module named 'duckdb'

In [10]:
# pip install dask
import dask.dataframe as dd
df2 = dd.read_parquet(
    path=path_all_partition,
    storage_options=storage_options,
    dtype_backend='pyarrow'
)  

ModuleNotFoundError: No module named 'dask'