# Import
Import text files from .zip folder and create for each .zip folder a .parquet file

In [None]:
import pandas as pd
import zipfile
import glob
import os
import re
from datetime import datetime # Added for robust date parsing

# --- Debugging Flag ---
# If True, only the first 10 zip files will be processed.
# Set to False to process all zip files.
do_debug = True

# --- List to collect debug information for each processed text file/combined file ---
debug_data_list = []

# --- Regex for Split File Detection ---
# Matches patterns like "...(1 of 3).txt", "... ( 2 of 10 ).txt", etc.
# Groups: 1=Base Name, 2=Part Number, 3=Total Parts
split_file_pattern = re.compile(r"^(.*?)\s*\((\d+)\s+of\s+(\d+)\)\.txt$", re.IGNORECASE)

# --- Helper Function to Extract Reporting Quarter ---
def extract_reporting_quarter(filename):
    """
    Extracts reporting quarter (e.g., "Q1 2023") from a filename.
    Searches for YYYYMMDD or MMDDYYYY patterns.
    """
    potential_dates = re.findall(r'\d{8}', filename) # Find all 8-digit sequences
    
    for date_str in potential_dates:
        # Try parsing as YYYYMMDD
        try:
            dt = datetime.strptime(date_str, '%Y%m%d')
            if 1980 <= dt.year <= 2050: # Reasonable year range
                quarter = (dt.month - 1) // 3 + 1
                return f"Q{quarter} {dt.year}"
        except ValueError:
            pass # Not YYYYMMDD

        # Try parsing as MMDDYYYY
        try:
            dt = datetime.strptime(date_str, '%m%d%Y')
            if 1980 <= dt.year <= 2050: # Reasonable year range
                quarter = (dt.month - 1) // 3 + 1
                return f"Q{quarter} {dt.year}"
        except ValueError:
            pass # Not MMDDYYYY
            
    return "Unknown"

# --- Helper Function to Log DataFrame Metrics ---
def log_df_metrics(df_to_log, file_identifier_for_log):
    """
    Calculates and stores debug metrics for a given DataFrame.
    Assumes df_to_log has IDRSSD as index if not empty.
    """
    # print(f"    - Debug: Attempting to log metrics for {file_identifier_for_log}, Shape: {df_to_log.shape}")
    quarter = extract_reporting_quarter(file_identifier_for_log)
    
    if df_to_log.empty:
        num_mv = 0
        num_banks_val = 0
        non_numeric_idrssd_val = 0
        # print(f"    - Debug: Logged empty DataFrame metrics for {file_identifier_for_log}")
    else:
        num_mv = df_to_log.isnull().sum().sum()
        num_banks_val = 0
        non_numeric_idrssd_val = 0

        if df_to_log.index.name == 'IDRSSD':
            idrssd_series_from_index = pd.Series(df_to_log.index.astype(str))
            num_banks_val = idrssd_series_from_index.nunique()
            # Check for non-digits in stripped IDRSSD strings. Empty strings (if any) won't be counted as non-numeric by this.
            non_numeric_idrssd_val = idrssd_series_from_index.str.strip().apply(lambda x: not x.isdigit() if x else False).sum()
            # print(f"    - Debug: Logged metrics for indexed DataFrame {file_identifier_for_log}. Banks: {num_banks_val}, Non-numeric IDRSSD: {non_numeric_idrssd_val}, MV: {num_mv}")
        else:
            print(f"    - Warning (Debug Logging): IDRSSD not index for {file_identifier_for_log} when logging metrics.")
            # Fallback: Check if 'IDRSSD' column exists (e.g., if set_index failed but processing continued)
            if 'IDRSSD' in df_to_log.columns:
                print(f"    - Debug Logging: Found 'IDRSSD' column for {file_identifier_for_log}.")
                try:
                    idrssd_col_series = df_to_log['IDRSSD'].astype(str).str.strip()
                    valid_idrssd_col_series = idrssd_col_series[idrssd_col_series != '']
                    if not valid_idrssd_col_series.empty:
                        num_banks_val = valid_idrssd_col_series.nunique()
                        non_numeric_idrssd_val = valid_idrssd_col_series.apply(lambda x: not x.isdigit() if x else False).sum()
                except Exception as e:
                    print(f"    - Error processing IDRSSD column for debug log on {file_identifier_for_log}: {e}")


    debug_data_list.append({
        'text_file_name': file_identifier_for_log,
        'reporting_quarter': quarter,
        'num_banks': num_banks_val,
        'num_non_numeric_idrssd': non_numeric_idrssd_val,
        'num_missing_values_total': num_mv
    })

# --- Find Zip Files ---
print("Searching for zip files...")
zip_file_paths = glob.glob('zip/*.zip')

# --- Apply Debug Limit if Active ---
if do_debug and zip_file_paths:
    original_zip_count = len(zip_file_paths)
    zip_file_paths = zip_file_paths[:10]
    print(f"DEBUG MODE: Processing first {len(zip_file_paths)} of {original_zip_count} zip files found.")
elif do_debug and not zip_file_paths:
    print("DEBUG MODE: No zip files found to limit.")


# Dictionary to store code -> metadata mapping across all files
code_metadata_map = {}

if not zip_file_paths:
    print("Error: No .zip files found in the 'zip' directory.")
else:
    print(f"Found zip files to process: {zip_file_paths}")

    # --- Loop through Each Zip File Independently ---
    for zip_file_path in zip_file_paths:
        print(f"\n--- Processing Zip File: {zip_file_path} ---")

        dfs_in_zip = {}
        split_file_groups = {}
        loaded_files_count = 0

        try:
            with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                zip_namelist = zip_ref.namelist()
                print(f"  Files found inside zip: {[os.path.basename(f) for f in zip_namelist]}")

                for internal_path in zip_namelist:
                    base_filename = os.path.basename(internal_path)

                    if not base_filename.lower().endswith('.txt'):
                        continue

                    print(f"  - Attempting to load: '{base_filename}' (assuming header=0, skiprows=[1])")
                    try:
                        with zip_ref.open(internal_path) as file_in_zip:
                            try:
                                header_meta_df = pd.read_csv(file_in_zip, sep='\t', header=None, nrows=2, encoding='utf-8', low_memory=False, dtype=str)
                                if header_meta_df.shape[0] == 2:
                                    codes = header_meta_df.iloc[0].values
                                    descriptions = header_meta_df.iloc[1].values
                                    for code, desc in zip(codes, descriptions):
                                        code_str = str(code).strip().strip('"')
                                        desc_str = str(desc).strip().strip('"')
                                        if code_str and code_str != 'IDRSSD' and code_str not in code_metadata_map and pd.notna(desc) and not desc_str.isdigit():
                                             code_metadata_map[code_str] = desc_str
                            except Exception as e:
                                print(f"    - Warning: Could not read header/metadata rows from {base_filename}: {e}")

                            file_in_zip.seek(0)
                            read_params = {
                                'sep': '\t', 'low_memory': False, 'dtype': {'IDRSSD': str},
                                'encoding': 'utf-8', 'header': 0, 'skiprows': [1]
                            }
                            try:
                                df = pd.read_csv(file_in_zip, **read_params)
                            except UnicodeDecodeError:
                                print(f"    - Warning: UTF-8 decoding failed for {base_filename}. Trying 'latin1'.")
                                read_params['encoding'] = 'latin1'
                                file_in_zip.seek(0)
                                df = pd.read_csv(file_in_zip, **read_params)
                            except ValueError as ve:
                                if 'Integer column has NA values' in str(ve) or 'cannot safely convert' in str(ve):
                                     print(f"    - Warning: Potential mixed types or NAs in IDRSSD for {base_filename}. Retrying with object dtype.")
                                     read_params.pop('dtype', None)
                                     file_in_zip.seek(0)
                                     df = pd.read_csv(file_in_zip, **read_params, dtype=object) # Read all as object
                                else: raise ve

                            if df.empty:
                                print(f"    - Warning: File {base_filename} loaded as empty DataFrame. Skipping.")
                                log_df_metrics(df, base_filename) # Log empty df state
                                continue
                            df.columns = [str(col).strip().strip('"') for col in df.columns]
                            
                            if 'IDRSSD' not in df.columns:
                                print(f"    - Warning: 'IDRSSD' column not found in {base_filename}. Skipping this file.")
                                # Log with current df state (no IDRSSD column means banks and non-numeric IDRSSD will be 0)
                                log_df_metrics(df, base_filename)
                                continue
                            
                            df['IDRSSD'] = df['IDRSSD'].astype(str) # Ensure IDRSSD is string
                            df = df.dropna(subset=['IDRSSD']) # Drop rows where IDRSSD is NA
                            df['IDRSSD'] = df['IDRSSD'].str.strip() # Strip whitespace from IDRSSD
                            df = df[df['IDRSSD'] != ''] # Remove rows with empty string IDRSSD

                            if df.empty:
                                 print(f"    - Warning: File {base_filename} has no valid IDRSSD values after cleaning. Skipping.")
                                 log_df_metrics(df, base_filename) # Log this empty state
                                 continue
                            
                            non_idrssd_cols = df.columns.difference(['IDRSSD'])
                            df = df.dropna(subset=non_idrssd_cols, how='all')
                            if df.empty:
                                print(f"    - Warning: File {base_filename} has no valid data after removing empty rows. Skipping.")
                                log_df_metrics(df, base_filename) # Log this empty state
                                continue
                            
                            if df['IDRSSD'].duplicated().any():
                                print(f"      - Warning: Duplicate 'IDRSSD' values found in {base_filename}. Keeping first occurrence.")
                                df = df.drop_duplicates(subset='IDRSSD', keep='first')
                            
                            try:
                                df = df.set_index('IDRSSD')
                                # Log metrics AFTER 'IDRSSD' is successfully set as index
                                log_df_metrics(df, base_filename)
                            except KeyError:
                                 print(f"    - Error: Could not set 'IDRSSD' as index for {base_filename}. Skipping.")
                                 # Log with df state before attempting set_index (IDRSSD is a column)
                                 # log_df_metrics will handle if IDRSSD is not index but a column
                                 # However, previous log_df_metrics call for empty df covers some cases.
                                 # For robust logging of this failure, we might need a more complex setup,
                                 # but for now, if set_index fails, the file is skipped.
                                 # If df was not empty before this, we could log its state.
                                 # Since it continues, its state won't be added to dfs_in_zip or split_groups.
                                 continue

                            match = split_file_pattern.match(base_filename)
                            if match:
                                base_name_part = match.group(1).strip()
                                part_num = int(match.group(2))
                                total_parts = int(match.group(3))
                                group_key = (base_name_part, total_parts)
                                print(f"      Detected split file: Base='{base_name_part}', Part={part_num}, Total={total_parts}. Shape: {df.shape}")
                                if group_key not in split_file_groups:
                                    split_file_groups[group_key] = {}
                                if part_num in split_file_groups[group_key]:
                                     print(f"      - Warning: Duplicate part number {part_num} found for group {group_key}. Overwriting with data from {base_filename}.")
                                split_file_groups[group_key][part_num] = df
                                loaded_files_count += 1
                            else:
                                if df.shape[1] == 0: # No data columns besides index
                                    print(f"    - Skipping non-split file {base_filename} as it contains no data columns after processing.")
                                    continue
                                loaded_files_count += 1
                                file_key = os.path.splitext(base_filename)[0]
                                original_key = file_key
                                counter = 1
                                while file_key in dfs_in_zip or any(key[0] == file_key for key in split_file_groups.keys()):
                                    file_key = f"{original_key}_{counter}"
                                    counter += 1
                                if file_key != original_key:
                                     print(f"    - Warning: Potential key conflict for '{original_key}'. Using '{file_key}' instead.")
                                dfs_in_zip[file_key] = df
                                print(f"      Stored non-split file as '{file_key}' (in zip). Shape: {df.shape}")

                    except pd.errors.EmptyDataError:
                        print(f"  - Warning: File is empty inside zip: {base_filename}")
                        log_df_metrics(pd.DataFrame(), base_filename) # Log as empty
                    except Exception as e:
                        print(f"  - Error loading or processing {base_filename} from {zip_file_path}: {e}")
            
            print("\n  - Checking for complete split file groups...")
            completed_group_keys = list(split_file_groups.keys())
            for group_key in completed_group_keys:
                base_name_part, total_parts = group_key
                parts_dict = split_file_groups[group_key]
                if len(parts_dict) == total_parts and all(i in parts_dict for i in range(1, total_parts + 1)):
                    print(f"    - Found complete group: '{base_name_part}' ({total_parts} parts). Concatenating horizontally...")
                    try:
                        sorted_parts = [parts_dict[i] for i in range(1, total_parts + 1)]
                        combined_df = sorted_parts[0]
                        for i in range(1, total_parts):
                            next_part_df = sorted_parts[i]
                            overlapping_cols = combined_df.columns.intersection(next_part_df.columns)
                            if len(overlapping_cols) > 0:
                                print(f"      - Warning: Overlapping columns found in part {i+1}: {list(overlapping_cols)}. Dropping from part {i+1}.")
                                next_part_df = next_part_df.drop(columns=overlapping_cols)
                            combined_df = pd.concat([combined_df, next_part_df], axis=1)
                        
                        combined_key = f"{base_name_part}_Combined_{total_parts}parts"
                        original_key = combined_key
                        counter = 1
                        while combined_key in dfs_in_zip:
                             combined_key = f"{original_key}_{counter}"
                             counter += 1
                        if combined_key != original_key:
                              print(f"      - Warning: Potential key conflict for combined group '{original_key}'. Using '{combined_key}'.")
                        
                        # Log metrics for the combined DataFrame
                        log_df_metrics(combined_df, combined_key)
                        
                        dfs_in_zip[combined_key] = combined_df
                        print(f"      - Combined group shape: {combined_df.shape}. Stored as '{combined_key}'.")
                        del split_file_groups[group_key]
                    except Exception as e:
                         print(f"      - Error during concatenation for group {group_key}: {e}. Skipping combination.")
                else:
                    print(f"    - Incomplete group found: '{base_name_part}'. Expected {total_parts}, found {len(parts_dict)}: {list(parts_dict.keys())}. Parts will not be combined.")
                    for part_num, df_part in parts_dict.items():
                        # These parts were already logged when first processed and indexed.
                        # Now they are being moved to dfs_in_zip with a potentially new key.
                        # We don't re-log them here to avoid duplicate entries for the same data state.
                        part_key = f"{base_name_part}_Part_{part_num}_of_{total_parts}"
                        original_key = part_key
                        counter = 1
                        while part_key in dfs_in_zip:
                            part_key = f"{original_key}_{counter}"
                            counter +=1
                        if part_key != original_key:
                             print(f"      - Warning: Potential key conflict for orphan part '{original_key}'. Using '{part_key}'.")
                        dfs_in_zip[part_key] = df_part # df_part already has IDRSSD as index
                        print(f"      - Storing incomplete part {part_num} as '{part_key}'. Shape: {df_part.shape}")
                    del split_file_groups[group_key]

            if not dfs_in_zip:
                print("  - No mergeable dataframes were loaded or derived from this zip file.")
                merged_df_for_zip = pd.DataFrame()
            else:
                print(f"\n  - Merging {len(dfs_in_zip)} DataFrames (incl. combined/orphan splits) loaded from {zip_file_path}...")
                base_key = None
                combined_keys = [k for k in dfs_in_zip.keys() if "_Combined_" in k]
                if combined_keys:
                     base_key = combined_keys[0]
                if not base_key:
                    potential_base_keys = list(dfs_in_zip.keys())
                    preferred_key_parts = ['ENT', 'POR', 'RC', 'RCA']
                    for p_key_part in preferred_key_parts:
                        for actual_key in potential_base_keys:
                             if actual_key.startswith(p_key_part) or f" {p_key_part}" in actual_key:
                                  base_key = actual_key
                                  break
                        if base_key: break
                if not base_key and dfs_in_zip:
                    base_key = max(dfs_in_zip, key=lambda k: dfs_in_zip[k].shape[0] * dfs_in_zip[k].shape[1])

                if not base_key:
                     print("    - Error: Could not determine a base DataFrame for merging.")
                     merged_df_for_zip = pd.DataFrame()
                else:
                    try:
                        merged_df_for_zip = dfs_in_zip.pop(base_key)
                        print(f"    - Starting merge with '{base_key}' (Shape: {merged_df_for_zip.shape})")
                        keys_to_merge = list(dfs_in_zip.keys())
                        for key in keys_to_merge:
                            if key in dfs_in_zip:
                                df_to_merge = dfs_in_zip.pop(key)
                                print(f"    - Merging '{key}' (Shape: {df_to_merge.shape})...")
                                if df_to_merge.shape[1] > 0: # Ensure columns to merge
                                    merged_df_for_zip = pd.merge(
                                        merged_df_for_zip, df_to_merge, left_index=True,
                                        right_index=True, how='outer', suffixes=('', f'_{key}')
                                    )
                                    print(f"      - Merged shape: {merged_df_for_zip.shape}")
                                else:
                                    print(f"      - Skipping merge for '{key}' as it has no columns besides index.")
                        merged_df_for_zip = merged_df_for_zip.reset_index()
                        print(f"  - Merge complete for this zip. Final shape: {merged_df_for_zip.shape}")
                    except Exception as e:
                         print(f"    - Error during merge process: {e}")
                         merged_df_for_zip = pd.DataFrame()

            if 'merged_df_for_zip' in locals() and not merged_df_for_zip.empty:
                base_zip_name = os.path.basename(zip_file_path)
                output_filename_base, _ = os.path.splitext(base_zip_name)
                output_filename = f"parquet/{output_filename_base}.parquet"
                print(f"\n  - Saving merged data for {zip_file_path} to: {output_filename}")
                try:
                    if 'IDRSSD' in merged_df_for_zip.columns:
                        merged_df_for_zip['IDRSSD'] = merged_df_for_zip['IDRSSD'].astype(str)
                    # Ensure parquet directory exists
                    os.makedirs(os.path.dirname(output_filename), exist_ok=True)
                    merged_df_for_zip.to_parquet(output_filename, index=False, engine='pyarrow')
                    print(f"  - Successfully saved {output_filename}")
                except ImportError:
                     print("    - Error: 'pyarrow' not found. Trying 'fastparquet'.")
                     try:
                         if 'IDRSSD' in merged_df_for_zip.columns:
                            merged_df_for_zip['IDRSSD'] = merged_df_for_zip['IDRSSD'].astype(str)
                         os.makedirs(os.path.dirname(output_filename), exist_ok=True)
                         merged_df_for_zip.to_parquet(output_filename, index=False, engine='fastparquet')
                         print(f"  - Successfully saved {output_filename} using fastparquet")
                     except ImportError:
                          print("    - Error: Neither 'pyarrow' nor 'fastparquet' found. Cannot save Parquet file.")
                     except Exception as e:
                          print(f"    - Error saving Parquet file {output_filename} with fastparquet: {e}")
                except Exception as e:
                    print(f"  - Error saving Parquet file {output_filename}: {e}")
            elif loaded_files_count > 0:
                 print("\n  - No data to save for this zip (merge resulted in empty DataFrame or only invalid files loaded).")
            else:
                 print("\n  - No valid FFIEC text files found or loaded in this zip file.")

        except zipfile.BadZipFile:
            print(f"Error: Failed to open {zip_file_path}. It might be corrupted or not a valid zip file.")
        except Exception as e:
            print(f"An unexpected error occurred while processing {zip_file_path}: {e}")

    # --- Create and Save/Display the Debug Log DataFrame ---
    if debug_data_list:
        print("\n\n--- Import Debug Log ---")
        debug_df = pd.DataFrame(debug_data_list)
        if not debug_df.empty:
            # For concise display, you might want to print debug_df.head()
            # For full display:
            # pd.set_option('display.max_rows', None)
            # pd.set_option('display.max_columns', None)
            # pd.set_option('display.width', 1000) # Adjust width as needed
            print(debug_df.to_string())
        else:
            print("Debug log DataFrame is empty.")
        try:
            debug_log_filename = "import_debug_log.csv"
            debug_df.to_csv(debug_log_filename, index=False)
            print(f"\nDebug log saved to {debug_log_filename}")
        except Exception as e:
            print(f"\nError saving debug log file: {e}")
    else:
        print("\n\n--- No debug log data collected ---")

    # --- Create and display the Code <-> Metadata Mapping DataFrame ---
    if code_metadata_map:
        print("\n\n--- Code to Metadata Mapping ---")
        metadata_df = pd.DataFrame(list(code_metadata_map.items()), columns=['Code', 'Metadata'])
        try:
            mapping_filename = "code_metadata_mapping.csv"
            metadata_df.to_csv(mapping_filename, index=False)
            print(f"\nMapping saved to {mapping_filename}")
        except Exception as e:
            print(f"\nError saving mapping file: {e}")
    else:
        print("\n\n--- No code-metadata mapping generated ---")

    print("\n--- Finished processing all zip files ---")

Searching for zip files...
DEBUG MODE: Processing first 10 of 96 zip files found.
Found zip files to process: ['zip/FFIEC CDR Call Bulk All Schedules 03312007.zip', 'zip/FFIEC CDR Call Bulk All Schedules 12312020.zip', 'zip/FFIEC CDR Call Bulk All Schedules 03312013.zip', 'zip/FFIEC CDR Call Bulk All Schedules 12312008.zip', 'zip/FFIEC CDR Call Bulk All Schedules 06302019.zip', 'zip/FFIEC CDR Call Bulk All Schedules 06302024.zip', 'zip/FFIEC CDR Call Bulk All Schedules 06302018.zip', 'zip/FFIEC CDR Call Bulk All Schedules 12312009.zip', 'zip/FFIEC CDR Call Bulk All Schedules 03312012.zip', 'zip/FFIEC CDR Call Bulk All Schedules 03312006.zip']

--- Processing Zip File: zip/FFIEC CDR Call Bulk All Schedules 03312007.zip ---
  Files found inside zip: ['FFIEC CDR Call Bulk POR 03312007.txt', 'FFIEC CDR Call Schedule CI 03312007.txt', 'FFIEC CDR Call Schedule ENT 03312007.txt', 'FFIEC CDR Call Schedule GI 03312007.txt', 'FFIEC CDR Call Schedule GL 03312007.txt', 'FFIEC CDR Call Schedule NAR