In [None]:
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Union, Optional
import logging
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Union, Optional
import logging

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [None]:
CRIMINAL_CASES: List[str] = [
    'Murder Case',
    'Criminal Revision',
    'Criminal Appeal',
    'Murder - Gender Justice Criminal Case',
    'Criminal Court Martial Appeal',
    'Anti-Corruption and Economic Crimes Revision',
    'Criminal Miscellaneous Application',
    'Criminal Applications',
    'COA Criminal Appeal'
]

BROAD_CASE_TYPES: Dict[str, List[str]] = {
    'Civil Suit': [
        'Civil Suit',
        'Anti-Corruption and Economic Crimes Suit',
        'Family Originating Summons',
        'Family Civil Case',
        'HCC(OS) Family',
        'Commercial Admiralty',
        'Commercial Matters',
    ],

    'Adoption': ['Family Adoption'],
    
    'Divorce': ['Family Divorce Cause'],
    
    'Criminal Application': ['Criminal Miscellaneous Application'],
    
    'Miscellaneous Application': [
        'Civil Case Miscellaneous',
        'Judicial Review Miscellaneous',
        'JR  Petition Miscellaneous',
        'Anti-Corruption and Economic Crimes Miscellaneous',
        'Commercial Miscellaneous',
        'Constitution and Human Rights Petitions Miscellaneous',
        'Family Miscellaneous',
        'Commercial Arbitration',
    ],
    
    'Judicial Review': [
        'Anti-Corruption and Economic Crime Judicial review',
        'Judicial Review ELC',
        'Judicial Review',
    ],
    
    'Criminal Revision': [
        'Criminal Revision',
        'Anti-Corruption and Economic Crimes Revision',
    ],
    
    'Criminal Appeal': [
        'Criminal Appeal',
        'Criminal Court Martial Appeal',
        'Anti-Corruption and Economic Crimes Appeal',
    ],
    
    'Civil Appeal': [
        'Family Appeal',
        'Civil Appeal',
        'Commercial Appeal',
        'Constitution and Human Rights Election Petition Appeal',
        'Constitution and Human Rights Petition Appeal',
        'Constitution and Human Rights Election Petition Appeal',
        'Gender Justice Civil Appeal',
        'Constitution and Human Rights Miscellaneous Election Petition Appeal (MEPA)',
    ],
    
    'Constitution Petition': [
        'Anti Corruption and Economic Crimes Petition',
        'High Court Criminal Petition',
        'Constitution and Human Rights Petition (Civil)',
        'Constitution and Human Rights Election Petition',
        'High Court Constitution and Human Rights Petitions (Criminal)',
        'Commercial Petition',
    ],
    
    'Probate Administration': [
        'Family P&A Intestate',
        'Family P&A Ad Litem',
        'Family P&A Ad Colligenda',
        'Family P&A Citation',
        'Family P&A Testate',
        'Family P&A Resealing of Grant',
        'Family P&A De Bonis Non',
        'Resealing of Grant',
        'Citation-Family',
    ],
    
    'Murder': [
        'Murder Case',
        'Murder - Gender Justice Criminal Case',
    ],
    
    'Tax Appeal': [
        'Commercial Income Tax Apperiod_startpeal',
        'Commercial Custom Tax Appeal',
    ],
    
    'Bankruptcy and Insolvency': [
        'Commercial Insolvency Notice Petition',
        'Commercial Insolvency Petition',
        'Commercial Bankruptcy Notice',
        'Commercial Insolvency Cause',
        'Commercial Insolvency Notice',
        'Commercial Bankruptcy Cause',
        'Commercial Winding Up Cause',
    ]
}


RESOLVED_OUTCOMES: List[str] = [
    'Ruling Delivered- Case Closed',
    'Terminated',
    'Matter Settled- Case Closed',
    'Application Dismissed - Case Closed',
    'Judgment Delivered- Case Closed',
    'Matter Withdrawn',
    'Application Allowed - Case Closed',
    'Application Withdrawn - Case Closed',
    'Judgment Delivered- Convicted',
    'Placed In Probation',
    'Dismissed',
    'Judgment Delivered',
    'Judgment Delivered- Acquittal',
    'Ruling Delivered- Accused Discharged',
    'Abated',
    'Consolidated- Case Closed',
    'Grant Confirmed',
    'Limited Grant Issued',
    'Struck Out',
    'Grant Revoked',
    'Consent Recorded - Case Closed',
    'Dismissed For Want Of Prosecution - Case Closed',
    'Out Of Court Settlement Reached',
    'Appeal Dismissed',
    'Retrial',
    'Appeal Rejected',
    'Sentence Commuted',
    'Ruling Delivered- Application Closed',
    'Probation Orders Issued',
    'Order Issued - Case Closed',
    'Revision Declined'
]


TRANSFERED_CASES: List[str] = [
    'File Transfered -case Closed',
    'File Transferred',
]


MERIT_CATEGORY: Dict[str, List[str]] = {
    'Judgment Delivered': [
        'Judgment Delivered- Case Closed',
        'Judgment Delivered',
        'Judgment Delivered- Acquittal',
        'Judgment Delivered- Convicted',
        'Grant Revoked',
        'Retrial'
    ],
    'Ruling Case Closed': [
        'Ruling Delivered- Case Closed',
        'Ruling Delivered- Accused Discharged',
    ],
    'Final Grant': [
        'Grant Confirmed',
        'Limited Grant Issued',
    ],
    'Case Withdrawn': [
        'Matter Withdrawn',
        'Application Withdrawn - Case Closed',
    ],
    'Out Of Court Settlement': [
        'Consent Recorded - Case Closed',
        'Matter Settled Through Mediation',
        'Out Of Court Settlement Reached',
    ],
    'Dismissed': [
        'Dismissed For Want Of Prosecution - Case Closed',
        'Dismissed',
        'Appeal Dismissed',
        'Terminated'
    ],

    'Case Closed': [
        'Struck Out',
        'Application Dismissed - Case Closed',
        'Application Allowed - Case Closed',
        'Matter Settled- Case Closed',
        'Ruling Delivered- Application Closed',
        'Consolidated- Case Closed',
        'Abated',
        'Placed In Probation',
        'Revision Declined',
        'Probation Orders Issued',
        'Appeal Rejected',
        'Interlocutory Judgement Entered',
        'Order issued - Case closed'
    ],
}


TIME_LIMITS: Dict[str, int] = {
    'Murder': 360, 
    'Constitution Petition': 360, 
    'Criminal Revision': 90, 
    'Judicial Review': 360, 
    'Civil Matter': 360,
    'Anti Corruption': 360,
    'Income Tax Appeal': 180,
    'Matrimonial Property': 360,
    'Succession': 180,
}


NON_ADJOURNABLE: List[str] = [
    'Taxation and Issuance of Certificates',
    'Orders',
    'Appointments of  Mediator',
    'Screening of files for Mediation',
    'Post-judgment',
    'Re-activation',
    'Reactivation',
    'Notice of Taxation',
    'Entering Interlocutory Judgments',
    'Approval by DR',
    'Registration/Filing-Application',
    'Registration/Filing',
    'Registration/Filing-Application',
]


MERIT_OUTCOMES: List[str] = [
    'Ruling Delivered- Case Closed',
    'Judgment Delivered- Case Closed',
    'Judgment Delivered',
    'Judgment Delivered- Acquittal',
    'Judgment Delivered- Convicted',
    'Grant Revoked',
    'Ruling Delivered- Accused Discharged',
    'Retrial'
]


In [None]:
def drop_nan_columns(df: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
    """
    Drop rows containing NaN values from the specified columns of a DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame to process.
        columns (List[str]): A list of column names to check for NaN values.

    Returns:
        pd.DataFrame: The updated DataFrame with NaN-containing rows dropped.

    Raises:
        ValueError: If any of the specified columns are not present in the DataFrame.
    """
    # Validate that all specified columns exist in the DataFrame
    missing_columns = set(columns) - set(df.columns)
    if missing_columns:
        raise ValueError(f"Columns not found in DataFrame: {', '.join(missing_columns)}")

    # Identify columns with NaN values
    nan_columns = df[columns].columns[df[columns].isna().any()].tolist()

    # Log dropped rows if any
    if nan_columns:
        nan_count = df[columns].isna().sum()
        logger.info("Dropping rows with NaN values:")
        for col in nan_columns:
            logger.info(f"  {col}: {nan_count[col]} rows")

    # Drop rows with NaN values in specified columns
    original_row_count = len(df)
    df_cleaned = df.dropna(subset=columns)
    dropped_row_count = original_row_count - len(df_cleaned)

    if dropped_row_count > 0:
        logger.info(f"Total rows dropped: {dropped_row_count}")
    else:
        logger.info("No rows were dropped.")

    return df_cleaned

def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:
    """
    Remove duplicates from a DataFrame.
    
    Args:
        data (pd.DataFrame): Input DataFrame.
        
    Returns:
        pd.DataFrame: DataFrame with duplicates removed.
    """
    num_duplicates = data.duplicated().sum()
    
    if num_duplicates > 0:
        logging.info(f"{num_duplicates} duplicates found.")
        data = data.drop_duplicates(keep="first").reset_index(drop=True)
        logging.info(f"{num_duplicates} duplicates removed.")
    else:
        logging.info("No duplicates found.")
    
    return data


def drop_null_values(df: pd.DataFrame, column_name: str = 'outcome') -> pd.DataFrame:
    """
    Drop rows from the DataFrame where the specified column contains null values.

    Args:
        df (pd.DataFrame): The DataFrame from which to drop rows.
        column_name (str): The name of the column to check for null values. Default is 'outcome'.

    Returns:
        pd.DataFrame: The DataFrame with rows containing null values in the specified column dropped.
    """
    df['outcome'] = df['outcome'].str.strip()
    initial_row_count: int = df.shape[0]
    cleaned_df: pd.DataFrame = df.dropna(subset=[column_name])
    final_row_count: int = cleaned_df.shape[0]
    dropped_row_count: int = initial_row_count - final_row_count
    
    if dropped_row_count > 0:
        logger.info(f"Total dropped rows with null values in '{column_name}': {dropped_row_count}")
    else:
        logger.info(f"No rows dropped with null values in '{column_name}'")
    return cleaned_df


def strip_dataframe_columns(df):
    """Strips leading and trailing whitespace from all columns in a Pandas DataFrame.

    Args:
        df (pandas.DataFrame): The DataFrame to modify.

    Returns:
        pandas.DataFrame: The modified DataFrame with stripped columns.
    """

    try:
        df = df.astype(str).apply(lambda x: x.str.strip())
        logger.info("str.strip() applied successfully to all columns.")
        return df
    except Exception as e:
        logger.error(f"Error applying str.strip(): {e}")
        return None

def convert_to_title_case(df: pd.DataFrame, column: str) -> pd.DataFrame:
    """
    Process the specified column of the DataFrame by applying title case.

    Args:
        df (pd.DataFrame): The input DataFrame.
        column (str): The name of the column to process.

    Returns:
        pd.DataFrame: The DataFrame with the processed column.
    """

    if column not in df.columns:
        logger.error(f"'{column}' column not found in the DataFrame")
        return df

    df[column] = df[column].str.title()

    return df

In [None]:
def validate_columns(df: pd.DataFrame, required_columns: Union[str, List[str]]) -> None:
    """
    Validate that the DataFrame contains the required columns.

    Args:
        df (pd.DataFrame): The input DataFrame.
        required_columns (Union[str, List[str]]): A single column name (str) or a list of column names (List[str]) that must be present.

    Raises:
        ValueError: If any required columns are missing.
    """
    # Convert required_columns to a list if it's a string
    if isinstance(required_columns, str):
        required_columns = [required_columns]

    # Check which required columns are missing
    missing_columns = set(required_columns) - set(df.columns)
    
    # If there are missing columns, raise a ValueError with an informative message
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")



def add_date(df: pd.DataFrame, column_names: List[str], new_col: str) -> pd.DataFrame:
    """
    Creates a new date column in the DataFrame by concatenating the values of three specified columns.

    Args:
        df (pd.DataFrame): The DataFrame containing the data.
        column_names (List[str]): A list of three column names to be concatenated [year, month, day].
        new_col (str): The import commandsname of the new date column to be created.

    Returns:
        pd.DataFrame: The DataFrame with the new date column added.

    Raises:
        ValueError: If the input list doesn't contain exactly three column names or if columns are missing.
    """
    if len(column_names) != 3:
        raise ValueError("column_names must contain exactly three elements: [year, month, day]")

    year_col, month_col, day_col = column_names

    # Check if all required columns exist in the DataFrame
    missing_columns = set(column_names) - set(df.columns)
    if missing_columns:
        raise ValueError(f"Missing columns in DataFrame: {', '.join(missing_columns)}")

    # Create copies to avoid SettingWithCopyWarning
    df = df.copy()

    try:
        # Convert year and day columns to integers
        df[year_col] = df[year_col].astype(float).astype(int)
        df[day_col] = df[day_col].astype(float).astype(int)

        # Concatenate the columns to create a date string
        df[new_col] = (df[year_col].astype(str) + '-' + 
                       df[month_col].astype(str) + '-' + 
                       df[day_col].astype(str))

        # Convert to datetime
        df[new_col] = pd.to_datetime(df[new_col], errors='coerce')

        # Log information about the conversion
        valid_dates = df[new_col].notna().sum()
        logger.info(f"Created new date column '{new_col}'. Valid dates: {valid_dates}/{len(df)}")

    except Exception as e:
        logger.error(f"Error creating date column: {str(e)}")
        raise

    return df

def add_case_number(df: pd.DataFrame, court_col: str, caseid_type_col: str, caseid_no_col: str, filed_yyyy_col: str, new_col='case_number') -> pd.DataFrame:
    """
    Generates a case number by concatenating court, caseid_type, caseid_no, and filed_yyyy columns.

    Args:
        df (pd.DataFrame): The dataframe containing the necessary columns.
        court_col (str): The name of the column containing court information.
        caseid_type_col (str): The name of the column containing case ID type.
        caseid_no_col (str): The name of the column containing case ID number.
        filed_yyyy_col (str): The name of the column containing the year the case was filed.
        new_col (str): The name of the new column to be created for the case number. Default is 'case_num'.

    Returns:
        pd.DataFrame: DataFrame with the new case number column.
    """
    df[new_col] = df[court_col] + '/' + df[caseid_type_col] + '/' + df[caseid_no_col] + '/' + df[filed_yyyy_col].astype(str)
    return df

In [None]:
def add_case_age(df: pd.DataFrame, filed_date_column: str = 'filed_date', activity_date_column: str = 'activity_date', age_column: str = 'case_age') -> pd.DataFrame:
    """
    Add a column representing the age of cases in days to the DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame containing case data.
        filed_date_column (str): The column name containing the filed dates.
        activity_date_column (str): The column name containing the activity dates.
        age_column (str): The name of the new column to store the case age in days.

    Returns:
        pd.DataFrame: The DataFrame with the new 'case_age' column added.
    
    Raises:
        ValueError: If the required columns are missing in the DataFrame.
    """
    # Check if required columns exist
    required_columns = [filed_date_column, activity_date_column]
    missing_columns = [col for col in required_columns if col not in df.columns]
    
    if missing_columns:
        logger.error(f"Missing required columns: {', '.join(missing_columns)}")
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")
    
    # Calculate the case age in days
    try:
        df[age_column] = (df[activity_date_column] - df[filed_date_column]).dt.days
        
        logger.info(f"Successfully added '{age_column}' column to the DataFrame.")
    
    except Exception as e:
        logger.error(f"Error calculating case age: {e}")
        raise

    return df


def categorize_case(case_type: str, criminal_cases: Optional[List[str]]) -> str:
    """
    Categorize a case as 'Criminal' or 'Civil' based on its type.
    
    Args:
        case_type (str): The type of the case.
        criminal_cases (Optional[List[str]]): List of case types considered as criminal.
        
    Returns:
        str: 'Criminal' if the case type is in the criminal cases list or if criminal_cases is None, 'Civil' otherwise.
    """
    if criminal_cases is None:
        return 'Criminal'
    else:
        return 'Criminal' if case_type in criminal_cases else 'Civil'

def add_case_nature(df: pd.DataFrame, criminal_cases: Optional[List[str]] = None) -> pd.DataFrame:
    """
    Categorize all cases in the DataFrame as 'Criminal' or 'Civil'.
    
    Args:
        df (pd.DataFrame): The input DataFrame containing case data.
        criminal_cases (Optional[List[str]]): List of case types considered as criminal.
            If None, all cases are categorized as 'Criminal'.
        
    Returns:
        pd.DataFrame: DataFrame with an added 'nature' column indicating case nature.
    """
    df['nature'] = df['case_type'].apply(lambda x: categorize_case(x, criminal_cases))

    # Check for presence of both case types
    if 'Criminal' not in df['nature'].values:
        logging.warning("No criminal cases found in the DataFrame.")
    if 'Civil' not in df['nature'].values:
        logging.warning("No civil cases found in the DataFrame.")
    
    return df

def find_matching_keys(value: Any, mapping: Dict[str, Union[str, List[Any]]]) -> Optional[Union[str, List[str]]]:
    """
    Find all keys in a dictionary where the given value is present.

    Args:
        value (Any): The value to search for.
        mapping (Dict[str, Union[str, List[Any]]]): A dictionary where keys map to either a string or a list of values.

    Returns:
        Optional[Union[str, List[str]]]: A single key if exactly one match is found, 
                                         a list of keys if multiple matches are found,
                                         or None if no matches are found.
    """
    matching_keys = [
        key for key, dict_value in mapping.items()
        if (isinstance(dict_value, str) and dict_value == value) or
           (isinstance(dict_value, list) and value in dict_value)
    ]

    if not matching_keys:
        return None
    return matching_keys[0] if len(matching_keys) == 1 else matching_keys



def add_broad_category(
    df: pd.DataFrame, 
    case_type_column: str, 
    broad_case_type_column: str, 
    mapping: Dict[str, Union[str, List[Any]]]
) -> pd.DataFrame:
    """
    Map case types to broad case categories based on a given mapping dictionary.

    Args:
        df (pd.DataFrame): The DataFrame containing the data.
        case_type_column (str): The column name containing case types to be mapped.
        broad_case_type_column (str): The name of the new column where the broad case categories will be stored.
        mapping (Dict[str, Union[str, List[Any]]]): The mapping dictionary where keys are broad categories and values are specific case types.

    Returns:
        pd.DataFrame: The DataFrame with the new column containing broad case categories.
    
    Raises:
        ValueError: If the case_type_column is not found in the DataFrame.
    """
    # Check if the required column exists
    if case_type_column not in df.columns:
        logger.error(f"Column '{case_type_column}' not found in DataFrame")
        raise ValueError(f"Column '{case_type_column}' not found in the DataFrame")
    
    # Apply the dictionary mapping using find_matching_keys function
    df[broad_case_type_column] = df[case_type_column].apply(lambda x: find_matching_keys(x, mapping))
    
    logger.info(f"Successfully mapped case types to broad categories in '{broad_case_type_column}' column.")
    return df


def analyze_court_outcomes(df: pd.DataFrame, start_date: str, end_date: str, outcome: str) -> pd.DataFrame:
    """
    Calculate the number of case outcomes per court within a specified period.
    
    Args:
        df (pd.DataFrame): A pandas DataFrame containing the data.
        start_date (str): The starting date of the period (YYYY-MM-DD format).
        end_date (str): The ending date of the period (YYYY-MM-DD format).
        outcome (str): A column representing the outcome of interest.
        
    Returns:
        pd.DataFrame: A DataFrame showing the number of resolved cases per court and case category.
    """
    try:
        period_start = pd.to_datetime(start_date)
        period_end = pd.to_datetime(end_date)
        
        if period_start > period_end:
            raise ValueError("start_date must be earlier than end_date")
        
        required_columns = {'court', 'broad_case_type', 'activity_date', outcome}
        if not required_columns.issubset(df.columns):
            missing_columns = required_columns - set(df.columns)
            raise KeyError(f"Missing required columns: {missing_columns}")
        
        filtered_cases = df[
            (df['activity_date'] >= period_start) &
            (df['activity_date'] <= period_end) &
            (df[outcome] == 1)
        ]
        
        if filtered_cases.empty:
            logging.warning("No cases found for the given date range and outcome.")
   
        outcome_by_type = (
            filtered_cases
            .groupby(['court', 'broad_case_type'])
            .size()
            .reset_index(name='num_cases')
        )

        result = outcome_by_type.pivot_table(
            index='court', 
            columns='broad_case_type', 
            values='num_cases', 
            fill_value=0
        )
        
        logging.info("Successfully calculated case outcomes per court.")
        return result
    
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        raise


def process_case_time_limits(df: pd.DataFrame, time_limits: Dict[str, int]) -> pd.DataFrame:
    """
    Process the case data by adding age and time limit compliance columns.

    Args:
        df (pd.DataFrame): The input DataFrame containing case data.
                           Required columns: 'filed_date', 'activity_date', 'broad_case_type', 'concluded'
        time_limits (Dict[str, int]): A dictionary with case categories as keys and time limits as values.

    Returns:
        pd.DataFrame: The processed DataFrame with 'age' and 'within_time_limit' columns added.
    """

    # Check if each case is within the time limit and concluded
    df['within_time_limit'] = (
        (df['case_age'] <= df['broad_case_type'].map(time_limits).fillna(0)) & 
        df['concluded']
    )
    
    return df


def is_concluded(outcome: str, resolved_outcomes: List[str]) -> bool:
    """
    Determine if a case outcome is resolved.

    Args:
        outcome (str): The outcome of the case.
        resolved_outcomes (List[str]): List of outcomes that indicate resolution.

    Returns:
        bool: True if the outcome is considered resolved, otherwise False.
    """
    return outcome.lower() in (resolved.lower() for resolved in resolved_outcomes)


def is_case_registered(outcome: str, activity_date: Union[str, pd.Timestamp], filed_date: Union[str, pd.Timestamp]) -> bool:
    """
    Check if a case is registered based on outcome and whether activity and filed dates match.

    Args:
        outcome (str): The outcome of the case.
        activity_date (Union[str, pd.Timestamp]): The date of the activity.
        filed_date (Union[str, pd.Timestamp]): The date the case was filed.

    Returns:
        bool: True if the outcome implies registration and activity_date matches filed_date, otherwise False.
    """
    outcome = outcome.strip().lower()
    
    if 'registered' not in outcome and 'filed' not in outcome:
        return False

    return pd.notna(activity_date) and pd.notna(filed_date) and activity_date == filed_date



def add_conclusion(df: pd.DataFrame, resolved_outcomes: List[str]) -> pd.DataFrame:
    """
    Add the 'concluded' column to the DataFrame based on resolved outcomes.
    
    Args:
        df (pd.DataFrame): The DataFrame with case data.
        resolved_outcomes (List[str]): List of outcomes considered resolved.
    
    Returns:
        pd.DataFrame: DataFrame with 'concluded' column added.
    """
    df['concluded'] = df['outcome'].apply(lambda outcome: is_concluded(outcome, resolved_outcomes))
    return df


def add_registration(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add the 'registered' column to the DataFrame based on case registration criteria.
    
    Args:
        df (pd.DataFrame): The DataFrame with case data.
    
    Returns:
        pd.DataFrame: DataFrame with 'registered' column added.
    """
    df['registered'] = df.apply(
        lambda row: is_case_registered(row['outcome'], row['activity_date'], row['filed_date']),
        axis=1
    )
    return df

def add_productivity(df: pd.DataFrame, merit_outcomes: list) -> pd.DataFrame:
    """
    Categorize each row in the DataFrame based on merit and conclusion status.

    Args:
        df (pd.DataFrame): The input DataFrame containing 'outcome' and 'concluded' columns.
        merit_outcomes (list): A list of outcomes considered merit outcomes.

    Returns:
        pd.DataFrame: The DataFrame with an additional 'productivity_category' column.
    """
    def categorize_row(row):
        if row['outcome'] in merit_outcomes and row['concluded'] == 1:
            return 'Merit'
        elif row['outcome'] not in merit_outcomes and row['concluded'] == 1:
            return 'Non-Merit'
        else:
            return None

    # Apply the row categorization
    df['productivity_category'] = df.apply(categorize_row, axis=1)
    
    return df

def get_productivity(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a pivot table summarizing the productivity category counts by court.

    Args:
        df (pd.DataFrame): The input DataFrame with a 'productivity_category' column.

    Returns:
        pd.DataFrame: The pivot table with merit and non-merit case counts for each court.
    """
    # Create the pivot table
    productivity_pivot_table = pd.pivot_table(
        df,
        values='concluded',  
        index='court',     
        columns='productivity_category', 
        aggfunc='count',   
        fill_value=0        
    ).rename(columns={'merit': 'Merit', 'non-merit': 'Non_Merit'})

    return productivity_pivot_table

def get_adjournment(df: pd.DataFrame, non_adjournable: list[str]) -> pd.DataFrame:
    """
    Perform adjournment analysis on the DataFrame by creating columns for adjourned and adjournable events,
    calculating adjourned events per court and reason, and determining adjournment rates.

    Args:
        df (pd.DataFrame): The input DataFrame containing 'reason_adj', 'comingfor', and 'court' columns.
        non_adjournable (list): A list of 'comingfor' values that are considered non-adjournable.

    Returns:
        pd.DataFrame: A DataFrame containing adjournment proportions per court.
    """
    if not all(col in df.columns for col in ['reason_adj', 'comingfor', 'court']):
        raise ValueError("Input DataFrame must contain 'reason_adj', 'comingfor', and 'court' columns.")

    df['comingfor'] = df['comingfor'].str.strip()

    # 1. Create 'adjourned' column (1 if 'reason_adj' is not null and 'comingfor' is not in non_adjournable, else 0)
    df['adjourned'] = (df['reason_adj'].notnull() & df['comingfor'].apply(lambda x: x not in non_adjournable)).astype(int)

    # 2. Create 'adjournable' column (1 if 'comingfor' is not in non_adjournable, else 0)
    df['adjournable'] = df['comingfor'].apply(lambda x: x not in non_adjournable).astype(int)

    # 3. Calculate adjourned events per court and reason_adj
    adjourned_per_court_reason = df.groupby(['court', 'reason_adj'])['adjourned'].sum().reset_index(name='count')

    # 4. Sum adjourned and adjournable events per court
    adjourned = df.groupby('court')['adjourned'].sum().reset_index(name='total_adjourned')
    adjournable = df.groupby('court')['adjournable'].sum().reset_index(name='total_adjournable')

    # 5. Calculate the adjournment proportion per court
    adjourn_proportion = pd.merge(adjourned, adjournable, on='court')
    adjourn_proportion['adjourn_proportion'] = (adjourn_proportion['total_adjourned'] / adjourn_proportion['total_adjournable']) * 100

    return adjourn_proportion


def get_monthly_case_stats(df, registered_col, concluded_col):
    """Calculates monthly statistics for registered and concluded cases.

    Args:
        df (pandas.DataFrame): The input DataFrame containing case data.
        registered_col (str): The name of the column containing registered cases.
        concluded_col (str): The name of the column containing concluded cases.

    Returns:
        pandas.DataFrame: A DataFrame with monthly statistics for registered and concluded cases.
    """

    monthly_cases = df.groupby(['court', 'date_mon', 'case_type']).agg(
        registered=(registered_col, 'sum'),
        concluded=(concluded_col, 'sum')
    ).reset_index()

    return monthly_cases

def get_cases_per_quarter(df, column):
    # Group by quarters and count cases
    cases_per_quarter = df.groupby(pd.Grouper(key='activity_date', freq='QE'))[column].sum()

    # Reset index to make the quarters a column
    cases_per_quarter = cases_per_quarter.reset_index()

    # Rename the columns
    cases_per_quarter.columns = ['quarter', f'cases_{column}']

    return cases_per_quarter


def get_quarterly_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate quarterly statistics for adjourned, adjournable, concluded, and registered cases.

    Args:
        df (pd.DataFrame): The input DataFrame containing case data.

    Returns:
        pd.DataFrame: A DataFrame with quarterly statistics.
    """
    quarterly_adjourned = get_cases_per_quarter(df, 'adjourned')
    quarterly_adjournable = get_cases_per_quarter(df, 'adjournable')
    quarterly_concluded = get_cases_per_quarter(df, 'concluded')
    quarterly_registered = get_cases_per_quarter(df, 'registered')
    
    # Merge quarterly data on quarter column
    quarterly_stats = quarterly_adjourned.merge(
        quarterly_adjournable, on='quarter'
    ).merge(
        quarterly_concluded, on='quarter'
    ).merge(
        quarterly_registered, on='quarter'
    )
    
    return quarterly_stats


def calculate_timeline_proportion_per_court(
    df: pd.DataFrame, 
    time_limit_column: str = 'within_time_limit', 
    concluded_column: str = 'concluded', 
    court_column: str = 'court', 
    case_type_column: str = 'broad_case_type'
) -> pd.DataFrame:
    """
    Calculate the proportion of cases resolved within the time limit per court and case type.

    Args:
        df (pd.DataFrame): The DataFrame containing the case data.
        time_limit_column (str): The column name indicating if the case was resolved within the time limit (binary).
        concluded_column (str): The column name indicating if the case was concluded (binary).
        court_column (str): The column name representing courts.
        case_type_column (str): The column representing broad case types.

    Returns:
        pd.DataFrame: A DataFrame containing the proportion of cases resolved within the time limit per court.
    """
    try:
        # Filter DataFrame for cases resolved within the time limit
        df_resolved_within_limit = df[df[time_limit_column] == 1]
        
        # Filter DataFrame for concluded cases
        df_concluded = df[df[concluded_column] == 1]
        
        # Create pivot tables
        resolved_within_timelimit_pivot = df_resolved_within_limit.pivot_table(
            index=court_column, 
            columns=case_type_column, 
            values=time_limit_column, 
            aggfunc='count', 
            fill_value=0
        )
        
        total_concluded_per_court_pivot = df_concluded.pivot_table(
            index=court_column, 
            columns=case_type_column, 
            values=time_limit_column,  # Using `time_limit_column` for count
            aggfunc='count', 
            fill_value=0
        )
        
        # Calculate the proportion of cases resolved within the time limit
        timeline_proportion_per_court = resolved_within_timelimit_pivot / total_concluded_per_court_pivot
        
        # Log the success message
        logger.info("Successfully calculated timeline proportion per court.")
        
        return timeline_proportion_per_court

    except Exception as e:
        logger.error(f"Error calculating timeline proportions: {e}")
        raise

### Reclassify rulings

if a outcome has ruling delivered cases closed, check that the comingfor is ruling, meaning it was already scheduled for delivery. 


In [None]:
def reclassify_ruling(df):
    ruling_date_set_outcomes = ['Ruling date given']
    ruling_delivered_outcomes = ['Ruling delivered- case closed']
    
    # Preprocessing: Filter and sort the DataFrame upfront
    df_filtered = df[df['outcome'].isin(judgment_date_set_outcomes + judgment_delivered_outcomes)]
    df_filtered = df_filtered.sort_values(by=['case_number', 'activity_date'])
    
    # Initialize columns
    df['judgment_status'] = 'Not Scheduled'
    df['set_date'] = pd.NaT
    df['delivery_date'] = pd.NaT
    df['delivery_category'] = ''
    
    # Filter rows with judgment set outcomes and valid schedule dates
    judgment_set_rows = df_filtered[df_filtered['outcome'].isin(judgment_date_set_outcomes) & 
                                    (df_filtered['next_date'] <= cutoff_date)]
    
    # For each case, find the earliest set date
    earliest_schedule = judgment_set_rows.groupby('case_number').first().reset_index()
    
    # Create dictionaries to map case numbers to their schedule dates and statuses
    case_to_set_date = dict(zip(earliest_schedule['case_number'], earliest_schedule['next_date']))
    case_to_status = {case: 'Scheduled' for case in earliest_schedule['case_number']}
    
    # Update the result dataframe with schedule information
    df['set_date'] = df['case_number'].map(case_to_set_date)
    df['judgment_status'] = df['case_number'].map(case_to_status).fillna('Not Scheduled')
    df['delivery_category'] = df['case_number'].map(case_to_status).fillna('')
    
    # Filter rows with judgment delivered outcomes
    judgment_delivered_rows = df_filtered[df_filtered['outcome'].isin(judgment_delivered_outcomes)]
    
    # Find the first delivery date after set date
    for case_number, group in earliest_schedule.groupby('case_number'):
        set_date = group['next_date'].values[0]
        delivery = judgment_delivered_rows[(judgment_delivered_rows['case_number'] == case_number) & 
                                           (judgment_delivered_rows['activity_date'] >= set_date)]
        
        if not delivery.empty:
            delivery_date = delivery.iloc[0]['activity_date']
            df.loc[df['case_number'] == case_number, 'delivery_date'] = delivery_date
            df.loc[df['case_number'] == case_number, 'judgment_status'] = 'Delivered'
            
            if delivery_date <= set_date:
                df.loc[df['case_number'] == case_number, 'delivery_category'] = 'On Time'
            else:
                df.loc[df['case_number'] == case_number, 'delivery_category'] = 'Delayed'
        else:
            earlier_delivery = judgment_delivered_rows[(judgment_delivered_rows['case_number'] == case_number) & 
                                                       (judgment_delivered_rows['activity_date'] < set_date)]
            if earlier_delivery.empty:
                if cutoff_date >= set_date:
                    df.loc[df['case_number'] == case_number, 'judgment_status'] = 'Delayed'
                    df.loc[df['case_number'] == case_number, 'delivery_category'] = 'Delayed'
            else:
                df.loc[df['case_number'] == case_number, 'delivery_date'] = earlier_delivery.iloc[0]['activity_date']
                df.loc[df['case_number'] == case_number, 'judgment_status'] = 'Delivered'
                df.loc[df['case_number'] == case_number, 'delivery_category'] = 'On Time'
    
    return df[df['set_date'].notna()]


In [None]:
file_path = '/home/stanoo/Projects/data/tribunal' 
#file_path = "/home/stanoo/dcrt/data/API/Hc/hc_23-24_data.csv"
raw_df = pd.read_csv(f'{file_path}/tribunal.csv')

In [None]:
df = raw_df.copy()

In [None]:
# outcomes=raw_df.groupby('outcome')['outcome'].count().sort_values(ascending=False)

In [None]:
# revision_outcomes = raw_df[raw_df['case_type'] == 'Criminal Revision'].groupby('outcome')['outcome'].count().sort_values(ascending=False)

In [None]:
try:
    validate_columns(df, ['outcome', 'activity_date', 'filed_date'])
    logger.info("Validation passed.")
except ValueError as e:
    logger.error(e)
df = add_date(df.copy(), ['date_dd', 'date_mon', 'date_yyyy'], 'activity_date')
df = add_date(df.copy(), ['filed_dd', 'filed_mon', 'filed_yyyy'], 'filed_date')
df = add_date(df.copy(), ['next_dd', 'next_mon','next_yyyy'], 'next_date')
df = add_case_number(df, 'court', 'caseid_type', 'caseid_no', 'filed_yyyy')
df = add_case_age(df)

In [None]:
#df = add_case_nature(df, CRIMINAL_CASES)
df = add_conclusion(df, RESOLVED_OUTCOMES)
df = add_registration(df)
df = add_productivity(df, MERIT_OUTCOMES)
#df = add_broad_category(df, 'case_type', 'broad_case_type', BROAD_CASE_TYPES)
#df = process_case_time_limits(df, TIME_LIMITS)

In [None]:
df[df['case_number']=='Eldoret High Court_High Court Criminal/HCCRREV/E223/2022']

In [None]:
df.groupby(['court', 'broad_case_type'])['concluded'].sum()

In [None]:
df[df['broad_case_type']=='Criminal Revision'].groupby('court')['concluded'].sum()

In [None]:
# get criminal revisions within time limit per court and broad case type
df[df['broad_case_type']=='Criminal Revision'].groupby('court')['within_time_limit'].sum()

In [None]:
performance_timeline = calculate_timeline_proportion_per_court(df)
court_productivity = get_productivity(df)
adjourned_stats = get_adjournment(df, NON_ADJOURNABLE) 

In [None]:
# monthly_filed_cases = analysis.get_monthly_cases(df, 'registered')
# monthly_concluded_cases = analysis.get_monthly_cases(df, 'concluded')
# average_time_to_conclude = analysis.get_average_time_to_conclude(df)

In [None]:
filed_cases = analyze_court_outcomes(df, '2023-07-01', '2024-06-30', 'registered')
resolved_cases = analyze_court_outcomes(df, '2023-07-01', '2024-06-30', 'concluded')
monthly_stats = get_monthly_case_stats(df, 'registered', 'concluded')
merged_quarterly = get_quarterly_stats(df)

In [None]:
performance_timeline

In [None]:
filed_cases.to_csv(f'{file_path}/reports/filed_cases.csv')
resolved_cases.to_csv(f'{file_path}/reports/resolved_cases.csv')
monthly_stats.to_csv(f'{file_path}/reports/monthly_stats.csv', index=False)
court_productivity.to_csv(f'{file_path}/reports/court_productivity.csv')
average_time_to_conclude.to_csv(f'{file_path}/reports/average_time_to_conclude.csv')
performance_timeline.to_csv(f'{file_path}/reports/performance_timeline.csv')
adjourned_stats.to_csv(f'{file_path}/reports/adjourned_stats.csv', index=False)

In [None]:
merged_quarterly

In [None]:
resolved_within_timelimit = df[df['time_lines'] == 1].pivot_table(index='court', columns='broad_case_type', values='time_lines', aggfunc='count', fill_value=0)
total_concluded_per_court = df[df['concluded'] == 1].pivot_table(index='court', columns='broad_case_type', values='time_lines', aggfunc='count', fill_value=0)
timeline_per_court = resolved_within_timeline_per_court_pivot / total_concluded_per_court_pivot
