<a href="https://colab.research.google.com/github/yadgire7/AIR-QUALITY-INDEX/blob/master/AQI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Approach:
1. The API has city as key. It can also be accesses using idx. SO, I created a list of idx.

2. There is no key like 'date' or 'day' using which I could fetch data for a particular day. So, I fetched data for all the cities for a particular day and then filtered out the data for a particular city.

3. Fetching data for all the cities by idx(around 959) using a for loop would take a lot of time. So, I used pyspark to parallelize the process.

4. Later during feature extraction, I realized that the the dat is inconsistent. Data for some attributes like location, air quality index and the parameters used to calculate air quality index are missing. So, I used the logic to join the data after fethching all the features on location.

5. Creating a dataframe from the joined spark dataframe was taking a lot of time. So, I decided to complete the task using 100 idx in order to complete the task in time and at least be able to submit the assignment.


In [41]:
import sys
import os
import requests
import json
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull, sum, mean
import pandas as pd
from matplotlib import pyplot as plt
import hopsworks
import xgboost

In [2]:
# pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=ab5fe30fe63712414e643e3ace845288332c18e96255cf4efc27f55d723e79cf
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [5]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "400").getOrCreate()
sc.setLogLevel('WARN')



In [6]:
def fetch_raw_data(id):
    '''
    params:
        id_list: list of ids for different cities
        api: "https://api.waqi.info/feed/"
        token = b5ffb81511c8f973b9db70d54e76893cc87d2b95
    '''
    api = "https://api.waqi.info/feed/"
    token = "token=b5ffb81511c8f973b9db70d54e76893cc87d2b95"
    url = api + "@" + str(id) + "/?" + token
    instance = requests.get(url)
    if instance.status_code == 200:
       return instance.text


In [7]:
arr = [i for i in range(100)]
id_list = sc.parallelize(arr)


In [8]:
raw_data = id_list.map(fetch_raw_data).map(lambda x: json.loads(x))


In [9]:
raw_data.first()

{'status': 'ok',
 'data': {'aqi': 57,
  'idx': 0,
  'attributions': [{'url': 'http://www.airqualityontario.com/',
    'name': 'Air Quality Ontario - the Ontario Ministry of the Environment and Climate Change',
    'logo': 'Ontario-Ministry-of-the-Environment-and-Climate-Change.png'},
   {'url': 'https://waqi.info/', 'name': 'World Air Quality Index Project'}],
  'city': {'geo': [44.382361, -79.702306],
   'name': 'Barrie, Ontario, Canada',
   'url': '',
   'location': ''},
  'dominentpol': 'pm25',
  'iaqi': {'h': {'v': 1},
   'no2': {'v': 1.3},
   'o3': {'v': 32},
   'p': {'v': 1001.5},
   'pm25': {'v': 57},
   't': {'v': 14.2}},
  'time': {'s': '2023-06-12 01:00:00',
   'tz': '-05:00',
   'v': 1686531600,
   'iso': '2023-06-12T01:00:00-05:00'},
  'forecast': {'daily': {'o3': [{'avg': 17,
      'day': '2023-06-10',
      'max': 22,
      'min': 13},
     {'avg': 16, 'day': '2023-06-11', 'max': 27, 'min': 8},
     {'avg': 10, 'day': '2023-06-12', 'max': 18, 'min': 5},
     {'avg': 8, 'd

In [10]:
'''
Cleaning the raw_data to get only the 'data' dictionary for further feature extraction.
'''
cleaned_data = raw_data.map(lambda x: x['data']).filter(lambda x: x is not None and len(x.keys()) > 2)


In [10]:
cleaned_data.first()

{'aqi': 53,
 'idx': 0,
 'attributions': [{'url': 'http://www.airqualityontario.com/',
   'name': 'Air Quality Ontario - the Ontario Ministry of the Environment and Climate Change',
   'logo': 'Ontario-Ministry-of-the-Environment-and-Climate-Change.png'},
  {'url': 'https://waqi.info/', 'name': 'World Air Quality Index Project'}],
 'city': {'geo': [44.382361, -79.702306],
  'name': 'Barrie, Ontario, Canada',
  'url': '',
  'location': ''},
 'dominentpol': 'pm25',
 'iaqi': {'h': {'v': 1},
  'no2': {'v': 3.6},
  'o3': {'v': 3.2},
  'p': {'v': 1003.5},
  'pm25': {'v': 53},
  't': {'v': 13.8}},
 'time': {'s': '2023-06-11 23:00:00',
  'tz': '-05:00',
  'v': 1686524400,
  'iso': '2023-06-11T23:00:00-05:00'},
 'forecast': {'daily': {'o3': [{'avg': 11,
     'day': '2023-06-09',
     'max': 16,
     'min': 8},
    {'avg': 17, 'day': '2023-06-10', 'max': 22, 'min': 13},
    {'avg': 16, 'day': '2023-06-11', 'max': 27, 'min': 8},
    {'avg': 10, 'day': '2023-06-12', 'max': 18, 'min': 5},
    {'avg'

### function definitions to extract features from the (cleaned) raw_data

In [11]:
def get_attr_value(x, a, k):
    if a in x.keys():
        if k in x[a].keys():
            return int(x[a][k]['v'])
    return 0


In [12]:
def get_location(raw_data_rdd):
    location = raw_data_rdd.map(lambda x: (
        x['idx'], x['city']['geo'])).filter(lambda x: x[1] is not None)\
        .map(lambda x: (x[0], x[1][0], x[1][1]))
    return location

In [13]:
def get_aqi(rdd):
    aqi = rdd.map(lambda x: (x['idx'], x['aqi']))
    return aqi

In [14]:
def create_df(rdd, name):
    df = spark.createDataFrame(rdd, ['idx', name])
    return df

In [15]:
def create_pandas_df(df_list):
    new_df_list = []
    for df in df_list:
        df = df.toPandas()
        new_df_list.append(df)
    return new_df_list


In [16]:
def filter_forecast(arr_of_dict):
    temp = [(d['day'], d['avg']) for d in arr_of_dict]
    for pair in temp:
        if pair[0] == "2023-06-10":
            return pair

In [17]:
def filter_rdd(rdd, ids):
    rdd = rdd.filter(lambda x: x[0] in ids)
    return rdd

In [18]:
location = get_location(cleaned_data)

In [19]:
aqi = get_aqi(cleaned_data)


In [20]:
main_ft = cleaned_data.map(lambda x: (x['idx'], get_attr_value(x,'iaqi', 'p'),get_attr_value(x,'iaqi', 'h'),\
                                      get_attr_value(x,'iaqi', 't'),get_attr_value(x,'iaqi', 'pm25'),\
                                      get_attr_value(x,'iaqi', 'pm10'), get_attr_value(x,'iaqi', 'o3'),\
                                      get_attr_value(x,'iaqi', 'no2'), get_attr_value(x,'iaqi', 'so2')))

In [21]:
main_ft_df = spark.createDataFrame(main_ft,['idx', 'p','h','t','pm25','pm10','o3','no2','so2'])

In [22]:
aqi_df = spark.createDataFrame(aqi, ['idx', 'aqi'])

In [23]:
location_df = spark.createDataFrame(location, ['idx', 'latitude', 'longitude'])

In [24]:
joined_df1= location_df.join(main_ft_df, on='idx', how= 'inner')

In [25]:
joined_df2 = joined_df1.join(aqi_df, on='idx',how='inner')

In [26]:
main_df = joined_df2.toPandas()

In [27]:
main_df.head()

Unnamed: 0,idx,latitude,longitude,p,h,t,pm25,pm10,o3,no2,so2,aqi
0,29,46.533194,-84.309917,1014,68,-2,5,0,22,1,0,22.0
1,19,46.323194,-79.449278,962,98,15,59,0,2,2,0,59.0
2,22,45.382528,-75.714194,1031,81,-17,13,0,7,1,0,7.0
3,7,45.224278,-78.932944,1008,97,15,57,0,9,0,0,57.0
4,34,43.662972,-79.388111,1006,89,15,46,0,36,7,0,46.0


In [28]:
main_df.isna().sum()

idx          0
latitude     0
longitude    0
p            0
h            0
t            0
pm25         0
pm10         0
o3           0
no2          0
so2          0
aqi          4
dtype: int64

In [29]:
# imputing the nan in aqi with the mean of the aqi
main_df['aqi'] = main_df['aqi'].fillna(main_df['aqi'].mean())

In [30]:
main_df.isna().sum()

idx          0
latitude     0
longitude    0
p            0
h            0
t            0
pm25         0
pm10         0
o3           0
no2          0
so2          0
aqi          0
dtype: int64

In [31]:
# !pip install -U hopsworks --quiet

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m120.7/120.7 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.9/50.9 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.2/68.2 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Creating feature group

In [34]:
project = hopsworks.login()
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.
Connection closed.
Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated

Paste it here: ··········
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/56479
Connected. Call `.close()` to terminate connection gracefully.


In [35]:
aqi_fg = fs.get_or_create_feature_group(
    name="air_quality_index",
    version="1",
    description="Air Quality Index",
    primary_key=['idx']
)

In [36]:
aqi_fg.insert(main_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/56479/fs/55375/fg/60782


Uploading Dataframe: 0.00% |          | Rows 0/94 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/56479/jobs/named/air_quality_index_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7fd06866f7f0>, None)

In [37]:
feature_descriptions = [
    {"name": "idx", "description": "City ID"},
    {"name": "latitude", "description": "Latitude of location"},
    {"name": "longitude", "description": "Longitude of location"},
    {"name": "p", "description": "pressure"},
    {"name": "h", "description": "humidity"},
    {"name": "pm25", "description": "pm2.5"},
    {"name": "pm10", "description": "pm1.0"},
    {"name": "o3", "description": "ozone"},
    {"name": "no2", "description": "nitrogen dioxide"},
    {"name": "so2", "description": "sulphur dioxide"},
    {"name": "aqi", "description": "air quality index"}
    
]
# idx	latitude	longitude	p	h	t	pm25	pm10	o3	no2	so2	aqi
for desc in feature_descriptions: 
    aqi_fg.update_feature_description(desc["name"], desc["description"])

In [38]:
ds_query = aqi_fg.select(["idx",	"latitude",	"longitude",	"p",	"h",	"t",    	"pm25",	"pm10",	"o3",	"no2",	"so2",	"aqi"])
ds_query.show(5)



Unnamed: 0,idx,latitude,longitude,p,h,t,pm25,pm10,o3,no2,so2,aqi
0,26,44.301917,-78.346222,1007,93,15,72,0,34,1,0,72.0
1,1,44.150528,-77.3955,1012,84,16,87,0,24,3,0,87.0
2,21,43.945944,-78.894917,1005,82,16,53,0,43,1,0,53.0
3,53,37.48293,-122.20348,1014,0,10,9,0,20,8,0,9.0
4,49,37.76595,-122.39902,1014,87,58,9,0,20,4,0,9.0


In [39]:
try:
    feature_view = fs.get_feature_view('aqi_view', version=1)
except:
    feature_view = fs.create_feature_view(
        name='aqi',
        query=ds_query,
        version=1,
        labels=["aqi"]
    )

Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/56479/fs/55375/fv/aqi/version/1


In [40]:
aiq_version, aiq_job = feature_view.create_train_validation_test_split(
    description = 'training set',
    data_format = 'csv',
    validation_size = 0.1,
    test_size = 0.1,
    write_options = {'wait_for_job': True},
    coalesce = True,
)

Training dataset job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/56479/jobs/named/aqi_1_1_create_fv_td_12062023070514/executions




In [42]:
X_train, X_val, X_test, y_train, y_val, y_test = feature_view.get_train_validation_test_split(1)

In [43]:
# train the model
mod = xgboost.XGBRegressor()
mod = mod.fit(X_train, y_train)

In [45]:
pred = mod.predict(X_val)