In [1]:
from azureml.core import Workspace, Dataset, Datastore
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DockerConfiguration
from azureml.pipeline.core import PipelineData, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
from azureml.data import OutputFileDatasetConfig

import os
import pandas as pd
import datetime as dt
from dateutil.relativedelta import relativedelta
from pathlib import Path

In [2]:
# from joblib import Parallel, delayed
# import multiprocessing as mp

# from tqdm.notebook import tqdm
# tqdm.pandas()

In [3]:
# Input data raw files (path inside the blob container)
input_full_raw_data_path = 'InputData/FullRawFiles'

# output path
# Note: This notebook helps put new data into Validation and Monitoring folder structure.
# Due to data inconsistency, there are codes help convert data type, select columns,... into original dataset
# For example: in Demographics, there are different between CIF_CREATE data type
# output_raw_data_path = 'InputData/InputRawFiles'
output_raw_data_path = 'InputData/InputRawFiles_VM'

In [4]:
%%time
subscription_id = 'bdea9b80-e147-4dd1-9e22-afd1228b6d1a'
resource_group = 'casa-churn-analysis-ey-demo'
workspace_name = 'AzureConnection'

# Create the Workspace
ws = Workspace(subscription_id, resource_group, workspace_name)

CPU times: user 202 ms, sys: 13 ms, total: 215 ms
Wall time: 405 ms


In [5]:
user_name = 'qa'

In [6]:
blob_storage_name = 'azureconstorage3630ce4c4'
blob_container = 'azureml'
blob_key = 'OaxkL/pE4/XVJ1nJiZABE8wvsfSN434A66MiyiyywQOhYEDRYXS0t9DP+xLDT+RS8Alcygkszd+W+ASt54cY2w=='

# Datastore name: We can see dataAssets created in this, this will also refer to the default blob storage specified
datastore_name = f"{user_name}_blob_datastore"

In [7]:
%%time
# Blob storage to Datastore
blob_data_store = Datastore.register_azure_blob_container(
    workspace = ws, 
    datastore_name = datastore_name,
    account_name = blob_storage_name,
    container_name = blob_container,
    account_key = blob_key
)
# Default Datastore
# default_store = ws.get_default_datastore()
default_store = blob_data_store

CPU times: user 91.7 ms, sys: 0 ns, total: 91.7 ms
Wall time: 755 ms


In [8]:
dataAssetName_suffix = ''

In [115]:
%%time
# List of input csv files to read - use only csv files
map_dataAsset_inputFile = {
    # 'transaction_data' : 'TRANSACTION_VM.csv',
    # 'demographics_data' : 'Customer_Demographic.csv',
    'demographics_data_v' : 'CUSTOMER_DEMOGRAPHICS_V.csv',
    # 'demographics_data_m' : 'CUSTOMER_DEMOGRAPHICS_M.csv', # encoding = 'latin1'
    # 'account_data' : 'AR_LCS_VM.csv'
}

# Create and Register Data Assets
for dataAssetName, inputFileName in map_dataAsset_inputFile.items():
    dataAssetNameOnAzure = f'{user_name}_{dataAssetName}{dataAssetName_suffix}'
    csv_path = [(blob_data_store, f"{input_full_raw_data_path}/{inputFileName}")]
    globals()[dataAssetName] = Dataset.Tabular.from_delimited_files(path=csv_path,encoding = 'latin1')

CPU times: user 287 ms, sys: 41.3 ms, total: 328 ms
Wall time: 1.19 s


In [10]:
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", 50)
pd.options.display.float_format = '{:.3f}'.format

In [11]:
import datetime
def fun_datetime_format(date_str):
    ''' This function convert any type of datetime format into date object of "%d-%b-%y".
     Ex: Convert this 29-Oct-14 12:00:00 AM into 29-Oct-14 
     Or 03-12-07 0:00 into 03-Dec-07.
     
     Need to enter the specific type of datetime. Reference: https://docs.python.org/3/library/datetime.html '''
    
    date_type = "%d-%b-%y %H:%M:%S %p"

    datetime_obj = datetime.datetime.strptime(date_str, date_type)
    formatted_date = datetime_obj.strftime("%d-%b-%y")
    return formatted_date

In [12]:
def fun_convert_datetime_to_object(date_time):
    ''' This function converts datetime type into object type.
    Ex: 2007-11-08 into 08-Nov-07'''
    date_obj = date_time.date().strftime("%d-%b-%y")
    return date_obj

# Demographics

In [128]:
df_demo_v = demographics_data_v.to_pandas_dataframe()

In [163]:
df_demo_m = demographics_data_m.to_pandas_dataframe()

In [131]:
customer_segment = {
    'Phổ thông':'Standard',
    'Ph? thông':'Standard',
    'Premier' : 'Premier',
    'Thân thiết':'Vip',
    'Thân thi?t':'Vip', 
    'Elite': 'Elite', 
    'Private': 'Private',
    'Chưa phân loại': 'Standard',
    'Phá»• thÃ´ng': 'Standard',
    'ThÃ¢n thiáº¿t':'Vip', 
    'ChÆ°a phÃ¢n loáº¡i': 'Standard'
}

In [132]:
df_demo_v['CUSTOMER_SEGMENT'] = df_demo_v['CUSTOMER_SEGMENT'].map(customer_segment)
df_demo_v['CIF_CREATE'] = df_demo_v['CIF_CREATE'].apply(fun_datetime_format)
df_demo_v.rename(columns={'ï»¿MOBILE_FLAG':'MOBILE_FLAG'},inplace=True)
df_demo_v['MOBILE_FLAG'] = df_demo_v['MOBILE_FLAG'].astype({'MOBILE_FLAG': 'int64'}) 

In [164]:
df_demo_m['CUSTOMER_SEGMENT'] = df_demo_m['CUSTOMER_SEGMENT'].map(customer_segment)
df_demo_m['CIF_CREATE'] = df_demo_m['CIF_CREATE'].apply(fun_convert_datetime_to_object)

In [167]:
# cus_lst_v = set(df_demo_v.HASHED_CIF.unique())
# cus_lst_m = set(df_demo_m.HASHED_CIF.unique())
# cus_lst_same = cus_lst_v.intersection(cus_lst_m)
# tbl_m = df_demo_m[~df_demo_m['HASHED_CIF'].isin(cus_lst_same)]
# tbl_v = df_demo_v[~df_demo_v['HASHED_CIF'].isin(cus_lst_same)]
# tbl_m_same = df_demo_m[df_demo_m['HASHED_CIF'].isin(cus_lst_same)]
# tbl_v_same = df_demo_v[df_demo_v['HASHED_CIF'].isin(cus_lst_same)]

In [189]:
df_demo = pd.concat([df_demo_v, df_demo_m], ignore_index=True).drop_duplicates(subset='HASHED_CIF').reset_index(drop=True)
df_demo.shape

(394043, 19)

In [190]:
dataAssetName = 'demographic_data'
inputFileName = 'Customer_Demographic.csv'
# Write to azure file system folder as parquet files
local_path = f'../tempData/{inputFileName}'
file_path = f'{local_path}/{inputFileName}'
Path(local_path).mkdir(parents=True, exist_ok=True)
df_demo.to_csv(file_path, index=False, header=True,encoding='utf-8')

# Upload the files to blob data store
target_path = f'{output_raw_data_path}/{dataAssetName}'
blob_data_store.upload_files([file_path], target_path=target_path, overwrite=True, show_progress=True)
    
if os.path.exists("../tempData"):
    os.system("rm -rf "+"../tempData")

Uploading an estimated of 1 files
Uploading ../tempData/Customer_Demographic.csv/Customer_Demographic.csv
Uploaded ../tempData/Customer_Demographic.csv/Customer_Demographic.csv, 1 files out of an estimated total of 1
Uploaded 1 files


# Account

In [14]:
df_acc = account_data.to_pandas_dataframe()

In [15]:
df_acc['DATE_OF_STATUS'] = df_acc['DATE_OF_STATUS'].apply(fun_datetime_format)

In [17]:
dataAssetName = 'account_data'
inputFileName = 'AR_LCS.csv'
# Write to azure file system folder as parquet files
local_path = f'../tempData/{inputFileName}'
file_path = f'{local_path}/{inputFileName}'
Path(local_path).mkdir(parents=True, exist_ok=True)
df_acc.to_csv(file_path, index=False, header=True)

# Upload the files to blob data store
target_path = f'{output_raw_data_path}/{dataAssetName}'
blob_data_store.upload_files([file_path], target_path=target_path, overwrite=True, show_progress=True)
    
if os.path.exists("../tempData"):
    os.system("rm -rf "+"../tempData")

"datastore.upload_files" is deprecated after version 1.0.69. Please use "FileDatasetFactory.upload_directory" instead. See Dataset API change notice at https://aka.ms/dataset-deprecation.


Uploading an estimated of 1 files
Uploading ../tempData/AR_LCS.csv/AR_LCS.csv
Uploaded ../tempData/AR_LCS.csv/AR_LCS.csv, 1 files out of an estimated total of 1
Uploaded 1 files


# Transaction

In [11]:
df_trx = transaction_data.to_pandas_dataframe()

In [None]:
# def applyParallel(dfGrouped, func, args_lst=[], njobs=2*mp.cpu_count()-1):
#     ''' Run the grouped data in parallel. '''
#     retLst = Parallel(n_jobs = njobs, verbose = 2)(delayed(func)(group, args_lst) for name, group in dfGrouped)
#     return pd.concat(retLst)

In [None]:
def fun_write_yearly_monthly_file(df_year, arg_list):
    table_name = arg_list[0]
    month = df_year['THANG'].unique()[0]
    year = df_year['NAM'].unique()[0]
    if len(str(month)) == 1:
        month = '0' + str(month)
    
    # Write to azure file system folder as parquet files
    local_path = f'../tempData/{year}_{month}'
    file_path = f'{local_path}/{year}_{month}.csv'
    Path(local_path).mkdir(parents=True, exist_ok=True)
    df_year.to_csv(file_path, index=False, header=True)
    
    # Upload the files to blob data store
    target_path = f'{output_raw_data_path}/{table_name}/{year}_{month}'
    blob_data_store.upload_files([file_path], target_path=target_path, overwrite=True, show_progress=True)

    print(f'Uploaded {year}_{month}.csv to {target_path}.')
    
    df = pd.DataFrame({
        'year' : [year], 'month' : [month], 'blob_path' : [target_path]}, 
        columns=['year', 'month', 'blob_path']
    )
    return df

In [20]:
%%time
# Parallel function
table_name = 'transaction_data'
# df = applyParallel(
#     dfGrouped=df_trx.groupby(['NAM', 'THANG']), 
#     func=fun_write_yearly_monthly_file, 
#     args_lst=[table_name],
# )

# Non-parallel function - only for testing
df_status = df_trx.groupby(['NAM', 'THANG']).apply(fun_write_yearly_monthly_file, ([table_name]))
df_status.reset_index(drop=True, inplace=True)
if os.path.exists("../tempData"):
    os.system("rm -rf "+"../tempData")

"datastore.upload_files" is deprecated after version 1.0.69. Please use "FileDatasetFactory.upload_directory" instead. See Dataset API change notice at https://aka.ms/dataset-deprecation.


Uploading an estimated of 1 files
Uploading ../tempData/2022_06/2022_06.csv
Uploaded ../tempData/2022_06/2022_06.csv, 1 files out of an estimated total of 1
Uploaded 1 files
Uploaded 2022_06.csv to InputData/InputRawFiles_VM/transaction_data/2022_06.
Uploading an estimated of 1 files
Uploading ../tempData/2022_07/2022_07.csv
Uploaded ../tempData/2022_07/2022_07.csv, 1 files out of an estimated total of 1
Uploaded 1 files
Uploaded 2022_07.csv to InputData/InputRawFiles_VM/transaction_data/2022_07.
Uploading an estimated of 1 files
Uploading ../tempData/2022_08/2022_08.csv
Uploaded ../tempData/2022_08/2022_08.csv, 1 files out of an estimated total of 1
Uploaded 1 files
Uploaded 2022_08.csv to InputData/InputRawFiles_VM/transaction_data/2022_08.
Uploading an estimated of 1 files
Uploading ../tempData/2022_09/2022_09.csv
Uploaded ../tempData/2022_09/2022_09.csv, 1 files out of an estimated total of 1
Uploaded 1 files
Uploaded 2022_09.csv to InputData/InputRawFiles_VM/transaction_data/2022_

Bad pipe message: %s [b'\x96(\x81\x9ddaD\xf1\x89}\x1dyJ`\x08\x05\xb0\x14 \x13\xbf\xc4\x97\x06\xf2\x99\x8bWr\xff\xf4\xcc \x8c']
Bad pipe message: %s [b"\xaaC\xc8\xa9\xd8\xbb\xe8\xf2%\x8ex\xa7V\x9b\xf7\xb7\xd6\x8e\x00\x00|\xc0,\xc00\x00\xa3\x00\x9f\xcc\xa9\xcc\xa8\xcc\xaa\xc0\xaf\xc0\xad\xc0\xa3\xc0\x9f\xc0]\xc0a\xc0W\xc0S\xc0+\xc0/\x00\xa2\x00\x9e\xc0\xae\xc0\xac\xc0\xa2\xc0\x9e\xc0\\\xc0`\xc0V\xc0R\xc0$\xc0(\x00k\x00j\xc0#\xc0'\x00g\x00@\xc0\n\xc0\x14\x009\x008\xc0\t\xc0\x13\x003\x00", b'\x9d\xc0\xa1\xc0\x9d\xc0Q\x00\x9c\xc0\xa0\xc0\x9c\xc0P\x00=\x00<\x005\x00/\x00\x9a\x00\x99\xc0\x07\xc0\x11\x00\x96\x00\x05\x00\xff\x01\x00\x00j\x00\x00\x00\x0e\x00\x0c\x00\x00']
Bad pipe message: %s [b"\xbf\xa9\xbf\x02\x13\xa3b(\xfa\xf7\r\x06tQ\x9a~\xfe\xef\x00\x00\xa6\xc0,\xc00\x00\xa3\x00\x9f\xcc\xa9\xcc\xa8\xcc\xaa\xc0\xaf\xc0\xad\xc0\xa3\xc0\x9f\xc0]\xc0a\xc0W\xc0S\xc0+\xc0/\x00\xa2\x00\x9e\xc0\xae\xc0\xac\xc0\xa2\xc0\x9e\xc0\\\xc0`\xc0V\xc0R\xc0$\xc0(\x00k\x00j\xc0s\xc0w\x00\xc4\x00\xc3\xc0#\xc0'\

In [None]:
df_status.head(50)