# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Feature Pipeline</span>

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/logicalclocks/hopsworks-tutorials/blob/master/advanced_tutorials/air_quality/2_feature_pipeline.ipynb)


## 🗒️ This notebook is divided into the following sections:
1. Parse Data
2. Feature Group Insertion

### <span style='color:#ff5f27'> 📝 Imports

In [2]:
import datetime
import time
import requests
import pandas as pd
import json

from functions import *

import warnings
warnings.filterwarnings("ignore")

In [3]:
with open('target_cities.json') as json_file:
    target_cities = json.load(json_file)

In [4]:
today = datetime.date.today()
forecast_day = today + datetime.timedelta(days=7)
hindcast_day = today - datetime.timedelta(days=10)

---

## <span style='color:#ff5f27'> 🌫 Filling gaps in Air Quality data (PM2.5)</span>

### First time we will determine the 'last update date' using our backfill data
#### Next time we will use `feature view` method from Hopsworks Feature Store

### <span style='color:#ff5f27'>  🧙🏼‍♂️ Parsing PM2.5 data

In [5]:
start_of_cell = time.time()

df_aq_raw = pd.DataFrame()

for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        df_ = get_aqi_data_from_open_meteo(city_name=city_name,
                                           coordinates=coords,
                                           start_date=str(hindcast_day),
                                           end_date=str(today))
        df_aq_raw = pd.concat([df_aq_raw, df_]).reset_index(drop=True)
    
end_of_cell = time.time()
print("-" * 64)
print(f"Parsed new PM2.5 data for ALL locations up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

Processed PM2_5 for Amsterdam since 2023-05-23 till 2023-06-02.
Took 0.2 sec.

Processed PM2_5 for Athina since 2023-05-23 till 2023-06-02.
Took 0.16 sec.

Processed PM2_5 for Berlin since 2023-05-23 till 2023-06-02.
Took 0.16 sec.

Processed PM2_5 for Gdansk since 2023-05-23 till 2023-06-02.
Took 0.15 sec.

Processed PM2_5 for Kraków since 2023-05-23 till 2023-06-02.
Took 0.19 sec.

Processed PM2_5 for London since 2023-05-23 till 2023-06-02.
Took 0.19 sec.

Processed PM2_5 for Madrid since 2023-05-23 till 2023-06-02.
Took 0.17 sec.

Processed PM2_5 for Marseille since 2023-05-23 till 2023-06-02.
Took 0.17 sec.

Processed PM2_5 for Milano since 2023-05-23 till 2023-06-02.
Took 0.15 sec.

Processed PM2_5 for München since 2023-05-23 till 2023-06-02.
Took 0.17 sec.

Processed PM2_5 for Napoli since 2023-05-23 till 2023-06-02.
Took 0.18 sec.

Processed PM2_5 for Paris since 2023-05-23 till 2023-06-02.
Took 0.18 sec.

Processed PM2_5 for Sevilla since 2023-05-23 till 2023-06-02.
Took 0.17

In [6]:
df_aq_raw

Unnamed: 0,city_name,date,pm2_5
0,Amsterdam,2023-05-23,7.7
1,Amsterdam,2023-05-24,10.1
2,Amsterdam,2023-05-25,9.6
3,Amsterdam,2023-05-26,7.6
4,Amsterdam,2023-05-27,11.6
...,...,...,...
490,Tulalip-Totem Beach Rd,2023-05-29,12.2
491,Tulalip-Totem Beach Rd,2023-05-30,9.5
492,Tulalip-Totem Beach Rd,2023-05-31,6.0
493,Tulalip-Totem Beach Rd,2023-06-01,5.5


In [7]:
df_aq_update = df_aq_raw
df_aq_update

Unnamed: 0,city_name,date,pm2_5
0,Amsterdam,2023-05-23,7.7
1,Amsterdam,2023-05-24,10.1
2,Amsterdam,2023-05-25,9.6
3,Amsterdam,2023-05-26,7.6
4,Amsterdam,2023-05-27,11.6
...,...,...,...
490,Tulalip-Totem Beach Rd,2023-05-29,12.2
491,Tulalip-Totem Beach Rd,2023-05-30,9.5
492,Tulalip-Totem Beach Rd,2023-05-31,6.0
493,Tulalip-Totem Beach Rd,2023-06-01,5.5


### <span style="color:#ff5f27;">🛠 Feature Engineering PM2.5</span>

In [8]:
df_aq_update['date'] = pd.to_datetime(df_aq_update['date'])

In [9]:
# df_aq_update = feature_engineer_aq(df_aq_update)
df_aq_update = df_aq_update.dropna()

In [10]:
df_aq_update.isna().sum().sum()

0

In [11]:
df_aq_update.shape

(495, 3)

In [12]:
df_aq_update.columns

Index(['city_name', 'date', 'pm2_5'], dtype='object')

---

### <span style='color:#ff5f27'>  🧙🏼‍♂️ Parsing Weather data

In [13]:
start_of_cell = time.time()

df_weather_update = pd.DataFrame()


for city_name, coords in target_cities["US"].items():
    df_ = get_weather_data_from_open_meteo(city_name=city_name,
                                           coordinates=coords,
                                           start_date=str(today),
                                           end_date=str(forecast_day),
                                           forecast=True)
    df_weather_update = pd.concat([df_weather_update, df_]).reset_index(drop=True)


for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        df_ = get_weather_data_from_open_meteo(city_name=city_name,
                                               coordinates=coords,
                                               start_date=str(today),
                                               end_date=str(forecast_day),
                                               forecast=True)
        df_weather_update = pd.concat([df_weather_update, df_]).reset_index(drop=True)
    
end_of_cell = time.time()
print("-" * 64)
print(f"Parsed new weather data for ALL cities up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

Parsed weather for Albuquerque since 2023-06-02 till 2023-06-09.
Took 0.18 sec.

Parsed weather for Atlanta since 2023-06-02 till 2023-06-09.
Took 0.21 sec.

Parsed weather for Chicago since 2023-06-02 till 2023-06-09.
Took 0.18 sec.

Parsed weather for Columbus since 2023-06-02 till 2023-06-09.
Took 0.16 sec.

Parsed weather for Dallas since 2023-06-02 till 2023-06-09.
Took 0.46 sec.

Parsed weather for Denver since 2023-06-02 till 2023-06-09.
Took 0.16 sec.

Parsed weather for Houston since 2023-06-02 till 2023-06-09.
Took 0.18 sec.

Parsed weather for Los Angeles since 2023-06-02 till 2023-06-09.
Took 0.16 sec.

Parsed weather for New York since 2023-06-02 till 2023-06-09.
Took 0.16 sec.

Parsed weather for Phoenix-Mesa since 2023-06-02 till 2023-06-09.
Took 0.17 sec.

Parsed weather for Salt Lake City since 2023-06-02 till 2023-06-09.
Took 0.17 sec.

Parsed weather for San Francisco since 2023-06-02 till 2023-06-09.
Took 0.13 sec.

Parsed weather for Tampa since 2023-06-02 till 202

In [14]:
df_weather_update

Unnamed: 0,city_name,date,temperature_max,temperature_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,wind_speed_max,wind_gusts_max,wind_direction_dominant
0,Albuquerque,2023-06-02,27.9,12.6,0.0,0.00,0.00,0.0,35.9,38.2,222
1,Albuquerque,2023-06-03,29.1,11.1,11.9,11.66,0.17,1.0,41.0,61.2,280
2,Albuquerque,2023-06-04,28.8,15.6,0.0,0.00,0.00,0.0,21.3,34.6,131
3,Albuquerque,2023-06-05,29.4,16.0,0.4,0.20,0.00,2.0,35.0,42.1,110
4,Albuquerque,2023-06-06,29.9,16.5,0.2,0.10,0.00,2.0,40.7,44.6,119
...,...,...,...,...,...,...,...,...,...,...,...
459,Tulalip-Totem Beach Rd,2023-06-05,15.7,6.3,0.0,0.00,0.00,0.0,27.5,44.6,310
460,Tulalip-Totem Beach Rd,2023-06-06,17.6,8.9,0.0,0.00,0.00,0.0,17.7,33.5,323
461,Tulalip-Totem Beach Rd,2023-06-07,18.1,8.9,0.0,0.00,0.00,0.0,17.8,27.4,315
462,Tulalip-Totem Beach Rd,2023-06-08,19.7,9.5,0.0,0.00,0.00,0.0,13.3,33.8,335


In [15]:
df_aq_update.date = pd.to_datetime(df_aq_update.date)
df_weather_update.date = pd.to_datetime(df_weather_update.date)

df_aq_update["unix_time"] = df_aq_update["date"].apply(convert_date_to_unix)
df_weather_update["unix_time"] = df_weather_update["date"].apply(convert_date_to_unix)

In [16]:
df_aq_update.date = df_aq_update.date.astype(str)
df_weather_update.date = df_weather_update.date.astype(str)

---

## <span style="color:#ff5f27;">⬆️ Uploading new data to the Feature Store</span>

### <span style="color:#ff5f27;"> 🔮 Connecting to Hopsworks Feature Store </span>

In [17]:
import hopsworks


project = hopsworks.login()
fs = project.get_feature_store() 

air_quality_fg = fs.get_or_create_feature_group(
    name = 'air_quality',
    version = 1
)
weather_fg = fs.get_or_create_feature_group(
    name = 'weather',
    version = 1
)

Connected. Call `.close()` to terminate connection gracefully.







Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/52403
Connected. Call `.close()` to terminate connection gracefully.


In [18]:
air_quality_fg.insert(df_aq_update, write_options={"wait_for_job": False})

2023-06-02 09:44:42,069 INFO: 	1 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/52403/fs/51299/fg/54909


Uploading Dataframe: 0.00% |          | Rows 0/495 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_backfill
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/52403/jobs/named/air_quality_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f8f31b338b0>,
 {
   "success": false,
   "evaluation_parameters": {},
   "results": [
     {
       "success": false,
       "result": {},
       "exception_info": {
         "user_message": "exception_info field exceeded max available space in SQL table, download validation report file to access the complete info.",
         "raised_exception": true,
         "exception_message": "TypeError: Column values, min_value, and max_value must either be None or of the same type."
       },
       "expectation_config": {
         "kwargs": {
           "column": "pm2_5",
           "min_value": "0.5",
           "max_value": "1000.0"
         },
         "expectation_type": "expect_column_values_to_be_between",
         "meta": {
           "expectationId": 47115
         }
       },
       "meta": {}
     }
   ],
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 0,
     "unsuccessful_expectations": 1,
     "success_percent": 0.0
   

In [19]:
weather_fg.insert(df_weather_update, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |          | Rows 0/464 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_backfill
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/52403/jobs/named/weather_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f8f3014cb50>, None)