**Madrid pollution data extraction and cleaning**

Goal:
This project goal is to extract public data information about pollution in Madrid, clean it and add data that makes sense if there are missing values, using Python programming language. The source is this Madrid council webpage. We will download data for 2018. This data is real time hourly data. In this case we will just keep the NO2 agent.

Steps:
In this Jupyter Notebook the next steps are taken:

Downloading the data (2018 real time data url), which comes in a zip folder with a file for each month in 3 different formats (.txt, .csv and .xml). In this case we will use the .csv extension files.

Extracting just the .csv files from the zip folder.

Adding records for missing days.

Transforming the monthly dataframe, as the default format is not good to work with. There is a row for each day, with a column for each hour in the day, and another column for each hour to tell if that value is validated or not. In the transformed dataframe, we will have a column for hours and another one to indicate if that record is validated or not.

Filling non validated hour values. We assign the average of the previous validated value and the next validated value.

Adding columns for year-month-date and year-month-date-time.

Finally we append all the monthly files together to have a yearly file.

Some tests to check if the transformation has been satisfactory.

Some data exploration.

Some charts to see visualized data.

In [1]:
import urllib.request
from zipfile import ZipFile
import pandas as pd
import numpy as np
import calendar
import os
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
def download_from_url(url, file_name):
    '''Downloads data from a url and stores it in the location where it is executed.'''
    urllib.request.urlretrieve(url, file_name)

In [4]:
def extract_from_zip_folder(file_type, zip_folder_path, new_folder_path):
    '''This function extracts files from a zip folder.
    It just extracts the files of a certain type
    '''
    with ZipFile(zip_folder_path, 'r') as zip_obj:
        # Gets a list of all archived file names from the zip.
        list_of_file_names = zip_obj.namelist()
        # Iterates over the file names.
        for file_name in list_of_file_names:
            # Checks filename endswith the required file type.
            if file_name.endswith(f'.{file_type}'):
                # Extracts a single file from zip.
                print(f'extracting {file_name}')
                zip_obj.extract(file_name, new_folder_path)

In [5]:

def add_missing_days(dataframe, partition_column):
    '''This function takes the monthly dataframe and checks if there are missing days
    for a specific sample spot. If so, it appends a row to the original dataframe with
    the info of that day and the validation columns set to N (non validated), to know that
    info is not correct (we will correct it later).
    '''
    year = dataframe.loc[0, 'ANO']
    month = dataframe.loc[0, 'MES']
    
    # First we have to know how many days a specific month has.
    _, number_days_month = calendar.monthrange(year, month)
    
    # We create a list with all the days of that month.
    list_of_days_of_the_month = list(range(1, number_days_month + 1))
    
    # We create a list with all the sample spots.
    sample_spots_list = list(set(dataframe[partition_column]))
    
    for sample_spot in sample_spots_list:
        # We create a df with just the info of one spot.
        sample_spot_df = dataframe[
            dataframe[partition_column] == sample_spot].reset_index(drop=True)

        # We check if all that days are contained in the spot df.
        isin_df = pd.Series(list_of_days_of_the_month).isin(list(sample_spot_df['DIA']))
        isin_df.index = list_of_days_of_the_month

        # Now, if a day is not included, we append a row with its data to the original df.
        for day, isin in isin_df.iteritems():
            if isin == False:
                print(f'Day {day}-{month}-{year} missing')
                
                # We take the first row of the df, but we change the day and the
                # validation columns to 'N'.
                row_to_append = [
                    sample_spot_df.loc[0, column] for column in sample_spot_df.columns]
                row_to_append[6] = day
                for i, e in enumerate(row_to_append):
                    if e == 'V':
                        row_to_append[i] = 'N'
                
                # We append the row.
                dataframe = dataframe.append(
                    pd.Series(row_to_append, index=sample_spot_df.columns),
                    ignore_index=True
                )
                
                print(f'Day {day}-{month}-{year} row added to original dataframe')
            
    return dataframe

In [6]:
def get_stacked_dataframe(dataframe, cols_to_drop, cols_remain):
    '''This function applies the pandas stack method to make data that is
    spread in columns collapse in a single column.
    First drops the columns that would not let the stack work properly,
    as we want to have the columns that will remain as they are,
    and the columns that will be stacked in the same column.
    Then sets the columns that do not have to be stacked as the index.
    Applies stack method. Finally, resets index.
    '''
    dataframe = dataframe.drop(columns=cols_to_drop).set_index(cols_remain)
    dataframe = dataframe.stack().reset_index()
    dataframe = dataframe[dataframe[dataframe.columns[-2]] != 'index']
    dataframe = dataframe.reset_index(drop=True)
    return dataframe

In [7]:
def add_last_col_to_df(df1, df2):
    ''' Adds the last column from a dataframe to another dataframe
    with the same number of rows.
    '''
    df1['new_col'] = df2.iloc[:,-1]
    return df1

In [8]:
def get_next_validated_value(dataframe, index, column, get_next=True):
    '''This function gets the nearest next or previous validated row index in a dataframe.
    If the get_next param is set to True, it looks for the nearest next validated row index,
    and if set to False, looks for the nearest previous validated row index.
    '''
    # We set a default initial values to start the iteration.
    iterator = 0
    next_validated = 'N'
    
    # The loop starts. It checks if the next (or previous) row value is V.
    # If it is V, the while loop breaks and returns the required index.
    # If it is not V, it checks the next (or previous) row, and so on until it finds a V.
    while next_validated != 'V':
        iterator += 1
        
        if get_next:
            next_validated = dataframe.loc[index + iterator, column]
            wanted_index = index + iterator
        else:
            next_validated = dataframe.loc[index - iterator, column]
            wanted_index = index - iterator

    return wanted_index

In [9]:
def assign_non_validated_values(dataframe, partition_column):
    '''This function partitions the dataframe by a partition column (PUNTO_MUESTREO)
    into smaller dataframes and, for each of them, sets new values to non validated records,
    based on near values. It tries to get the nearest next and previous validated values,
    to assign an average of them. If it doesn't find a nearest next validated value,
    it assigns the nearest previous one, and viceversa.
    '''
    # First we sort the data.
    dataframe = dataframe.sort_values(by=['PUNTO_MUESTREO', 'ANO', 'MES', 'DIA', 'HORA'])
    dataframe = dataframe.reset_index(drop=True)
    
    # We get a list of the partition_column unique values, to loop through them.
    sample_spots_list = list(set(dataframe[partition_column]))
    
    # We start the loop.
    for sample_spot in sample_spots_list:
        
        # We get the partitioned dataframe.
        sample_spot_df = dataframe[dataframe[partition_column] == sample_spot]
        
        # Now we assign the non validated records to a new dataframe.
        sample_spot_df_n = sample_spot_df[sample_spot_df['VALIDADO'] == 'N']
        
        # We loop over the non validated records dataframe.
        for index, row in sample_spot_df_n.iterrows():            
            
            # We try to get the next nearest validated value in the original df.
            try:
                next_validated_index = get_next_validated_value(
                    sample_spot_df, index, 'VALIDADO')
                
                # Now we try to get the previous nearest validated value.
                try:
                    previous_validated_index = get_next_validated_value(
                        sample_spot_df, index, 'VALIDADO', get_next=False)
                    dataframe.loc[index, 'NIVEL_NO2'] = (
                        sample_spot_df.loc[next_validated_index, 'NIVEL_NO2'] +
                        sample_spot_df.loc[previous_validated_index, 'NIVEL_NO2']
                    ) / 2
                
                # I we reach this point, it means that there are validated values in
                # the next rows, but there are not any validated values in the
                # previous rows, so we assign the nearest next validated value.
                except KeyError:
                    next_validated_index = get_next_validated_value(
                        sample_spot_df, index, 'VALIDADO')
                    dataframe.loc[index, 'NIVEL_NO2'] = sample_spot_df.loc[
                        next_validated_index, 'NIVEL_NO2']
            
            # I we reach this point, it means that there are validated values in the
            # previous rows, but there are not any validated values in the next rows,
            # so we assign the nearest previous validated value.
            except KeyError:
                next_validated_index = get_next_validated_value(
                    sample_spot_df, index, 'VALIDADO', get_next=False)
                dataframe.loc[index, 'NIVEL_NO2'] = sample_spot_df.loc[
                    next_validated_index, 'NIVEL_NO2']
                
            # For now, we asume that there will always be at least either a
            # next validated value or a previous validated value.
            
            # So far, the 'VALIDADO' column can contain either V if it is a validated
            # value or an N if it is not. To confirm that we have assigned a value that
            # we think makes sense, we change that value to an R (Reassign).
            dataframe.loc[index, 'VALIDADO'] = 'R'

    return dataframe

In [10]:
def int_date_to_string(integer):
    '''It turns 1 to 9 integers into strings with a '0' before, and keeps
    the same the rest of numbers.
    '''
    if len(str(integer)) == 1:
        return '0' + str(integer)
    else:
        return str(integer)

In [11]:
def get_clean_df(dataframe):
    '''Gets a df, keeps just the NO2 info, splits it into 2 dataframes,
    each of them with one of the columns that we want to stack,
    joins them into a single dataframe, renames columns and formats HORA column.
    The result is a much easier to use dataframe'''
        
    print('Keeping just NO2 data.')
    dataframe = dataframe[dataframe['MAGNITUD'] == 8].drop(columns=['MAGNITUD'])
    dataframe = dataframe.reset_index(drop=True)
        
    print('Adding missing days rows.')
    # We need a list of the sample spots.
    list_of_sample_spots = list(set(dataframe['PUNTO_MUESTREO']))
    
    # We apply the function that add records of missing days.
    dataframe = add_missing_days(dataframe, 'PUNTO_MUESTREO')
        
    cols_dimensiones = ['PROVINCIA', 'MUNICIPIO', 'ESTACION',
                        'PUNTO_MUESTREO', 'ANO', 'MES', 'DIA']
    
    print('Stacking dataframes.')
    df_h = get_stacked_dataframe(
        dataframe, cols_remain=cols_dimensiones,
        cols_to_drop=[col for col in list(dataframe.columns) if col[0] == 'V']
    )
    
    df_v = get_stacked_dataframe(
        dataframe, cols_remain=cols_dimensiones,
        cols_to_drop=[col for col in list(dataframe.columns) if col[0] == 'H']
    )
    
    print('Joining dataframes.')
    final_df = add_last_col_to_df(df_h, df_v)
    
    print('Renaming columns.')
    final_df = final_df.rename(columns={'level_7': 'HORA', 0: 'NIVEL_NO2',
                                        'new_col': 'VALIDADO'})
    
    print('Formatting HORA column.')
    final_df['HORA'] = final_df['HORA'].apply(lambda x: int(x[-2:]))
    
    print('Assigning values to non validated records.')
    final_df = assign_non_validated_values(final_df, 'PUNTO_MUESTREO')
    
    print('Adding date and time columns.')
    final_df['ANO_MES_DIA'] = (
        final_df['ANO'].astype(str) +
        final_df['MES'].apply(int_date_to_string) +
        final_df['DIA'].apply(int_date_to_string)
    ).astype(int)
    
    final_df['ANO_MES_DIA_HORA'] = (
        final_df['ANO'].astype(str) +
        final_df['MES'].apply(int_date_to_string) +
        final_df['DIA'].apply(int_date_to_string) +
        final_df['HORA'].apply(int_date_to_string)
    ).astype(int)
    
    print('Reordering columns.')
    final_df = final_df[[
        'PROVINCIA', 'MUNICIPIO', 'ESTACION', 'PUNTO_MUESTREO', 'ANO', 'MES', 'DIA',
        'HORA', 'ANO_MES_DIA', 'ANO_MES_DIA_HORA', 'NIVEL_NO2', 'VALIDADO'
    ]]
    
    print(f'Monthly dataframe shape: {final_df.shape}.')
    print()
    
    return final_df

In [12]:
def get_complete_dataframe(url, file_name, desired_data_path, file_type):
    '''This last function of the process executes the other built functions.
    It downloads the zip folder, extracts just the required files by format,
    accesses each of the files, transforms them in pandas dataframes, cleans, adds
    missing values, and appends all the dataframes together.
    '''
    
    # We download the file with the data.
    download_from_url(url, file_name)
    
    # Extraction of just the wanted files (.csv).
    extract_from_zip_folder(file_type, file_name, desired_data_path)
    
    # We create an empty dataframe to append the clean ones to it in each loop iteration.
    data = pd.DataFrame()
    
    # We iterate through the files.
    for file_name in os.listdir(desired_data_path):
        print()
        print(f'Working with {file_name}')
        
        # We convert the csv files into dataframes.
        
        # For Windows:
        try:
            monthly_data = pd.read_csv(f'{desired_data_path}/{file_name}', sep=';')
        
        # For Linux:
        except:
            monthly_data = pd.read_csv(f'{desired_data_path}\\{file_name}', sep=';')

        # We pass the monthly dataframe to the cleaning function.
        monthly_data = get_clean_df(monthly_data)
        
        # We append the clean monthly dataframe to the final dataframe.
        data = data.append(monthly_data, ignore_index=True)
        
    print(f'Final dataframe shape: {data.shape}')
    
    return data

In [None]:
# Final execution.
if __name__ == '__main__':

    url = 'https://datos.madrid.es/egob/catalogo/201200-10306314-calidad-aire-horario.zip'
    file_name = 'pollution_data.zip'
    file_type = 'csv'
    desired_data_path = f'pollution_data_{file_type}'

    final_df = get_complete_dataframe(url, file_name, desired_data_path, file_type)