<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill</span>


## 🗒️ This notebook is divided into the following sections:
1. Fetch historical data
2. Connect to the Hopsworks feature store
3. Create feature groups and insert them to the feature store



### <span style='color:#ff5f27'> 📝 Imports

In [1]:
import datetime
import requests
import pandas as pd
import hopsworks
import warnings
import datetime
warnings.filterwarnings("ignore")

---

## <span style='color:#ff5f27'> 🌍 Representing the Target cities </span>

In [2]:
country="sweden"
city="stockholm"
street="hornsgatan-108"
today = datetime.date.today()

df = pd.read_csv("../../data/stockholm-air-quality.csv",  parse_dates=['date'], skipinitialspace=True)
df

Unnamed: 0,date,pm25,pm10,no2
0,2024-02-01,22.0,33.0,10.0
1,2024-02-02,22.0,19.0,6.0
2,2024-02-03,12.0,25.0,8.0
3,2024-02-04,17.0,34.0,11.0
4,2024-02-05,20.0,42.0,14.0
...,...,...,...,...
2286,2017-10-24,,,5.0
2287,2017-10-25,,,10.0
2288,2017-10-26,,,14.0
2289,2017-10-27,,,9.0


In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2291 entries, 0 to 2290
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    2291 non-null   datetime64[ns]
 1   pm25    2254 non-null   float64       
 2   pm10    2254 non-null   float64       
 3   no2     2265 non-null   float64       
dtypes: datetime64[ns](1), float64(3)
memory usage: 71.7 KB


In [4]:
from functions import *

df2 = df[['date', 'pm25']]
df2['city']=city
df2['street']=street

latitude, longitude = get_city_coordinates(city)

In [5]:
df2['pm25'] = df2['pm25'].astype('float32')
df2["pm25_yesterday"] = df2["pm25"].shift(1)
df2

Unnamed: 0,date,pm25,city,street,pm25_yesterday
0,2024-02-01,22.0,stockholm,hornsgatan-108,
1,2024-02-02,22.0,stockholm,hornsgatan-108,22.0
2,2024-02-03,12.0,stockholm,hornsgatan-108,22.0
3,2024-02-04,17.0,stockholm,hornsgatan-108,12.0
4,2024-02-05,20.0,stockholm,hornsgatan-108,17.0
...,...,...,...,...,...
2286,2017-10-24,,stockholm,hornsgatan-108,
2287,2017-10-25,,stockholm,hornsgatan-108,
2288,2017-10-26,,stockholm,hornsgatan-108,
2289,2017-10-27,,stockholm,hornsgatan-108,


In [6]:
df2.dropna(inplace=True)
df2

Unnamed: 0,date,pm25,city,street,pm25_yesterday
1,2024-02-02,22.0,stockholm,hornsgatan-108,22.0
2,2024-02-03,12.0,stockholm,hornsgatan-108,22.0
3,2024-02-04,17.0,stockholm,hornsgatan-108,12.0
4,2024-02-05,20.0,stockholm,hornsgatan-108,17.0
5,2024-02-06,41.0,stockholm,hornsgatan-108,20.0
...,...,...,...,...,...
2249,2017-12-26,16.0,stockholm,hornsgatan-108,14.0
2250,2017-12-27,10.0,stockholm,hornsgatan-108,16.0
2251,2017-12-28,55.0,stockholm,hornsgatan-108,10.0
2252,2017-12-29,42.0,stockholm,hornsgatan-108,55.0


In [7]:
earliest_aq_date = pd.Series.min(df2['date'])

In [8]:
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

'2017-10-04'

In [9]:
df_air_quality=df2
df_air_quality.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2253 entries, 1 to 2253
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   date            2253 non-null   datetime64[ns]
 1   pm25            2253 non-null   float32       
 2   city            2253 non-null   object        
 3   street          2253 non-null   object        
 4   pm25_yesterday  2253 non-null   float32       
dtypes: datetime64[ns](1), float32(2), object(2)
memory usage: 88.0+ KB


### <span style="color:#ff5f27;">🛠 Feature Engineering</span>

In [10]:
# Print the shape (number of rows and columns) of the df_air_quality DataFrame
df_air_quality.shape

(2253, 5)

In [11]:
# Retrieve and display the column names of the df_air_quality DataFrame
df_air_quality.columns

Index(['date', 'pm25', 'city', 'street', 'pm25_yesterday'], dtype='object')

---

## <span style='color:#ff5f27'> 🌦 Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## Instructions for weather
https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

In [12]:
weather_df = get_historical_weather(city, earliest_aq_date, str(today))

Coordinates 59.29701232910156°N 18.163265228271484°E
Elevation 18.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [13]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2327 entries, 0 to 2326
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         2327 non-null   datetime64[ns]
 1   temperature_2m_mean          2327 non-null   float32       
 2   precipitation_sum            2327 non-null   float32       
 3   wind_speed_10m_max           2327 non-null   float32       
 4   wind_direction_10m_dominant  2327 non-null   float32       
 5   city                         2327 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 90.9+ KB


In [14]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":999.9,
            "strict_min":True
        }
    )
)

{"kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 999.9, "strict_min": true}, "expectation_type": "expect_column_min_to_be_between", "meta": {}}

In [15]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---

---

### <span style="color:#ff5f27;"> 🔮 Connecting to Hopsworks Feature Store </span>

In [16]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://snurran.hops.works/p/5240
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27;">🪄 Creating Feature Groups</span>

### <span style='color:#ff5f27'> 🌫 Air Quality Data

In [17]:
# Get or create feature group
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['city','street','date'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)    

In [18]:
# Insert data
air_quality_fg.insert(df_air_quality)

Feature Group created successfully, explore it at 
https://snurran.hops.works/p/5240/fs/5188/fg/5170
2024-02-18 09:14:15,767 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://snurran.hops.works/p/5240/fs/5188/fg/5170


Uploading Dataframe: 0.00% |          | Rows 0/2253 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://snurran.hops.works/p/5240/jobs/named/air_quality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7effa6781540>,
 {
   "evaluation_parameters": {},
   "meta": {
     "great_expectations_version": "0.15.12",
     "expectation_suite_name": "aq_expectation_suite",
     "run_id": {
       "run_time": "2024-02-18T09:14:15.767744+00:00",
       "run_name": null
     },
     "batch_kwargs": {
       "ge_batch_id": "169be08e-ce3e-11ee-b4f7-92152048118b"
     },
     "batch_markers": {},
     "batch_parameters": {},
     "validation_time": "20240218T091415.767686Z",
     "expectation_suite_meta": {
       "great_expectations_version": "0.15.12"
     }
   },
   "results": [
     {
       "result": {
         "observed_value": 4.0,
         "element_count": 2253,
         "missing_count": null,
         "missing_percent": null
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024

In [19]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("city", "Place where the air quality was measured (sometimes a country in acqcn.org)")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

<hsfs.feature_group.FeatureGroup at 0x7effa69cdc00>

### <span style='color:#ff5f27'> 🌦 Weather Data

In [20]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city','date'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

In [21]:
# Insert data
weather_fg.insert(weather_df)

Feature Group created successfully, explore it at 
https://snurran.hops.works/p/5240/fs/5188/fg/5171
2024-02-18 09:14:22,716 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://snurran.hops.works/p/5240/fs/5188/fg/5171


Uploading Dataframe: 0.00% |          | Rows 0/2327 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://snurran.hops.works/p/5240/jobs/named/weather_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7effa5ab5c00>,
 {
   "evaluation_parameters": {},
   "meta": {
     "great_expectations_version": "0.15.12",
     "expectation_suite_name": "weather_expectation_suite",
     "run_id": {
       "run_time": "2024-02-18T09:14:22.716559+00:00",
       "run_name": null
     },
     "batch_kwargs": {
       "ge_batch_id": "1ac0299a-ce3e-11ee-b4f7-92152048118b"
     },
     "batch_markers": {},
     "batch_parameters": {},
     "validation_time": "20240218T091422.716510Z",
     "expectation_suite_meta": {
       "great_expectations_version": "0.15.12"
     }
   },
   "results": [
     {
       "result": {
         "observed_value": 0.0,
         "element_count": 2327,
         "missing_count": null,
         "missing_percent": null
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": 

In [22]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x7effa59f05b0>

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Feature Pipeline 
 </span> 

In the following notebook you will parse data and insert it into Feature Groups.

## <span style="color:#ff5f27;">⏭️ **Exercises:** 
 </span> 
    
    * Add a rolling window of 3 days and 5 days for 'pm25'
        df.set_index("date").rolling(3).mean().head()
