# XGB Prediction - Multi Processing and Multi Threading

In [None]:
import os
import sys
sys.path.insert(1, os.path.abspath(os.path.join(os.getcwd(), '..')))

import multiprocessing
import concurrent.futures
import threading

import argparse
import pandas as pd
import time
import random

from utils import ds_general as ds
from utils.BigQuery import BigQuery

import xgboost as xgb
import pickle
import joblib

In [None]:
def load_blob_list(config, verbose = 1):    
    """
    return list of blobs of train or test data
    """
    
    try: 
        max_files = config['TOTAL_FILES_TO_PREDICT']
        blobs_list = bq.list_blobs(config['BUCKET'], config['FEATURE_DATA_BLOB'])
        blobs_list = [f"gs://{config['BUCKET']}/" + b for b in blobs_list]

        n_blobs = len(blobs_list) if not max_files \
            else max_files
        print(f"total num of blobs {len(blobs_list)}")
        print(f"num of the selected blobs {n_blobs}")
        return blobs_list[:n_blobs]
    
    except Exception as e:
        ds.terminate_prog(f"Failed to load {train_or_test}blob list", error = e)

# load data from blob
def load_data_from_blob_thread(blob_path):
    """
    Load model data from a blob.
    Return a data frame
    """
    t_name = thread_name = threading.current_thread().name
    logger = ds.get_logger(name=t_name, level="INFO")
    # Perform some computation and return a result
    print(f"Thread name: {t_name}")
    temp_df = pd.DataFrame()
    result_list = []
    result_df = pd.DataFrame()
    try:
        if not isinstance(blob_path, list):
            blob_path = [blob_path]
            
        for i, blob in enumerate(blob_path):
            temp_df = pd.read_parquet(blob)
            result_list.append(temp_df)
            logger.debug(f"{i+1}th blob loading: {blob}, shape:{temp_df.shape}")
            
        result_df = pd.concat(result_list, axis=0)
        logger.debug(ds.df_info(result_df))
        logger.debug(result_df.iloc[:2, :5])
        return result_df
            

    except Exception as e:
        msg = "Failed to load the blob."
        ds.terminate_prog(msg, error=e)

def feature_eng(df, config):
    cols = df.columns
    cols_to_drop = [col for col in config['COLS_TO_DROP'] if col in cols]
    
    idx = df[config['IDX_COLS']].copy()
    df = df.drop(cols_to_drop, axis = 1)
    
    return df, idx


def load_xgb_model_from_file(model_fname, file_type):
    process_name = multiprocessing.current_process().name
    logger = ds.get_logger(name=process_name, level="INFO")
    
    try:
        if file_type == 'xgb':
            print("loading model by xgboost")
            # model = xgb.Booster()
            model = xgb.XGBClassifier()
            model = model.load_model(model_fname)
            print(f"check loaded mode: {model.best_score}")
        elif file_type == 'joblib':
            print("loading model by joblib")
            model = joblib.load(model_fname)
        elif file_type == 'pickle':
            print("loading model by pickle")
            with open(model_fname, 'rb') as f:
                model = pickle.load(f)
        else:
            logger.critical("file_type: wrong file_type. Should be either 'xgb', 'joblib', or 'pickle'")
            return

        if not model:
            ds.terminate_prog(msg="Model is None!!! check the local model file")
        else:
            print(f"Loaded model from disk; {model}")
            logger.debug("Succefully loaded a pre-trained model. ")

        return model

    except Exception as e:
        logger.critical(f"Failed to load pretrained xgb model from {blob} \nContinue without a pre-trained model")
        logger.critical(e)
        return

In [None]:
def predict_batch(data_blob_list, config):
    """
    load feature data of the parquet files from the blob list.
    load config.
    load a model.
    predict and append to the bq table.
    """
    process_name = multiprocessing.current_process().name
    logger = ds.get_logger(name=process_name, level="INFO")
    
    bq = BigQuery(project_id=config["GCP_PROJECT"])
    
    first_blob = data_blob_list[0].split("/")[-1]
    logger.info(f"*** PREDICTION, begining blob: {first_blob}")

    
    # load trained model
    if config['MODEL_LOCAL_FILE_PATH']:
        model_path = config['MODEL_LOCAL_FILE_PATH']
    else:
        model_path = config['TEMP_MODEL_FILE_NAME']

    print(f"loading model from {model_path}")
    
    model = load_xgb_model_from_file(
        model_fname=model_path,
        file_type = config['MODEL_TYPE'])

    # TREAD SUB BATCH: split the blobs for multi processing
    splited_blobs_to_read = ds.split_list_n_size(
        data_blob_list, config['N_THREADS'])
    
    # load data from the blob list by multi processing
    data_df = pd.DataFrame()
    
    # Create a ThreadPoolExecutor with the desired number of threads
    threads_results = []
    with concurrent.futures.ThreadPoolExecutor(
        max_workers=config['N_THREADS']) as executor:
        # Submit each thread job and get a Future object
        futures = [
            executor.submit(load_data_from_blob_thread, blob_list) \
            for blob_list in splited_blobs_to_read
        ]

        # Wait for all threads to complete and get results
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            threads_results.append(result)

    # Now, 'threads_results' list contains the return dataframes from each thread
    data_df = pd.concat(threads_results, axis=0)
    logger.debug(ds.df_info(data_df))
    
    # feature engineering, and generate feature data and index
    X, idx_df = feature_eng(data_df, config)
    
    
    try:
        pred_y = pd.DataFrame(data=model.predict_proba(X), columns=['neg_score', 'pos_score'])
        pred_y = pred_y.reset_index(drop=True)
        idx_df = idx_df.reset_index(drop=True)
        pred_y = pd.concat([idx_df, pred_y], axis=1)
        logger.info("Prediction of the batch done.")
        
        logger.debug("\n" + str(pred_y.head(2)))
        logger.info(pred_y.shape)
        
        
        
        if config['WRITE_TABLE']:
            # write prediction to a bq table.
            # delay in order to avoid rateLimitExceeded in updating bq table.
            time.sleep(random.randrange(2,4))

            bq.df_to_table(
                dataframe = pred_y, 
                destination=config['PREDICTION_RESULT_TABLE'], 
                write_method='WRITE_APPEND')
        
            logger.debug(f"Appended the prediction to a bq table: {config['PREDICTION_RESULT_TABLE']}")
        
            # delay in order to avoid rateLimitExceeded in updating bq table.
            time.sleep(random.randrange(2,4))
        
    except Exception as e:
        msg=f"Failed to predict, begining blob: {data_blob_list[0]}"
        logger.critical(e)
        ds.terminate_prog(msg=msg,error=e)
    

    return pred_y.shape[0]
    

In [None]:
def main(config):
    
    process_name = multiprocessing.current_process().name
    logger = ds.get_logger(name=process_name, level="INFO")
    
    main_time = ds.time_start_end(msg="MAIN")
    
    # Load blob list
    logger.info("Loading prediction blobs...")
    data_blob_list = load_blob_list(config)

    if not config['MODEL_LOCAL_FILE_PATH']:
        # Download a model from a blob
        logger.info("Downloading model from a blob...")
        try:
            bq.download_file_from_blob(
                bucket_name=config['BUCKET'], 
                source_blob_name=config['MODEL_BLOB'],
                destination_file_name=config['TEMP_MODEL_FILE_NAME'],
                verbose=1)
        except Exception as e:
                msg=f"Failed to load pretrained xgb model from {config['MODEL_BLOB']} \nContinue without a pre-trained model"
                logger.critical(e)
                ds.terminate_prog(msg, error=e)
    
    # BATCH: split the blob list for multiprocessing
    splited_data_blob_list = ds.split_list_n_size(
        data_blob_list,
        config['N_FILES_IN_BATCH'])
    
    # arguments tuple for multi processing: [(blob_list, config)]
    tuple_args_list = [(b_list, config) for b_list in splited_data_blob_list]
    
    # run multiprocessing
    p = multiprocessing.Pool(config['N_CPUS'])
    results = p.starmap(predict_batch, tuple_args_list)
    
    p.close()
    p.join()
    
    print("DONE ALL multiprocessing and threading.")
    logger.info("Number of rows predicted each batch:" + str(results))
    total_rows = 0 
    for n_rows in results:
        total_rows += n_rows if n_rows else 0
    logger.info("Total number of rows predicted:" + str(total_rows))
    
    if config['WRITE_TABLE']:
        print(f"Prediction bq table: {config['PREDICTION_RESULT_TABLE']}")
    
    

In [None]:
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('config_path', help='config file path and name')
    args = parser.parse_args()
    
    # load config file
    config = ds.load_config(args.config_path)
    
    ds.make_folder('tmp')
    
    # load bigquery client
    bq = BigQuery(project_id=config["GCP_PROJECT"])
    

    main(config)
    
    

In [None]:
# run_predicrt.sh
# # !/bin/bash
# jupyter nbconvert xgb_prediction_multiprocessing.ipynb --to python 
# nohup python3 -u xgb_prediction_multiprocessing.py ./config/./config/xgb_prediction_config01.yaml.yaml > model_07282023_210857_prediction_xgb_fv2_2_testset_jan_2023_08022023.log 2>1&