# Data preprocessing

The city of Seoul maintains [datasets for estimated hourly population](https://data.seoul.go.kr/dataVisual/seoul/seoulLivingPopulation.do). 
A zipped file for this dataset is uploaded monthly and the zip file 
has around 30 csv file for daily data inside it. In this notebook,
we are going to 

1. unzip this monthly dataset
2. merge by month 
3. change column names into English
4. convert the monthly dataframe into parquet format
5. upload the parquets to S3



In [11]:
import os
import shutil
import re
import glob
import datetime
import zipfile
import urllib.request
import ssl

import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
import boto3

ssl._create_default_https_context = ssl._create_unverified_context

In [74]:
class XpopSeoul:
    def __init__(self, work_dir='.', s3bucket='xpop-seoul'):
        self.dataset_list = None
        self.prefix = 'local'
        self.work_dir = work_dir
        self.month = ''
        self.dataset_name = ''
        self.xpop_zip = ''
        self.extract_dir = ''
        self.parquet_path = ''
        
    def list_dataset(self, mode='monthly'):
        # file list page
        url = "https://data.seoul.go.kr/dataList/fileView.do?infId=OA-14979&srvType=F"
        page = urllib.request.urlopen(url).read()
        soup = BeautifulSoup(page)
        seq_numbers = list()
        for tr in soup.find_all("table", class_="dataset01")[0].find_all("tr"):
            try:
                tds = tr.find_all("td")
                filename = tds[1].text
                date = re.search('\d+', filename).group(0)
                href = tds[5].find('a')['href']
                seq_number = re.search('\d+', href).group(0)
                seq_numbers.append({'date':date, 'seq_no':seq_number})
            except:
                pass
        df_seq = pd.DataFrame(seq_numbers)
        if mode == 'monthly':
            df_seq = df_seq[df_seq.date.str.len() == 6]
        self.dataset_list = df_seq.reset_index(drop=True)
        self.inf_seq = soup.find("input", id="infSeq").get('value')
        print("number of available datasets: {}".format(len(df_seq)))
        return self

    def find_unprocessed(self, s3bucket='xpop-seoul', profile='default'):
        available = self._list_available_dataset()
        existing = self._list_parquet_in_s3(s3bucket, profile)
        unprocessed = sorted([item.replace(self.prefix + "-", "") for item in available - existing])
        print("number of unprocessed datasets: {}".format(len(unprocessed)))
        return unprocessed
    
    def set_month(self, month):
        self.month = month
        self.dataset_name = "{}-{}".format(self.prefix, month)
        self.xpop_zip = os.path.join(self.work_dir, "{}.zip".format(self.dataset_name))
        self.extract_dir = os.path.join(self.work_dir, self.dataset_name)
        self.parquet_path = os.path.join(self.work_dir, "{}.parquet".format(self.dataset_name))
        return self

    def download_csvs(self):
        # download
        seq_no = self._get_seq_no(self.month)
        url = "http://115.84.165.224/bigfile/iot/inf/nio_download.do" + \
            "?&infId=OA-14979&seq={}&infSeq={}".format(seq_no, self.inf_seq)
        urllib.request.urlretrieve(url, self.xpop_zip)
        
        # extract
        if os.path.exists(self.extract_dir):
            os.remove(self.extract_dir)
        else:
            os.mkdir(self.extract_dir)
        with zipfile.ZipFile(self.xpop_zip, "r") as zip_ref:
            zip_ref.extractall(self.extract_dir)
            
        # sometimes zip files are zipped
        zip_files = glob.glob(self.extract_dir + "/*.zip")
        for zip_file in zip_files:
            with zipfile.ZipFile(zip_file, "r") as zip_ref:
                zip_ref.extractall(self.extract_dir)
        
        print("csv files downloaded at {}".format(self.extract_dir))
        return self
    
    def make_parquet(self):
        csv_files = glob.glob(self.extract_dir + "/*.csv")
        dfs = list()
        for csv in csv_files:
            try:
                df = pd.read_csv(csv, na_values='*', encoding = "euc_kr")
                if isinstance(df.index, pd.MultiIndex):
                    dft = df.reset_index().iloc[:,0:len(df.columns)]
                    dft.columns = df.columns
                    df = dft
            except UnicodeDecodeError:
                df = pd.read_csv(csv, na_values='*')
            df = self.rename_columns(df)
            dfs.append(df)
        pop = pd.concat(dfs)
        # pop = pd.concat(map(lambda p: pd.read_csv(p, na_values='*', encoding = "euc_kr"), csv_files))

        pop = pop.fillna(0)
        pop['date'] = pd.to_datetime(pop['date'], format='%Y%m%d')

        # change dtype
        for col in pop.columns:
            if 'xpop' in col or 'hour' in col:
                pop[col] = pop[col].astype(int)
            elif 'date' in col:
                pass
            else:
                pop[col] = pop[col].astype(str)
            
        pop.to_parquet(self.parquet_path)
        print("parquet create at {}".format(self.parquet_path))
        return self
    
    def rename_columns(self, df):
        # rename columns
        col_names = list(df.columns)
        new_names = list()
        for col in df.columns:
            col = col.replace('남자', 'xpop_m')
            col = col.replace('여자', 'xpop_f')
            col = col.replace('세부터', 'to')
            col = col.replace('세생활인구수', '')
            col = col.replace('세이상생활인구수', 'over')
            col = col.replace('기준일ID', 'date')
            col = col.replace('\"', '')
            col = col.replace('?', '')
            col = col.replace('시간대구분', 'hour')
            col = col.replace('행정동코드', 'adm_id')
            col = col.replace('집계구코드', 'census_id')
            col = col.replace('총생활인구수', 'xpop_total')
            new_names.append(col)
        df.columns = new_names
        return df
    
    def s3upload_parquet(self, s3bucket='xpop-seoul', profile='default'):
        session = boto3.session.Session(profile_name=profile)
        s3 = session.resource('s3')
        s3.meta.client.upload_file(self.parquet_path, s3bucket, "monthly/{}.parquet".format(self.dataset_name))
        print("parquet uploaded to {}".format(s3bucket))
        return self

    def clean_up(self):
        os.remove(self.xpop_zip)
        shutil.rmtree(self.extract_dir)
        os.remove(self.parquet_path)
        return self
    
    def _get_seq_no(self, month):
        return self.dataset_list[self.dataset_list.date==month]['seq_no'].values[0]
    
    def _list_parquet_in_s3(self, s3bucket, profile):
        session = boto3.session.Session(profile_name=profile)
        s3 = session.resource('s3')
        bucket = s3.Bucket(s3bucket)
        parquets = set()
        for s3_file in bucket.objects.all():
            if "monthly/" in s3_file.key and ".parquet" in s3_file.key:
                name = s3_file.key.replace(".parquet", "").replace("monthly/", "")
                parquets.add(name)
        return parquets
    
    def _list_available_dataset(self):
        return set(self.prefix + "-" + self.dataset_list['date'])
    


## Download/Merge/Upload dataset

In [7]:
XpopSeoul(work_dir="../data/interim")\
.list_dataset()\
.set_month('202001')\
.download_csvs()\
.make_parquet()\
.s3upload_parquet(s3bucket='xpop-seoul')\
.clean_up()

number of available datasets: 38
csv files downloaded at ../data/interim/local-202001
parquet create at ../data/interim/local-202001.parquet
parquet uploaded to xpop-seoul


<__main__.XpopSeoul at 0x7fd4c4e42710>

## Update datasets not uploaded to S3

In [78]:
updater = XpopSeoul(work_dir="../data/interim")\
.list_dataset()

# list datasetss not in S3 bucket
update_list = updater.find_unprocessed(s3bucket='xpop-seoul')
print(update_list)

number of available datasets: 38
number of unprocessed datasets: 3
['201910', '201911', '201912']


In [77]:
updater\
    .set_month('201909')\
    .make_parquet()\
    .s3upload_parquet(s3bucket='xpop-seoul')

parquet create at ../data/interim/local-201909.parquet
parquet uploaded to xpop-seoul


<__main__.XpopSeoul at 0x7fd535f80710>

In [79]:
for month in update_list:
    updater\
    .set_month(month)\
    .download_csvs()\
    .make_parquet()\
    .s3upload_parquet(s3bucket='xpop-seoul')\
    .clean_up()

csv files downloaded at ../data/interim/local-201910
parquet create at ../data/interim/local-201910.parquet
parquet uploaded to xpop-seoul
csv files downloaded at ../data/interim/local-201911
parquet create at ../data/interim/local-201911.parquet
parquet uploaded to xpop-seoul
csv files downloaded at ../data/interim/local-201912
parquet create at ../data/interim/local-201912.parquet
parquet uploaded to xpop-seoul
