```
From: https://github.com/ksatola
Version: 1.0.0
```

# Execute end-to-end ETL

## Table of Contents

- [Pollution ETL](#pol)
- [Weather ETL](#wea)
- [Analytical View Combined](#com)

In [1]:
%load_ext autoreload

In [2]:
%autoreload 2

In [3]:
import sys
sys.path.insert(0, '../src')

In [7]:
import pandas as pd
import numpy as np
import time
import os
import random
import re
import fnmatch

from pathlib import Path
import zipfile
import csv

import requests
import urllib.request
from bs4 import BeautifulSoup

In [5]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.width', 1000)

In [6]:
from prepare import (
    
    extract_archived_data,
    get_files_for_name_pattern,
    
    get_gios_pollution_data_files,
    get_pollutant_measures_for_locations,
    build_gios_analytical_view,
    
    get_imgw_yearly_weather_data_files,
    parse_imgw_metadata,
    build_imgw_analytical_view,
)

---
<a id='pol'></a>

## Pollution ETL

In [None]:
# Web scraping - download GIOS data files

%%time

download_base_url = 'http://powietrze.gios.gov.pl/pjp/archives'
path_to_save = "/Users/ksatola/Documents/git/air-polution/data/gios/etl"

get_gios_pollution_data_files(download_base_url, path_to_save)

In [None]:
# Extract files to folders

%%time

source_dir = '/Users/ksatola/Documents/git/air-polution/data/gios/etl'
target_dir = '/Users/ksatola/Documents/git/air-polution/data/gios/etl/extracted/'
file_search_pattern = '*.zip'

extract_archived_data(source_dir, target_dir, file_search_pattern)

In [None]:
# Data transformation

# Emission measurement stations codes in the Krakow area

ems_codes = [
    
    # Active stations
    'MpKrakOsPias', # from 2016-01-01, pm25, pm10, http://powietrze.gios.gov.pl/pjp/current/station_details/info/10139
    'MpKrakWadow',  # from 2017-01-01, pm25, pm10, http://powietrze.gios.gov.pl/pjp/current/station_details/info/10447
    'MpKrakSwoszo', # from 2019-01-01, pm10, http://powietrze.gios.gov.pl/pjp/current/station_details/info/11303
    'MpKrakZloRog', # from 2016-01-01, pm10, http://powietrze.gios.gov.pl/pjp/current/station_details/info/10123
    'MpKrakAlKras', # from 2003-01-01, pm25, pm10, CO, NO2, NOx, benzen, http://powietrze.gios.gov.pl/pjp/current/station_details/info/400
    'MpKrakBujaka', # from 2010-01-01, pm25, pm10, CO, NO2, NOx, benzen, SO2, O3 http://powietrze.gios.gov.pl/pjp/current/station_details/info/401
    'MpKrakBulwar', # from 2003-01-01, pm25, pm10, CO, NO2, NOx, benzen, SO2, http://powietrze.gios.gov.pl/pjp/current/station_details/info/402
    'MpKrakDietla', # from 2016-01-01, pm10, NO2, NOx, http://powietrze.gios.gov.pl/pjp/current/station_details/info/10121
    
    # Old codes and historical stations
    'MpKrakowWIOSAKra6117', # MpKrakAlKras
    'MpKrakowWIOSBuja6119', # MpKrakBujaka
    'MpKrakowWIOSBulw6118', # MpKrakBulwar
    'MpKrakowWIOSPrad6115', # closed on 2010-02-28
    'MpKrakowWSSEKapi6108', # closed on 2009-12-31
    'MpKrakowWSSEPrad6102', # closed on 2004-12-31
    'MpKrakowWSSERPod6113', # closed on 2004-12-31
    'MpKrakTelime'          # closed on 2018-06-01
]

In [None]:
source_dir = '/Users/ksatola/Documents/git/air-polution/data/gios/etl/extracted/'

years = [
    '2000',
    '2001',
    '2002',
    '2003',
    '2004',
    '2005',
    '2006',
    '2007',
    '2008',
    '2009',
    '2010',
    '2011',
    '2012',
    '2013',
    '2014',
    '2015',
    '2016',
    '2017',
    '2018',
    '2019'
]

In [None]:
# Build 1g analytical view

%%time

df_1g = build_gios_analytical_view(years=years, sampling_freq='1g', root_folder=source_dir, ems_codes=ems_codes)

In [None]:
# Create a save directory if not exists
save_dir = '/Users/ksatola/Documents/git/air-polution/data/final'
Path(save_dir).mkdir(parents=True, exist_ok=True)

In [None]:
# Save
gios_1g_all_file = '/Users/ksatola/Documents/git/air-polution/data/final/gios_1g_all.csv'
df_1g.to_csv(gios_1g_all_file, encoding="utf-8", index=True)

In [None]:
# Test read
df_1g_read = pd.read_csv(gios_1g_all_file, encoding='utf-8', sep=",", index_col="Datetime")
df_1g_read.head()

In [None]:
assert df_1g.shape == df_1g_read.shape

In [None]:
# Build 24g analytical view

%%time

df_24g = build_gios_analytical_view(years=years, sampling_freq='24g', root_folder=source_dir, ems_codes=ems_codes)

In [None]:
# Create a save directory if not exists
save_dir = '/Users/ksatola/Documents/git/air-polution/data/final'
Path(save_dir).mkdir(parents=True, exist_ok=True)

In [None]:
# Save
gios_24g_all_file = '/Users/ksatola/Documents/git/air-polution/data/final/gios_24g_all.csv'
df_24g.to_csv(gios_24g_all_file, encoding="utf-8", index=True)

In [None]:
# Test read
df_24g_read = pd.read_csv(gios_24g_all_file, encoding='utf-8', sep=",", index_col="Datetime")
df_24g_read.head()

In [None]:
assert df_24g.shape == df_24g_read.shape

---
<a id='wea'></a>

## Weather ETL

In [None]:
years = [
    '2001',
    '2002',
    '2003',
    '2004',
    '2005',
    '2006',
    '2007',
    '2008',
    '2009',
    '2010',
    '2011',
    '2012',
    '2013',
    '2014',
    '2015',
    '2016',
    '2017',
    '2018',
    '2019'
]

download_base_url = 'https://dane.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_meteorologiczne/terminowe/synop'
path_to_save = "/Users/ksatola/Documents/git/air-polution/data/imgw/etl"

In [None]:
# Web scraping - download IMGW data files

%%time

get_imgw_yearly_weather_data_files(years, download_base_url, path_to_save)

In [None]:
# Unpack and flatten files

%%time

source_dir = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/'
target_dir = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/extracted/'
file_search_pattern = '*.zip'

extract_archived_data(source_dir, target_dir, file_search_pattern)

In [None]:
# Prepare column names

%%timeit

file_input = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/metadata/s_t_format_corrected_input.txt'
file_output = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/metadata/s_t_format_corrected_output.csv'

parse_imgw_metadata(file_input, file_output, input_encoding="cp1250", output_encoding="utf-8")

In [None]:
# Build 1g analytical view

columns = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/metadata/s_t_format_corrected_output.csv'
source_dir = '/Users/ksatola/Documents/git/air-polution/data/imgw/etl/extracted/'
file_search_pattern = '*.csv'

# Synoptic stations codes in the Krakow area
sms_codes = [
    "250190410", # "KRAKÓW HISTORYCZNE"
    "350190566", # "KRAKÓW-BALICE"
    "250199987", # "KRAKÓW-BIELANY-KLASZTOR"
    "250209979", # "KRAKÓW-ŁĘG"
    "250190390", # "KRAKÓW-OBSERWATORIUM"
    "250199984", # "KRAKÓW-SWOSZOWICE"
    "250190470" # "KRAKÓW-WOLA JUSTOWSKA"
]

In [None]:
%%time

df = build_imgw_analytical_view(source_dir, columns, file_search_pattern, sms_codes)

In [None]:
# Create a save directory if not exists
save_dir = '/Users/ksatola/Documents/git/air-polution/data/final'
Path(save_dir).mkdir(parents=True, exist_ok=True)

In [None]:
# Save
imgw_all_file = '/Users/ksatola/Documents/git/air-polution/data/final/imgw_all.csv'
df.to_csv(imgw_all_file, encoding="utf-8", index=True)

In [None]:
# Test read

# when without low_memory=False
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.errors.DtypeWarning.html
#/Users/ksatola/anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py:3058: 
# DtypeWarning: Columns (6,12,13,14,34,52,53,54,56,58,59,61,62,64,101,102) have mixed types. 
# Specify dtype option on import or set low_memory=False.
# interactivity=interactivity, compiler=compiler, result=result)

df_read = pd.read_csv(imgw_all_file, encoding='utf-8', sep=",", index_col="Datetime", low_memory=False)
df_read.head()

In [None]:
assert df.shape[1] == df_read.shape[1]

---
<a id='com'></a>

## Analytical View Combined

This is will be completed in next iterations.