In [None]:
## detect unused CPU by running jobs for a long time
import pandas as pd
import numpy as np
from datetime import datetime
import pytz
import re

# 1. Function to convert epoch to a CST datetime object
def convert_epoch_to_cst_datetime(epoch, cst):
    utc_time = datetime.utcfromtimestamp(epoch).replace(tzinfo=pytz.utc)
    return utc_time.astimezone(cst)

# 2. Function to create 'submitInterval' column based on time of day
def get_time_of_day(hour):
    if 5 <= hour < 12:
        return 'Morning'
    elif 12 <= hour < 17:
        return 'Afternoon'
    elif 17 <= hour < 21:
        return 'Evening'
    else:
        return 'Night'

# 3. Function to read CSV file
def read_csv(file_path):
    return pd.read_csv(file_path)

# 4. Function to convert 'submitTime', 'startTime', and 'endTime' to CST
def convert_time_columns(df, cst):
    df['submitTime'] = df['submitTime'].apply(lambda x: convert_epoch_to_cst_datetime(x, cst))
    df['startTime'] = df['startTime'].apply(lambda x: convert_epoch_to_cst_datetime(x, cst))
    df['endTime'] = df['endTime'].apply(lambda x: convert_epoch_to_cst_datetime(x, cst))
    return df

# 5. Function to calculate 'pendTime'
def calculate_pend_time(df):
    df['pendTime'] = (df['startTime'] - df['submitTime']).dt.total_seconds()
    return df

# 6. Function to add 'submitInterval' and 'submitDay' columns
def add_time_of_day_and_day(df):
# Define the desired order for submitDay and submitInterval
    day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    interval_order = ['Morning', 'Afternoon', 'Evening', 'Night']
    
    # Add submitInterval and submitDay columns
    df['submitInterval'] = df['submitTime'].dt.hour.apply(get_time_of_day)
    df['submitDay'] = df['submitTime'].dt.day_name()
    
    # Set categorical ordering for submitDay and submitInterval
    df['submitDay'] = pd.Categorical(df['submitDay'], categories=day_order, ordered=True)
    df['submitInterval'] = pd.Categorical(df['submitInterval'], categories=interval_order, ordered=True)

    return df

# 7. Function to overwrite 'queue' after replacing 'rhel8_' and 'rhel88_'
def overwrite_queue(df):
    df['queue'] = df['queue'].str.replace(r'rhel8_', '', regex=True)
    df['queue'] = df['queue'].str.replace(r'rhel88_', '', regex=True)
    return df

# 8. Function to extract memory from 'resReq' and overwrite existing values
def extract_requested_memory(df):
    df['resReq'] = df['resReq'].astype(str)
    df['requested_memory'] = df['resReq'].apply(lambda resReq: float(re.search(r'mem=(\d+(\.\d+)?)', resReq).group(1)) if re.search(r'mem=(\d+(\.\d+)?)', resReq) else None)
    return df.dropna(subset=['requested_memory'])

# 9. Function to convert 'maxRMem' from KB to MB
def convert_max_rmem(df):
    df['maxRMem'] = df['maxRMem'] / 1024
    return df

# 10. Function to create the 'userCancelJob' indicator variable
def create_user_cancel_job(df):
    df['userCancelJob'] = np.where(df['pendTime'] < 0, 1, 0)
    return df

# 11. Function to filter rows where 'userCancelJob' is greater than 0
def remove_user_cancel_job(df):
    return df[df['userCancelJob'] == 0]

# 12. Function to order the 'queue' variable based on value counts
def order_queue_by_value_counts(df):
    queue_counts = df['queue'].value_counts()
    ordered_queues = queue_counts.index
    df['queue'] = pd.Categorical(df['queue'], categories=ordered_queues, ordered=True)
    return df

#13. Define Job category based on Job distribution
def define_job_category(df):
    # Calculate percentiles for runTime
    df_done = df[df['jStatus'] == 'done']
    short_threshold = df['runTime'].quantile(0.75)
    long_threshold = df['runTime'].quantile(0.95)
    verylong_threshold = df['runTime'].quantile(0.99)
    # Define job categories based on runTime
    conditions = [
        (df['runTime'] <= short_threshold),
        (df['runTime'] > short_threshold) & (df['runTime'] <= long_threshold),
        (df['runTime'] > long_threshold) & (df['runTime'] <= verylong_threshold),
        (df['runTime'] > verylong_threshold)
    ]
    
    # Assign labels to the categories
    categories = ['Short', 'Medium', 'Long', 'Very Long']
    
    # Create a new column 'jobCategory' based on the conditions
    df['jobCategory'] = np.select(conditions, categories)
    
    # Ensure 'jobCategory' is an ordered categorical variable
    df['jobCategory'] = pd.Categorical(df['jobCategory'], 
                                       categories=['Short', 'Medium', 'Long', 'Very Long'],
                                       ordered=True)
    return df


###14. Calculate the memory utilization 
def calculate_and_categorize_memutilization(df):
    # Calculate memory utilization
    df.loc[df['numProcessors'] > 0, 'memUtilization'] = (df['maxRMem'] / (df['numProcessors'] * df['requested_memory'])) * 100
    df.loc[df['numProcessors'] == 0, 'memUtilization'] = 0

    # Replace inf and nan with 0
    df['memUtilization'].replace([np.inf, -np.inf], 0, inplace=True)
    df['memUtilization'].fillna(0, inplace=True)

    # Calculate percentiles
    percentiles = df['memUtilization'].quantile([0.50, 0.75, 0.95])
    
    # Define categories based on percentiles
    def categorize_utilization(utilization):
        if utilization <= percentiles[0.50]:
            return 'Low'
        elif utilization <= percentiles[0.75]:
            return 'Medium'
        elif utilization <= percentiles[0.95]:
            return 'High'
        else:
            return 'Very High'
    
    # Apply categorization
    df['memUtilizationCat'] = df['memUtilization'].apply(categorize_utilization)

    return df


# 15. Main preprocessing function
def modelProcess_csv(file_path):
    cst = pytz.timezone('US/Central')  # Define CST timezone    
    # Step-by-step preprocessing
    df = read_csv(file_path)
    df = convert_time_columns(df, cst)
    df = calculate_pend_time(df)
    df = add_time_of_day_and_day(df)
    df = overwrite_queue(df)
    df = extract_requested_memory(df)
    df = convert_max_rmem(df)
    df = create_user_cancel_job(df)
    df = remove_user_cancel_job(df)  # Keep only rows where userCancelJob > 0
    df = order_queue_by_value_counts(df)  # Order the 'queue' column by value counts
    df = define_job_category(df) #Categorize job to short/intermediate/long
    df = calculate_and_categorize_memutilization(df) ## calculate and categorize memory utilization

    # Display the filtered dataframe
    #print(df.head())
    #print(df.shape[0])

    return df



In [None]:
file_path = '../common/finished_jobs_1_week.csv'
df_preprocessed = modelProcess_csv(file_path)