In [1]:
# json is used to read json files
import json
# os is used to iterate through files
import os
# statistics is used for the harmonic mean
import statistics
# pandas is used to create dataframes
import pandas as pd
# json_normalize is used extract nested features
from pandas import json_normalize

In [2]:
# reads devices json file line by line
with open('input_data/devices.json') as fh:
    devices = [json.loads(line) for line in fh.readlines()]

In [3]:
# converts the dictionary to a pandas dataframe
devicesData = pd.DataFrame.from_dict(devices, orient='columns')

In [4]:
# displays dataframe contents
devicesData.head()

Unnamed: 0,id,name,lat,lon
0,1001,zigbuu-17,36.928026,-2.687309
1,1002,zigbuu-22,40.723609,-5.204659
2,1003,zigbuu-52,35.7721,-78.63861
3,1004,zigbuu-124,42.210419,1.240819


In [5]:
# loads the station csv file into a pandas dataframe
stationsData = pd.read_csv (r'input_data/stations.csv')

In [6]:
# displays dataframe contents
stationsData.head()

Unnamed: 0,station_id,lat,lon,source,reports,country,measurement_reliability
0,022320-99999,64.433,15.633,isd,hourly,SE,1007.281089
1,023140-99999,62.1,13.25,isd,hourly,SE,822.825
2,023160-99999,61.7,13.183,isd,hourly,SE,813.3911
3,024090-99999,59.05,12.7,isd,hourly,SE,749.935
4,026270-99999,55.717,13.217,isd,hourly,SE,736.411589


In [7]:
# creates a dataframe with the schema required
dailySummary = pd.DataFrame(columns = ['device_id', 'device_name', 'lat', 'lon', 'date', 'avg_temp', 'meassurement_reliability_score', 'pm10', 'pm25'])

In [8]:
# specififes location of the weather data json files
rootdir = 'scripts/input_data/weather'

# for each json file (day) the following code is ran
for subdir, dirs, files in os.walk(rootdir):
    for file in files:
        
        # opens the weather data file based on the current iteration
        with open(os.path.join(subdir, file), 'r') as fh:
            weather = [json.loads(line) for line in fh.readlines()]

        # normalises the data to capture nested features
        weatherData = json_normalize(weather, 'data', ['lat', 'lon', 'station_id'], record_prefix='data_')

        # sets the dataframes features to what is required
        weatherData = weatherData[['station_id','lon', 'lat', 'data_temp', 'data_datetime']]
        
        # for each longitiude in the devices dataframe the following code is ran
        for (columnName, columnData) in devicesData.iteritems():
            # if the current column is the longitude values the following code is ran
            if columnName == 'lon':
                # sets lon to be all longitude values
                lon = columnData.values
                # for each value in lon the following code is ran
                for x in range (0, len(lon)):
                    # returns the average temperture for each longitude record
                    avgTemp = weatherData.loc[weatherData['lon'] == lon[x], 'data_temp'].mean(axis=0)

                    # returns the date for each longitude record without the time
                    weatherData.loc[weatherData['lon'] == lon[x], 'data_datetime']
                    date = weatherData.loc[weatherData['lon'] == lon[x], 'data_datetime'].iloc[0]
                    date = date.replace(":00", "")

                    # returns the station id for each longitude record
                    weatherData.loc[weatherData['lon'] == lon[x], 'station_id']
                    # check if there is two station id values returned
                    if weatherData.loc[weatherData['lon'] == lon[x], 'station_id'].iloc[1] is None:
                        # gets the station id based on current longitude iteration
                        stationId = weatherData.loc[weatherData['lon'] == lon[x], 'station_id'].iloc[0]
                        
                        # returns the reliability value based on the station id
                        stationsData.loc[stationsData['station_id'] == stationId, 'measurement_reliability']
                        rely = stationsData.loc[stationsData['station_id'] == stationId, 'measurement_reliability'].iloc[0]
                    else:
                        # gets the harmonic mean of the two resulting rely values returned if there are mutliple station records
                        stationId1 = weatherData.loc[weatherData['lon'] == lon[x], 'station_id'].iloc[0]
                        stationId2 = weatherData.loc[weatherData['lon'] == lon[x], 'station_id'].iloc[1]
                        
                        stationsData.loc[stationsData['station_id'] == stationId1, 'measurement_reliability']
                        rely1 = stationsData.loc[stationsData['station_id'] == stationId1, 'measurement_reliability'].iloc[0]
                        
                        stationsData.loc[stationsData['station_id'] == stationId2, 'measurement_reliability']
                        rely2 = stationsData.loc[stationsData['station_id'] == stationId2, 'measurement_reliability'].iloc[0]
                        
                        data = [rely1, rely2]
                        
                        rely = statistics.harmonic_mean(data)

                    # returns the device id based on current longitude iteration
                    devicesData.loc[devicesData['lon'] == lon[x], 'id']
                    deviceId = devicesData.loc[devicesData['lon'] == lon[x], 'id'].iloc[0]

                    # returns the device name based on current longitude iteration
                    devicesData.loc[devicesData['lon'] == lon[x], 'name']
                    deviceName = devicesData.loc[devicesData['lon'] == lon[x], 'name'].iloc[0]

                    # returns the latitude for the current longitude iteration
                    devicesData.loc[devicesData['lon'] == lon[x], 'lat']
                    lat = devicesData.loc[devicesData['lon'] == lon[x], 'lat'].iloc[0]
                    
                    # opens the air data json file containing data for the last 3 days generated by the script
                    with open('scripts/air_data/data.json', 'r') as fh:
                        air = [json.loads(line) for line in fh.readlines()]

                    # normalises the data to capture nested features
                    airData = json_normalize(air, 'data', ['lat', 'lon'], record_prefix='data_')

                    # sets the dataframes features to what is required
                    airData = airData[['lat', 'lon', 'data_pm10','data_pm25', 'data_datetime']]

                    # converts the datatime value to only contain the date
                    airData['data_datetime'] = airData.apply(lambda x: x['data_datetime'][:-3], axis = 1)

                    # creates a list of all unique dates
                    dates = airData['data_datetime'].unique()
                    
                    # sets a bool which shows if the dates match with the current interations date value
                    match = False
                    
                    # checks each date against the current iterations date value
                    for val in dates: 
                        # if the current iterations date matches the average PM values are set
                        if val == date: 
                            # the air data longitude is rounded to 2 dp so the current iteration longitude is rounded too
                            avgPM10 = airData.loc[(airData['lon'] == round(lon[x], 2)) & (airData['data_datetime'] == val), 'data_pm10'].mean(axis=0)
                            avgPM25 = airData.loc[(airData['lon'] == round(lon[x], 2)) & (airData['data_datetime'] == val), 'data_pm25'].mean(axis=0)
                            # the dataframe made earlier is appended with all the values returned in the current iteration
                            dailySummary = dailySummary.append({'device_id': deviceId, 'device_name': deviceName, 'lat': lat, 'lon': lon[x], 'date': date, 'avg_temp': avgTemp, 'meassurement_reliability_score': rely, 'pm10': avgPM10, 'pm25': avgPM25}, ignore_index=True)
                            # match is set to true
                            match = True         
                    # if the dates don't match then the PM values are not appended as that data isn't present
                    if match == False:
                        # the dataframe made earlier is appended with all the values returned in the current iteration
                        dailySummary = dailySummary.append({'device_id': deviceId, 'device_name': deviceName, 'lat': lat, 'lon': lon[x], 'date': date, 'avg_temp': avgTemp, 'meassurement_reliability_score': rely}, ignore_index=True)

The above code combines the data from all the dataframes and puts it into a new dataframe called "dailySummary".

In [9]:
dailySummary.tail(15)

Unnamed: 0,device_id,device_name,lat,lon,date,avg_temp,meassurement_reliability_score,pm10,pm25
105,1002,zigbuu-22,40.723609,-5.204659,2020-08-06,26.733333,225.317904,,
106,1003,zigbuu-52,35.7721,-78.63861,2020-08-06,25.45,2827.643544,,
107,1004,zigbuu-124,42.210419,1.240819,2020-08-06,23.875,36.5874,,
108,1001,zigbuu-17,36.928026,-2.687309,2020-08-07,28.966667,135.015588,12.26893,3.448922
109,1002,zigbuu-22,40.723609,-5.204659,2020-08-07,27.4,225.317904,45.29295,11.044711
110,1003,zigbuu-52,35.7721,-78.63861,2020-08-07,25.625,2827.643544,1.916219,0.968291
111,1004,zigbuu-124,42.210419,1.240819,2020-08-07,26.5375,36.5874,7.32329,2.491452
112,1001,zigbuu-17,36.928026,-2.687309,2020-08-08,28.425,135.015588,19.712083,4.227155
113,1002,zigbuu-22,40.723609,-5.204659,2020-08-08,26.358333,225.317904,49.775121,15.768629
114,1003,zigbuu-52,35.7721,-78.63861,2020-08-08,26.125,2827.643544,2.513984,1.651116


Here is the tail of the dataframe now containing the pm10/pm25 values from the last three days retrieved by the script file.

In [10]:
# saved to a json file for further inspection
dailySummary.to_json(r'dailySummary.json')