In [1]:
import pandas as pd
import numpy as np
import datetime
import json

import io

In [2]:
# comment this out, the next box below is more dynamic
# import sys
# sys.path.append('/mnt/batch/tasks/shared/LS_root/mounts/clusters/aml-scoring-vm5/code/Users/0szeng/gitrepos/LifeCycle/Lib')

In [3]:
import sys
import os

# 获取 Lifecycle 的路径（Lib 的上一级）
upper_level_folder = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(upper_level_folder)

# 现在可以像导入包一样导入 Lib
# from Lib import aml_datalake_loader, logger
from Lib import *

In [4]:
# no need for below because of __init__.py in Lib folder
# from Lib.logger import Logger
# from Lib.connectors import DataConnector
# from Lib.aml_datalake_loader import DataLoader

In [None]:
# import os
# from aml_datalake_loader import DataLoader

# 拆分路径和文件名
path_n = '/home/azureuser/mycode/gitrepos/AzureMachineLearning/LifeCycle/Scripts'
folder_n = 'dataprep_input'
filenames = ['claim_rollup.csv', 'case_bill.csv', 'claim_rollup_mapping.csv']

# 拼接输出路径
output_dir = os.path.join(path_n, folder_n)

# 初始化并加载数据
for filename in filenames:
    loader = DataLoader(source='datalake', filename=filename, output_dir=output_dir)
    loader.load_data()

INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use Azure ML managed identity


INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use Azure ML managed identity
INFO:azure.identity._credentials.chained:DefaultAzureCredential acquired a token from ManagedIdentityCredential
INFO:azure.identity._internal.msal_managed_identity_client:AzureMLCredential.get_token_info succeeded
INFO:azure.identity._internal.decorators:ManagedIdentityCredential.get_token_info succeeded
INFO:azure.identity._credentials.default:DefaultAzureCredential acquired a token from ManagedIdentityCredential
INFO:azure.monitor.opentelemetry.exporter.export._base:Transmission succeeded: Item received: 1. Items accepted: 1
INFO:azure.monitor.opentelemetry.exporter.export._base:Transmission succeeded: Item received: 4. Items accepted: 4
INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential wil

INFO:azure.monitor.opentelemetry.exporter.export._base:Transmission succeeded: Item received: 7. Items accepted: 7


In [6]:
use_local_data = True
exclude_travel = False

In [7]:
def get_weekly_estimates(logger, cost, claims, exclude_travel):

    logger.debug('Getting weekly estimates', True)

    max_weeks = 20

    exclude_types = ['Travel', 'Reporting']

    # exclude travel activities if required
    if exclude_travel:
        logger.debug('Removing travel activities')
        cost = cost[~cost['type'].isin(exclude_types)]

    # Prepare claim dates
    claims_df = claims[['ClaimNo', 'first_referral', 'DateClosedLast']]
    claims_df['first_referral'] = pd.to_datetime(claims_df['first_referral'])
    claims_df['date_closed'] = pd.to_datetime(claims_df['DateClosedLast'])

    # Work out day differences from date of referral
    activity_df = cost[cost['BillDate'].notnull()]
    activity_df = activity_df.merge(claims_df[['ClaimNo', 'first_referral']], on='ClaimNo')
    activity_df['activity_date'] = pd.to_datetime(activity_df['BillDate'].str[:10])
    activity_df['diff_days'] = (activity_df['activity_date'] - activity_df['first_referral']).dt.days
    activity_df = activity_df[activity_df['diff_days']>=0]

    # Get weekly diffs and aggregate
    activity_df['diff_weeks'] = (np.floor(activity_df['diff_days']/7)+1).astype('int')
    agg = {
        'Id': 'count',
        'CostsTotalExTax':'sum',
        'Duration':'sum'
    }
    weekly_df = activity_df.groupby(['ClaimNo', 'diff_weeks'], as_index=False).agg(agg)
    weekly_df.rename(columns={'Id': 'n_activities', 'CostsTotalExTax':'total_cost', 'Duration':'total_duration'}, inplace=True)

    # Create a separate aggregation by cost type
    weekly_df_by_type = activity_df.groupby(['ClaimNo', 'diff_weeks', 'type'], as_index=False).agg(agg)
    weekly_df_by_type.rename(columns={'Id': 'n_activities', 'CostsTotalExTax':'total_cost', 'Duration':'total_duration'}, inplace=True)
    weekly_df_by_type_pivot = pd.pivot(weekly_df_by_type, index=['ClaimNo', 'diff_weeks'], values=['n_activities', 'total_cost', 'total_duration'], columns=['type'])
    weekly_df_by_type_pivot.fillna(0, inplace=True)    
    weekly_df_by_type_pivot.columns = [':'.join(col) for col in weekly_df_by_type_pivot.columns]
    weekly_df_by_type_pivot.reset_index(inplace=True)

    # Create a DF will all possible week numbers and claim numbers
    week_nos = pd.DataFrame(pd.Series(range(1, max_weeks)), columns=['week_num'])
    claim_nos = pd.DataFrame(activity_df['ClaimNo'].unique(), columns=['ClaimNo'])
    week_nos['join_col']=1
    claim_nos['join_col']=1
    claim_week_nos = claim_nos.merge(week_nos).drop(columns=['join_col'])

    # remove weeks that are beyond the claim closed date
    claims_df['date_closed'] = pd.to_datetime(claims_df['date_closed'].fillna(datetime.date.today()))
    claims_df['diff_days'] = (claims_df['date_closed'] - claims_df['first_referral']).dt.days
    claims_df['week_num_max'] = (np.floor(claims_df['diff_days']/7)+1).astype('int')
    claim_week_nos = claim_week_nos.merge(claims_df[['ClaimNo', 'week_num_max']])    
    claim_week_nos = claim_week_nos[claim_week_nos['week_num'] <= claim_week_nos['week_num_max']]

    # left-join the actual data zero-ing the nulls
    weekly_df_all = claim_week_nos.merge(weekly_df, how='left', left_on=['ClaimNo', 'week_num'], right_on=['ClaimNo', 'diff_weeks'])
    weekly_df_all = weekly_df_all.merge(weekly_df_by_type_pivot, how='left', left_on=['ClaimNo', 'week_num'], right_on=['ClaimNo', 'diff_weeks'])
    weekly_df_all.fillna(0.0, inplace=True)
    
    return weekly_df_all

In [8]:
def generate_milestones(logger, cost_weekly):

    logger.debug('Generating milestones', True)

    milestone_wks = [2,4,6,8,12,16,20]
    active_mins = 10 # threshold for determinning that a week has been active (from analysis)
    cost_weekly['active'] = cost_weekly['total_duration'] >= active_mins

    milestone_dfs = []
    for milestone in milestone_wks:
        cost_weekly_subset = cost_weekly[(cost_weekly['week_num']<=milestone) & (cost_weekly['week_num_max']>=milestone)]
        milestone_df = cost_weekly_subset.groupby('ClaimNo', as_index=False).sum()
        milestone_df[f'activity_score{active_mins}'] = milestone_df['active']/milestone        

        cols_to_keep = []
        cols_to_keep_rename = []
        for col in list(milestone_df.columns):
            if 'n_activities' in col or 'total_cost' in col or 'total_duration' in col or 'activity_score' in col:
                cols_to_keep.append(col)
                cols_to_keep_rename.append(f'{col}_wk{milestone}')
        
        milestone_df.index = milestone_df['ClaimNo']
        milestone_df = milestone_df[cols_to_keep]
        milestone_df.columns = cols_to_keep_rename
        milestone_dfs.append(milestone_df)

    milestones_all = pd.concat(milestone_dfs, axis=1)
    milestones_all['ClaimNo'] = milestones_all.index
    milestones_all.reset_index(drop=True, inplace=True)
    milestones_all.fillna(-1, inplace=True) # This signifies that the milestone is beyond when the case was closed

    return milestones_all

In [9]:
def extract_cost_type(desc):

    desc = desc.lower()

    if 'travel' in desc:
        return 'Travel'

    if 'report' in desc:
        return 'Reporting'

    if 'assessment' in desc:
        return 'Assessment'

    if 'communic' in desc:
        if 'treat' in desc or 'health' in desc or 'medic' in desc:
            return 'CommunicationTreating'
        else:
            return 'CommunicationOther'

    if 'conferenc' in desc:
        return 'CaseConference'

    return 'Other'

In [10]:
def categorise_activities(logger, cost):

    logger.debug('Categorising activities', True)

    # the coding and classifications here are based on analysis conduced in May 2024.

    # categorise from activity name first
    cost['activity_lower'] = cost['ActivityName'].str.lower()
    cost['activity_lower'].fillna('unknown', inplace=True)
    cost['activity_grp'] = 'Other'
    cost.loc[cost['activity_lower'].str.contains('training'), 'activity_grp'] = 'Coaching'
    cost.loc[cost['activity_lower'].str.contains('coaching'), 'activity_grp'] = 'Coaching'
    cost.loc[cost['activity_lower'].str.contains('conference'), 'activity_grp'] = 'Case Conference'
    cost.loc[cost['activity_lower'].str.contains('liais'), 'activity_grp'] = 'Contact'
    cost.loc[cost['activity_lower'].str.contains('phone'), 'activity_grp'] = 'Contact'
    cost.loc[cost['activity_lower'].str.contains('email'), 'activity_grp'] = 'Contact'
    cost.loc[cost['activity_lower'].str.contains('communic'), 'activity_grp'] = 'Contact'
    cost.loc[cost['activity_lower'].str.contains('assessment'), 'activity_grp'] = 'Assessment'
    cost.loc[cost['activity_lower'].str.contains('report'), 'activity_grp'] = 'Report'
    cost.loc[cost['activity_lower'].str.contains('review'), 'activity_grp'] = 'Review'
    cost.loc[cost['activity_lower'].str.contains('travel'), 'activity_grp'] = 'Travel'
    
    # now categorise from template name ---------------------------------------################################### comment-out below since no CaseBillTemplate
    # cost['template_lower'] = cost['TemplateName'].str.lower()
    # cost['template_lower'].fillna('unknown', inplace=True)
    # cost['template_grp'] = 'Other'
    # cost.loc[cost['template_lower'].str.contains('assessment'), 'template_grp'] = 'Assessment'
    # cost.loc[cost['template_lower'].str.contains('training'), 'template_grp'] = 'Coaching'
    # cost.loc[cost['template_lower'].str.contains('coaching'), 'template_grp'] = 'Coaching'
    # cost.loc[cost['template_lower'].str.contains('conference'), 'template_grp'] = 'Case Conference'
    # cost.loc[cost['template_lower'].str.contains('report'), 'template_grp'] = 'Report'
    # cost.loc[cost['template_lower'].str.contains('review'), 'template_grp'] = 'Review'
    # cost.loc[cost['template_lower'].str.contains('travel'), 'template_grp'] = 'Travel'
    # cost.loc[cost['template_lower'].str.contains('contact'), 'template_grp'] = 'Contact'
    
    # and now combine them into a single classification
    cost['type'] = cost['activity_grp']
    # cost.loc[(cost['template_grp']=='Report') & (cost['activity_grp']=='Other'), 'type'] = 'Report' -----------------------  comment-out below since no CaseBillTemplate
    # cost.loc[cost['template_grp'].isin(['Coaching', 'Case Conference', 'Travel', 'Review', 'Assessment']), 'type'] = cost['template_grp'] comment-out below since no CaseBillTemplate
    cost.loc[cost['activity_grp']=='Travel', 'type'] = 'Travel'    

    return cost

In [11]:
def get_claim_totals(logger, claims_cost):

    logger.debug('Removing claims that have fixed-fee costs')

    # Figure out the total and % of the fixed-fee costs for each claim
    all_costs = claims_cost.groupby('ClaimNo').agg({'CostsTotalExTax':'sum'})
    claims_cases_cost_hourly = claims_cost[claims_cost['BillType']==1] #TODO: check this is still right
    hourly_costs = claims_cases_cost_hourly.groupby('ClaimNo').agg({'CostsTotalExTax':'sum'})
    hourly_costs.rename(columns={'CostsTotalExTax':'CostsTotalExTaxHourly'}, inplace=True)
    all_costs = all_costs.merge(hourly_costs, left_index=True, right_index=True, how='left')
    all_costs.fillna(0, inplace=True)
    all_costs = all_costs[all_costs['CostsTotalExTax']>0]
    all_costs['hourly_pctg'] = (all_costs['CostsTotalExTaxHourly'] / all_costs['CostsTotalExTax'])
           
    # Finalise DF
    all_costs['ClaimNo'] = all_costs.index
    all_costs.reset_index(drop=True, inplace=True)
    all_costs = all_costs[['ClaimNo', 'CostsTotalExTax', 'hourly_pctg']].rename(columns={'CostsTotalExTax':'claim_total_cost', 'hourly_pctg': 'claim_cost_hourly_pctg'})
    
    return all_costs

In [12]:
def data_prep():

    logger = Logger()

    with open('config.json', 'rb') as f:
        config = json.load(f)

    data_dir_in = config['data_folder_in']    
    data_dir_out = config['data_folder_out']
    ref_data_dir = config['ref_data_folder']

    logger.debug('Starting lifecycle dataprep', True)

    data_conn = DataConnector(logger, 'creds.json', data_dir_in, use_local_data, data_dir_out)    

    claims = data_conn.read_data('claim_rollup', csv=True)
    cost = data_conn.read_data('case_bill', csv=True)
    claim_rollup_mapping = data_conn.read_data('claim_rollup_mapping', csv=True)
     
    # TODO: modify the ETL to include the template name in the case_bill extract, and the delete the below two lines
    cost_templates = data_conn.read_data('CaseBillTemplate', csv=True)
    #cost = cost[['Id']].merge(cost_templates[['Id', 'TemplateName', 'ActivityName', 'BillDate', 'CostsTotalExTax']], on='Id', how='left')
    cost_templates['CostsTotalExTax'] = cost_templates['SubTotal'].fillna(0)/100
    cost_templates['Duration'] = cost_templates['Minutes'].fillna(0)
    #cost_templates = cost_templates[cost_templates['BillType']<3]
    cost = cost_templates

    cost = categorise_activities(logger, cost)
    claims_cost = claim_rollup_mapping.merge(cost, how='left', on=['CaseServiceId'])
    
    claim_totals = get_claim_totals(logger, claims_cost)
    claim_totals.to_csv(f'{data_dir_out}/claim_cost_totals.csv', index=False)

    cost_weekly = get_weekly_estimates(logger, claims_cost, claims, exclude_travel)    
    if exclude_travel:
        cost_weekly.to_csv(f'{data_dir_out}/cost_weekly_ex_travel.csv', index=False)
    else:
        cost_weekly.to_csv(f'{data_dir_out}/cost_weekly.csv', index=False)

    milestones = generate_milestones(logger, cost_weekly)

    if exclude_travel:
        milestones.to_csv(f'{data_dir_out}/milestones_ex_travel.csv', index=False)
    else:
        milestones.to_csv(f'{data_dir_out}/milestones.csv', index=False)

    data_conn.close_connections()
    
    logger.debug('Completed lifcycle dataprep', True)

#### below scripts replicate def data_prep():, but with breakdowns :

In [13]:
def test_data_loading():
    logger = Logger()
    logger.debug('Starting lifecycle dataprep', True)

    with open('config.json', 'r') as f:
        config = json.load(f)

    data_dir_in = config['data_folder_in']
    data_dir_out = config['data_folder_out']

    # ✅ Corrected keyword argument name
    # data_conn = DataConnector(logger,'creds.json',data_dir_in,use_local_data=True,local_data_dir_out=data_dir_out)

    # claims = data_conn.read_data('claim_rollup', csv=True)
    # cost = data_conn.read_data('case_bill', csv=True)
    # claim_rollup_mapping = data_conn.read_data('claim_rollup_mapping', csv=True)

    
    # ✅ Re-read with header=0 to ensure proper column names
    claims = pd.read_csv(f"{data_dir_in}/claim_rollup.csv", header=1, encoding='utf-8-sig')
    cost = pd.read_csv(f"{data_dir_in}/case_bill.csv", header=1, encoding='utf-8-sig')
    claim_rollup_mapping = pd.read_csv(f"{data_dir_in}/claim_rollup_mapping.csv", header=1, encoding='utf-8-sig')

    print("Claims shape:", claims.shape)
    print("Cost shape:", cost.shape)
    print("Claim Rollup Mapping shape:", claim_rollup_mapping.shape)

    # data_conn.close_connections()

    return claims, cost, claim_rollup_mapping, logger, data_dir_out

# ✅ Call the function and unpack the results
claims, cost, claim_rollup_mapping, logger, data_dir_out = test_data_loading()

[95m 2025-08-07 16:29:56 DEBUG Starting lifecycle dataprep [0m
Claims shape: (47311, 68)
Cost shape: (3356011, 14)
Claim Rollup Mapping shape: (94784, 5)


In [14]:
# convert claims.info() into dataframe as so many columns :
buffer = io.StringIO()
claims.info(buf=buffer)
lines = buffer.getvalue().splitlines()

data = []
for line in lines:
    parts = line.split()
    if len(parts) >= 4 and parts[0].isdigit():
        col_name = parts[1]
        non_null = parts[2]
        dtype = parts[-1]
        data.append([col_name, non_null, dtype])

info_df = pd.DataFrame(data, columns=['Column', 'Non-Null Count', 'Dtype'])
info_df


Unnamed: 0,Column,Non-Null Count,Dtype
0,index,47311,int64
1,ClaimNo,47311,object
2,n_cases,47311,int64
3,first_referral,47311,object
4,has_non_assessment,47311,bool
...,...,...,...
63,BillTo_last,24864,object
64,BillToId_last,29831,float64
65,CaseTypeId_last,30012,float64
66,BusinessUnitId_last,30012,float64


In [15]:
cost['CostsTotalExTax'] = cost['CostsTotalExTax'].fillna(0)
cost['CostsTotalExTax'] = cost['Duration'].fillna(0)

In [16]:
cost_ca = categorise_activities(logger, cost)

[95m 2025-08-07 16:30:07 DEBUG Categorising activities [0m


In [17]:
claims_cost = claim_rollup_mapping.merge(cost_ca, how='left', on=['CaseServiceId'])

In [18]:
claim_totals = get_claim_totals(logger, claims_cost)

[92m 2025-08-07 16:30:32 DEBUG Removing claims that have fixed-fee costs [0m


In [19]:
cost_weekly = get_weekly_estimates(logger, claims_cost, claims, exclude_travel)  

[95m 2025-08-07 16:30:35 DEBUG Getting weekly estimates [0m


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  claims_df['first_referral'] = pd.to_datetime(claims_df['first_referral'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  claims_df['date_closed'] = pd.to_datetime(claims_df['DateClosedLast'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  claims_df['date_closed'] = pd.to_datetime(claims_df['date_cl

In [20]:
milestones = generate_milestones(logger, cost_weekly)

[95m 2025-08-07 16:30:50 DEBUG Generating milestones [0m


In [21]:
# 保存 claim_totals
claim_totals.to_csv(f'{data_dir_out}/claim_cost_totals.csv', index=False)

# 根据 exclude_travel 设置文件名
suffix = '_ex_travel' if exclude_travel else ''
cost_weekly.to_csv(f'{data_dir_out}/cost_weekly{suffix}.csv', index=False)

# 生成并保存 milestones
milestones = generate_milestones(logger, cost_weekly)
milestones.to_csv(f'{data_dir_out}/milestones{suffix}.csv', index=False)


[95m 2025-08-07 16:31:00 DEBUG Generating milestones [0m


In [22]:
logger.debug('Completed lifcycle dataprep', True)

[95m 2025-08-07 16:31:05 DEBUG Completed lifcycle dataprep [0m


## below script is to write-back to datalake :

In [24]:
from azure.storage.filedatalake import DataLakeServiceClient

base_path = "/home/azureuser/mycode/gitrepos/AzureMachineLearning/py_for_da_course"  # Leave as "" if file is in current folder
csv_filename = "ab_query_type_df.csv"
csv_filepath = os.path.join(base_path, csv_filename)
prompt_name= csv_filename
with open(csv_filepath, 'r', encoding='utf-8') as f:
    csv_data = f.read()

# import io
# csv_buffer = io.StringIO()
# df.to_csv(csv_buffer, index=False)
# csv_data = csv_buffer.getvalue()

connection_string = "DefaultEndpointsProtocol=https;AccountName=stdataanalyticsadls001;AccountKey=ncWVOLxWUDzS9+6jmziedHHr1lXV1xTwr2B4oK58Q7osopvYepDuJbUb7drz4RPgMrzfPQEVhl+nuKtgNHwCDQ==;EndpointSuffix=core.windows.net"
service_client = DataLakeServiceClient.from_connection_string(connection_string)
file_system_client = service_client.get_file_system_client("datalakecontainer")
directory_client = file_system_client.get_directory_client("Rehab/lifecycle_")
file_client = directory_client.create_file(f"writeback_{prompt_name}.csv")
file_client.append_data(data=csv_data, offset=0, length=len(csv_data))
file_client.flush_data(len(csv_data))

{'date': datetime.datetime(2025, 8, 7, 6, 31, 57, tzinfo=datetime.timezone.utc),
 'etag': '"0x8DDD57C1C0B8E34"',
 'last_modified': datetime.datetime(2025, 8, 7, 6, 31, 57, tzinfo=datetime.timezone.utc),
 'content_length': 0,
 'client_request_id': '37d3f56e-7358-11f0-bcc0-6045bde48a8f',
 'request_id': 'd36e32f5-a01f-0008-0f64-07fea9000000',
 'version': '2025-05-05',
 'request_server_encrypted': False,
 'encryption_key_sha256': None,
 'lease_renewed': None}