In [1]:
import pandas as pd
import numpy as np
from glob import glob
from multiprocessing import Pool
from time import time
from IPython.display import clear_output

### Loading Aug and Dec 2019 raw filenames

In [2]:
df_files = []
year = '2019'
for month in ['08','12']:
    for day in [str(d).zfill(2) for d in range(1,32)]:
        df_files.extend(glob('../raw_data/realtime-gzipped/'+year+'-'+month+'-'+day+'/*'))
len(df_files)

8844

### Reading csv files and filter for Delhi and pm25

If we first load and then filter, it is too expensive for RAM because of big data. So, filtering step is kept in the loading itself.

In [3]:
def read_csv(file):
    tmp_df = pd.read_json(file, lines=True)
    return tmp_df[(tmp_df.city=='Delhi') & (tmp_df.parameter=='pm25')]

init = time()
workers = Pool()
df_list = workers.map(read_csv, df_files)
print('Finished in',round((time()-init)/60,2),'minutes')
workers.terminate()

Finished in 0.81 minutes


### Combine as a single DataFrame

In [4]:
data_df = pd.concat(df_list)
print(data_df.shape)
data_df.head(2)

(157052, 13)


Unnamed: 0,date,parameter,value,unit,averagingPeriod,location,city,country,coordinates,attribution,sourceName,sourceType,mobile
140,"{'utc': '2019-08-01T17:45:00.000Z', 'local': '...",pm25,37.0,µg/m³,"{'unit': 'hours', 'value': 0.25}","ITO, Delhi - CPCB",Delhi,IN,"{'latitude': 28.6316945, 'longitude': 77.2494387}","[{'name': 'Central Pollution Control Board', '...",caaqm,government,False
170,"{'utc': '2019-08-01T17:45:00.000Z', 'local': '...",pm25,59.39,µg/m³,"{'unit': 'hours', 'value': 0.25}","NSIT Dwarka, Delhi - CPCB",Delhi,IN,"{'latitude': 28.60909, 'longitude': 77.0325413}","[{'name': 'Central Pollution Control Board', '...",caaqm,government,False


### Saving time and coordinates correctly

In [5]:
data_df['time'] = pd.to_datetime(data_df['date'].apply(lambda x: x['utc']))
data_df['latitude'] = data_df.coordinates.apply(lambda x: x['latitude'])
data_df['longitude'] = data_df.coordinates.apply(lambda x: x['longitude'])
data_df['pm25'] = data_df['value'] # Renaming the variable
data_df.head(2)

Unnamed: 0,date,parameter,value,unit,averagingPeriod,location,city,country,coordinates,attribution,sourceName,sourceType,mobile,time,latitude,longitude,pm25
140,"{'utc': '2019-08-01T17:45:00.000Z', 'local': '...",pm25,37.0,µg/m³,"{'unit': 'hours', 'value': 0.25}","ITO, Delhi - CPCB",Delhi,IN,"{'latitude': 28.6316945, 'longitude': 77.2494387}","[{'name': 'Central Pollution Control Board', '...",caaqm,government,False,2019-08-01 17:45:00+00:00,28.631694,77.249439,37.0
170,"{'utc': '2019-08-01T17:45:00.000Z', 'local': '...",pm25,59.39,µg/m³,"{'unit': 'hours', 'value': 0.25}","NSIT Dwarka, Delhi - CPCB",Delhi,IN,"{'latitude': 28.60909, 'longitude': 77.0325413}","[{'name': 'Central Pollution Control Board', '...",caaqm,government,False,2019-08-01 17:45:00+00:00,28.60909,77.032541,59.39


### Dropping unnecessary information

In [6]:
data_df_final = data_df[['time','location','latitude','longitude','pm25']]
print(data_df_final.shape)
data_df_final.head(2)

(157052, 5)


Unnamed: 0,time,location,latitude,longitude,pm25
140,2019-08-01 17:45:00+00:00,"ITO, Delhi - CPCB",28.631694,77.249439,37.0
170,2019-08-01 17:45:00+00:00,"NSIT Dwarka, Delhi - CPCB",28.60909,77.032541,59.39


### Saving processed data

In [7]:
data_df_final.to_pickle('../processed_data/aq_v0.1.0.pickle')