# Data Preprocessing

In [1]:
import os
import sys
import argparse
import numpy as np
import pandas as pd
from collections import defaultdict, OrderedDict
from functools import reduce
import matplotlib.pyplot as plt
import multiprocessing as mp

%matplotlib inline

## Functions and classes

### Build grid

In [2]:
class Grids(object):
    # An class to get region from coordinate
    def __init__(self, data_path, city_list):
        super(Grids, self).__init__()
        self.grid_dict = {}

        for city in city_list:
            filename = os.path.join(data_path, "city_%s" % city, "grid_attr.csv")
            df = pd.read_csv(filename, header=None, names=["grid_x", "grid_y", "region_id"])
            df['region_id'] = city + '_' + df['region_id'].astype('str')
            df = df.groupby('region_id').agg({'grid_x':['min','max'], 'grid_y':['min','max']})
            df = df.reset_index()
            df.columns = ['region_id', 'x_min', 'x_max', 'y_min', 'y_max']
            self.grid_dict[city] = df

        self.df_all = pd.concat([df for df in self.grid_dict.values()])

    def get_region(self, x, y, city=None):
        x = float(x)
        y = float(y)
        query = "(x_min <= @x) and (@x <= x_max) and (y_min <= @y) and (@y <= y_max)"
        if city:
            df = self.grid_dict[city]
            region = df.query(query)
            region = region.iloc[0,0] if len(region) else None
        else:
            region  = self.df_all.query(query)
            region = region.iloc[0,0] if len(region) else None
        return region
        


### Density

In [3]:
def parallelize_dataframe(df, func, n_cores=1):
    df_split = np.array_split(df, n_cores)
    pool = mp.Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [4]:
def make_region(df):
    city = df['city'].iloc[0]
    df_city = grids_table.grid_dict[city]
    df_list = []
    for region_id in df_city['region_id']:
        x_min = df_city.query(f"region_id=='{region_id}'")['x_min'].iloc[0]
        x_max = df_city.query(f"region_id=='{region_id}'")['x_max'].iloc[0]
        y_min = df_city.query(f"region_id=='{region_id}'")['y_min'].iloc[0]
        y_max = df_city.query(f"region_id=='{region_id}'")['y_max'].iloc[0]
        query = "(@x_min <= grid_x) and (grid_x <= @x_max) and (@y_min <= grid_y) and (grid_y <= @y_max)"
        df_region = df.query(query).copy()
        df_region['region'] = region_id
        df_list.append(df_region)

    df = pd.concat(df_list)
    return df

In [5]:
def density_process(data_path, city_list, output_path):
    res = []
    for i, city in enumerate(city_list):
        filename = os.path.join(data_path, "city_%s" % city, "density.csv")
        density = pd.read_csv(filename, 
                                sep=',', 
                                header=None,
                                names=["date", "hour", "grid_x", "grid_y", "density"])
        # return density
        density = density.groupby(['date', "grid_x", "grid_y"])['density'].median().reset_index()
        density['city'] = city
        
        n_cores = mp.cpu_count() - 1
        density = parallelize_dataframe(density, make_region, n_cores=n_cores)

        density = density.groupby(['date', "region"])['density'].sum().reset_index()
        density = density.pivot_table(values='density', 
                                        index=density['date'], 
                                        columns='region', 
                                        aggfunc='first').reset_index()
        res.append(density)

    df = reduce(lambda left, right: pd.merge(left, right, on='date', how='outer'), res)
    df.to_csv(os.path.join(output_path, 'density.csv'), index=None)


In [6]:
def density_int(output_path, date_range):
    filename = os.path.join(output_path, "density.csv")
    
    df_dens = pd.read_csv('dataset/data_processed/density.csv')
    df_dens['date'] = pd.to_datetime(df_dens['date'], format='%Y%m%d')
    df_dens = df_dens.set_index('date')

    df_dens_int = df_dens.reindex(date_range, fill_value=None)
    df_dens_int = df_dens_int.interpolate(method='quadratic',
                                          limit_direction='both')
    df_dens_int = df_dens_int.interpolate(method='linear',
                                          limit_direction='both')
    df_dens_int = df_dens_int.reset_index()
    df_dens_int['index'] = df_dens_int['index'].dt.strftime("%Y%m%d")
    df_dens_int = df_dens_int.rename(columns = {'index':'date'})
    df_dens_int.to_csv(os.path.join(output_path, 'density_int.csv'), index=None)

### Transfer

In [7]:
def transfer_coord2ID(data_path, city_list, output_path):
    # read each city's transfer.csv and convert coord to region
    n_cores = mp.cpu_count() - 1
    for city in city_list:
        filename = os.path.join(data_path, "city_%s" % city, "transfer.csv")
        df = pd.read_csv(filename,
                        header=None,
                        names=['hour', 'start_x', 'start_y', 'end_x', 'end_y', 'index'])
        
        df['city'] = city
        # start region
        df = df.rename(columns={'start_x':'grid_x', 'start_y':'grid_y'})
        df = parallelize_dataframe(df, make_region, n_cores=n_cores)
        df = df.rename(columns={'region':'s_region'})
        df = df.drop(columns=['grid_x', 'grid_y'])
        # end region
        df = df.rename(columns={'end_x':'grid_x', 'end_y':'grid_y'})
        df = parallelize_dataframe(df, make_region, n_cores=n_cores) 
        df = df.rename(columns={'region':'e_region'})
        df = df.drop(columns=['grid_x', 'grid_y'])
        # sorting and saving
        df['s_for_sort'] = df['s_region'].str.split('_', expand=True, n=1)[1].astype('int')
        df['e_for_sort'] = df['e_region'].str.split('_', expand=True, n=1)[1].astype('int')
        df = df.sort_values(['hour', 's_for_sort', 'e_for_sort'])
        df = df[['hour', 's_region', 'e_region', 'index']]
        df.to_csv(os.path.join(output_path, f'transfer_{city}.csv'), 
                  index=None, 
                  header=None)

In [8]:
def transfer_index(city_list, output_path):
    df_dens = pd.read_csv(os.path.join(output_path,f"density.csv"))
    df_dens = df_dens.replace({'date':date_idx}).set_index('date')
    df_dens.index.name = None
    population_dict = df_dens[:6].median(axis=0).to_dict()
    # read processed transfer of each city and
    # transfer only contains one day's data
    dfs = []
    for city in city_list:
        popu_city = {k:population_dict[k] for k in population_dict.keys() if city in k}
        df_popu_city = pd.DataFrame.from_dict(popu_city, orient='index', columns=['popu'])

        filename = os.path.join(output_path, f"transfer_{city}.csv")
        df = pd.read_csv(filename,
                        header=None,
                        names=['hour', 's_region', 'e_region', 'index'])
        # combine points to region
        df = df.groupby(['s_region', 'e_region', 'hour'])['index'].sum().reset_index()
        # get transfer per hour
        df = df.groupby(['s_region', 'e_region'])['index'].median().reset_index()
        # sum up all transfers of destination region
        df = df.groupby('e_region')['index'].sum().reset_index()
        # add population
        df = df.merge(df_popu_city, left_on='e_region', right_index=True, how='outer')
        # calculate trans_index to population ratio
        df['ratio'] = df['index'] / df['popu']
        ratio_median = df['ratio'].median()
        # fill NaN in index
        df['index'] = df['index'].fillna(ratio_median * df['popu'])
        # normalize each region's transfer so they sum up to 1
        df['index'] = df['index'] / df['index'].sum()
        df = df[['e_region', 'index']].rename(columns={'e_region': 'region'})

        dfs.append(df)
        
    df_tr = pd.concat(dfs)
    df_tr.to_csv(os.path.join(output_path, 'transfer.csv'), index=None)


### Migration

In [9]:
def combine_city_migration(data_path, city_list, output_path):
    dfs = []
    for city_name in city_list:
        filename = os.path.join(data_path, "city_%s" % city_name, "migration.csv")
        df = pd.read_csv(filename, 
                                sep=',', 
                                header=None,
                                names=['date', 's_city', 'e_city', 'index'])
        dfs.append(df)

    df = pd.concat(dfs)
    df = df.groupby(['date', 's_city', 'e_city'])['index'].median().reset_index()
#     return df
    df.to_csv(os.path.join(output_path, 'migration_city.csv'), index=None) 



In [10]:
def process_city_migration(data_path, city_name, mig_in=True):
    # read origional city's migration into dataframe
    filename = os.path.join(data_path, "city_%s" % city_name, "migration.csv")
    migration = pd.read_csv(filename, 
                            sep=',', 
                            header=None,
                            names=['date', 's_city', 'e_city', city_name])

    # only use moving in "city" data, ignore moving out data
    if mig_in:
        df = migration[migration.e_city == city_name]
    else:
        df = migration[migration.s_city == city_name]
    df = df[["date", city_name]]

    # calculate total move in data of "city"
    df = df.groupby('date')[city_name].sum().reset_index()
    return df

In [11]:
def migration(data_path, city_list, output_path):
    # combine city migration and region transfer
    name_tran = os.path.join(output_path, "transfer.csv")
    df_tran = pd.read_csv(name_tran)

    df_in_list = []
    df_out_list = []
    for city_name in city_list:
        df_in = df_tran[df_tran['region'].str.contains(city_name)].copy()
        df_out = df_tran[df_tran['region'].str.contains(city_name)].copy()

        migration_in = process_city_migration(data_path, city_name, mig_in=True)
        migration_out = process_city_migration(data_path, city_name, mig_in=False)
        # loop over dates
        for i in range(len(migration_in)):
            date = migration_in.date[i]

            index_mig_in = migration_in[city_name][i]
            index_mig_out = migration_out[city_name][i]
            df_in[date] = df_in['index'] * index_mig_in
            df_out[date] = df_out['index'] * index_mig_out

        df_in_list.append(df_in)
        df_out_list.append(df_out)

    df_in = pd.concat(df_in_list, axis=0).drop(columns=['index'])
    df_out = pd.concat(df_out_list, axis=0).drop(columns=['index'])
    df_in = df_in.set_index('region').transpose()
    df_out = df_out.set_index('region').transpose()
    df_in.index.name = 'date'
    df_out.index.name = 'date'

    df_in.to_csv(os.path.join(output_path, 'migration_in.csv')) 
    df_out.to_csv(os.path.join(output_path, 'migration_out.csv')) 



### Infection

In [12]:
def infection_process(data_path, city_list, region_nums, output_path):
    res_city = []
    res_region = []
    region_name_list = []
    for i, city in enumerate(city_list):
        filename = os.path.join(data_path, "city_%s" % city, "infection.csv")
        df_inf = pd.read_csv(filename, 
                                sep=',', 
                                header=None,
                                names=["city", "region", "date", "infect"])

        df_city = df_inf.copy()
        df_city = df_city.groupby(['date'])['infect'].sum().reset_index()
        df_city = df_city.rename(columns={'infect': city})
        res_city.append(df_city)

        order = sorted(range(region_nums[i]), key=lambda x:str(x))
        for j, idx in enumerate(order):
            target_region = idx #str(idx)
            df_region = df_inf[df_inf['region'] == target_region].reset_index(drop=True)
            if i == 0 and j == 0:
                df_region = df_region[['date', 'infect']]
            else:
                df_region = df_region[['infect']]

            df_region = df_region.rename(columns={'infect': '%s_%d' % (city, idx)})
            region_name_list.append("%s_%d" % (city, idx))

            res_region.append(df_region)

    df_region = pd.concat(res_region, axis=1)
    df_city = reduce(lambda left, right: pd.merge(left, right, on='date', how='outer'), 
                     res_city)

    region_to_save = os.path.join(output_path, "infection.csv")
    df_region.to_csv(region_to_save, index=False)

    city_to_save = os.path.join(output_path, "infection_city.csv")
    df_city.to_csv(city_to_save, index=False)

    region_name_file = os.path.join(output_path, "region_names.txt")
    with open(region_name_file, 'w') as f:
        names = ' '.join(region_name_list)
        f.write(names + '\n')


## Weather

In [13]:
def weather_pred(data_path, city_list, region_nums, output_path):
    df = pd.DataFrame()
    df['date'] = pd.date_range('2120-05-01', periods=60).strftime('%Y%m%d')
    names=['date', 'hour', 'T', 'H', 'W_d', 'W_v', 'W_f', 'cond']
    for i, city in enumerate(city_list):
        df_weather = pd.read_csv(os.path.join(data_path, f'city_{city}/weather.csv'),
                                 header=None, 
                                 names=names)        
        df_weather = df_weather[df_weather['hour']>=5]
        
        sr_weath = df_weather.groupby('date')['T'].mean().reset_index(drop=True)
#         sr_weath_mean = sr_weath.copy()
#         sr_weath_mean[:] = sr_weath.mean()
        sr_weath_pred = np.zeros(90)
        sr_weath_pred[:60] = sr_weath.copy()
        sr_weath_pred[60] = sr_weath_pred[58:60].mean()
        
#         lmd = 0.9   
#         sr_weath_stable = sr_weath[:20].median()     
#         for j in range(60, len(sr_weath_pred)):
#             sr_weath_pred[j] = sr_weath_stable * (1-lmd) + sr_weath_pred[i-1] * lmd  

        order = sorted(range(region_nums[i]), key=lambda x:str(x))
        for idx in order:
            df[f'{city}_{idx}'] = sr_weath

    df.to_csv(os.path.join(output_path, 'temperature.csv'), index=None) 

## Running

In [14]:
data_path = './dataset/train_data_all'
output_path = './dataset/data_processed'
city_list = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"]

region_nums = []
for city in city_list:
    df = pd.read_csv(os.path.join(data_path, f'city_{city}', 'grid_attr.csv'), header=None, names=['x', 'y', 'region'])
    region_nums.append(len(set(df['region'])))
    
date_range = pd.date_range('2120-05-01', '2120-06-29')
date_idx = {int(date.strftime('%Y%m%d')):ind 
            for date, ind in zip(date_range, range(len(date_range)))}
grids_table = Grids(data_path, city_list)        

In [None]:
transfer_coord2ID(data_path, city_list, output_path)
density_process(data_path, city_list, output_path)
transfer_index(city_list, output_path)
migration(data_path, city_list, output_path)
infection_process(data_path, city_list, region_nums, output_path)
combine_city_migration(data_path, city_list, output_path)

weather_pred(data_path, city_list, region_nums, output_path)
density_int(output_path, date_range)