<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## 🗒️ This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> 📝 Imports

In [31]:
import datetime
import time
import requests

import pandas as pd
import hopsworks
from functions import util
import json
import os
import warnings
warnings.filterwarnings("ignore")
import numpy as np
from scipy.stats import skew

## <span style='color:#ff5f27'> 🌍 Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [32]:
# If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
# os.environ["HOPSWORKS_API_KEY"] = ""

project = hopsworks.login()
fs = project.get_feature_store() 
secrets = util.secrets_api(project.name)

# This line will fail if you have not registered the AQI_API_KEY as a secret in Hopsworks
AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
location = json.loads(location_str)

country=location['country']
city=location['city']
street=location['street']
aqicn_url=location['aqicn_url']
latitude=location['latitude']
longitude=location['longitude']

today = datetime.date.today()

location_str

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1159320
Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.


'{"country": "indonesia", "city": "semarang", "street": "lapangan_simpang_lima", "aqicn_url": "https://api.waqi.info/feed/@13651", "latitude": -6.99, "longitude": 110.42}'

### <span style="color:#ff5f27;"> 🔮 Get references to the Feature Groups </span>

In [33]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality_improved',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_improved',
    version=1,
)

---

## <span style='color:#ff5f27'> 🌫 Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [34]:
import requests
import pandas as pd

aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQI_API_KEY)
aq_today_df

Unnamed: 0,pm25,country,city,street,date,url
0,75.0,indonesia,semarang,lapangan_simpang_lima,2024-11-20,https://api.waqi.info/feed/@13651


In [35]:
aq_today_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   pm25     1 non-null      float32       
 1   country  1 non-null      object        
 2   city     1 non-null      object        
 3   street   1 non-null      object        
 4   date     1 non-null      datetime64[ns]
 5   url      1 non-null      object        
dtypes: datetime64[ns](1), float32(1), object(4)
memory usage: 172.0+ bytes


## <span style='color:#ff5f27'> 🌦 Get Weather Forecast data</span>

In [36]:
hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
hourly_df = hourly_df.set_index('date')

# We will only make 1 daily prediction, so we will replace the hourly forecasts with a single daily forecast
# We only want the daily weather data, so only get weather at 12:00
daily_df = hourly_df.between_time('11:59', '12:01')
daily_df = daily_df.reset_index()
daily_df['date'] = pd.to_datetime(daily_df['date']).dt.date
daily_df['date'] = pd.to_datetime(daily_df['date'])
daily_df['city'] = city
daily_df

Coordinates -7.0°N 110.5°E
Elevation 8.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city
0,2024-11-20,26.15,2.6,11.119281,150.945496,semarang
1,2024-11-21,24.85,2.5,11.304229,127.234917,semarang
2,2024-11-22,25.85,0.1,7.993298,97.765083,semarang
3,2024-11-23,24.75,0.9,8.049845,153.435013,semarang
4,2024-11-24,25.25,1.7,8.647496,177.614105,semarang
5,2024-11-25,26.700001,0.3,6.36905,137.290634,semarang
6,2024-11-26,26.0,0.7,3.877318,158.198532,semarang
7,2024-11-27,26.049999,0.3,7.386582,136.97493,semarang
8,2024-11-28,26.799999,0.1,3.893995,146.309906,semarang
9,2024-11-29,25.4,0.9,2.545584,98.13002,semarang


In [37]:
daily_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         10 non-null     datetime64[ns]
 1   temperature_2m_mean          10 non-null     float32       
 2   precipitation_sum            10 non-null     float32       
 3   wind_speed_10m_max           10 non-null     float32       
 4   wind_direction_10m_dominant  10 non-null     float32       
 5   city                         10 non-null     object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 448.0+ bytes


## <span style="color:#ff5f27;">⬆️ Features engineering</span>

In [38]:
#combine df for features eng
aq_df = air_quality_fg.read(read_options={"use_hive": True})

# First standardize aq_df dates
aq_df['date'] = pd.to_datetime(aq_df['date']).dt.tz_localize(None)

# Standardize aq_today_df dates
aq_today_df['date'] = pd.to_datetime(aq_today_df['date']).dt.tz_localize(None)

missing_cols = list(set(aq_df.columns) - set(aq_today_df.columns))
for col in missing_cols:
    aq_today_df[col] = None

# Ensure same column order
aq_today_df = aq_today_df[aq_df.columns]

# Concatenate
combined_aq_df = pd.concat([aq_df, aq_today_df], axis=0, ignore_index=True)

# Verify the combined result
combined_aq_df

Finished: Reading data from Hopsworks, using Hive (2.43s) 


Unnamed: 0,date,pm25,country,city,street,url,pm25_rolling_mean_3d,pm25_rolling_std_3d,pm25_rolling_max_3d,pm25_rolling_mean_7d,...,pm25_ema_7d,pm25_trend_3d,pm25_volatility_3d,pm25_trend_7d,pm25_volatility_7d,pm25_trend_14d,pm25_volatility_14d,pm25_zscore,pm25_percentile,pm25_skew_7d
0,2023-05-31,105.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,108.666667,8.144528,118.0,111.571429,...,110.501333,-1.333333,0.074950,-5.928571,0.084374,16.678571,0.092975,0.720566,0.743028,-0.308756
1,2022-05-27,69.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,67.000000,7.211103,73.0,66.428571,...,67.243382,-2.333333,0.107628,-3.928571,0.153237,1.285714,0.209159,-0.943490,0.188247,-0.541525
2,2024-09-23,84.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,95.000000,11.000000,106.0,87.428571,...,90.384138,4.166667,0.115789,-1.642857,0.159975,-0.214286,0.139076,-0.250133,0.442231,-0.294547
3,2024-05-15,106.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,113.333333,10.214369,125.0,100.428571,...,106.031802,12.833333,0.090127,-8.357143,0.149123,14.571429,0.140902,0.766790,0.755976,0.218035
4,2022-04-14,98.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,104.666667,14.224392,121.0,92.000000,...,96.146481,10.166667,0.135902,2.785714,0.168858,-0.428571,0.146093,0.397000,0.662849,0.738984
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1000,2023-07-15,109.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,109.000000,11.000000,120.0,105.000000,...,104.872744,4.000000,0.100917,5.000000,0.139538,2.321429,0.138175,0.905461,0.793825,-1.130198
1001,2024-08-11,97.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,98.666667,1.527525,100.0,105.285714,...,101.264014,-7.666667,0.015482,6.142857,0.089219,-7.428571,0.186273,0.350776,0.646912,0.777546
1002,2024-05-11,86.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,88.666667,10.263203,100.0,106.000000,...,98.878288,-14.666667,0.115750,2.571429,0.169899,15.928571,0.165072,-0.157686,0.470618,-0.353569
1003,2023-04-13,61.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,72.000000,10.148892,81.0,80.142857,...,78.644753,-7.000000,0.140957,-5.571429,0.140511,-6.642857,0.226337,-1.313280,0.086155,-0.415397


In [39]:
#combine df for features eng
wt_df = weather_fg.read(read_options={"use_hive": True})

# First standardize aq_df dates
wt_df['date'] = pd.to_datetime(aq_df['date']).dt.tz_localize(None)

# Standardize aq_today_df dates
daily_df['date'] = pd.to_datetime(daily_df['date']).dt.tz_localize(None)

missing_cols = list(set(wt_df.columns) - set(daily_df.columns))
for col in missing_cols:
    daily_df[col] = None

# Ensure same column order
daily_df = daily_df[wt_df.columns]

# Concatenate
combined_wt_df = pd.concat([wt_df, daily_df], axis=0, ignore_index=True)

# Verify the combined result
combined_wt_df

Finished: Reading data from Hopsworks, using Hive (0.86s) 


Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city,temperature_2m_mean_rolling_mean_3d,precipitation_sum_rolling_mean_3d,wind_speed_10m_max_rolling_mean_3d,temp_wind_interaction,temp_precip_interaction,wind_direction_sin,wind_direction_cos,temp_wind_precip,high_temp_low_wind,wind_efficiency
0,2023-05-31,,,,,,,,,,,,,,,
1,2022-05-27,,,,,,,,,,,,,,,
2,2024-09-23,,,,,,,,,,,,,,,
3,2024-05-15,,,,,,,,,,,,,,,
4,2022-04-14,,,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1009,2024-11-25,26.700001,0.3,6.369050,137.290634,semarang,,,,,,,,,,
1010,2024-11-26,26.000000,0.7,3.877318,158.198532,semarang,,,,,,,,,,
1011,2024-11-27,26.049999,0.3,7.386582,136.974930,semarang,,,,,,,,,,
1012,2024-11-28,26.799999,0.1,3.893995,146.309906,semarang,,,,,,,,,,


In [40]:
def engineer_air_quality_features(df):
    # Make a copy to avoid modifying the original
    df = df.copy()
    
    # First handle the timezone issue before any operations
    print("Converting dates to timezone-naive format...")
    df['date'] = pd.to_datetime(df['date'])
    
    # Check for mixed timezone values
    tz_info = df['date'].apply(lambda x: x.tzinfo)
    if tz_info.nunique() > 1:
        print(f"Found mixed timezone values at positions: {tz_info[tz_info.notna()].index.tolist()}")
        # Force convert to timezone-naive
        df['date'] = df['date'].apply(lambda x: x.replace(tzinfo=None) if hasattr(x, 'tzinfo') and x.tzinfo is not None else x)
    
    # 1. Rolling statistics (3, 7 days)
    for window in [3, 7]:
        df[f'pm25_rolling_mean_{window}d'] = df['pm25'].rolling(window=window).mean()
        df[f'pm25_rolling_std_{window}d'] = df['pm25'].rolling(window=window).std()
        df[f'pm25_rolling_max_{window}d'] = df['pm25'].rolling(window=window).max()
        
    # 2. Lag features (1-3 days)
    for lag in range(1, 4):
        df[f'pm25_lag_{lag}d'] = df['pm25'].shift(lag)
    
    # 3. Rate of change
    df['pm25_diff_1d'] = df['pm25'].diff()
    df['pm25_pct_change'] = df['pm25'].pct_change()
    
    return df

# Feature engineering
daily_aq_eng = engineer_air_quality_features(combined_aq_df)
daily_aq_eng = daily_aq_eng.fillna(method='bfill').fillna(method='ffill')

# Get the latest row (which should be from aq_today_df)
latest_aq_df = daily_aq_eng.sort_values('date', ascending=True).iloc[[-1]]

latest_aq_df

Converting dates to timezone-naive format...


Unnamed: 0,date,pm25,country,city,street,url,pm25_rolling_mean_3d,pm25_rolling_std_3d,pm25_rolling_max_3d,pm25_rolling_mean_7d,...,pm25_ema_7d,pm25_trend_3d,pm25_volatility_3d,pm25_trend_7d,pm25_volatility_7d,pm25_trend_14d,pm25_volatility_14d,pm25_zscore,pm25_percentile,pm25_skew_7d
1004,2024-11-20,75.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,74.0,12.529964,86.0,87.142857,...,78.644753,-7.0,0.140957,-5.571429,0.140511,-6.642857,0.226337,-1.31328,0.086155,-0.415397


In [41]:
def engineer_weather_features(df):
    # 1. Rolling means for weather metrics
    weather_cols = ['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max']
    for col in weather_cols:
        df[f'{col}_rolling_mean_3d'] = df[col].rolling(window=3).mean()
    
    # 2. Interaction features
    df['temp_wind_interaction'] = df['temperature_2m_mean'] * df['wind_speed_10m_max']
    df['temp_precip_interaction'] = df['temperature_2m_mean'] * df['precipitation_sum']
    
    # 3. Cyclical encoding for wind direction
    df['wind_direction_sin'] = np.sin(df['wind_direction_10m_dominant'] * (2 * np.pi / 360))
    df['wind_direction_cos'] = np.cos(df['wind_direction_10m_dominant'] * (2 * np.pi / 360))
    
    return df


daily_wt_eng = engineer_weather_features(combined_wt_df)
daily_wt_eng = daily_wt_eng.fillna(method='bfill').fillna(method='ffill')

today_date = pd.to_datetime('today')

# Filter the rows from today till the latest date
filtered_wt_df = daily_wt_eng[daily_wt_eng['date'] >= today_date]

filtered_wt_df

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city,temperature_2m_mean_rolling_mean_3d,precipitation_sum_rolling_mean_3d,wind_speed_10m_max_rolling_mean_3d,temp_wind_interaction,temp_precip_interaction,wind_direction_sin,wind_direction_cos,temp_wind_precip,high_temp_low_wind,wind_efficiency
1005,2024-11-21,24.85,2.5,11.304229,127.234917,semarang,25.616667,1.733333,10.138936,280.910095,62.125,0.796161,-0.605084,,,
1006,2024-11-22,25.85,0.1,7.993298,97.765083,semarang,25.616667,1.733333,10.138936,206.62674,2.585,0.99083,-0.135112,,,
1007,2024-11-23,24.75,0.9,8.049845,153.435013,semarang,25.15,1.166667,9.11579,199.233658,22.275,0.447213,-0.894428,,,
1008,2024-11-24,25.25,1.7,8.647496,177.614105,semarang,25.283333,0.9,8.230213,218.349274,42.925003,0.04163,-0.999133,,,
1009,2024-11-25,26.700001,0.3,6.36905,137.290634,semarang,25.566667,0.966667,7.688797,170.053635,8.01,0.67828,-0.734804,,,
1010,2024-11-26,26.0,0.7,3.877318,158.198532,semarang,25.983334,0.9,6.297955,100.81028,18.199999,0.371392,-0.928476,,,
1011,2024-11-27,26.049999,0.3,7.386582,136.97493,semarang,26.25,0.433333,5.87765,192.420456,7.815,0.682318,-0.731055,,,
1012,2024-11-28,26.799999,0.1,3.893995,146.309906,semarang,26.283333,0.366667,5.052632,104.35907,2.68,0.554701,-0.83205,,,
1013,2024-11-29,25.4,0.9,2.545584,98.13002,semarang,26.083333,0.433333,4.60872,64.657837,22.859999,0.98995,-0.14142,,,


In [42]:
def add_temporal_features(df):
    # Convert date to datetime if not already
    df['date'] = pd.to_datetime(df['date'])
    
    # Time-based features
    df['month'] = df['date'].dt.month
    df['day_of_week'] = df['date'].dt.dayofweek
    df['is_weekend'] = df['date'].dt.dayofweek.isin([5,6]).astype(int)
    
    # Seasonal features using sine and cosine transforms
    df['month_sin'] = np.sin(2 * np.pi * df['month']/12)
    df['month_cos'] = np.cos(2 * np.pi * df['month']/12)
    
    return df

def add_advanced_rolling_features(df):
    # Exponential moving averages (gives more weight to recent values)
    df['pm25_ema_3d'] = df['pm25'].ewm(span=3).mean()
    df['pm25_ema_7d'] = df['pm25'].ewm(span=7).mean()
    
    # Rolling statistics with different windows
    windows = [3, 7, 14]
    for window in windows:
        # Trend indicators
        df[f'pm25_trend_{window}d'] = df['pm25'].rolling(window=window).mean() - \
                                     df['pm25'].rolling(window=window*2).mean()
        
        # Volatility
        df[f'pm25_volatility_{window}d'] = df['pm25'].rolling(window=window).std() / \
                                          df['pm25'].rolling(window=window).mean()
    
    return df

def add_weather_interactions(df):
    # More complex weather interactions
    df['temp_wind_precip'] = df['temperature_2m_mean'] * df['wind_speed_10m_max'] * df['precipitation_sum']
    
    # Threshold-based features
    df['high_temp_low_wind'] = ((df['temperature_2m_mean'] > df['temperature_2m_mean'].median()) & 
                               (df['wind_speed_10m_max'] < df['wind_speed_10m_max'].median())).astype(int)
    
    # Wind efficiency (how well wind might disperse pollution)
    df['wind_efficiency'] = df['wind_speed_10m_max'] / (df['temperature_2m_mean'] + 1)  # +1 to avoid division by zero
    
    return df

def add_statistical_features(df):
    # Z-score for identifying unusual days
    df['pm25_zscore'] = (df['pm25'] - df['pm25'].mean()) / df['pm25'].std()
    
    # Percentile ranks
    df['pm25_percentile'] = df['pm25'].rank(pct=True)
    
    # Rolling skewness (indicates asymmetry in distribution)
    df['pm25_skew_7d'] = df['pm25'].rolling(window=7).apply(lambda x: skew(x))
    
    return df

# After loading your dataframes but before creating feature groups
def engineer_all_features(air_quality_df, weather_df):
    # Engineer air quality features
    air_quality_df = add_temporal_features(air_quality_df)
    air_quality_df = add_advanced_rolling_features(air_quality_df)
    air_quality_df = add_statistical_features(air_quality_df)
    
    # Engineer weather features
    weather_df = add_weather_interactions(weather_df)
    
    # Handle missing values
    air_quality_df = air_quality_df.fillna(method='bfill').fillna(method='ffill')
    weather_df = weather_df.fillna(method='bfill').fillna(method='ffill')
    
    return air_quality_df, weather_df

# Apply feature engineering
daily_aq_eng, daily_wt_eng = engineer_all_features(daily_aq_eng, daily_wt_eng)

In [47]:
today_str = today.strftime('%Y-%m-%d')
today_aq_df = daily_aq_eng[daily_aq_eng['date'].dt.strftime('%Y-%m-%d') == today_str].iloc[[0]]


today_aq_df

Unnamed: 0,date,pm25,country,city,street,url,pm25_rolling_mean_3d,pm25_rolling_std_3d,pm25_rolling_max_3d,pm25_rolling_mean_7d,...,pm25_ema_7d,pm25_trend_3d,pm25_volatility_3d,pm25_trend_7d,pm25_volatility_7d,pm25_trend_14d,pm25_volatility_14d,pm25_zscore,pm25_percentile,pm25_skew_7d
1004,2024-11-20,75.0,indonesia,semarang,lapangan_simpang_lima,https://api.waqi.info/feed/@13651,74.0,12.529964,86.0,87.142857,...,84.542559,-18.833333,0.169324,-8.785714,0.308091,-0.928571,0.234294,-0.665669,0.286567,0.235102


In [44]:
today_wt_df = daily_wt_eng[daily_wt_eng['date'].dt.strftime('%Y-%m-%d') >= today_str]

today_wt_df

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city,temperature_2m_mean_rolling_mean_3d,precipitation_sum_rolling_mean_3d,wind_speed_10m_max_rolling_mean_3d,temp_wind_interaction,temp_precip_interaction,wind_direction_sin,wind_direction_cos,temp_wind_precip,high_temp_low_wind,wind_efficiency
1004,2024-11-20,26.15,2.6,11.119281,150.945496,semarang,25.616667,1.733333,10.138936,290.769196,67.989998,0.485641,-0.874158,755.999878,0,0.40955
1005,2024-11-21,24.85,2.5,11.304229,127.234917,semarang,25.616667,1.733333,10.138936,280.910095,62.125,0.796161,-0.605084,702.275269,0,0.437301
1006,2024-11-22,25.85,0.1,7.993298,97.765083,semarang,25.616667,1.733333,10.138936,206.62674,2.585,0.99083,-0.135112,20.662674,0,0.297702
1007,2024-11-23,24.75,0.9,8.049845,153.435013,semarang,25.15,1.166667,9.11579,199.233658,22.275,0.447213,-0.894428,179.310287,0,0.312615
1008,2024-11-24,25.25,1.7,8.647496,177.614105,semarang,25.283333,0.9,8.230213,218.349274,42.925003,0.04163,-0.999133,371.193787,0,0.329428
1009,2024-11-25,26.700001,0.3,6.36905,137.290634,semarang,25.566667,0.966667,7.688797,170.053635,8.01,0.67828,-0.734804,51.016094,1,0.22993
1010,2024-11-26,26.0,0.7,3.877318,158.198532,semarang,25.983334,0.9,6.297955,100.81028,18.199999,0.371392,-0.928476,70.567192,0,0.143604
1011,2024-11-27,26.049999,0.3,7.386582,136.97493,semarang,26.25,0.433333,5.87765,192.420456,7.815,0.682318,-0.731055,57.726139,0,0.273071
1012,2024-11-28,26.799999,0.1,3.893995,146.309906,semarang,26.283333,0.366667,5.052632,104.35907,2.68,0.554701,-0.83205,10.435907,1,0.140072
1013,2024-11-29,25.4,0.9,2.545584,98.13002,semarang,26.083333,0.433333,4.60872,64.657837,22.859999,0.98995,-0.14142,58.192051,0,0.096424


## <span style="color:#ff5f27;">⬆️ Uploading new data to the Feature Store</span>

In [48]:
for feature in air_quality_fg.features:
    if feature.name in today_aq_df.columns:
        if feature.type == 'float':
            today_aq_df[feature.name] = today_aq_df[feature.name].astype('float32')
        elif feature.type == 'double':
            today_aq_df[feature.name] = today_aq_df[feature.name].astype('float64')
        elif feature.type == 'int':
            today_aq_df[feature.name] = today_aq_df[feature.name].astype('int32')
        elif feature.type == 'bigint':
            today_aq_df[feature.name] = today_aq_df[feature.name].astype('int64')
        elif feature.type == 'string':
            today_aq_df[feature.name] = today_aq_df[feature.name].astype('str')

air_quality_fg.insert(today_aq_df)

2024-11-20 18:01:21,368 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1159320/fs/1150023/fg/1359195


Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_improved_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1159320/jobs/named/air_quality_improved_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x173639a7850>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 683023
         }
       },
       "result": {
         "observed_value": 75.0,
         "element_count": 1,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-11-20T05:01:21.000367Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 1,
     "unsu

In [46]:
# Create a mapping of Feature Store types to pandas dtypes
type_mapping = {
    'float': 'float32',
    'double': 'float64',
    'int': 'int32',
    'bigint': 'int64',
    'string': 'str'
}

# Convert types based on Feature Group schema
for feature in weather_fg.features:
    if feature.name in today_wt_df.columns:
        try:
            target_type = type_mapping.get(feature.type)
            if target_type:
                today_wt_df[feature.name] = today_wt_df[feature.name].astype(target_type)
        except (ValueError, TypeError) as e:
            print(f"Error converting {feature.name} to {feature.type}: {str(e)}")
            # Handle or raise the error as needed

# Add error handling for the insert operation
try:
    weather_fg.insert(today_wt_df)
except Exception as e:
    print(f"Error inserting data into Feature Group: {str(e)}")

2024-11-20 17:44:20,005 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1159320/fs/1150023/fg/1359196


Uploading Dataframe: 0.00% |          | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_improved_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1159320/jobs/named/weather_improved_1_offline_fg_materialization/executions


## <span style="color:#ff5f27;">⏭️ **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
