In [1]:
import pandas as pd
import numpy as np
import janitor
import unidecode
import pickle
import os
import operator
import functools
import itertools
from typing import Callable

In [2]:
if 'raw_databases.pkl' not in os.listdir('./data/'):
    from quickstart.loader import XlsxDriveLoader

    Loads = XlsxDriveLoader()  # Drive Folder is hardcoded in module, since this is not prone to change
    databases = Loads.content  # Process takes approximately ~3 minutes to run for the first time.
                               # Then the file will be stored in the data/
else:
    with open('./data/raw_databases.pkl', 'rb') as file:
        databases = pickle.load(file)
    print("Loaded data from .pkl file")

Loaded data from .pkl file


In [3]:
(db_name_contaminantes, db_name_meteoorologicas, 
db_name_anotaciones, _) = databases.keys()


get_col_names: Callable[[pd.core.frame.DataFrame], pd.core.indexes.base.Index] = (
                         lambda df: sorted(list(df.columns)))

def shared_structure(db_name: str) -> list:
    """Returns sheets in dict of DataFrames that conain the same fields"""

    df_col_names = [[key, get_col_names(val)] for key, val in databases[db_name].items()]

    grouped = list()
    for _, g in itertools.groupby(df_col_names, operator.itemgetter(1)):
        group = list(g) 
        if len(group) > 1: # Can't be performed with comprehension since g is a generator/iterator
            grouped.append(group)
    
    if len(grouped) == 1: 
        return [sheet_name for sheet_name, _ in functools.reduce(operator.iconcat, grouped, [])] 
        # Expecting there is only one shared_structure across the DataFrames
    else:
        return ... # Not necessary

In [4]:
raw_contaminantes = pd.concat([databases[db_name_contaminantes][sheet] 
                            for sheet in shared_structure(db_name_contaminantes)])

raw_meteorologicas = pd.concat([databases[db_name_meteoorologicas][sheet] 
                            for sheet in shared_structure(db_name_meteoorologicas)])

In [5]:
# Dictionary for categorical values

identifier = (
    databases[db_name_anotaciones]['LEEME']
    .loc[:, ['Flag', 'Hora']]
    .set_index('Flag')
    .dropna(axis='index')
    .squeeze()
    .apply(lambda string: unidecode.unidecode(string).strip().lower())
    .apply(lambda validity: True if validity == 'valida' else False)
    .to_dict()
)

identifier |= {'x': False} # Record does not appear on DataFrame


f_identifier = lambda dict_: identifier.get(dict_, True)

In [6]:
# Dictionary for mesaurement units

measurement_units = (
    databases[db_name_anotaciones]['Hoja1'].
    iloc[23:41, [0, 2]]
    .dropna()
    .drop(33)
    .pivot(columns='Notas a considerar:', values='Unnamed: 2')
    .pipe(janitor.clean_names, remove_special=True)
    .mode()
    .squeeze()
    .to_dict()
)

with open('data/measurement_units.pkl', 'wb') as file:
            pickle.dump(measurement_units, file)

In [7]:
fields = ['parametro', 'Fecha',
          'SO2', 'SO2 b']  # San Pedro identifiers


def munge(df: pd.core.frame.DataFrame, fields: list):
    return (
        df
        .loc[:, fields]
        .pipe(janitor.rename_columns, new_column_names={'parametro': 'factor',
                                                        'Fecha': 'date'})
        .pipe(janitor.process_text, column_name='factor', string_function='strip')
        .pivot(index='date', columns='factor', values=['SO2', 'SO2 b'])
        .convert_dtypes(convert_integer=False)
        .apply(lambda field: field.where(field > 0, pd.NA) if pd.api.types.is_numeric_dtype(field)
                                    else field.map(f_identifier))  # Remove negative values from numeric fields
                                                                   # and map str typed variables with a dict
        .pipe(janitor.clean_names, strip_underscores='r')
    )


In [8]:
contaminantes = munge(raw_contaminantes, fields)
meteorologicas = munge(raw_meteorologicas, fields)

In [9]:
# There is a Timestamp missing in df metereologicas
set(contaminantes.index).difference(set(meteorologicas.index))

{Timestamp('2019-05-15 20:00:00')}

In [10]:
df_ = contaminantes.join(other=meteorologicas,
                         how='outer')

In [11]:
df_.describe(include="all").T

Unnamed: 0_level_0,Unnamed: 1_level_0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
Unnamed: 0_level_1,factor,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
so2,co,29973.0,,,,1.679246,1.025919,0.01,0.8,1.5,2.39,5.79
so2,no,31723.0,,,,9.118173,8.916451,0.5,3.2,8.2,11.0,161.0
so2,no2,32607.0,,,,11.03015,7.8801,0.1,5.7,8.8,14.3,82.3
so2,nox,32626.0,,,,19.729188,13.361774,0.5,12.3,16.9,23.3,198.2
so2,o3,37936.0,,,,25.389788,19.092307,1.0,10.0,22.0,35.0,153.0
so2,pm10,36946.0,,,,60.021545,35.906347,2.0,37.0,52.0,75.0,714.0
so2,pm2_5,20458.0,,,,18.928187,13.211943,2.0,9.0,16.0,25.0,156.0
so2,so2,29466.0,,,,4.205708,2.24159,0.5,2.9,3.6,4.8,67.4
so2_b,co,39394.0,2.0,True,30073.0,,,,,,,
so2_b,no,39394.0,2.0,True,31704.0,,,,,,,


In [12]:
df = (
    df_
    .apply(lambda row: row.so2.where(row.so2_b.astype('bool')), axis=1) # check validity
    .replace({pd.NA: np.nan}) # .astype() does not operate when having diferent dtypes for
                              # missing values
    .astype(float)
    .drop_duplicates()
)

# creating missing timestamps, statistical imputation and outlier analysis
# will proceed after data exploration and visualization. Since we are working
# with a time-series database, these proceedures have to be done carefully and
# with precise methodologies.

In [13]:
df

factor,co,no,no2,nox,o3,pm10,pm2_5,so2,prs,rainf,rh,sr,tout,wdr,wsr
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2017-01-01 00:00:00,1.10,17.0,20.1,37.0,6.0,163.0,,5.0,712.5,,54.0,,20.68,253.0,4.0
2017-01-01 01:00:00,1.05,19.5,25.1,44.5,5.0,117.0,,3.9,712.4,,49.0,,21.59,244.0,5.4
2017-01-01 02:00:00,0.91,16.7,22.5,39.0,5.0,46.0,,3.5,712.3,,51.0,,20.94,260.0,4.4
2017-01-01 03:00:00,0.75,12.5,16.8,29.2,8.0,36.0,,3.4,712.1,,51.0,,20.99,257.0,3.7
2017-01-01 04:00:00,0.59,6.5,6.7,13.1,12.0,25.0,,3.2,711.9,,51.0,,20.89,84.0,2.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-06-30 19:00:00,0.67,4.4,11.3,15.7,17.0,36.0,14.0,3.5,714.1,0.05,80.0,0.029,22.78,69.0,27.1
2021-06-30 20:00:00,0.66,3.9,10.7,14.6,16.0,41.0,7.0,3.6,714.2,,75.0,0.007,23.57,74.0,97.5
2021-06-30 21:00:00,0.64,3.4,11.7,15.1,15.0,16.0,7.0,3.4,714.3,,78.0,,22.89,79.0,117.0
2021-06-30 22:00:00,0.61,3.4,9.7,13.1,16.0,31.0,6.0,3.1,714.5,,79.0,,22.72,89.0,117.7


Dropping outliers is not as straightforward as it may be in other types of analysis, since we are trying to understand when this spikes on pollution occur. Likewise, imputation for missing values is more than replacing `pd.NA`'s with the mean/median for every column, because we are dealing with a time-series. Data visualization will proceed, to understand the behaviour of data and apply the most adequate procedures for outliers and missing values 

In [14]:
with open('data/dataframe.pkl', 'wb') as file:
            pickle.dump(df, file)