# Library

In [29]:
import os
import pandas as pd
import re

# Path Setting

In [2]:
# Use the current working directory instead
base_path = os.getcwd()
data_source_dir = os.path.abspath(os.path.join(base_path, "..", "..", "A. Data Source", "A.2. SSKI (Bank Indonesia)"))

# Additional Function

In [3]:
def read_sheet_15a_or_16a(file_path):
    try:
        # Get all sheet names
        sheet_names = pd.ExcelFile(file_path).sheet_names

        # Select the appropriate sheet
        target_sheet = None
        if '15a' in sheet_names:
            target_sheet = '15a'
        elif '16a' in sheet_names:
            target_sheet = '16a'

        if target_sheet:
            df = pd.read_excel(file_path, sheet_name=target_sheet)
            return df
        else:
            print(f"No sheet '15a' or '16a' found in {file_path}")
            return None

    except Exception as e:
        print(f"Failed to read {file_path}: {e}")
        return None

In [None]:
def normalize_text(text):
    """Convert text to lowercase and remove all spaces."""
    return str(text).lower().replace(" ", "")

def merge_rows(df, merge_list, col_index=1):
    # Normalize the merge_list
    normalized_list = [normalize_text(item) for item in merge_list]

    rows_to_drop = []
    for i in range(len(df) - 1):
        current_val = normalize_text(df.iloc[i, col_index])

        if current_val in normalized_list:
            # Merge current row with the next row (column by column)
            for col in df.columns:
                val1 = df.at[i, col]
                val2 = df.at[i + 1, col]

                # Convert nulls to empty string, others to string
                str1 = "" if pd.isna(val1) else str(val1)
                str2 = "" if pd.isna(val2) else str(val2)

                # Merge with a space only if both are non-empty
                if str1 and str2:
                    merged = str1 + " " + str2
                else:
                    merged = str1 + str2  # One of them is empty

                df.at[i, col] = merged

            rows_to_drop.append(i + 1)

    df = df.drop(rows_to_drop).reset_index(drop=True)
    return df

In [28]:
def cut_dataframe_target(df:pd.DataFrame, col_index:int, target_value:str)-> pd.DataFrame :
    col_name = df.columns[col_index]

    # print(f"Columns name to searched and cut : {col_name}")
    standardized_target = target_value.lower().replace(' ', '')

    # Standardize column 1 values
    standardized_col = df[col_name].str.lower().str.replace(' ', '', regex=False)

    # Find first match
    start_index = df[standardized_col == standardized_target].index.min()
                                                                                                           
    # Filter rows from that index down
    if(start_index >= 3) :
        filtered_df = df.loc[start_index:]
    else :
        filtered_df = df.loc[0:]

    return(filtered_df.reset_index(drop=True))

In [30]:
def process_q_strings_inplace(df: pd.DataFrame):
    first_row = df.iloc[0].tolist()  # Make a copy to iterate safely

    for idx, val in enumerate(first_row):
        if isinstance(val, str) and val.startswith('Q'):
            # Search leftward
            for left_idx in range(idx - 1, -1, -1):
                left_val = str(first_row[left_idx])
                if re.match(r'^\d', left_val):  # Starts with digit
                    digits = re.findall(r'\d', left_val)
                    if len(digits) >= 4:
                        left_number = ''.join(digits[:4])
                        new_val = f"{left_number} {val}"
                        df.iat[0, idx] = new_val  # Update value in-place
                    break  # Stop after first match
    
    return(df)

In [38]:
def clean_dataframe_and_trim_nulls(df):
    # Step 1: Remove first and last two columns
    df = df.iloc[:, 1:-2]

    # Step 2: Reset column index
    df.columns = range(df.shape[1])

    # Step 3: Set first cell to "KOMPONEN"
    df.iat[0, 0] = "KOMPONEN"

    # Step 4: Always remove last row if:
    # - The entire row is null, or
    # - The value in first column is null
    while df.shape[0] > 0:
        last_idx = df.index[-1]
        if df.iloc[last_idx].isnull().all() or pd.isna(df.iat[last_idx, 0]):
            df = df.drop(index=last_idx)
        else:
            break  # Stop when a valid last row is found

    # Step 5: Reset index
    df = df.reset_index(drop=True)

    return df

In [53]:
def tag_section_headers(df:pd.DataFrame, search_values:list, new_col_name:str, target_col_index:int=0):
    # Step 1: Insert new column at the beginning
    df.insert(0, new_col_name, None)

    # Step 2: Normalize search values once
    normalized_search = [normalize_text(val) for val in search_values]

    # Step 3: Convert column to list for processing
    col_values = df.iloc[:, target_col_index + 1].tolist()  # +1 because of inserted column

    # Step 4: Track index manually because we'll be deleting rows
    i = 0
    while i < len(col_values):
        val = col_values[i]
        norm_val = normalize_text(val)

        if norm_val in normalized_search:
            header = val  # Keep original text for tagging
            start_idx = i

            # Remove the header row
            df = df.drop(index=start_idx)
            df = df.reset_index(drop=True)

            # Rebuild col_values after drop
            col_values = df.iloc[:, target_col_index + 1].tolist()

            # Fill new_col_name from start_idx until next header or end
            end_idx = start_idx
            while end_idx < len(col_values) and normalize_text(col_values[end_idx]) not in normalized_search:
                df.at[end_idx, new_col_name] = header
                end_idx += 1

            # Start next search at end_idx
            i = end_idx
            col_values = df.iloc[:, target_col_index + 1].tolist()
        else:
            i += 1

    return df

In [56]:
def auto_tag_section_by_null_above(df: pd.DataFrame, new_col_name: str, target_col_index: int = 1):
    # Step 1: Insert the new column at position 1 (second column)
    df.insert(1, new_col_name, None)

    # Step 2: Prepare tracking
    i = 1  # Start from second row since we'll compare with row - 1
    current_tag = None

    while i < len(df):
        current_val = df.iat[i, target_col_index + 1]  # +1 because of inserted column
        prev_val = df.iat[i - 1, target_col_index + 1]

        if pd.notna(current_val) and pd.isna(prev_val):
            # New section detected

            # Store the tag (original text)
            current_tag = current_val

            # Remove both tag row and null row above it
            df = df.drop(index=[i - 1, i])
            df = df.reset_index(drop=True)

            # Recalculate total length
            i -= 2  # Step back to safely continue from the right index
            if i < 0:
                i = 0
        else:
            if current_tag:
                df.iat[i, 1] = current_tag  # Set the tag into the new column
            i += 1

    # Fill the last part of the dataframe with the final tag
    for j in range(i, len(df)):
        if current_tag:
            df.iat[j, 1] = current_tag

    return df.reset_index(drop=True)

In [68]:
def auto_tag_section_by_null_right(df: pd.DataFrame, new_col_name: str, target_col_index: int = 2):
    # Step 1: Insert new column at position 2 (third column)
    df.insert(target_col_index, new_col_name, None)

    current_tag = None
    i = 0

    while i < len(df):
        current_val = df.iat[i, target_col_index + 1]      # Value in the tag column (newly inserted)
        right_val = df.iat[i, target_col_index + 2]     # Value in the original "right" column (index 3 originally)

        if pd.notna(current_val) and pd.isna(right_val):
            # Treat this row as a tag
            current_tag = current_val

            # Remove the tag row
            df = df.drop(index=i)
            df = df.reset_index(drop=True)

            # No increment — stay on the same row after drop
            continue
        else:
            if current_tag:
                df.iat[i, target_col_index] = current_tag
        i += 1

    return df.reset_index(drop=True)

In [73]:
def conditionally_clear_ket_data_level_3_per_row(df: pd.DataFrame, check_str : str) -> pd.DataFrame:
    if 'ket_data_level_3' not in df.columns:
        return df  # Do nothing if the column doesn't exist

    for i in range(len(df)):
        val = df.iat[i, 0]  # Value in column index 0
        if normalize_text(val) != normalize_text(check_str):
            df.at[i, 'ket_data_level_3'] = None  # or use np.nan
    return df

In [77]:
def rename_columns_from_first_row(df: pd.DataFrame) -> pd.DataFrame:
    # Step 1: Get the current column names
    current_cols = df.columns.tolist()

    # Step 2: Get the first row as new names (for columns index 3 onward)
    new_names = df.iloc[0, 3:].tolist()

    # Step 3: Combine fixed names (first 3) + new names from first row
    updated_cols = current_cols[:3] + new_names

    # Step 4: Assign new column names
    df.columns = updated_cols

    # Step 5: Drop the first row
    df = df.iloc[1:].reset_index(drop=True)

    return df

In [82]:
def end_to_end_df_processing(df:pd.DataFrame) -> pd.DataFrame :
    merge_key_list = [
        "KOMPONEN"
    ]
    
    target_value = "KOMPONEN"

    value_list = [
        "RASIO KINERJA KEUANGAN",
        "DATA KINERJA KEUANGAN",
        "INDIKATOR HASIL SURVEI 3)"
    ]

    process_df_0 = merge_rows(df, merge_key_list, 0)
    process_df_1 = cut_dataframe_target(process_df_0.copy(), 0, target_value)
    process_df_2 = process_q_strings_inplace(process_df_1.copy())
    process_df_3 = clean_dataframe_and_trim_nulls(process_df_2.copy())
    process_df_4 = tag_section_headers(process_df_3.copy(), value_list, "ket_data_level_1")
    process_df_5 = auto_tag_section_by_null_above(process_df_4.copy(), "ket_data_level_2")
    process_df_6 = auto_tag_section_by_null_right(process_df_5.copy(), "ket_data_level_3")
    process_df_7 = conditionally_clear_ket_data_level_3_per_row(process_df_6.copy(), "RASIO KINERJA KEUANGAN")
    process_df_8 = rename_columns_from_first_row(process_df_7.copy())

    return(process_df_8) 

# Main Code

In [5]:
data_dict = {}

# Loop through all child folders
for folder in os.listdir(data_source_dir):
    folder_path = os.path.join(data_source_dir, folder)
    
    # Only process if it's a directory
    if os.path.isdir(folder_path):
        for file in os.listdir(folder_path):
            if file.endswith('.xlsx'):
                file_path = os.path.join(folder_path, file)
                try:
                    print(f"Found excel file : {file}")
                    df = read_sheet_15a_or_16a(file_path)
                    key = os.path.splitext(file)[0]  # Get filename without extension
                    data_dict[key] = df
                except Exception as e:
                    print(f"Failed to read {file_path}: {e}")

Found excel file : SSKI_DESEMBER_2022.xlsx
Found excel file : SSKI_DESEMBER_2023.xlsx
Found excel file : SSKI_DESEMBER_2024.xlsx
Found excel file : SSKI_JUNI 2025.xlsx
Found excel file : SSKI_JUNI_2022.xlsx
Found excel file : SSKI_JUNI_2023.xlsx
Found excel file : SSKI_JUNI_2024.xlsx


In [6]:
data_dict_backup = data_dict.copy()

In [83]:
data_dict = data_dict_backup.copy()

merge_key_list = [
    "KOMPONEN"
]

for key in data_dict.keys() :
    df = data_dict[key]
    new_df = end_to_end_df_processing(df)
    data_dict[key] = new_df