# 0. prepairation

## 0.1. import packages

In [1]:
import sys
import os

In [2]:
import time
import datetime
from dateutil.relativedelta import relativedelta

In [3]:
import numpy as np
import pandas as pd

import glob
import pickle
import yaml

In [4]:
import re
import itertools
from tqdm import tqdm

In [5]:
from logging import getLogger, FileHandler, Formatter # 途中で誤ってloggingを汚さないように個別にimport
from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL 

## 0.2. append system path

In [6]:
path_sys_base="/home/jovyan/work"

## 0.3. define classes

In [7]:
class DataConverter(): 
    def __init__(self):
        pass

    def get_sample_dtype_def(self): 
        dtype_def ={
            "int64": ["Marital_Status", "Occupation"], 
            "float64": ["Purchase"], 
            "datetime64": ["hogehoge"]
        }
    
    def convert_dtype_with_dtype_def(self, df_in, dtype_def): 
        for clnm in df_in.columns: 
            df_in[clnm] = df_in[clnm].astype("str")
        
        for dtype, clnms in dtype_def.items(): 
            if dtype == "datetime64":
                for clnm in clnms: 
                    df_in[clnm] = pd.to_datetime(df_in[clnm])
            else: 
                for clnm in clnms: 
                    df_in[clnm] = df_in[clnm].astype(dtype)

    def invert_dict_key_value(self, in_dict, value_type="list"): 
        if value_type=="str":
            ## key に対して Value　が string であるようなdictionary に対して、 key と Value を反転させる
            grp_val_unique = pd.Series([v for k, v in in_dict.items()]).unique()
            
            rev_dict = {}
            for grp_val in grp_val_unique: 
                rev_dict[grp_val] = [k for k, v in in_dict.items() if v==grp_val]
        elif value_type=="list":
            ## key に対して Value　が list であるようなdictionary に対して、 key と Value を反転させる
            grp_val_unique = []
            for k, v in in_dict.items(): 
                grp_val_unique.extend(pd.Series(v).unique().tolist())
            grp_val_unique = pd.Series(grp_val_unique).unique()
            
            rev_dict = {}
            for grp_val in grp_val_unique: 
                rev_dict[grp_val] = [k for k, v in in_dict.items() if grp_val in v]
        else: 
            print("value_type must be \"str\" or \"list\"!")
        return rev_dict


In [8]:
class GroupingValue():
    def __init__(self, grouping_def_): 
        self._grouping_def_ = grouping_def_

    def get_sample_grouping_def_(self): 
        ret_dict_num = {0.2:"GRP01", 0.95:"GRP02", 1.0:"GRP03"}
        ret_dict_cat = {
            "A":"GRP01", 
            "B":"GRP01", 
            "C":"GRP01", 
            "D":"GRP02", 
            "E":"GRP02", 
            "F":"GRP03"
        }
        return ret_dict_cat, ret_dict_num

    def grouping_num(self, in_num, is_debug=False):
        
        thresholds = [k for k, v in self._grouping_def_.items()]
        
        lt_thrshld_list = [k for k, v in self._grouping_def_.items() if in_num <= k]
        
        if len(lt_thrshld_list) > 0: 
            matched_threshold = min(lt_thrshld_list)
            ret_val = self._grouping_def_[matched_threshold]
        else: 
            matched_threshold = None
            ret_val = "other"
        
        if is_debug:
            return ret_val, matched_threshold, in_num
        else: 
            return ret_val

    def grouping_cat(self, in_cat, is_debug=False):

        if in_cat in [s for s in self._grouping_def_.keys()]: 
            ret_val = self._grouping_def_[in_cat]
        else: 
            ret_val = "other"
        
        if is_debug:
            return ret_val, in_cat
        else: 
            return ret_val


In [9]:
class BasicStatistics(): 
    def __init__(self):
        pass


    def get_unique_and_null_count(self, df_in): 
    
        unq_ ={}
        for i1, clnm in enumerate(df_in.columns): 
            i1
            unq_[(i1, clnm)] = pd.DataFrame(
                [
                    {
                        "null_count": df_in[clnm].isna().sum(), 
                        "null_ratio": df_in[clnm].isna().sum()/len(df_in), 
                        "unique": len(df_in.groupby(clnm).count())
                    }
                ]
            )
        
        return pd.concat(unq_)


    def get_unique_value_count(self, df_in, is_object_only=True): 
        clnms_orig = pd.Series(df_in.columns)
        cond_raw = df_in.dtypes == "object"
        cond_idx = pd.Series({i1: is_obj for i1, (clnm, is_obj)  in enumerate(cond_raw.items())})
        if is_object_only: 
            clnms_iter = clnms_orig[cond_idx]
        else: 
            clnms_iter = clnms_orig

        df_value_COUNT_ ={}
        for i1, clnm in clnms_iter.items():
            df_tmp = df_in.reset_index().groupby([clnm]).count().loc[:, ["index"]].reset_index()
            df_tmp.columns =["value", "COUNT"]
            
            df_value_COUNT_[(i1, clnm)] = df_tmp.sort_values(["COUNT"], ascending=[False]).reset_index(drop=True).rename(index = lambda s:s+1)

        df_value_COUNT = pd.concat(df_value_COUNT_).reset_index().rename(columns={"level_0":"column_id", "level_1":"column", "level_2":"seq"})

        return df_value_COUNT.sort_values(["column_id", "COUNT"], ascending=[True, False]).reset_index(drop=True)


    def get_and_shape_unique_value_count(self, df_in, max_seq=50): 
        df_unique = self.get_unique_value_count(df_in)
    
        unique_values_top50_ = {}
        for grpby_val, df_unique_sub in df_unique.groupby(["column_id", "column"]): 
            unique_values_top50_[grpby_val] = pd.DataFrame(
                df_unique_sub.reset_index(drop=True).loc[:(max_seq-1), :].apply(lambda s:"{}: {:,d}".format(s["value"], s["COUNT"]), axis=1)
            ).T
            
        return pd.concat(unique_values_top50_)


    def get_stats_numerical_column(self, df_in, quantile_points=[s/10 for s in range(0, 11, 1)]): 
    
        clnms_orig = pd.Series(df_in.columns)
        cond_raw = df_in.dtypes != "object"
        cond_idx = pd.Series({i1: is_obj for i1, (clnm, is_obj)  in enumerate(cond_raw.items())})
        
        df_desc_ ={}
        for i1, clnm in clnms_orig[cond_idx].items(): 
            tmp_dict = {"{}%".format(10*i1):qt for i1, qt in enumerate(np.quantile(df_in.loc[:, clnm], q=quantile_points))}
            tmp_dict.update(
                {
                    "mean": np.mean(df_in.loc[:, clnm]), 
                    "std": np.std(df_in.loc[:, clnm])
                }
            )
            
            df_desc_[(i1, clnm)] = pd.DataFrame([tmp_dict])
        
        return pd.concat(df_desc_)

## 0.4. define functions

## 0.5. set notebooks parameters

In [10]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

# 1. constants

In [11]:
process_id = "01_read_data_v001"
exec_st_datetime_string = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

# 2. instantiate classes

In [12]:
flnm_log = "../log/{}_{}.log".format(process_id, exec_st_datetime_string)

In [13]:
cstmlggr = getLogger()
cstmlggr.setLevel(INFO)

handler=FileHandler(flnm_log)
cstmlggr.addHandler(handler)

## ログファイルフォーマットの定義
handler.setFormatter(Formatter("%(asctime)s, %(levelname)s, %(filename)s, %(name)s, %(funcName)s, %(lineno)s, %(message)s"))

In [14]:
cstmlggr.info("start")

# 3. read files and convert data type

In [15]:
path_in_data = "{}/data_management/00_orig/walmart".format(path_sys_base)
flnm_in_data = "walmart.csv"

In [16]:
df_walmart = pd.read_table(
    "{}/{}".format(path_in_data, flnm_in_data), 
    delimiter=",", 
    header=0, 
    dtype="object", 
    quotechar="\"", 
    quoting=1, ## quoting = QUOTE_MINIMAL (0), QUOTE_ALL (1), QUOTE_NONNUMERIC (2) or QUOTE_NONE (3).
    encoding="utf-8"
)


# 4. basic statistics

In [17]:
bs = BasicStatistics()

In [18]:
df_describe_bf = pd.concat(
    [
        bs.get_unique_and_null_count(df_walmart), 
        bs.get_and_shape_unique_value_count(df_walmart, max_seq=10)
    ], axis=1
)

In [19]:
df_describe_bf = df_describe_bf.reset_index().rename(columns={"level_0":"column_id", "level_1":"column", "level_2":"DUMMY"})
df_describe_bf.columns = [str(s) for s in df_describe_bf.columns]

# 4. dtype convert

In [20]:
dc = DataConverter()

## 4.0. test

### 4.0.1. generate test data

In [21]:
# df_in = df_walmart.copy()

rng = np.random.default_rng()
numofrecord = 100
df_in = pd.concat(
    [
        pd.DataFrame(pd.Series([s for s in range(0, numofrecord, 1)]), columns=["id"]), 
        pd.DataFrame(pd.Series([rng.choice([chr(ord("A")+s) for s in range(0, 26, 1)]) for s in range(0, numofrecord, 1)]), columns=["category"]), 
        pd.DataFrame(pd.Series([pd.to_datetime("20240810") + relativedelta(months=s) for s in range(0, numofrecord, 1)]), columns=["date"]), 
        pd.DataFrame(pd.Series([int(rng.normal(1000, 10)) for s in range(0, numofrecord, 1)]), columns=["price"])
    ], axis=1
)

In [22]:
df_in.dtypes

id                   int64
category            object
date        datetime64[ns]
price                int64
dtype: object

### 4.0.2. convert to string 

In [23]:
dtype_def ={
    "int64": [], 
    "float64": [], 
    "datetime64": []
}

dc.convert_dtype_with_dtype_def(df_in, dtype_def)

In [24]:
type(df_in.loc[0, "date"])

str

### 4.0.3. convert to appropriate dtype

In [25]:
dtype_def ={
    "int64": [], 
    "float64": ["price"], 
    "datetime64": ["date"]
}

dc.convert_dtype_with_dtype_def(df_in, dtype_def)

In [26]:
df_in.dtypes

id                  object
category            object
date        datetime64[ns]
price              float64
dtype: object

## 4.1. apply to walmart data

In [27]:
dtype_def ={
    "int64": ["Marital_Status", "Occupation"], 
    "float64": ["Purchase"], 
    "datetime64": []
}

dc.convert_dtype_with_dtype_def(df_walmart, dtype_def)

## 4.2. get describe after convertt dtype

In [28]:
df_describe_af = pd.concat(
    [
        bs.get_unique_and_null_count(df_walmart), 
        bs.get_stats_numerical_column(df_walmart), 
        bs.get_and_shape_unique_value_count(df_walmart, max_seq=50)
    ], axis=1
)

In [29]:
df_describe_af = df_describe_af.reset_index().rename(columns={"level_0":"column_id", "level_1":"column", "level_2":"DUMMY"})
df_describe_af.columns = [str(s) for s in df_describe_af.columns]

# 5. output

## 5.1. align dtype of describe

In [30]:
dtype_def_describe = dc.invert_dict_key_value(
    {k:str(v) for k, v in df_describe_bf.dtypes.items()}, 
    value_type="str"
)
_ = dtype_def_describe.pop("object")
dc.convert_dtype_with_dtype_def(df_describe_bf, dtype_def_describe)

In [31]:
dtype_def_describe = dc.invert_dict_key_value(
    {k:str(v) for k, v in df_describe_af.dtypes.items()}, 
    value_type="str"
)
_ = dtype_def_describe.pop("object")
dc.convert_dtype_with_dtype_def(df_describe_af, dtype_def_describe)

## 5.2. set output specification

In [32]:
path_out = "../data"
data_out_ = {
    "walmart": df_walmart, 
    "desc_01_bf": df_describe_bf, 
    "desc_02_af": df_describe_af
}

## 5.3. do

In [33]:
for k, v in data_out_.items(): 
    v.to_parquet("{}/{}.parquet".format(path_out, k))

    v.to_csv(
        path_or_buf="{}/{}.csv".format(path_out, k), 
        sep="\t", 
        quotechar="\"", 
        quoting=1, ## quoting = QUOTE_MINIMAL (0), QUOTE_ALL (1), QUOTE_NONNUMERIC (2) or QUOTE_NONE (3).
        encoding="utf-8", 
        header=True, 
        index=False
    )


# 6. read parquet file

## 6.1.  with dd

In [34]:
import dask.dataframe as dd

In [35]:
dskdf_ = {}
for k, v in data_out_.items(): 
    dskdf_[k] = dd.read_parquet(("{}/{}.parquet".format(path_out, k)))

In [36]:
df_walmart = dskdf_["walmart"].compute()

In [37]:
df_walmart.dtypes

User_ID                       string[pyarrow]
Product_ID                    string[pyarrow]
Gender                        string[pyarrow]
Age                           string[pyarrow]
Occupation                              int64
City_Category                 string[pyarrow]
Stay_In_Current_City_Years    string[pyarrow]
Marital_Status                          int64
Product_Category              string[pyarrow]
Purchase                              float64
dtype: object

## 6.2.  with duck DB

In [38]:
# !pip install duckdb ## ENABLE when factory reset
import duckdb

In [39]:
query_str = """
(
    SELECT * FROM parquet_scan('{train_path}') 
)
""".format(
    train_path="{}/{}.parquet".format(path_out, "walmart")
)
print(query_str)


(
    SELECT * FROM parquet_scan('../data/walmart.parquet') 
)



In [40]:
dckdf_ = {}

con = duckdb.connect()
dckdf_["walmart"] = con.query(query_str).df()
con.close()

In [41]:
dckdf_["walmart"]

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,8370.0
1,1000001,P00248942,F,0-17,10,A,2,0,1,15200.0
2,1000001,P00087842,F,0-17,10,A,2,0,12,1422.0
3,1000001,P00085442,F,0-17,10,A,2,0,12,1057.0
4,1000002,P00285442,M,55+,16,C,4+,0,8,7969.0
...,...,...,...,...,...,...,...,...,...,...
550063,1006033,P00372445,M,51-55,13,B,1,1,20,368.0
550064,1006035,P00375436,F,26-35,1,C,3,0,20,371.0
550065,1006036,P00375436,F,26-35,15,B,4+,1,20,137.0
550066,1006038,P00375436,F,55+,1,C,2,0,20,365.0
