
# Space

In [None]:
import os
import sys 
import logging
import pandas as pd 
from pprint import pprint 

# WorkSpace
KEY = 'WorkSpace'; WORKSPACE_PATH = os.getcwd().split(KEY)[0] + KEY; print(WORKSPACE_PATH)
os.chdir(WORKSPACE_PATH)
sys.path.append(WORKSPACE_PATH)

# Pipeline Space
from proj_space import SPACE
SPACE['WORKSPACE_PATH'] = WORKSPACE_PATH
sys.path.append(SPACE['CODE_FN'])
pprint(SPACE)

# Available Packages
import pandas as pd
from datetime import datetime 

logger = logging.getLogger(__name__)
recfldtkn_config_path = os.path.join(SPACE['CODE_FN'], 'config_recfldtkn/')


# Dataset

In [None]:
import datasets
from recfldtkn.loadtools import load_ds_rec_and_info
from recfldtkn.configfn import load_cohort_args, load_record_args

base_config = load_cohort_args(recfldtkn_config_path, SPACE)
print(base_config)

# TriggerCaseMethod

In [None]:
from recfldtkn.pipeline_model import get_Trigger_Cases, convert_TriggerCases_to_LearningCases

use_learning = True 
use_inference = not use_learning

######################################
TriggerCaseMethod = 'CGM5MinEntry'
cohort_label_list = [1]
Trigger2LearningMethods = [
    
    {'op':'Tag',    'Name': 'Tag_PttBasicWD'},
    {'op':'Filter', 'Name': 'Filter_PttBasicWD'},
    
    {'op':'CFQ',    'Name': 'CFQ_Bf24hCGMrn'},
    {'op':'TagCF',  'Name': 'TagCF_Bf24hCGMrn', 'CFQName': 'CFQ_Bf24hCGMrn'},
    
    {'op':'CFQ',    'Name': 'CFQ_Af2hCGMrn',    
     'type': 'learning-only'},
    {'op':'TagCF',  'Name': 'TagCF_Af2hCGMrn',  'CFQName': 'CFQ_Af2hCGMrn',  
     'type': 'learning-only',},
    
    # {'op':'Filter', 'Name': 'Filter_BfCGMgeq280'},
    # {'op':'Filter', 'Name': 'Filter_AfCGMgeq24',  
    #  'type': 'learning-only'},
    
    # {'op':'CFQ',    'Name': 'CFQ_Bf1mMEDALrn',    
    #  'type': 'learning-only'},
    # {'op':'TagCF',  'Name': 'TagCF_Bf1mMEDALrn', 'CFQName': 'CFQ_Bf1mMEDALrn', 
    #  'type': 'learning-only'},

]
######################################

In [None]:
use_inference

In [None]:
from recfldtkn.loadtools import fetch_casetag_tools, fetch_casefilter_tools, load_module_variables

def convert_TriggerCases_to_LearningCases(df_case, 
                                          cohort_label_list,
                                          Trigger2LearningMethods, 
                                          base_config, 
                                          use_inference):
    
    CFQ_to_CaseFeatInfo = {}

    if use_inference == True:
        Trigger2LearningMethods = [i for i in Trigger2LearningMethods if i.get('type', None) != 'learning-only']
       
    # print(Trigger2LearningMethods)
    SPACE = base_config['SPACE']
    for method in Trigger2LearningMethods:
        if method['op'] == 'Tag':
            name = method['Name']
            logger.info(f'CaseTag: {name}')
            CaseTag_Tools = fetch_casetag_tools(name, SPACE)

            subgroup_columns = CaseTag_Tools['subgroup_columns']
            if 'InfoRecName' in CaseTag_Tools:
                InfoRecName = CaseTag_Tools['InfoRecName']
                ds_info, _ = load_ds_rec_and_info(InfoRecName, base_config, cohort_label_list)
            else:
                ds_info = None

            fn_case_tagging = CaseTag_Tools['fn_case_tagging']
            df_case = fn_case_tagging(df_case, ds_info, subgroup_columns, base_config)

        elif method['op'] == 'Filter':
            name = method['Name']
            logger.info(f'CaseFilter: {name}')
            CaseFilter_Tools = fetch_casefilter_tools(name, SPACE)
            fn_case_filtering = CaseFilter_Tools['fn_case_filtering']
            
            logger.info(f'Before Filter: {df_case.shape}')
            df_case = fn_case_filtering(df_case)
            logger.info(f'After Filter: {df_case.shape}')

        elif method['op'] == 'CFQ':
            name = method['Name']
            pypath = os.path.join(SPACE['CODE_FN'], 'fn_learning', f'{name}.py')
            module = load_module_variables(pypath)
            fn_casefeat_querying = module.fn_casefeat_querying
            df_case, CaseFeatInfo = fn_casefeat_querying(df_case, base_config)
            # CaseFeatName = CaseFeatInfo['CaseFeatName']
            CFQ_to_CaseFeatInfo[name] = CaseFeatInfo

        elif method['op'] == 'TagCF':
            name = method['Name']
            pypath = os.path.join(SPACE['CODE_FN'], 'fn_learning', f'{name}.py')
            module = load_module_variables(pypath)
            fn_case_tagging_on_casefeat = module.fn_case_tagging_on_casefeat
            
            CFQName = method['CFQName']
            CFQ_to_CaseFeatInfo = CFQ_to_CaseFeatInfo[CFQName]
            df_case = fn_case_tagging_on_casefeat(df_case, CaseFeatInfo)

        else:
            raise ValueError(f'Unknown method: {method}')

    return df_case

In [None]:
RecName_to_dsRec = {}
RecName_to_dsRecInfo = {}

df_case = get_Trigger_Cases(TriggerCaseMethod, 
                            cohort_label_list, 
                            base_config, 
                            SPACE, 
                            RecName_to_dsRec, 
                            RecName_to_dsRecInfo)

#####################
# df_case = df_case.sample(1000, random_state=42)
#####################

df_case.shape

# Split DataFrame

In [None]:
# Function to split DataFrame into chunks
def split_dataframe(df, chunk_size):
    chunks = []
    num_chunks = len(df) // chunk_size + 1
    for i in range(num_chunks):
        chunks.append(df[i*chunk_size:(i+1)*chunk_size])
    return chunks

In [None]:
len(df_case)

df_case = df_case[:1000]

In [None]:
# SIZE = 1_000_000
SIZE = 5_10
chunks = split_dataframe(df_case, SIZE)
print(len(chunks))

In [None]:
def process_chunk(chunk):
    try:
        df_case = convert_TriggerCases_to_LearningCases(chunk, 
                                                    cohort_label_list, 
                                                    Trigger2LearningMethods, 
                                                    base_config, 
                                                    use_inference)
        columns = [i for i in df_case.columns if '_co.' not in i]
        # print(columns)
        df_case = df_case[columns].reset_index(drop=True)
        # df_case.head()
        return df_case
    except Exception as e:
        print(f"Error processing chunk: {e}")
        raise  # Re-raise the exception to ensure it's visible outside the function

In [None]:
chunk = chunks[0]
chunk

In [None]:
process_chunk(chunk)

In [None]:
import os

# Determine the number of available CPUs
num_cpus = os.cpu_count()
num_cpus

In [None]:
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed


num_workers = 2
with ProcessPoolExecutor() as executor:
    futures = [executor.submit(process_chunk, chunk) for chunk in chunks]
    for future in as_completed(futures):
        try:
            ds_case = future.result()
            print(ds_case)  # Or any other logic to handle the result
        except Exception as e:
            print(f"Error processing future: {e}")