# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Feature Pipeline</span>

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/logicalclocks/hopsworks-tutorials/blob/master/advanced_tutorials/air_quality/2_feature_pipeline.ipynb)


## üóíÔ∏è This notebook is divided into the following sections:
1. Parse Data
2. Feature Group Insertion

### <span style='color:#ff5f27'> üìù Imports

In [1]:
import datetime
import time
import requests
import pandas as pd
import json

from functions import *

import warnings
warnings.filterwarnings("ignore")

In [2]:
with open('target_cities.json') as json_file:
    target_cities = json.load(json_file)

In [3]:
today = datetime.date.today()
forecast_day = today + datetime.timedelta(days=7)
hindcast_day = today - datetime.timedelta(days=10)

---

## <span style='color:#ff5f27'> üå´ Filling gaps in Air Quality data (PM2.5)</span>

### First time we will determine the 'last update date' using our backfill data
#### Next time we will use `feature view` method from Hopsworks Feature Store

### <span style='color:#ff5f27'>  üßôüèº‚Äç‚ôÇÔ∏è Parsing PM2.5 data

In [4]:
start_of_cell = time.time()

df_aq_raw = pd.DataFrame()

for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        df_ = get_aqi_data_from_open_meteo(city_name=city_name,
                                           coordinates=coords,
                                           start_date=str(hindcast_day),
                                           end_date=str(today))
        df_aq_raw = pd.concat([df_aq_raw, df_]).reset_index(drop=True)
    
end_of_cell = time.time()
print("-" * 64)
print(f"Parsed new PM2.5 data for ALL locations up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

Processed PM2_5 for Amsterdam since 2023-06-02 till 2023-06-12.
Took 0.21 sec.

Processed PM2_5 for Athina since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Berlin since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Gdansk since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Krak√≥w since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for London since 2023-06-02 till 2023-06-12.
Took 0.17 sec.

Processed PM2_5 for Madrid since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Marseille since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Milano since 2023-06-02 till 2023-06-12.
Took 0.17 sec.

Processed PM2_5 for M√ºnchen since 2023-06-02 till 2023-06-12.
Took 0.17 sec.

Processed PM2_5 for Napoli since 2023-06-02 till 2023-06-12.
Took 0.17 sec.

Processed PM2_5 for Paris since 2023-06-02 till 2023-06-12.
Took 0.16 sec.

Processed PM2_5 for Sevilla since 2023-06-02 till 2023-06-12.
Took 0

In [5]:
df_aq_raw

Unnamed: 0,city_name,date,pm2_5
0,Amsterdam,2023-06-02,6.5
1,Amsterdam,2023-06-03,5.7
2,Amsterdam,2023-06-04,6.4
3,Amsterdam,2023-06-05,6.7
4,Amsterdam,2023-06-06,7.0
...,...,...,...
490,Tulalip-Totem Beach Rd,2023-06-08,20.4
491,Tulalip-Totem Beach Rd,2023-06-09,11.0
492,Tulalip-Totem Beach Rd,2023-06-10,12.7
493,Tulalip-Totem Beach Rd,2023-06-11,12.7


In [6]:
df_aq_update = df_aq_raw
df_aq_update

Unnamed: 0,city_name,date,pm2_5
0,Amsterdam,2023-06-02,6.5
1,Amsterdam,2023-06-03,5.7
2,Amsterdam,2023-06-04,6.4
3,Amsterdam,2023-06-05,6.7
4,Amsterdam,2023-06-06,7.0
...,...,...,...
490,Tulalip-Totem Beach Rd,2023-06-08,20.4
491,Tulalip-Totem Beach Rd,2023-06-09,11.0
492,Tulalip-Totem Beach Rd,2023-06-10,12.7
493,Tulalip-Totem Beach Rd,2023-06-11,12.7


### <span style="color:#ff5f27;">üõ† Feature Engineering PM2.5</span>

In [7]:
df_aq_update['date'] = pd.to_datetime(df_aq_update['date'])

In [8]:
# df_aq_update = feature_engineer_aq(df_aq_update)
df_aq_update = df_aq_update.dropna()

In [9]:
df_aq_update.isna().sum().sum()

0

In [10]:
df_aq_update.shape

(495, 3)

In [11]:
df_aq_update.columns

Index(['city_name', 'date', 'pm2_5'], dtype='object')

---

### <span style='color:#ff5f27'>  üßôüèº‚Äç‚ôÇÔ∏è Parsing Weather data

In [12]:
start_of_cell = time.time()

df_weather_update = pd.DataFrame()


for city_name, coords in target_cities["US"].items():
    df_ = get_weather_data_from_open_meteo(city_name=city_name,
                                           coordinates=coords,
                                           start_date=str(today),
                                           end_date=str(forecast_day),
                                           forecast=True)
    df_weather_update = pd.concat([df_weather_update, df_]).reset_index(drop=True)


for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        df_ = get_weather_data_from_open_meteo(city_name=city_name,
                                               coordinates=coords,
                                               start_date=str(today),
                                               end_date=str(forecast_day),
                                               forecast=True)
        df_weather_update = pd.concat([df_weather_update, df_]).reset_index(drop=True)
    
end_of_cell = time.time()
print("-" * 64)
print(f"Parsed new weather data for ALL cities up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

Parsed weather for Albuquerque since 2023-06-12 till 2023-06-19.
Took 0.17 sec.

Parsed weather for Atlanta since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Chicago since 2023-06-12 till 2023-06-19.
Took 0.21 sec.

Parsed weather for Columbus since 2023-06-12 till 2023-06-19.
Took 0.15 sec.

Parsed weather for Dallas since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Denver since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Houston since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Los Angeles since 2023-06-12 till 2023-06-19.
Took 0.15 sec.

Parsed weather for New York since 2023-06-12 till 2023-06-19.
Took 0.15 sec.

Parsed weather for Phoenix-Mesa since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Salt Lake City since 2023-06-12 till 2023-06-19.
Took 0.15 sec.

Parsed weather for San Francisco since 2023-06-12 till 2023-06-19.
Took 0.16 sec.

Parsed weather for Tampa since 2023-06-12 till 202

In [13]:
df_weather_update

Unnamed: 0,city_name,date,temperature_max,temperature_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,wind_speed_max,wind_gusts_max,wind_direction_dominant
0,Albuquerque,2023-06-12,31.1,14.1,0.0,0.00,0.0,0.0,29.3,45.0,200
1,Albuquerque,2023-06-13,29.8,12.3,0.0,0.00,0.0,0.0,26.1,43.9,219
2,Albuquerque,2023-06-14,31.6,15.6,0.0,0.00,0.0,0.0,22.5,40.7,251
3,Albuquerque,2023-06-15,32.5,16.9,0.0,0.00,0.0,0.0,45.1,60.1,232
4,Albuquerque,2023-06-16,32.3,15.4,0.0,0.00,0.0,0.0,49.2,55.8,245
...,...,...,...,...,...,...,...,...,...,...,...
459,Tulalip-Totem Beach Rd,2023-06-15,20.3,8.3,0.0,0.00,0.0,0.0,10.9,20.2,198
460,Tulalip-Totem Beach Rd,2023-06-16,18.9,11.2,0.0,0.00,0.0,0.0,20.6,35.3,178
461,Tulalip-Totem Beach Rd,2023-06-17,18.1,10.1,11.2,11.65,0.0,19.0,22.4,31.3,317
462,Tulalip-Totem Beach Rd,2023-06-18,13.6,8.5,1.3,0.20,0.0,4.0,17.3,25.6,278


In [14]:
df_aq_update.date = pd.to_datetime(df_aq_update.date)
df_weather_update.date = pd.to_datetime(df_weather_update.date)

df_aq_update["unix_time"] = df_aq_update["date"].apply(convert_date_to_unix)
df_weather_update["unix_time"] = df_weather_update["date"].apply(convert_date_to_unix)

In [15]:
df_aq_update.date = df_aq_update.date.astype(str)
df_weather_update.date = df_weather_update.date.astype(str)

---

## <span style="color:#ff5f27;">‚¨ÜÔ∏è Uploading new data to the Feature Store</span>

### <span style="color:#ff5f27;"> üîÆ Connecting to Hopsworks Feature Store </span>

In [16]:
import hopsworks


project = hopsworks.login()
fs = project.get_feature_store() 

air_quality_fg = fs.get_feature_group(
    name = 'air_quality',
    version = 1
)
weather_fg = fs.get_feature_group(
    name = 'weather',
    version = 1
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://35.231.25.90:443/p/119
Connected. Call `.close()` to terminate connection gracefully.


In [17]:
air_quality_fg.insert(df_aq_update, write_options={"wait_for_job": False})

2023-06-12 10:48:41,458 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://35.231.25.90:443/p/119/fs/67/fg/15


Uploading Dataframe: 0.00% |          | Rows 0/495 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_backfill
Job started successfully, you can follow the progress at 
https://35.231.25.90/p/119/jobs/named/air_quality_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f375772c520>,
 {
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 1,
     "unsuccessful_expectations": 0,
     "success_percent": 100.0
   },
   "meta": {
     "great_expectations_version": "0.14.12",
     "expectation_suite_name": "pmi_data",
     "run_id": {
       "run_name": null,
       "run_time": "2023-06-12T08:48:41.456551+00:00"
     },
     "batch_kwargs": {
       "ge_batch_id": "ee669ff6-08fd-11ee-812d-512b762ac0f0"
     },
     "batch_markers": {},
     "batch_parameters": {},
     "validation_time": "20230612T084841.456043Z",
     "expectation_suite_meta": {
       "great_expectations_version": "0.14.12"
     }
   },
   "success": true,
   "results": [
     {
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2023-06-12T08:48:41.000456Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": n

In [18]:
weather_fg.insert(df_weather_update, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |          | Rows 0/464 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_backfill
Job started successfully, you can follow the progress at 
https://35.231.25.90/p/119/jobs/named/weather_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f37804d32e0>, None)