# Predicting Air Quality using AWS Forecast and OpenAQ dataset

Outdoor particulate pollution was responsible for an estimated 4.2 million deaths worldwide in 2015, with a majority concentrated in east and south Asia. Millions more fell ill from breathing dirty air.
This fine pollution mainly comes from burning things: Coal in power plants, gasoline in cars, chemicals in industrial processes, or woody materials and whatever else ignites during wildfires. The particles are too small for the eye to see — each about 35 times smaller than a grain of fine beach sand — but in high concentrations they cast a haze in the sky. And, when breathed in, they wreak havoc on human health(source- nytimes)
The OpenAQ dataset is very rich and gives us a wealth of information on air quality across the globe.
In this section we are trying to create an AWS Forecast to predict the air quality in a city. This would help us in many ways -


- Parents can use the application to plan any outside activities with the kids. It might not be a good idea to go to a park if the air quality predictot/index is very bad that week.
- The forecast can also be used by policymakers to come up with better ways to combat the air quality issues pro actively
- The Air Quality Index information can also be used to regulate any traffic or wildfire activities in the area.

In [3]:
from pyathena import connect
import pandas as pd
import matplotlib.pyplot as plt

In [4]:
athena_data_bucket = "openaq-athena-bucket"
conn = connect(s3_staging_dir="s3://" + athena_data_bucket,
               region_name="us-east-1")

### Create Athena database and import OpenAQ data into Athena, so that we can query it through sagemaker.

In [11]:
database = """
CREATE DATABASE IF NOT EXISTS db_openaq;
"""
cursor = conn.cursor()
cursor.execute(database)
print(cursor.fetchall())

[]


In [13]:
table = """
CREATE EXTERNAL TABLE IF NOT EXISTS db_openaq.tbl_openaq2(
  `date` struct<utc:string,local:string> COMMENT 'from deserializer', 
  `parameter` string COMMENT 'from deserializer', 
  `location` string COMMENT 'from deserializer', 
  `value` float COMMENT 'from deserializer', 
  `unit` string COMMENT 'from deserializer', 
  `city` string COMMENT 'from deserializer', 
  `attribution` array<struct<name:string,url:string>> COMMENT 'from deserializer', 
  `averagingperiod` struct<unit:string,value:float> COMMENT 'from deserializer', 
  `coordinates` struct<latitude:float,longitude:float> COMMENT 'from deserializer', 
  `country` string COMMENT 'from deserializer', 
  `sourcename` string COMMENT 'from deserializer', 
  `sourcetype` string COMMENT 'from deserializer', 
  `mobile` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://openaq-fetches/realtime-gzipped'
TBLPROPERTIES (
  'transient_lastDdlTime'='1518373755')
"""
cursor = conn.cursor()
cursor.execute(table)
print(cursor.fetchall())

[]


In [None]:
table = """
CREATE EXTERNAL TABLE IF NOT EXISTS db_openaq.tbl_openaq2(
  `date` struct<utc:string,local:string> COMMENT 'from deserializer', 
  `parameter` string COMMENT 'from deserializer', 
  `location` string COMMENT 'from deserializer', 
  `value` float COMMENT 'from deserializer', 
  `unit` string COMMENT 'from deserializer', 
  `city` string COMMENT 'from deserializer', 
  `attribution` array<struct<name:string,url:string>> COMMENT 'from deserializer', 
  `averagingperiod` struct<unit:string,value:float> COMMENT 'from deserializer', 
  `coordinates` struct<latitude:float,longitude:float> COMMENT 'from deserializer', 
  `country` string COMMENT 'from deserializer', 
  `sourcename` string COMMENT 'from deserializer', 
  `sourcetype` string COMMENT 'from deserializer', 
  `mobile` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://openaq-fetches/realtime-gzipped'
TBLPROPERTIES (
  'transient_lastDdlTime'='1518373755')
"""
cursor = conn.cursor()
cursor.execute(table)
print(cursor.fetchall())

In [30]:
df_pm25 = pd.read_sql("SELECT country,avg(value)  FROM db_openaq.tbl_openaq2 WHERE parameter = 'pm25' GROUP BY country;", conn)
df_pm25.columns = ['country','pm25']
df_pm25

Unnamed: 0,country,pm25
0,MT,20.957464
1,BR,14.967491
2,TJ,68.403656
3,HK,20.195745
4,BA,19.843891
5,HR,14.167453
6,BK,35.487500
7,IQ,-12.859020
8,XK,-0.948590
9,NG,16.333216


In [125]:
df_pm25 = df_pm25.sort_values(by=['pm25'])
#df_pm25 = df_pm25['value'][df_pm25['value'] < 0]  = 0

AttributeError: 'int' object has no attribute 'sort_values'

In [None]:
### Lets plot the chart 

In [121]:
import plotly.express as px
fig = px.bar(df_pm25, x='country', y='pm25')
fig.show()

In [85]:
df_pm25_delhi = pd.read_sql("SELECT * FROM db_openaq.tbl_openaq2 WHERE parameter = 'pm25' AND city = 'Delhi';", conn)
#df_pm25_delhi.columns = ['country','pm25']
df_pm25_delhi

Unnamed: 0,date,parameter,location,value,unit,city,attribution,averagingperiod,coordinates,country,sourcename,sourcetype,mobile
0,"{utc=2015-07-17T01:10:00.000Z, local=2015-07-1...",pm25,Mandir Marg,39.00,µg/m³,Delhi,,,"{latitude=28.6341, longitude=77.2005}",IN,Mandir Marg,,
1,"{utc=2015-07-17T00:30:00.000Z, local=2015-07-1...",pm25,Punjabi Bagh,92.00,µg/m³,Delhi,,,"{latitude=28.6683, longitude=77.1167}",IN,Punjabi Bagh,,
2,"{utc=2015-07-17T01:30:00.000Z, local=2015-07-1...",pm25,Mandir Marg,57.00,µg/m³,Delhi,,,"{latitude=28.6341, longitude=77.2005}",IN,Mandir Marg,,
3,"{utc=2015-07-17T02:30:00.000Z, local=2015-07-1...",pm25,Mandir Marg,60.00,µg/m³,Delhi,,,"{latitude=28.6341, longitude=77.2005}",IN,Mandir Marg,,
4,"{utc=2015-07-17T02:30:00.000Z, local=2015-07-1...",pm25,Anand Vihar,98.00,µg/m³,Delhi,,,"{latitude=28.6508, longitude=77.3152}",IN,Anand Vihar,,
5,"{utc=2015-07-17T02:10:00.000Z, local=2015-07-1...",pm25,Anand Vihar,88.00,µg/m³,Delhi,,,"{latitude=28.6508, longitude=77.3152}",IN,Anand Vihar,,
6,"{utc=2015-07-17T00:30:00.000Z, local=2015-07-1...",pm25,Anand Vihar,132.00,µg/m³,Delhi,,,"{latitude=28.6508, longitude=77.3152}",IN,Anand Vihar,,
7,"{utc=2015-07-17T04:10:00.000Z, local=2015-07-1...",pm25,Anand Vihar,121.00,µg/m³,Delhi,,,"{latitude=28.6508, longitude=77.3152}",IN,Anand Vihar,,
8,"{utc=2015-07-17T00:10:00.000Z, local=2015-07-1...",pm25,Punjabi Bagh,75.00,µg/m³,Delhi,,,"{latitude=28.6683, longitude=77.1167}",IN,Punjabi Bagh,,
9,"{utc=2015-07-17T00:30:00.000Z, local=2015-07-1...",pm25,Mandir Marg,39.00,µg/m³,Delhi,,,"{latitude=28.6341, longitude=77.2005}",IN,Mandir Marg,,


In [86]:
df_pm25_delhi['datetime']= df_pm25_delhi['date'].str[5:15]

In [87]:
df_pm25_delhi['time1'] = pd.to_datetime(df_pm25_delhi['datetime'], format='%Y-%m-%d')

In [88]:
df_delhi = df_pm25_delhi.groupby(['city', 'parameter','time1']).mean().reset_index(drop=False)

In [89]:
df_delhi['value'].mean()

93.39339321529081

In [90]:
df_delhi['value'][df_delhi.value < 0] = df_delhi['value'].mean()



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy



In [91]:
import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=df_delhi.time1, y=df_delhi.value, name="microgram/m3",
                         line_color='deepskyblue'))

fig.update_layout(title_text='Delhi - PPM - Time Series with Rangeslider',
                  xaxis_rangeslider_visible=True)
fig.show()

### As you can see the PPM concentration in Delhi during winters in almost 1500 which is the most toxic level manageable by humans

In [116]:
df_pred = df_delhi[['city','time1','value']]

In [117]:
df_pred.columns = ['item','timestamp','demand']

## Now we would use Amazon Forecast to predict the Air Quality in Delhi in the next 2 weeks

## AWS Forecast to predict the forecast of the ppm in Delhi

In [118]:
import boto3
from time import sleep
import subprocess
import pandas as pd
import json
import time

In [None]:
forecast = boto3.client(service_name='forecast') 
forecastquery = boto3.client(service_name='forecastquery')

In [None]:
DATASET_FREQUENCY = "M" 
TIMESTAMP_FORMAT = "yyyy-MM-dd"
key="airquality/airquality_delhi.csv"
bucket_name = "openaq-forecast"

In [None]:
project = 'air_quality_forecast'
datasetName= project+'_ds'
datasetGroupName= project +'_dsg'
s3DataPath = "s3://"+bucket_name+"/"+key

In [None]:
# Now save things 
%store project

In [None]:
create_dataset_group_response = forecast.create_dataset_group(DatasetGroupName=datasetGroupName,
                                                              Domain="CUSTOM",
                                                             )
datasetGroupArn = create_dataset_group_response['DatasetGroupArn']

In [None]:
forecast.describe_dataset_group(DatasetGroupArn=datasetGroupArn)

In [None]:
# Specify the schema of your dataset here. Make sure the order of columns matches the raw data files.
schema ={
   "Attributes":[
      {
         "AttributeName":"city",
         "AttributeType":"string"
      },
      {
         "AttributeName":"timestamp",
         "AttributeType":"timestamp"
      },
      {
         "AttributeName":"pm25",
         "AttributeType":"float"
      }

   ]
}

In [None]:
response=forecast.create_dataset(
                    Domain="CUSTOM",
                    DatasetType='TARGET_TIME_SERIES',
                    DatasetName=datasetName,
                    DataFrequency=DATASET_FREQUENCY, 
                    Schema = schema
)

In [None]:
datasetArn = response['DatasetArn']
forecast.describe_dataset(DatasetArn=datasetArn)

In [None]:
forecast.update_dataset_group(DatasetGroupArn=datasetGroupArn, DatasetArns=[datasetArn])

In [None]:
role_arn = "arn:aws:iam::611181293678:role/AmazonForecast-ExecutionRole-abhi"

In [None]:
datasetImportJobName = 'SP_DSIMPORT_JOB_TARGET'
ds_import_job_response=forecast.create_dataset_import_job(DatasetImportJobName=datasetImportJobName,
                                                          DatasetArn=datasetArn,
                                                          DataSource= {
                                                              "S3Config" : {
                                                                 "Path":s3DataPath,
                                                                 "RoleArn": role_arn
                                                              } 
                                                          },
                                                          TimestampFormat=TIMESTAMP_FORMAT
                                                         )

In [None]:
ds_import_job_arn=ds_import_job_response['DatasetImportJobArn']
print(ds_import_job_arn)

In [None]:
while True:
    dataImportStatus = forecast.describe_dataset_import_job(DatasetImportJobArn=ds_import_job_arn)['Status']
    print(dataImportStatus)
    if dataImportStatus != 'ACTIVE' and dataImportStatus != 'CREATE_FAILED':
        sleep(30)
    else:
        break

In [None]:
forecast.describe_dataset_import_job(DatasetImportJobArn=ds_import_job_arn)

In [None]:
%store datasetGroupArn
%store datasetArn
%store role_arn
%store key
%store bucket_name

# Forecast Predictor 

In [None]:
import boto3
from time import sleep
import subprocess
import pandas as pd
import json
import time

In [None]:
%store -r

In [None]:
forecast = boto3.client(service_name='forecast') 
forecastquery = boto3.client(service_name='forecastquery')

In [None]:
predictorName= project+'_prophet'

In [None]:
forecastHorizon = 14

In [None]:
algorithm = 'Prophet'
algorithm_arn = 'arn:aws:forecast:::algorithm/'
algorithm_arn_prophet = algorithm_arn + algorithm

In [None]:
create_predictor_response=forecast.create_predictor(PredictorName=predictorName, 
                                                  AlgorithmArn=algorithm_arn_prophet,
                                                  ForecastHorizon=forecastHorizon,
                                                  PerformAutoML= False,
                                                  PerformHPO=False,
                                                  EvaluationParameters= {"NumberOfBacktestWindows": 1, 
                                                                         "BackTestWindowOffset": 12}, 
                                                  InputDataConfig= {"DatasetGroupArn": datasetGroupArn},
                                                  FeaturizationConfig= {"ForecastFrequency": "M", 
                                                                        "Featurizations": 
                                                                        [
                                                                          {"AttributeName": "target_value", 
                                                                           "FeaturizationPipeline": 
                                                                            [
                                                                              {"FeaturizationMethodName": "filling", 
                                                                               "FeaturizationMethodParameters": 
                                                                                {"frontfill": "none", 
                                                                                 "middlefill": "zero", 
                                                                                 "backfill": "zero"}
                                                                              }
                                                                            ]
                                                                          }
                                                                        ]
                                                                       }
                                                 )

In [None]:
predictor_arn_prophet = create_predictor_response['PredictorArn']

In [None]:
while True:
    predictorStatus = forecast.describe_predictor(PredictorArn=predictor_arn_prophet)['Status']
    print(predictorStatus)
    if predictorStatus != 'ACTIVE' and predictorStatus != 'CREATE_FAILED':
        sleep(30)
    else:
        break

In [None]:
forecast.get_accuracy_metrics(PredictorArn=predictor_arn_prophet)

In [None]:
forecastName= project+'_prophet_forecast'

In [None]:
create_forecast_response=forecast.create_forecast(ForecastName=forecastName,
                                                  PredictorArn=predictor_arn_prophet)
forecast_arn_prophet = create_forecast_response['ForecastArn']

In [None]:
while True:
    forecastStatus = forecast.describe_forecast(ForecastArn=forecast_arn_prophet)['Status']
    print(forecastStatus)
    if forecastStatus != 'ACTIVE' and forecastStatus != 'CREATE_FAILED':
        sleep(30)
    else:
        break

In [None]:
print(forecast_arn_prophet)
print()
forecastResponse = forecastquery.query_forecast(
    ForecastArn=forecast_arn_prophet,
    Filters={"item_id":"CYCL000036443"}
)
print(forecastResponse)