In [17]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pandas as pd

In [36]:
# clean up timestamps, extract only relevant columns & use timestamp as index
# finally drop NAs
def clean_dataframe(df):
    timestamp_pat = '(?P<day>\d{2})/(?P<month>\d{2})/(?P<year>\d{4}) +(?P<hour>\d{2}):(?P<minute>\d{2})'
    df["Timestamp"] = pd.to_datetime(df["Timestamp"].str.extract(timestamp_pat, expand=True))
    df = df.filter(["Timestamp", "Mannerheimintie"])
    #df[["Mannerheimintie"]] = df[["Mannerheimintie"]].apply(pd.to_numeric, errors='coerce', axis=1) 
    df["Mannerheimintie"] = pd.to_numeric(df["Mannerheimintie"], errors='coerce') 

    df = df.dropna()
    
    return df

In [38]:
df_2014 = clean_dataframe(pd.read_csv("data/air-quality-index-2014.csv", sep=";"))
df_2015 = clean_dataframe(pd.read_csv("data/air-quality-index-2015.csv", sep=";"))
df_2016 = clean_dataframe(pd.read_csv("data/air-quality-index-2016.csv", sep=";"))
df_2017 = clean_dataframe(pd.read_csv("data/air-quality-index-2017.csv", sep=";"))
df_2018 = clean_dataframe(pd.read_csv("data/air-quality-index-2018.csv", sep=";"))
df_2019 = clean_dataframe(pd.read_csv("data/air-quality-index-2019.csv", sep=";"))
df = pd.concat([df_2014, df_2015, df_2016, df_2017, df_2018, df_2019])

df = df.reset_index()
df = df.drop("index", axis=1)
df.head()

Unnamed: 0,Timestamp,Mannerheimintie
0,2014-01-01 01:00:00,78.0
1,2014-01-01 02:00:00,68.0
2,2014-01-01 03:00:00,60.0
3,2014-01-01 04:00:00,48.0
4,2014-01-01 05:00:00,51.0


In [28]:
# clean up weather data
def clean_weather_data(df):
    time_data = df.Klo.str.split(":", n = 1, expand = True)
    df["Tunti"] = time_data[0]
    df["Minuutti"] = time_data[1]
    df["Timestamp"] = pd.to_datetime(dict(year=df.Vuosi, month=df.Kk, 
                                          day=df.Pv, hour=df.Tunti,
                                          minute=df.Minuutti))
    df.drop("Vuosi", axis=1, inplace=True)
    df.drop("Kk", axis=1, inplace=True)
    df.drop("Pv", axis=1, inplace=True)
    df.drop("Klo", axis=1, inplace=True)
    df.drop("Aikavyöhyke", axis=1, inplace=True)
    df.drop("Tunti", axis=1, inplace=True)
    df.drop("Minuutti", axis=1, inplace=True)

    cols = ["Ilmanpaine (msl) (hPa)", "Suhteellinen kosteus (%)", "Sateen intensiteetti (mm/h)",
            "Ilman lämpötila (degC)", "Tuulen suunta (deg)", "Tuulen nopeus (m/s)"]
    df[cols] = df[cols].apply(pd.to_numeric, errors='coerce', axis=1) 
    
    df = df.rename(columns = { 
                "Ilmanpaine (msl) (hPa)": "Air pressure (msl) (hPa)", 
                "Suhteellinen kosteus (%)": "Relative humidity (%)",
                "Sateen intensiteetti (mm/h)": "Rain intensity (mm/h)",
                "Ilman lämpötila (degC)": "Air temperature (degC)",
                "Tuulen suunta (deg)": "Wind direction (deg)",
                "Tuulen nopeus (m/s)": "Wind speed (m/s)"
    })

    df.dropna(inplace=True)

    return df

In [29]:
weather_2014 = clean_weather_data(pd.read_csv("data/weather-2014.csv", sep=","))
weather_2015 = clean_weather_data(pd.read_csv("data/weather-2015.csv", sep=","))
weather_2016 = clean_weather_data(pd.read_csv("data/weather-2016.csv", sep=","))
weather_2017 = clean_weather_data(pd.read_csv("data/weather-2017.csv", sep=","))
weather_2018 = clean_weather_data(pd.read_csv("data/weather-2018.csv", sep=","))
weather_2019 = clean_weather_data(pd.read_csv("data/weather-2019.csv", sep=","))

weather = pd.concat([weather_2014, weather_2015, weather_2016, weather_2017, weather_2018, weather_2019])

weather = weather.reset_index()
weather = weather.drop("index", axis=1)
weather.head()

Unnamed: 0,Air pressure (msl) (hPa),Relative humidity (%),Rain intensity (mm/h),Air temperature (degC),Wind direction (deg),Wind speed (m/s),Timestamp
0,1016.8,90.0,0.0,5.3,246.0,4.9,2014-01-01 00:00:00
1,1016.8,89.0,0.0,5.1,253.0,4.9,2014-01-01 01:00:00
2,1016.8,87.0,0.0,5.1,255.0,5.6,2014-01-01 02:00:00
3,1016.9,85.0,0.0,5.2,246.0,5.3,2014-01-01 03:00:00
4,1016.9,86.0,0.0,5.2,254.0,5.7,2014-01-01 04:00:00


In [39]:
df.dtypes

Timestamp          datetime64[ns]
Mannerheimintie           float64
dtype: object

In [31]:
weather.dtypes

Air pressure (msl) (hPa)           float64
Relative humidity (%)              float64
Rain intensity (mm/h)              float64
Air temperature (degC)             float64
Wind direction (deg)               float64
Wind speed (m/s)                   float64
Timestamp                   datetime64[ns]
dtype: object

In [40]:
data = pd.merge_asof(df, weather, on="Timestamp")
data.head()

Unnamed: 0,Timestamp,Mannerheimintie,Air pressure (msl) (hPa),Relative humidity (%),Rain intensity (mm/h),Air temperature (degC),Wind direction (deg),Wind speed (m/s)
0,2014-01-01 01:00:00,78.0,1016.8,89.0,0.0,5.1,253.0,4.9
1,2014-01-01 02:00:00,68.0,1016.8,87.0,0.0,5.1,255.0,5.6
2,2014-01-01 03:00:00,60.0,1016.9,85.0,0.0,5.2,246.0,5.3
3,2014-01-01 04:00:00,48.0,1016.9,86.0,0.0,5.2,254.0,5.7
4,2014-01-01 05:00:00,51.0,1017.1,89.0,0.0,5.1,249.0,4.3


In [41]:
data.dtypes

Timestamp                   datetime64[ns]
Mannerheimintie                    float64
Air pressure (msl) (hPa)           float64
Relative humidity (%)              float64
Rain intensity (mm/h)              float64
Air temperature (degC)             float64
Wind direction (deg)               float64
Wind speed (m/s)                   float64
dtype: object

In [46]:
data.to_parquet("data/air-quality-idx.parquet.gzip", compression="gzip")