In [7]:
import pandas as pd
from datetime import datetime, date, time
from typing import List, Dict, Tuple
from pandas import DataFrame
from pydantic import BaseModel


class TimeBlock(BaseModel):
    start_time: time
    end_time: time
    id: str
    demand_key_code: int


class TimeBlocks:
    def __init__(self, time_blocks: List[TimeBlock]):
        self.time_blocks = time_blocks

    def get_time_block_ids(self) -> List[str]:
        time_block_ids = []
        for time_block in self.time_blocks:
            time_block_ids.append(time_block.id)
        return time_block_ids

    def get_time_block_to_start_time(self) -> Dict[str, time]:
        time_block_to_start_time: Dict[str, time] = {}
        for time_block in self.time_blocks:
            time_block_to_start_time[time_block.id] = time_block.start_time

        return time_block_to_start_time

    def to_data_frame(self):
        return DataFrame([time_block.__dict__ for time_block in self.time_blocks])
    
def load_time_block() -> TimeBlocks:
        usecols = ['id', 'start_time', 'end_time', 'demand_key_code']
        df_time_block = pd.read_csv('time_block.txt',delimiter='\t')
        return TimeBlocks([
            (TimeBlock(id=row.get("id"),
                       start_time=datetime.strptime(row.get("start_time"), '%H:%M:%S').time(),
                       end_time=datetime.strptime(row.get("end_time"), '%H:%M:%S').time(),
                       demand_key_code=row.get("demand_key_code")))
            for index, row in df_time_block.iterrows()])


In [26]:
df_mv = pd.read_feather('df_mv.feather')
df_mv_ctg = pd.read_feather('df_mv_ctg.feather')
df_ctg = pd.read_feather('df_ctg.feather')
df_ctg_tb = pd.read_feather('df_ctg_tb.feather')

date_format = "%Y-%m-%d"
train_start_date = datetime.strptime("2021-01-01", date_format).date()
train_end_date = datetime.strptime("2023-05-31", date_format).date()
wards = ["nursingW2", "nursingW3", "nursingW4", "nursingW5", "nursingW7", "nursingIcu", "nursingW12", "nursingW13",
             "nursingW10", "nursingW11"]
customer_code = "STGAH_LDF_220729"
time_blocks = load_time_block()

In [16]:
import concurrent.futures
# TODO: Please revise the below functions to make aggregate_patient_category_v1 run faster
def time_blocks_spanned_v1(start_dt: datetime, end_dt: datetime,
                            time_block_to_start_time: Dict[str, time]) -> List[Tuple[date, str]]:
        if end_dt < start_dt:
            return []

        start_date = start_dt.date()
        end_date = end_dt.date()
        start_time = start_dt.time()
        end_time = end_dt.time()

        tb_spanned = []
        if start_date == end_date:
            for time_block, time_block_start in time_block_to_start_time.items():
                if start_time <= time_block_start <= end_time:
                    tb_spanned.append((start_date, time_block))
            return tb_spanned

        # start_date strictly smaller than end_date
        date_rng = pd.date_range(start=start_date, end=end_date, freq='D')
        for the_date in date_rng[1:-1]:
            for time_block in time_block_to_start_time:
                tb_spanned.append((the_date.date(), time_block))
        for time_block, time_block_start in time_block_to_start_time.items():
            if start_time <= time_block_start:
                tb_spanned.append((start_date, time_block))
            if end_time >= time_block_start:
                tb_spanned.append((end_date, time_block))

        return tb_spanned

def __convert_mv_ctg_to_df_v1(row, end_date, time_block_to_start_time):
    end_datetime = datetime.combine(end_date, time.max)
    if row['category_end_dt'] > end_datetime:
        row['category_end_dt'] = end_datetime

    tb_spanned = time_blocks_spanned_v1(row['category_start_dt'], row['category_end_dt'],
                                                    time_block_to_start_time)

    result: pd.DataFrame = pd.DataFrame(columns=['date', 'time_block', 'ward', 'category_id', 'patient_id'])
    for the_date, tb in tb_spanned:
        result = pd.concat([result, pd.DataFrame(
            [[the_date, tb, row.ward, row.patient_category_id, row.patient_id]],
            columns=['date', 'time_block', 'ward', 'category_id', 'patient_id'])])

    return result

async def aggregate_patient_category_v1(df_mv: pd.DataFrame, df_mv_ctg: pd.DataFrame, df_ctg: pd.DataFrame,
                                   df_ctg_tb: pd.DataFrame, start_date: date,
                                   end_date: date, wards: list,
                                   time_blocks: TimeBlocks) -> pd.DataFrame:
    df_ctg = df_ctg.drop(columns='name')
    df_mv = df_mv.rename(
        columns={'start_time': 'movement_start_dt', 'end_time': 'movement_end_dt', 'workspace_code': 'ward'})
    df_mv_ctg.rename(columns={'start_date': 'category_start_dt', 'end_date': 'category_end_dt'}, inplace=True)

    def clean_start_end_dt(df, start='category_start_dt', end='category_end_dt'):
        df = df.sort_values(start)
        df[end] = df[start].shift(-1)
        df[start] = df[end].shift(1)
        return df

    df_mv_ctg = df_mv_ctg.groupby('patient_movement_id', as_index=False, group_keys=False).apply(clean_start_end_dt)
    df_mv_ctg_merged = pd.merge(df_mv_ctg, df_mv, how='inner', left_on='patient_movement_id', right_on='id')
    df_mv_ctg_merged['category_start_dt'].fillna(df_mv_ctg_merged.movement_start_dt, inplace=True)
    df_mv_ctg_merged['category_end_dt'].fillna(df_mv_ctg_merged.movement_end_dt, inplace=True)
    df_mv_ctg_merged['category_start_dt'] = df_mv_ctg_merged[['category_start_dt', 'movement_start_dt']].max(axis=1)
    df_mv_ctg_merged['category_end_dt'] = df_mv_ctg_merged[['category_end_dt', 'movement_end_dt']].min(axis=1)

    # add datetime filter
    min_datetime = datetime.combine(start_date, time.min)
    max_datetime = datetime.combine(end_date, time.max)
    df_mv_ctg_merged = df_mv_ctg_merged[
        (df_mv_ctg_merged.category_start_dt <= max_datetime) & (df_mv_ctg_merged.category_end_dt >= min_datetime)]

    df_mv_ctg_merged = df_mv_ctg_merged[['patient_movement_id', 'patient_type_id', 'patient_category_id', 'patient_id',
                                         'ward', 'category_start_dt', 'category_end_dt']]

    df_mv_ctg_split = pd.DataFrame(columns=['date', 'time_block', 'ward', 'category_id', 'patient_id'])
    time_block_to_start_time = time_blocks.get_time_block_to_start_time()

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for idx, row in df_mv_ctg_merged.iterrows():
            futures.append(executor.submit(__convert_mv_ctg_to_df_v1, row, end_date, time_block_to_start_time))

        for future in futures:
            df_mv_ctg_split = pd.concat([df_mv_ctg_split, future.result()])

    df_mv_ctg_split.drop_duplicates(inplace=True)
    df_mv_ctg_split['date'] = pd.to_datetime(df_mv_ctg_split.date)
    df_mv_ctg_split['category_id'] = pd.to_numeric(df_mv_ctg_split.category_id)
    df_mv_ctg_split['patient_id'] = pd.to_numeric(df_mv_ctg_split.patient_id)

    df_mv_ctg_agg = df_mv_ctg_split.groupby(['date', 'time_block', 'ward', 'category_id'], as_index=False)[
        'patient_id'].nunique()
    df_mv_ctg_agg.rename(columns={'patient_id': 'patient_volume'}, inplace=True)

    date_rng = pd.date_range(start=start_date, end=end_date, freq='D')
    category_idx = df_ctg.id

    index = pd.MultiIndex.from_product([date_rng, time_blocks.get_time_block_ids(), wards, category_idx],
                                       names=['date', 'time_block', 'ward', 'category_id'])
    df_ctg_agg_full = pd.DataFrame(index=index).reset_index()
    df_ctg_agg_full = pd.merge(df_ctg_agg_full, df_ctg, how='left', left_on='category_id', right_on='id')
    df_ctg_agg_full = df_ctg_agg_full[['date', 'time_block', 'ward', 'patient_type_id', 'category_id']]
    df_mv_ctg_agg = pd.merge(df_ctg_agg_full, df_mv_ctg_agg, how='left')
    df_mv_ctg_agg['patient_volume'].fillna(0, inplace=True)

    # add in workload hours
    df_ctg_tb['time_block_id'] = df_ctg_tb['time_block_id'].astype(str)
    df_mv_ctg_agg = df_mv_ctg_agg.merge(df_ctg_tb, how='left', left_on=['category_id', 'time_block'],
                                        right_on=['patient_category_id', 'time_block_id'])
    df_mv_ctg_agg['cat_total_time_per_nurse_h'] = df_mv_ctg_agg['required_time'] * df_mv_ctg_agg['patient_volume']
    df_mv_ctg_agg = df_mv_ctg_agg[['date', 'time_block', 'ward', 'patient_type_id', 'category_id', 'patient_volume',
                                   'cat_total_time_per_nurse_h']]
    #fill them as 0 first
    df_mv_ctg_agg['cat_total_time_per_nurse_h'].fillna(0, inplace=True)

    print(f"Prepared patient categories, size: {len(df_mv_ctg_agg)}")

    return df_mv_ctg_agg

In [None]:
result = await aggregate_patient_category_v1(df_mv, df_mv_ctg, df_ctg, df_ctg_tb, train_start_date, train_end_date, wards, time_blocks)

# Version 2

We improved the code by optimizing the aggregation of patient data based on categories and time blocks. By processing each date only once, we follow these steps:

- Maintain a set of rows in `df_mv_ctg_merged` 
- Sort the `category_start_date`, `category_end_date`, as well as the timeblocks in an ascending order within a list
- Iterate through the sorted list:
    - If it matches category_start_date, add that row to the set.
    - If it matches category_end_date, remove that row from the set.
    - If it matches the time blocks, extend the result with existing rows in the set.

This approach eliminates the need for concurrent threads, significantly reducing resource consumption. We conducted tests on my machine, and the results are impressive. The first implementation took approximately 30 minutes to complete, while the optimized version only took around 25 seconds. Importantly, the results from the optimized solution have been validated and are equivalent to those obtained from the initial implementation.

In [30]:
async def aggregate_patient_category_v2(df_mv: pd.DataFrame, df_mv_ctg: pd.DataFrame, df_ctg: pd.DataFrame,
                                   df_ctg_tb: pd.DataFrame, start_date: date,
                                   end_date: date, wards: list,
                                   time_blocks: TimeBlocks) -> pd.DataFrame:
    df_ctg = df_ctg.drop(columns='name')
    df_mv = df_mv.rename(
        columns={'start_time': 'movement_start_dt', 'end_time': 'movement_end_dt', 'workspace_code': 'ward'})
    df_mv_ctg.rename(columns={'start_date': 'category_start_dt', 'end_date': 'category_end_dt'}, inplace=True)

    def clean_start_end_dt(df, start='category_start_dt', end='category_end_dt'):
        df = df.sort_values(start)
        df[end] = df[start].shift(-1)
        df[start] = df[end].shift(1)
        return df

    df_mv_ctg = df_mv_ctg.groupby('patient_movement_id', as_index=False, group_keys=False).apply(clean_start_end_dt)
    df_mv_ctg_merged = pd.merge(df_mv_ctg, df_mv, how='inner', left_on='patient_movement_id', right_on='id')
    df_mv_ctg_merged['category_start_dt'].fillna(df_mv_ctg_merged.movement_start_dt, inplace=True)
    df_mv_ctg_merged['category_end_dt'].fillna(df_mv_ctg_merged.movement_end_dt, inplace=True)
    df_mv_ctg_merged['category_start_dt'] = df_mv_ctg_merged[['category_start_dt', 'movement_start_dt']].max(axis=1)
    df_mv_ctg_merged['category_end_dt'] = df_mv_ctg_merged[['category_end_dt', 'movement_end_dt']].min(axis=1)

    # add datetime filter
    min_datetime = datetime.combine(start_date, time.min)
    max_datetime = datetime.combine(end_date, time.max)
    df_mv_ctg_merged = df_mv_ctg_merged[
        (df_mv_ctg_merged.category_start_dt <= max_datetime) & (df_mv_ctg_merged.category_end_dt >= min_datetime)]

    df_mv_ctg_merged = df_mv_ctg_merged[['patient_movement_id', 'patient_type_id', 'patient_category_id', 'patient_id',
                                         'ward', 'category_start_dt', 'category_end_dt']]
    
    time_block_to_start_time = time_blocks.get_time_block_to_start_time()

    ###----------------------------------------------------------------------------------------------------------------
    ###----------------------------------------------------------------------------------------------------------------

    from datetime import timedelta
    df_mv_ctg_merged['index'] = df_mv_ctg_merged.index
    start_times = df_mv_ctg_merged[df_mv_ctg_merged['category_start_dt'] <= df_mv_ctg_merged['category_end_dt']][['index','category_start_dt']].values.tolist()
    start_times = [x + [0] for x in start_times]
    end_times = df_mv_ctg_merged[df_mv_ctg_merged['category_start_dt'] <= df_mv_ctg_merged['category_end_dt']][['index','category_end_dt']].values.tolist()
    end_times = [x + [2] for x in end_times]
    total_times = start_times + end_times 
    total_times_date = list(set([x[1].date() for x in total_times]))
    total_times_date = [[tb, datetime.combine(x, tb_start_time), 1] for x in total_times_date for tb, tb_start_time in time_block_to_start_time.items()]
    total_times = sorted(total_times + total_times_date, key=lambda x: (x[1], x[2]))


    current_time = total_times[0][1]
    time_interval_stack = set()
    result = []

    for i, timestamp in enumerate(total_times):
        if i and timestamp[1].date() != total_times[i-1][1].date():
            current_time = current_time + timedelta(days=1)

        while current_time.date() < timestamp[1].date():
            result.extend([[current_time.date(), tb, x] for tb in time_block_to_start_time for x in time_interval_stack])
            current_time = current_time + timedelta(days=1)
        if timestamp[2] == 0:
            time_interval_stack.add(timestamp[0])
        elif timestamp[2] == 2:
            time_interval_stack.remove(timestamp[0])
        else:
            result.extend([[current_time.date(), timestamp[0], x] for x in time_interval_stack])

    df_mv_ctg_split = pd.DataFrame(result, columns=['date', 'time_block', 'index'])
    df_mv_ctg_split = df_mv_ctg_split.merge(df_mv_ctg_merged, how='inner', left_on='index', right_on='index')
    df_mv_ctg_merged.drop(columns='index', inplace=True)

    df_mv_ctg_split = df_mv_ctg_split[['date', 'time_block', 'ward', 'patient_category_id', 'patient_id']]
    df_mv_ctg_split.rename(columns={'patient_category_id': 'category_id'}, inplace=True)
    
    ###----------------------------------------------------------------------------------------------------------------
    ###----------------------------------------------------------------------------------------------------------------

    df_mv_ctg_split.drop_duplicates(inplace=True)
    df_mv_ctg_split['date'] = pd.to_datetime(df_mv_ctg_split.date)
    df_mv_ctg_split['category_id'] = pd.to_numeric(df_mv_ctg_split.category_id)
    df_mv_ctg_split['patient_id'] = pd.to_numeric(df_mv_ctg_split.patient_id)

    df_mv_ctg_agg = df_mv_ctg_split.groupby(['date', 'time_block', 'ward', 'category_id'], as_index=False)[
        'patient_id'].nunique()
    df_mv_ctg_agg.rename(columns={'patient_id': 'patient_volume'}, inplace=True)

    date_rng = pd.date_range(start=start_date, end=end_date, freq='D')
    category_idx = df_ctg.id

    index = pd.MultiIndex.from_product([date_rng, time_blocks.get_time_block_ids(), wards, category_idx],
                                       names=['date', 'time_block', 'ward', 'category_id'])
    df_ctg_agg_full = pd.DataFrame(index=index).reset_index()
    df_ctg_agg_full = pd.merge(df_ctg_agg_full, df_ctg, how='left', left_on='category_id', right_on='id')
    df_ctg_agg_full = df_ctg_agg_full[['date', 'time_block', 'ward', 'patient_type_id', 'category_id']]
    df_mv_ctg_agg = pd.merge(df_ctg_agg_full, df_mv_ctg_agg, how='left')
    df_mv_ctg_agg['patient_volume'].fillna(0, inplace=True)

    # add in workload hours
    df_ctg_tb['time_block_id'] = df_ctg_tb['time_block_id'].astype(str)
    df_mv_ctg_agg = df_mv_ctg_agg.merge(df_ctg_tb, how='left', left_on=['category_id', 'time_block'],
                                        right_on=['patient_category_id', 'time_block_id'])
    df_mv_ctg_agg['cat_total_time_per_nurse_h'] = df_mv_ctg_agg['required_time'] * df_mv_ctg_agg['patient_volume']
    df_mv_ctg_agg = df_mv_ctg_agg[['date', 'time_block', 'ward', 'patient_type_id', 'category_id', 'patient_volume',
                                   'cat_total_time_per_nurse_h']]
    #fill them as 0 first
    df_mv_ctg_agg['cat_total_time_per_nurse_h'].fillna(0, inplace=True)

    print(f"Prepared patient categories, size: {len(df_mv_ctg_agg)}")

    return df_mv_ctg_agg

revised_result = await aggregate_patient_category_v2(df_mv, df_mv_ctg, df_ctg, df_ctg_tb, train_start_date, train_end_date, wards, time_blocks)

Prepared patient categories, size: 1162920


In [34]:
revised_result = revised_result.sort_values(['date','time_block','ward','patient_type_id','category_id','patient_volume','cat_total_time_per_nurse_h'])
result = result.sort_values(['date','time_block','ward','patient_type_id','category_id','patient_volume','cat_total_time_per_nurse_h'])

result.equals(revised_result)

True

The optimization potential can be significantly enhanced through the utilization of vectorization, list comprehension, Cython, or Numba. However, implementing these approaches necessitates intricate modifications to the code, and due to time constraints, further optimization efforts will be discontinued at this point.