In [1]:
import os
import sqlite3
import pandas as pd
from pathlib import Path

from collections import defaultdict
from tqdm.notebook import tqdm
from typing import Any, Union

# import dask
# import dask.dataframe as dd

data_path = Path() / 'data' 

# SQLite3 Database

In [2]:
conn = sqlite3.connect(data_path / "airpollution.db")

# Airquality Data

In [3]:
def load_data(path:Path) -> pd.DataFrame:
    """Load data function

    Args:
        path (Path): path of data with file name
        enc (str, optional): encoding. Defaults to 'utf-8'.
    Returns:
        pd.DataFrame 
    """    
    if path.name.split('.')[-1] == 'xlsx':
        df = pd.read_excel(path)
    else:
        try:
            df = pd.read_csv(path, encoding='cp949')
        except UnicodeDecodeError:
            df = pd.read_csv(path, encoding='utf-8')

    return df

def filter_seoul(df):
    return df.loc[df['지역'].str.contains('서울'), :]

In [None]:
datafiles = sorted([x for x in (data_path / 'airquality').glob("*") if x.is_dir()])
# concatnate all files
parts = []
for p_year in tqdm(datafiles, total=len(datafiles)):
    new_path = data_path / 'airquality' / f'air-seoul-{p_year.name}.csv'
    for p in p_year.glob('*'):
        
        df = load_data(p)
        df = filter_seoul(df)
        parts.append(df)
    
    df_all = pd.concat(parts).reset_index(drop=True)
    if p_year.name == '2018':
        # fillna for air-seoul-2018.csv
        # '망' column contains null value due to policy changed
        # create dictionary for measure point
        m_dict = dict(df_all.loc[~df_all['망'].isna(), ['측정소코드', '망']].drop_duplicates().values)
        df_all.loc[df_all['망'].isna(), '망'] = df_all.loc[df_all['망'].isna(), '망'].fillna(df_all['측정소코드'].map(m_dict)).values
    
    df_all.to_csv(new_path, encoding='utf-8', index=False)
    parts = []

In [33]:
# change column name and insert into database
column_m_dict = {
    '지역': 'district', 
    '측정소코드': 'measurecode', 
    '측정소명': 'measurename', 
    '측정일시': 'date', 
    '주소': 'address',
    '망': 'measurepoint'
}

check_miss_match = {}
for p in sorted((data_path / 'airquality').glob("*.csv")):
    df = pd.read_csv(p, encoding='utf-8').rename(columns=column_m_dict)
    c = df.loc[:, ['district', 'measurepoint', 'measurecode', 'measurename', 'address']].drop_duplicates()
    check_miss_match[int(p.name.rstrip('\.csv').split('-')[-1])] = c
    print(f"{p.name}, num-unique data: {len(c)}, measurecode: {len(c['measurecode'].unique())}, district: {len(c['district'].unique())}, address: {len(c['district'].unique())}")
    # saved changed columns
    df.to_csv(p, encoding='utf-8', index=False)

air-seoul-2018.csv, num-unique data: 80, measurecode: 40,         district: 48, address: 48
air-seoul-2019.csv, num-unique data: 40, measurecode: 40,         district: 25, address: 25
air-seoul-2020.csv, num-unique data: 40, measurecode: 40,         district: 25, address: 25
air-seoul-2021.csv, num-unique data: 40, measurecode: 40,         district: 25, address: 25


In [35]:
# fix the district name and address by 2021 version of measurecode
code2dist = dict(check_miss_match[2021].loc[:, ['measurecode', 'district']].values)
code2add = dict(check_miss_match[2021].loc[:, ['measurecode', 'address']].values)
df = pd.read_csv(data_path / 'airquality' / 'air-seoul-2018.csv', encoding='utf-8').rename(columns=column_m_dict)
df['district'] = df['measurecode'].map(code2dist)
df['address'] = df['measurecode'].map(code2add)
df = df.set_index(['measurecode', 'district', 'measurename', 'address', 'measurepoint']).sort_values(['measurecode', 'date']).reset_index()

# save 
# df.to_csv(data_path / 'airquality' / 'air-seoul-2018.csv', encoding='utf-8', index=False)

In [21]:
def drop_tables(conn):
    cur = conn.cursor()
    conn.execute("DROP TABLE IF EXISTS airquality;")
    conn.execute("DROP TABLE IF EXISTS airmeasure;")
    cur.close()

In [22]:
drop_tables(conn)

In [23]:
cur = conn.cursor()
cur.execute(
    """
    CREATE TABLE IF NOT EXISTS airmeasure (
        sid INTEGER PRIMARY KEY,
        measurecode INTEGER NOT NULL UNIQUE,
        district TEXT, 
        measurename TEXT, 
        address TEXT, 
        measurepoint TEXT
    );
    """
)
cur.execute(
    """
    CREATE TABLE IF NOT EXISTS airquality (
        airid INTEGER PRIMARY KEY,
        measurecode INTEGER, 
        date TEXT, 
        SO2 REAL, 
        CO REAL, 
        O3 REAL,
        NO2 REAL, 
        PM10 REAL, 
        PM25 REAL, 
        FOREIGN KEY (measurecode)
            REFERENCES airmeasure (measurecode)
            ON DELETE CASCADE 
            ON UPDATE NO ACTION
    );
    """
)

airmeasure_columns = ['measurecode', 'district', 'measurename', 'address', 'measurepoint']
airquality_columns = ['measurecode', 'date', 'SO2', 'CO', 'O3', 'NO2', 'PM10', 'PM25']
df_airmeasure = None

sql_airmeasure = """
INSERT INTO airmeasure (sid, measurecode, district, measurename, address, measurepoint)
VALUES (?, ?, ?, ?, ?, ?);
"""
sql_airquality = """
INSERT INTO airquality (airid, measurecode, date, SO2, CO, O3, NO2, PM10, PM25) 
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
idx = 1
for p in tqdm(sorted((data_path / 'airquality').glob("*.csv")), total=4):
    df = pd.read_csv(p, encoding='utf-8')
    df = df.set_index(['measurecode', 'district', 'measurename', 'address', 'measurepoint']).sort_values(['measurecode', 'date']).reset_index()

    if df_airmeasure is None:
        df_airmeasure = df.loc[:, airmeasure_columns].drop_duplicates().reset_index(drop=True)
        for i, x in df_airmeasure.iterrows():
            cur.execute(sql_airmeasure, [i+1] + [x[c] for c in airmeasure_columns])
    else:
        df_temp = df.loc[:, airmeasure_columns].drop_duplicates().reset_index(drop=True)
        if (df_temp != df_airmeasure).sum().sum():
            raise ValueError("not equal table")
    for m in df_airmeasure['measurecode'].values:
        df_airquality = df.loc[df['measurecode'] == m, airquality_columns]
        df_airquality['date'] = pd.to_datetime(df_airquality['date']-1, format='%Y%m%d%H').dt.strftime('%Y-%m-%d %H:%M:%S')
        
        for i, x in df_airquality.iterrows():
            idx += 1
            cur.execute(sql_airquality, [idx] + [x[c] for c in airquality_columns])

  0%|          | 0/4 [00:00<?, ?it/s]

In [19]:
df_airquality['date']

0       2018-01-01 00:00:00
1       2018-01-01 01:00:00
2       2018-01-01 02:00:00
3       2018-01-01 03:00:00
4       2018-01-01 04:00:00
               ...         
8755    2018-12-31 19:00:00
8756    2018-12-31 20:00:00
8757    2018-12-31 21:00:00
8758    2018-12-31 22:00:00
8759    2018-12-31 23:00:00
Name: date, Length: 8760, dtype: object

In [15]:
pd.to_datetime(df_airquality['date']-1, format='%Y%m%d%H').dt.strftime('%Y-%m-%d %H').str.split(' ', expand=True)

Unnamed: 0,0,1
0,2018-01-01,00
1,2018-01-01,01
2,2018-01-01,02
3,2018-01-01,03
4,2018-01-01,04
...,...,...
8755,2018-12-31,19
8756,2018-12-31,20
8757,2018-12-31,21
8758,2018-12-31,22


In [8]:
# use sql to query data
cur = conn.cursor()
sql = """SELECT measurecode, date, SO2, CO, O3, NO2, PM10, PM25 FROM airquality
WHERE date >= 2021-01-01 00;
"""
x = cur.execute(sql).fetchall()

OperationalError: near "00": syntax error

In [7]:
x

[]

# Traffic Data

In [None]:
datafiles = sorted([x for x in (data_path / 'traffic').glob("*") if x.is_dir()])
p = next(datafiles[-1].glob('*'))

df = pd.read_excel(p)

In [None]:
df['지점명'].unique()

KeyError: '지점명'

In [None]:
df

Unnamed: 0,지점별 일자별 교통량 범례,Unnamed: 1,Unnamed: 2,Unnamed: 3
0,구분,설명,표현 예시,예시 설명
1,일자,교통량 조사 일자,20181201,43435
2,요일,교통량 조사 요일,토,토요일
3,지점명,교통량 조사 도로명(조사지점명),성산로(금화터널),조사지점의 도로명(지점명)
4,지점번호,"조사지점을 5개 영역(A,B,C,D,F)으로 구분하고 일련번호를 부여함\n- [A(...",A-01,도심 1번 지점
5,방향,유입 : 외곽에서 서울시청으로 들어오는 방향\n유출 : 시울시청에서 외곽으로 나가는 방향,유입/유출,
6,구분,조사지점에서 가까운 교차로명으로 방향표시,봉원고가차도→독립문역,봉원고가차도에서 독립문역 방향의 교통량
7,시간대,1시간 단위를 표시,0시,0시~1시
8,교통량,1시간 교통량,809,809대/시
