In [1]:
import pandas as pd
import dask.dataframe as dd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()

### Load Data

In [2]:
# Read and merge for memory saving
df = dd.concat([
    dd.read_csv('../data/TOL_202309.txt', delimiter='|'),
    dd.read_csv('../data/TOL_202310.txt', delimiter='|'),
    dd.read_csv('../data/TOL_202311.txt', delimiter='|')
], ignore_index=True)

In [3]:
# Set datetime
df['BUY_DATE'] = dd.to_datetime(df['BUY_DATE'])

In [4]:
# Set date as index
# df = df.set_index('BUY_DATE')

### Preprocessing

In [2]:
df.head()

In [6]:
df.dtypes

PACKAGE_CODE      string[pyarrow]
PREFIX            string[pyarrow]
CHARGING_TYPE     string[pyarrow]
CHANNEL           string[pyarrow]
BUY_DATE           datetime64[ns]
FUNCTION_VAS      string[pyarrow]
GROUP_CHANNEL     string[pyarrow]
BUY_TM_KEY_MTH              int64
MSISDN                      int64
dtype: object

In [3]:
df.columns

In [8]:
# Drop unnecessary columns, redundant
df = df.drop(columns=['BUY_TM_KEY_MTH', 'PREFIX', 'CHARGING_TYPE', 'GROUP_CHANNEL'])

In [9]:
# Create premium number flag
df['Premium_Number'] = df['FUNCTION_VAS'].map(lambda x: 1 if x == 'PREM_ADD_PACKAGE' or x == 'PREM_CANCEL' else 0, meta=('x', 'string[pyarrow]'))

In [10]:
# Map function (transaction type) into group
df['FUNCTION_VAS'] = df['FUNCTION_VAS'].map({
    # Actual function name removed (confidential data) 
    'FUNCTION 1': 'RC',
    'FUNCTION 2': 'BUY',
    'FUNCTION 3': 'CANCEL',
})

In [11]:
# Drop incorrect and excluded channels
df = df[~df['CHANNEL'].isin(['CHANNEL 1', 'CHANNEL 2', 'CHANNEL 3', 'CHANNEL 4'])] # Actual channel name removed (confidential data) 

In [4]:
df.head()

In [13]:
# Drop GROUP_CHANNEL and Map channels into groups
mapping = {
    # Actual channel name removed (confidential data)
    'CHANNEL 1': 'System',
    'CHANNEL 2': 'Digital',
    'CHANNEL 3': 'Kiosk/POS',
    'CHANNEL 4': 'Digital',
    'CHANNEL 5': 'Legacy',
    'CHANNEL 6': 'Other'
}

df['CHANNEL'] = df['CHANNEL'].map(lambda x: mapping[x], meta=('x', 'string[pyarrow]'))

In [14]:
# Drop unfocused channel group
df = df[~df['CHANNEL'].isin(['Customer Support', 'Campaign', 'Borrow'])]

In [15]:
# Select only BUY transaction
df = df[(df['FUNCTION_VAS'] == 'BUY') | (df['FUNCTION_VAS'] == 'RC')]

In [16]:
df.columns

Index(['PACKAGE_CODE', 'CHANNEL', 'BUY_DATE', 'FUNCTION_VAS', 'MSISDN',
       'Premium_Number'],
      dtype='object')

In [17]:
df['CHANNEL'].unique().compute()

In [18]:
# Prepare aggregation dict
# agg_dict = {col: 'first' for col in ['FUNCTION_VAS', 'Premium_Number']}
# agg_dict.update({col: 'sum' for col in ['SYS_TRX', 'DIGITAL_TRX', 'KIOSK_POS_TRX', 'LEGACY_TRX', 'AGENT_TRX']})

# temp_df = df[df['BUY_DATE'].dt.month == 9]

# # Set index
# temp_df['BUY_DATE'] = pd.to_datetime(temp_df['BUY_DATE'])
# temp_df.set_index('BUY_DATE', inplace=True)

In [22]:
# Dummified Trx count from each channel as columns
# temp_df['SYS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'System' else 0)
# temp_df['DIGITAL_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Digital' else 0)
# temp_df['KIOSK_POS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Kiosk/POS' else 0)
# temp_df['LEGACY_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Legacy' else 0)
# temp_df['AGENT_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Agent' else 0)

# temp_df.drop(columns='CHANNEL', inplace=True)

In [19]:
# Count number of transactions for each package/MSISDN
# temp_df = temp_df.groupby(['PACKAGE_CODE', 'MSISDN']).resample('W').agg(agg_dict).reset_index().drop(columns='BUY_DATE')

In [None]:
# push to file
# temp_df.to_parquet(f'../data/2013_09_parquet')

### Monthly Aggregation

In [17]:
months = [9, 10, 11]

# Prepare aggregation dict
agg_dict = {col: 'first' for col in ['FUNCTION_VAS', 'Premium_Number']}
agg_dict.update({col: 'sum' for col in ['SYS_TRX', 'DIGITAL_TRX', 'KIOSK_POS_TRX', 'LEGACY_TRX', 'AGENT_TRX']})

# Push into pandas dataframe for calculation
for month in months:
    temp_df = df[df['BUY_DATE'].dt.month == month].compute()

    # Set index
    temp_df['BUY_DATE'] = pd.to_datetime(temp_df['BUY_DATE'])
    temp_df.set_index('BUY_DATE', inplace=True)

    # Dummified Trx count from each channel as columns
    temp_df['SYS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'System' else 0)
    temp_df['DIGITAL_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Digital' else 0)
    temp_df['KIOSK_POS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Kiosk/POS' else 0)
    temp_df['LEGACY_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Legacy' else 0)
    temp_df['AGENT_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Agent' else 0)

    temp_df.drop(columns='CHANNEL', inplace=True)

    # Count number of transactions for each package/MSISDN
    # temp_df = temp_df.groupby(['PACKAGE_CODE', 'MSISDN']).resample('W').agg(agg_dict).reset_index().drop(columns='BUY_DATE')
    temp_df = temp_df.groupby(['PACKAGE_CODE', 'MSISDN']).agg(agg_dict).reset_index()


    # push to file
    temp_df.to_parquet(f'../data/2023_{month}_parquet')

### Weekly Aggregation

In [1]:
# months = [9, 10, 11]
# day_range = [(1, 5), (6, 10), (11, 15), (16, 20), (21, 25), (26, 31)]

# # Prepare aggregation dict
# agg_dict = {col: 'first' for col in ['FUNCTION_VAS', 'Premium_Number']}
# agg_dict.update({col: 'sum' for col in ['SYS_TRX', 'DIGITAL_TRX', 'KIOSK_POS_TRX', 'LEGACY_TRX', 'AGENT_TRX']})

# # Push into pandas dataframe for calculation
# for month in months:
#     for day in day_range:
#         temp_df = df[(df['BUY_DATE'].dt.month == month) & (df['BUY_DATE'].dt.day >= day[0]) & (df['BUY_DATE'].dt.day <= day[1])].compute()

#         # Set index
#         temp_df['BUY_DATE'] = pd.to_datetime(temp_df['BUY_DATE'])
#         temp_df.set_index('BUY_DATE', inplace=True)

#         # Dummified Trx count from each channel as columns
#         temp_df['SYS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'System' else 0)
#         temp_df['DIGITAL_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Digital' else 0)
#         temp_df['KIOSK_POS_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Kiosk/POS' else 0)
#         temp_df['LEGACY_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Legacy' else 0)
#         temp_df['AGENT_TRX'] = temp_df['CHANNEL'].map(lambda x: 1 if x == 'Agent' else 0)

#         temp_df.drop(columns='CHANNEL', inplace=True)

#         # Count number of transactions for each package/MSISDN
#         temp_df = temp_df.groupby(['PACKAGE_CODE', 'MSISDN']).resample('W').agg(agg_dict).reset_index().drop(columns='BUY_DATE')

#         # push to file
#         temp_df.to_parquet(f'../data/2023_{month}_{day[0]}_{day[1]}_weekly_parquet')

### Merge

In [None]:
pack_detail_df = pd.read_csv('../data/PACKAGE_DETAIL_FINAL.csv')

In [None]:
# Rename key for merging
pack_detail_df = pack_detail_df.rename(columns={'IPK_CODE': 'PACKAGE_CODE'})

In [None]:
df.shape[0]

19802699

In [None]:
# Merge all data
index = [(0, 6600900), (6600901, 13201800), (13201801, 19802699)]

count = 1
for idx in index:
    temp = df.iloc[idx[0]:idx[1]]
    temp = temp.merge(pack_detail_df, on='PACKAGE_CODE', how='left')

    # Drop null
    temp.dropna(inplace=True)

    temp.to_parquet(f'../data/MERGED_DATA_{count}_parquet')
    count += 1

In [10]:
# Export to csv
# df.to_parquet('../data/ALL_DATA_parquet')