### Mount Google Drive, Import Libraries and Define Paths

In [19]:
# =============================================================================
# ENVIRONMENT SETUP + PATH CONFIGURATION (SERVER / COLAB COMPATIBLE)
# =============================================================================

import os
import sys
import importlib
from pathlib import Path
import string
import re
import gc

# -----------------------------------------------------------------------------
# 0) HARD SAFETY: cap native thread usage (prevents pthread_create EAGAIN)
#    MUST be set before importing numpy / scipy / pandas
# -----------------------------------------------------------------------------
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_MAX_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["BLIS_NUM_THREADS"] = "1"

# -----------------------------------------------------------------------------
# 1) Detect environment
# -----------------------------------------------------------------------------
IN_COLAB = "google.colab" in sys.modules

# -----------------------------------------------------------------------------
# 2) (Colab only) Mount Google Drive
# -----------------------------------------------------------------------------
if IN_COLAB:
    from google.colab import drive
    drive.mount("/content/drive")
    BASE_PATH = "/content/drive/MyDrive/Colab Notebooks"
else:
    # Server base path (your target)
    BASE_PATH = "/home/jovyan/work/hpool1/pseidel/test"

print("IN_COLAB:", IN_COLAB)
print("BASE_PATH:", BASE_PATH)

# -----------------------------------------------------------------------------
# 3) Sanity checks: path exists + write permission
# -----------------------------------------------------------------------------
BASE = Path(BASE_PATH)
if not BASE.exists():
    raise FileNotFoundError(f"BASE_PATH does not exist: {BASE}")

# quick write test (fails fast if you don't have permissions)
test_file = BASE / ".write_test_tmp"
try:
    test_file.write_text("ok", encoding="utf-8")
    test_file.unlink()
except Exception as e:
    raise PermissionError(f"No write permission in {BASE}. Error: {e}")

# -----------------------------------------------------------------------------
# 4) Environment check: ensure required packages import cleanly
# -----------------------------------------------------------------------------
required_packages = ["numpy", "scipy", "pandas", "linearmodels", "xlsxwriter"]

for pkg in required_packages:
    print(f"Importing {pkg} ...")
    importlib.import_module(pkg)
    print(f"{pkg} OK")

# -----------------------------------------------------------------------------
# 5) Base paths and input/output locations
# -----------------------------------------------------------------------------
Input_file_path   = str(BASE / "Input")
Temp_file_path    = str(BASE / "Temp")
Output_file_path  = str(BASE / "Output")

Fundamentals_file_path = f"{Input_file_path}/WSFV_f_20250131.txt"
Current_file_path      = f"{Input_file_path}/WSCurrent_f_20250131.txt"
Calendar_file_path     = f"{Input_file_path}/WSCalendarPrd_f_20250131.txt"
Meta_file_path         = f"{Input_file_path}/WSMetaData_f_20250131.txt"
Excel_file_path        = f"{Input_file_path}/WS PIT Table Definitions V5 with start dates.xls"

MarketValues_file_path          = f"{Input_file_path}/Daily MV USD"
MarketValues_file_path_LC       = f"{Input_file_path}/Daily MV LC"
DailyTotalReturns_file_path     = f"{Input_file_path}/Daily Returns USD"
DailyIndexReturns_file_path     = f"{Input_file_path}/Daily Index Returns USD"
Constituents_file_path          = f"{Input_file_path}/Constituents.01.csv"
UniversalMatching_file_path     = f"{Input_file_path}/Universal Matching File"

Temp_file_path_GO  = f"{Temp_file_path}/TempGeneralOverview"
Temp_file_path_EoC = f"{Temp_file_path}/TempExtractionofCharacteristics"
Temp_file_path_DP  = f"{Temp_file_path}/TempDataPreparation"
Temp_file_path_A   = f"{Temp_file_path}/TempAnomalies"
Temp_file_path_R   = f"{Temp_file_path}/TempRegressionModel"

Relevant_items_path   = f"{Input_file_path}/RelevantItems.txt"
Relevant_items_path_A = f"{Input_file_path}/RelevantItems.txt"
Relevant_items_path_B = f"{Input_file_path}/RelevantItemsB.txt"
Relevant_items_path_C = f"{Input_file_path}/RelevantItemsC.txt"
Relevant_items_path_D = f"{Input_file_path}/RelevantItemsD.txt"

Subset_file_path = f"{Temp_file_path_GO}/Subsets"
Fundamentals_clean_file_path = f"{Temp_file_path_GO}/Fundamentals_clean.txt"
Current_clean_file_path      = f"{Temp_file_path_GO}/Current_clean.txt"
Calendar_clean_file_path     = f"{Temp_file_path_GO}/Input/Calendar_clean.txt"
Meta_clean_file_path         = f"{Temp_file_path_GO}/Input/Meta_clean.txt"

# -----------------------------------------------------------------------------
# 6) Ensure required directories exist
# -----------------------------------------------------------------------------
Path(Output_file_path).mkdir(parents=True, exist_ok=True)
Path(Temp_file_path_GO).mkdir(parents=True, exist_ok=True)
Path(Temp_file_path_EoC).mkdir(parents=True, exist_ok=True)
Path(Temp_file_path_DP).mkdir(parents=True, exist_ok=True)
Path(Temp_file_path_A).mkdir(parents=True, exist_ok=True)
Path(Temp_file_path_R).mkdir(parents=True, exist_ok=True)
Path(Subset_file_path).mkdir(parents=True, exist_ok=True)
Path(Path(Calendar_clean_file_path).parent).mkdir(parents=True, exist_ok=True)

# -----------------------------------------------------------------------------
# 7) Streaming / deduplication settings
# -----------------------------------------------------------------------------
CHUNK_SIZE = 2_000_000
DATE_COL = "PIT Date"
DEDUP_KEYS = ["ID", "ItemCode", DATE_COL]

print("Paths configured. Temp outputs ->", Temp_file_path_GO)
print("Example input path ->", Fundamentals_file_path)


IN_COLAB: False
BASE_PATH: /home/jovyan/work/hpool1/pseidel/test
Importing numpy ...
numpy OK
Importing scipy ...
scipy OK
Importing pandas ...
pandas OK
Importing linearmodels ...
linearmodels OK
Importing xlsxwriter ...
xlsxwriter OK
Paths configured. Temp outputs -> /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview
Example input path -> /home/jovyan/work/hpool1/pseidel/test/Input/WSFV_f_20250131.txt


In [43]:
!free -h


               total        used        free      shared  buff/cache   available
Mem:           754Gi       192Gi       313Gi        69Mi       256Gi       561Gi
Swap:             0B          0B          0B


# 1.0. Worldscope PIT

### Load relevant ItemCodes

In [21]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell loads multiple sets of relevant item codes (A, B, C, D) from
# text files into memory as Python sets of integers. The process is designed
# to be robust and memory-efficient:
#   - It first inspects the file to detect the correct column containing
#     item codes, preferring a column explicitly named "ItemCode" and
#     falling back to the first column otherwise.
#   - It reads the full file in chunks to avoid loading the entire file
#     into memory at once, which is important for large input files.
#   - Within each chunk, it filters the column by coercing values to
#     numeric, dropping any non-numeric or invalid entries, and casting
#     valid values to 64-bit integers.
#   - The item codes from all chunks are concatenated and converted to a
#     set, enforcing uniqueness and providing fast membership tests for
#     downstream filtering operations.
#   - Four different relevant-item lists (A/B/C/D) are loaded and stored
#     as separate sets to allow different filtering configurations in later
#     processing steps.
# =============================================================================

# ---------- Load relevant item sets for A/B/C/D ----------

def load_relevant_set(txt_path: str) -> set[int]:
    """
    Loads a list of item codes (header present) and returns a set[int].
    Detects a numeric column: 'ItemCode' if present else first column.
    """
    import pandas as pd  # Local import to keep the function self-contained

    # Attempt to read a small sample of rows assuming pipe-delimited format
    # to infer the structure and available column names
    try:
        df = pd.read_csv(
            txt_path,
            sep="|",               # Assume pipe as the primary delimiter
            engine="c",            # Use C engine for faster parsing
            dtype=str,             # Read all columns as strings initially
            encoding="latin1",     # Encoding that can handle extended characters
            nrows=50               # Only read a small sample to inspect header
        )
    except Exception:
        # If reading with pipe delimiter fails, fall back to default delimiter
        # (comma) to maximize robustness to different file formats
        df = pd.read_csv(
            txt_path,
            engine="c",
            dtype=str,
            encoding="latin1",
            nrows=50
        )

    # Identify columns whose name matches "itemcode" (case-insensitive)
    cols = [c for c in df.columns if c.lower() == "itemcode"]

    # Prefer a column explicitly named "ItemCode"; otherwise use the first column
    # as the source of item codes
    use_col = cols[0] if cols else df.columns[0]

    # Prepare a list to accumulate cleaned numeric item codes from all chunks
    codes = []

    # Stream the full file in chunks using the detected column
    for ch in pd.read_csv(
        txt_path,
        sep="|",                  # Use pipe delimiter when streaming
        engine="c",
        dtype=str,                # Read column as string before numeric conversion
        encoding="latin1",
        usecols=[use_col],        # Only load the column with item codes
        chunksize=200_000,        # Number of rows per chunk for streaming
        on_bad_lines="skip"       # Skip lines that cannot be parsed correctly
    ):
        # Convert the selected column to numeric, coercing invalid entries to NaN
        numeric_series = pd.to_numeric(ch[use_col], errors="coerce")

        # Drop NaN values to retain only valid numeric item codes
        numeric_series = numeric_series.dropna()

        # Cast the numeric values to int64 for consistent integer representation
        numeric_series = numeric_series.astype("int64")

        # Append this cleaned series of item codes to the accumulator list
        codes.append(numeric_series)

    # If no valid codes were accumulated, return an empty set
    if not codes:
        return set()

    # Concatenate all numeric item code series into a single Series,
    # convert to a Python list, and then to a set to ensure uniqueness
    return set(pd.concat(codes, ignore_index=True).tolist())

# Load relevant item code sets for each configuration (A, B, C, D)
RELEVANT_ITEMCODES_A = load_relevant_set(Relevant_items_path_A)
RELEVANT_ITEMCODES_B = load_relevant_set(Relevant_items_path_B)
RELEVANT_ITEMCODES_C = load_relevant_set(Relevant_items_path_C)
RELEVANT_ITEMCODES_D = load_relevant_set(Relevant_items_path_D)

# Report the size of each loaded relevant item set for verification
print(
    "Relevant sets loaded:",
    f"A={len(RELEVANT_ITEMCODES_A)}  B={len(RELEVANT_ITEMCODES_B)}  "
    f"C={len(RELEVANT_ITEMCODES_C)}  D={len(RELEVANT_ITEMCODES_D)}"
)


Relevant sets loaded: A=49  B=7  C=1  D=1


### Define Helper Functions

In [22]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell defines reusable helper functions for data cleaning, type coercion,
# file output, and audit tracking. In particular:
#   - `standardize_strings` performs conservative text standardization:
#       * keeps the "ID" column as a string without modifying content
#       * for other text-like columns, trims surrounding whitespace and converts
#         empty strings to missing values (<NA>). This is a key transformation
#         step for normalizing string fields.
#   - `coerce_types_schema` enforces a specific schema ("A_num", "A_str", "B")
#       * converts core identifier and PIT date columns into appropriate string
#         and datetime types
#       * converts numeric fields like ItemCode and FiscalPeriod to nullable
#         integer types (Int64)
#       * converts Value either to numeric or string depending on the chosen
#         schema. This ensures consistent typing across datasets prior to
#         further filtering and aggregation.
#   - `save_append_txt` appends DataFrames to pipe-separated text files with
#     controlled date formatting. This supports incremental, chunk-based
#     writing of large processed outputs.
#   - `_na_increase` and `_trim_changes` are audit helpers:
#       * `_na_increase` computes, per column, how many additional missing
#         values were introduced by a transformation step.
#       * `_trim_changes` counts how many cells in string-like columns changed
#         after trimming operations (excluding the ID column), making it
#         possible to quantify the impact of text-cleanup transformations.
# =============================================================================

# Helper imports and constants
import pandas as pd                     # DataFrame operations and data manipulation

OUTPUT_SEP = "|"   # Delimiter used for all .txt outputs written by this pipeline


def standardize_strings(df: pd.DataFrame) -> pd.DataFrame:
    """
    Conservative text cleanup:
      - 'ID': keep as string; DO NOT strip/alter content.
      - other object/string cols: cast to 'string', strip whitespace, empty -> <NA>.
    """
    # Iterate over all columns to apply column-specific string standardization
    for col in df.columns:
        if col == "ID":
            # Ensure ID is stored as pandas StringDtype without altering values
            df[col] = df[col].astype("string")
            continue
        # For all non-ID columns with object or string-like dtype, apply trimming
        if pd.api.types.is_object_dtype(df[col]) or pd.api.types.is_string_dtype(df[col]):
            # Convert column to pandas StringDtype and strip leading/trailing whitespace
            df[col] = df[col].astype("string").str.strip()
            # Replace pure whitespace or empty strings with missing values (<NA>)
            df[col] = df[col].replace(r"^\s*$", pd.NA, regex=True)
    # Return the standardized DataFrame
    return df


def coerce_types_schema(df: pd.DataFrame, schema: str) -> pd.DataFrame:
    """
    schema:
      - "A_num": ID str, PIT Date datetime, ItemCode Int64, Frequency str, FiscalPeriod Int64, Value numeric
      - "A_str": ID str, PIT Date datetime, ItemCode Int64, Frequency str, FiscalPeriod Int64, Value string
      - "B":     ID str, PIT Date datetime, ItemCode Int64, Value string
    Dates: use format='mixed' to avoid per-element parsing warnings; later we CUT to 'YYYY-MM-DD'.
    """
    # Ensure ID column (if present) is handled as a pandas string type
    if "ID" in df.columns:
        df["ID"] = df["ID"].astype("string")

    # Convert PIT Date column to datetime using mixed formats, coercing invalid values to NaT
    if "PIT Date" in df.columns:
        df["PIT Date"] = pd.to_datetime(df["PIT Date"], errors="coerce", utc=False, format="mixed")

    # Convert ItemCode to a nullable integer type (Int64) after numeric coercion
    if "ItemCode" in df.columns:
        df["ItemCode"] = pd.to_numeric(df["ItemCode"], errors="coerce").astype("Int64")

    # Handle schema-specific field typing for schemas A_num and A_str
    if schema in ("A_num", "A_str"):
        # Convert Frequency to string if present
        if "Frequency" in df.columns:
            df["Frequency"] = df["Frequency"].astype("string")
        # Convert FiscalPeriod to nullable integer if present
        if "FiscalPeriod" in df.columns:
            df["FiscalPeriod"] = pd.to_numeric(df["FiscalPeriod"], errors="coerce").astype("Int64")

        # For numeric-valued schema (A_num), coerce Value to numeric type
        if schema == "A_num":
            if "Value" in df.columns:
                df["Value"] = pd.to_numeric(df["Value"], errors="coerce")
        else:  # For string-valued schema (A_str), keep Value as string
            if "Value" in df.columns:
                df["Value"] = df["Value"].astype("string")

    # Handle schema "B" where Value should be treated as string
    elif schema == "B":
        if "Value" in df.columns:
            df["Value"] = df["Value"].astype("string")

    # Return DataFrame with coerced dtypes based on the selected schema
    return df


def save_append_txt(df: pd.DataFrame, out_path: str, write_header: bool):
    """
    Append to a single .txt file using OUTPUT_SEP (|).
    Dates will be formatted compactly; we already cut PIT Date earlier to 'YYYY-MM-DD'.
    """
    # Write the DataFrame to a pipe-separated text file in append mode
    df.to_csv(
        out_path,
        index=False,                 # Do not write the index column
        sep=OUTPUT_SEP,              # Use the global separator for consistency
        mode="a",                    # Append to existing file instead of overwriting
        header=write_header,         # Conditionally write header (for first chunk only)
        lineterminator="\n",         # Use newline as row terminator
        date_format="%Y-%m-%d"       # Format datetime columns in a compact date format
    )


# Audit helpers used to analyze changes introduced by cleaning steps
def _na_increase(before_df: pd.DataFrame, after_df: pd.DataFrame) -> dict:
    """
    Compute the increase in missing values per column between two DataFrames.
    Only columns that are present in both DataFrames are considered.
    """
    # Determine the set of columns shared between the two DataFrames
    cols = [c for c in after_df.columns if c in before_df.columns]
    if not cols:
        # If there are no common columns, return an empty dictionary
        return {}
    # Count missing values in the "before" DataFrame for the common columns
    b = before_df[cols].isna().sum()
    # Count missing values in the "after" DataFrame for the common columns
    a = after_df[cols].isna().sum()
    # Compute the increase in missing values per column and cast to int
    inc = (a - b).astype(int)
    # Keep only columns where the number of missing values increased
    inc = inc[inc > 0]
    # Return the increase as a plain dictionary {column_name: increase}
    return inc.to_dict()


def _trim_changes(before_df: pd.DataFrame, after_df: pd.DataFrame) -> dict:
    """
    Count cells changed by trimming in string-like columns (excluding 'ID').
    """
    # Dictionary to accumulate the count of changed cells per column
    changes = {}
    # Determine columns present in both DataFrames, excluding the ID column
    common_cols = [c for c in after_df.columns if c in before_df.columns and c != "ID"]
    for c in common_cols:
        # Only consider columns with object or string-like dtype in the "before" DataFrame
        if not (pd.api.types.is_object_dtype(before_df[c]) or pd.api.types.is_string_dtype(before_df[c])):
            continue
        # Convert the column in both DataFrames to pandas StringDtype for comparison
        b = before_df[c].astype("string")
        a = after_df[c].astype("string")
        # Build a mask of cells that are non-missing in both and where the value changed
        mask = b.notna() & a.notna() & (b != a)
        # Count how many values changed due to trimming or other string operations
        cnt = int(mask.sum())
        if cnt > 0:
            # Record the number of changed cells for this column
            changes[c] = changes.get(c, 0) + cnt
    # Return a dictionary mapping column names to number of changed cells
    return changes


### Config

In [23]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell defines configuration parameters controlling how large input files
# are processed and where intermediate cleaned data is written:
#   - `CHUNK_SIZE` determines how many rows are read per chunk when streaming
#     large files, which directly influences memory usage and I/O efficiency.
#   - `REQUIRE_VALUE_AND_DATE` is a flag that indicates whether downstream
#     cleaning steps must enforce the presence of both Value and PIT Date
#     columns (e.g., filtering out rows with missing Value or PIT Date).
#   - `USE_LOCAL_TMP` toggles whether intermediate files are first written to
#     a local temporary directory in the Colab runtime for faster writes,
#     before later being moved to Google Drive.
#   - `LOCAL_TMP_DIR` specifies the local temporary directory path and is
#     created if it does not already exist, ensuring that subsequent write
#     operations to this location succeed without directory-related errors.
# =============================================================================

from pathlib import Path
import sys

# Maximum number of rows to process per chunk when reading large datasets
CHUNK_SIZE = 1_000_000

# Flag indicating whether cleaned outputs must have non-missing Value and PIT Date
REQUIRE_VALUE_AND_DATE = True

# Detect environment
IN_COLAB = "google.colab" in sys.modules

# Configure whether to use a local temporary directory for faster intermediate writes
USE_LOCAL_TMP = True

# IMPORTANT:
# - In Colab, /content is valid.
# - On your server, /content is not writable/doesn't exist.
# So we pick a temp directory that is always valid.
if USE_LOCAL_TMP:
    if IN_COLAB:
        LOCAL_TMP_DIR = Path("/content/tmp_clean")
    else:
        # Use your project temp folder (must already be defined by your setup cell)
        # Temp_file_path should be like: /home/jovyan/work/hpool1/pseidel/KGWMAT/Temp
        LOCAL_TMP_DIR = Path(Temp_file_path) / "tmp_clean"

    LOCAL_TMP_DIR.mkdir(parents=True, exist_ok=True)

print("IN_COLAB:", IN_COLAB)
print("USE_LOCAL_TMP:", USE_LOCAL_TMP)
print("LOCAL_TMP_DIR:", LOCAL_TMP_DIR if USE_LOCAL_TMP else None)
print("CHUNK_SIZE:", CHUNK_SIZE)


IN_COLAB: False
USE_LOCAL_TMP: True
LOCAL_TMP_DIR: /home/jovyan/work/hpool1/pseidel/test/Temp/tmp_clean
CHUNK_SIZE: 1000000


### Extract Relevant Rows Only

In [24]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell performs the first main extraction step for the pipeline:
#   - It defines `extract_relevant_rows_only`, a chunk-based reader that:
#       * streams large input text files row by row in chunks to control memory usage
#       * assigns column names depending on whether the file is "A_like" or "B_like"
#       * converts the ItemCode column to integers and keeps only rows whose
#         ItemCode is contained in a precomputed set of relevant item codes
#       * optionally filters out rows with certain Frequency values (E, L, R, U)
#         for A_like files. This is a key filtering transformation that reduces
#         the dataset to only relevant data points.
#       * writes the filtered subset to an output file, appending chunk by chunk
#         via `save_append_txt`.
#       * keeps counters of how many rows are retained and how many are dropped
#         due to item code or frequency filtering to provide transparency on
#         the impact of the filtering.
#       * optionally writes to a local temporary file first and then moves it
#         to the final Google Drive location for faster I/O.
#   - It then calls this function four times to produce A/B/C/D "relevant_raw"
#     files, each based on a different input source and relevant item set:
#       * A: fundamentals dataset (A_like schema, frequency filtering enabled)
#       * B: current dataset (B_like schema, no frequency filtering)
#       * C: calendar dataset (A_like schema, frequency filtering enabled)
#       * D: metadata dataset (A_like schema, frequency filtering enabled)
# =============================================================================

# ---------- STEP 1 — Extract relevant rows for A, B, C, D ----------

import gc, shutil  # gc for manual garbage collection, shutil for file moving utilities


def extract_relevant_rows_only(
    input_path: str,
    schema: str,
    out_dir: str,
    out_filename: str,
    relevant_itemcodes: set[int],
    drop_freq: bool = False
):
    """
    schema: "A_like" (ID,PIT Date,Frequency,FiscalPeriod,ItemCode,Value)
            "B_like" (ID,PIT Date,ItemCode,Value)
    drop_freq: if True, drop Frequency in {"E","L","R","U"} (use for A_like datasets)
    """
    # Ensure out_dir is a Path object and create the directory if needed
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Construct the final output file path
    final_path = out_dir / out_filename

    # Decide whether to use a local temporary path for faster writes
    if USE_LOCAL_TMP:
        # Temporary partial file path in local storage
        part_path = LOCAL_TMP_DIR / (out_filename + ".part")
        # Remove any existing temporary file from previous runs
        if part_path.exists():
            part_path.unlink()
        # Remove any existing final file to avoid appending to old data
        if final_path.exists():
            final_path.unlink()
        # Write output initially to the temporary path
        write_path = part_path
    else:
        # If not using local temp, remove existing final file if present
        if final_path.exists():
            final_path.unlink()
        # Write output directly to the final path
        write_path = final_path

    # Select appropriate column name layout based on schema
    if schema == "A_like":
        # A_like layout: includes Frequency and FiscalPeriod
        column_names = ["ID", "PIT Date", "Frequency", "FiscalPeriod", "ItemCode", "Value"]
    else:
        # B_like layout: only ID, PIT Date, ItemCode, Value
        column_names = ["ID", "PIT Date", "ItemCode", "Value"]

    # Track whether a header has already been written to the output file
    wrote_header = False

    # Counters for auditing how many rows are kept or dropped by filters
    total_kept = 0
    total_dropped_itemcode = 0
    total_dropped_frequency = 0
    chunk_idx = 0

    # Stream the input file in chunks to limit memory usage
    for chunk in pd.read_csv(
        input_path,
        sep="|",              # Input data is pipe-delimited
        header=None,          # No header row in the input file
        names=column_names,   # Assign explicit column names
        dtype=str,            # Read all columns as strings initially
        engine="c",           # Use faster C parser
        on_bad_lines="skip",  # Skip lines that cannot be parsed correctly
        chunksize=CHUNK_SIZE, # Number of rows per chunk
        encoding="latin1"     # Encoding for extended characters
    ):
        # Increment chunk index for progress reporting
        chunk_idx += 1

        # Convert ItemCode column to nullable integer type for filtering
        itemc = pd.to_numeric(chunk["ItemCode"], errors="coerce").astype("Int64")

        # Mask indicating rows whose ItemCode is in the relevant item set
        mask_item = itemc.isin(relevant_itemcodes)

        # If we are dropping certain frequency codes and Frequency column exists
        if drop_freq and ("Frequency" in chunk.columns):
            # Keep frequencies that are not in the exclusion set {E, L, R, U}
            mask_freq = ~chunk["Frequency"].isin(["E", "L", "R", "U"])

            # Final mask: rows that satisfy both itemcode and frequency conditions
            mask_final = mask_item & mask_freq

            # Count how many rows were dropped purely due to excluded frequencies
            total_dropped_frequency += int((~mask_freq).sum())
        else:
            # If not filtering by frequency, only use the item code mask
            mask_final = mask_item

        # Apply the final combined mask to filter the chunk down to relevant rows
        subset = chunk[mask_final]

        # Count how many rows were dropped due to item code irrelevance
        total_dropped_itemcode += int((~mask_item).sum())

        # If there are any relevant rows in this chunk, append them to the output file
        if not subset.empty:
            # Write filtered rows, appending to the target file
            save_append_txt(subset, str(write_path), write_header=not wrote_header)
            # After the first write, ensure header is not written again
            wrote_header = True
            # Update kept row counter
            total_kept += len(subset)

        # Explicitly delete large DataFrames to free memory
        del chunk, subset
        gc.collect()

        # Print progress information for monitoring extraction
        print(
            f"[extract] {out_filename}  chunk={chunk_idx:,}  kept={total_kept:,}  "
            f"dropped_itemcode={total_dropped_itemcode:,}  dropped_freq={total_dropped_frequency:,}"
        )

    # If using local temporary storage and the partial file exists, move it to final location
    if USE_LOCAL_TMP and write_path.exists():
        shutil.move(str(write_path), str(final_path))
        print(f"[extract] moved final file to {final_path}")

    # Final summary of how many rows were written to the output file
    print(f"[extract] DONE: wrote {total_kept:,} rows to {final_path}")
    return str(final_path)


# Run STEP 1 for all datasets using the precomputed relevant item sets
A_RELEVANT_PATH = extract_relevant_rows_only(
    Fundamentals_file_path,
    schema="A_like",
    out_dir=Temp_file_path_GO,
    out_filename="A_relevant_raw.txt",
    relevant_itemcodes=RELEVANT_ITEMCODES_A,
    drop_freq=True
)

B_RELEVANT_PATH = extract_relevant_rows_only(
    Current_file_path,
    schema="B_like",
    out_dir=Temp_file_path_GO,
    out_filename="B_relevant_raw.txt",
    relevant_itemcodes=RELEVANT_ITEMCODES_B,
    drop_freq=False
)

C_RELEVANT_PATH = extract_relevant_rows_only(
    Calendar_file_path,
    schema="A_like",
    out_dir=Temp_file_path_GO,
    out_filename="C_relevant_raw.txt",
    relevant_itemcodes=RELEVANT_ITEMCODES_C,
    drop_freq=True
)

D_RELEVANT_PATH = extract_relevant_rows_only(
    Meta_file_path,
    schema="A_like",
    out_dir=Temp_file_path_GO,
    out_filename="D_relevant_raw.txt",
    relevant_itemcodes=RELEVANT_ITEMCODES_D,
    drop_freq=True
)


[extract] A_relevant_raw.txt  chunk=1  kept=196,067  dropped_itemcode=738,377  dropped_freq=256,745
[extract] A_relevant_raw.txt  chunk=2  kept=405,552  dropped_itemcode=1,454,354  dropped_freq=529,289
[extract] A_relevant_raw.txt  chunk=3  kept=625,601  dropped_itemcode=2,168,525  dropped_freq=774,061
[extract] A_relevant_raw.txt  chunk=4  kept=913,515  dropped_itemcode=2,878,961  dropped_freq=778,813
[extract] A_relevant_raw.txt  chunk=5  kept=1,208,871  dropped_itemcode=3,582,632  dropped_freq=781,806
[extract] A_relevant_raw.txt  chunk=6  kept=1,508,274  dropped_itemcode=4,282,589  dropped_freq=783,654
[extract] A_relevant_raw.txt  chunk=7  kept=1,798,822  dropped_itemcode=4,990,889  dropped_freq=787,365
[extract] A_relevant_raw.txt  chunk=8  kept=2,102,846  dropped_itemcode=5,686,378  dropped_freq=788,956
[extract] A_relevant_raw.txt  chunk=9  kept=2,402,453  dropped_itemcode=6,385,760  dropped_freq=792,143
[extract] A_relevant_raw.txt  chunk=10  kept=2,698,166  dropped_itemcode=7

### Clean + Coerce + Require Fields -> Final Clean Files + Audit CSV

In [25]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell implements STEP 2 of the pipeline: cleaning, type coercion, and
# strict separation of clean vs. error rows for all datasets (A, B, C, D).
#
# Core behavior and transformations:
#   - Reads the STEP 1 "relevant_raw" files line by line, manually splitting
#     each row into columns and detecting malformed rows based on column count.
#   - For well-formed rows:
#       * Applies `standardize_strings` to normalize string columns while
#         preserving the ID column. This includes trimming whitespace and
#         converting empty strings to missing values.
#       * Applies `coerce_types_schema` to enforce dataset-specific schemas
#         ("A_num", "A_str", "B") by converting types of ID, PIT Date,
#         ItemCode, Frequency, FiscalPeriod, and Value.
#   - If `REQUIRE_VALUE_AND_DATE` is True:
#       * Classifies rows as "clean" only if both Value and PIT Date are
#         present and non-missing after coercion.
#       * All other rows, including malformed lines, preexisting missing
#         values, or values that become missing after coercion, are written
#         to an error file with detailed ErrorReason fields and, where
#         applicable, original uncoerced Value/PIT Date for debugging.
#   - Every successfully read line is written to exactly one of:
#       * {final_basename}.txt        -> clean rows
#       * {final_basename}_errors.txt -> non-clean rows and malformed rows
#   - PIT Date is normalized to the compact 'YYYY-MM-DD' format in both
#     clean and error outputs.
#   - For each dataset (Fundamentals, Current, Calendar, Meta), this cell
#     runs the cleaning function, collects audit metrics (new NAs from
#     standardization/coercion, trimming changes, totals), and writes a
#     combined audit summary CSV to disk.
# =============================================================================

import gc, shutil
from pathlib import Path

def clean_coerce_and_require(input_path: str, schema: str, out_dir: str, final_basename: str):
    """
    schema:
      - "A_num" -> Value numeric, has Frequency/FiscalPeriod
      - "A_str" -> Value string, has Frequency/FiscalPeriod
      - "B"     -> Value string, no Frequency/FiscalPeriod

    Behavior:
      - Reads input line-by-line, manually detects malformed lines (bad column count).
      - Applies standardize_strings + coerce_types_schema (no row drops).
      - If REQUIRE_VALUE_AND_DATE:
           Clean rows:   Value != NA AND PIT Date != NA after coercion
           Error rows:   all others (incl. malformed, preexisting missing, coercion-induced NAs)
      - Every successfully read line is written to EXACTLY ONE of:
           {final_basename}.txt          (clean rows)
           {final_basename}_errors.txt   (all non-clean rows, with ErrorReason)
      - No {final_basename}_dropped_* side files anymore.

    Returns:
      (final_clean_path, totals_dict, na_std_dict, na_coerce_dict, trim_changes_dict)
    """

    # Ensure output directory exists
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Define expected columns based on schema
    if schema in ("A_num", "A_str"):
        # Schema with Frequency and FiscalPeriod
        column_names = ["ID", "PIT Date", "Frequency", "FiscalPeriod", "ItemCode", "Value"]
    else:  # "B"
        # Simpler schema without Frequency/FiscalPeriod
        column_names = ["ID", "PIT Date", "ItemCode", "Value"]

    n_cols = len(column_names)

    # Construct output paths for clean and error files
    final_path  = out_dir / f"{final_basename}.txt"
    errors_path = out_dir / f"{final_basename}_errors.txt"

    # Prepare clean writer target, potentially via a local temporary path
    if USE_LOCAL_TMP:
        part_path = LOCAL_TMP_DIR / (final_path.name + ".part")
        # Remove any existing partial file from previous runs
        if part_path.exists():
            part_path.unlink()
        # Remove existing final file to avoid appending to stale data
        if final_path.exists():
            final_path.unlink()
        clean_write_path = part_path
    else:
        # Direct write to final path, removing previous file if any
        if final_path.exists():
            final_path.unlink()
        clean_write_path = final_path

    # Remove any existing error file to avoid mixing runs
    if errors_path.exists():
        errors_path.unlink()

    # Header flags to ensure header is written exactly once per file
    clean_header_written = False
    err_header_written = False

    # Counters to track row statistics for auditing
    totals = {
        "rows_in": 0,           # number of well-formed rows processed
        "malformed_rows": 0,    # rows with wrong column count
        "rows_clean": 0,        # rows written to clean file
        "rows_error": 0         # rows written to error file (incl. malformed)
    }

    # Dictionaries to aggregate NA increases and trimming changes over all chunks
    na_std, na_coerce, trimmed = {}, {}, {}

    def _acc(dst: dict, delta: dict):
        """
        Accumulate counts from delta into dst, summing values per key.
        """
        for k, v in (delta or {}).items():
            dst[k] = dst.get(k, 0) + int(v)

    # Small helper to append a DataFrame to a target file (clean or error)
    def _append_df(df: pd.DataFrame, path: Path, header_flag_attr: str):
        nonlocal clean_header_written, err_header_written
        if df.empty:
            return

        # Decide whether to write header based on file type and previous writes
        if header_flag_attr == "clean":
            write_header = not clean_header_written
            clean_header_written = True
        else:
            write_header = not err_header_written
            err_header_written = True

        # Append DataFrame to target CSV with consistent formatting
        df.to_csv(
            path,
            index=False,
            sep=OUTPUT_SEP,
            mode="a",
            header=write_header,
            lineterminator="\n",
            date_format="%Y-%m-%d"
        )

    # Buffer to collect well-formed rows before processing them as a chunk
    buffer_rows = []   # list of lists, each of length n_cols

    def process_buffer():
        """
        Process the current buffer_rows as one chunk: standardize, coerce, and
        split rows into clean vs. error groups based on required fields.
        """
        nonlocal buffer_rows
        if not buffer_rows:
            return

        # Convert buffer into a DataFrame using the known column names
        df = pd.DataFrame(buffer_rows, columns=column_names)
        totals["rows_in"] += len(df)

        # 1) Standardize string columns (ID preserved, other string columns trimmed)
        b_std = df.copy(deep=False)
        df = standardize_strings(df)
        _acc(na_std, _na_increase(b_std, df))
        _acc(trimmed, _trim_changes(b_std, df))

        # 2) Coerce types based on schema (ID, PIT Date, ItemCode, etc.)
        b_co = df.copy(deep=False)
        df = coerce_types_schema(df, schema=schema)
        _acc(na_coerce, _na_increase(b_co, df))

        # 3) Classify rows as clean or error, depending on required fields
        if REQUIRE_VALUE_AND_DATE:
            has_value = "Value" in df.columns

            # Track missingness for Value and PIT Date before coercion
            pre_missing_val   = (b_co["Value"].isna()    if has_value else pd.Series(False, index=df.index))
            pre_missing_date  = (b_co["PIT Date"].isna() if "PIT Date" in b_co.columns else pd.Series(False, index=df.index))

            # Track missingness for Value and PIT Date after coercion
            post_missing_val  = (df["Value"].isna()    if has_value else pd.Series(False, index=df.index))
            post_missing_date = (df["PIT Date"].isna() if "PIT Date" in df.columns else pd.Series(False, index=df.index))

            # Rows failing the required Value/PIT Date constraint after coercion
            bad_req = post_missing_val | post_missing_date

            # Initialize a string-based error reason for each row
            err_reason = pd.Series("", index=df.index, dtype="string")

            if has_value:
                # Preexisting missing Value
                mask = pre_missing_val & post_missing_val
                err_reason[mask] = err_reason[mask] + "Value_missing_preexisting;"
                # Value that became missing due to coercion
                mask = (~pre_missing_val) & post_missing_val
                err_reason[mask] = err_reason[mask] + "Value_became_NA_after_coerce;"

            if "PIT Date" in df.columns:
                # Preexisting missing PIT Date
                mask = pre_missing_date & post_missing_date
                err_reason[mask] = err_reason[mask] + "PITDate_missing_preexisting;"
                # PIT Date that became missing due to coercion
                mask = (~pre_missing_date) & post_missing_date
                err_reason[mask] = err_reason[mask] + "PITDate_became_NA_after_coerce;"

            # Default label for rows failing requirements without a more specific reason
            err_reason[(bad_req) & (err_reason == "")] = "Missing_required_fields"

            # Build explicit clean and error masks
            error_mask = bad_req
            clean_mask = ~error_mask

            # Split into clean and error chunks
            clean_chunk = df[clean_mask].copy()
            error_chunk = df[error_mask].copy()

            # Attach diagnostic columns to error rows only
            if not error_chunk.empty:
                # Trim trailing semicolons from concatenated reasons
                error_chunk["ErrorReason"] = err_reason[error_mask].str.rstrip(";")
                if has_value:
                    # Preserve original string representation of Value
                    error_chunk["OriginalValue"] = b_co.loc[error_mask, "Value"].astype("string")
                if "PIT Date" in b_co.columns:
                    # Preserve original PIT Date as string
                    error_chunk["OriginalPITDate"] = b_co.loc[error_mask, "PIT Date"].astype("string")

        else:
            # If no required-field enforcement, all rows are considered clean
            clean_chunk = df.copy()
            error_chunk = df.iloc[0:0].copy()  # empty DataFrame for errors

        # 4) Normalize PIT Date format in both clean and error outputs
        for out_df in (clean_chunk, error_chunk):
            if not out_df.empty and "PIT Date" in out_df.columns:
                d = pd.to_datetime(out_df["PIT Date"], errors="coerce", utc=False, format="mixed")
                out_df["PIT Date"] = d.dt.strftime("%Y-%m-%d")

        # 5) Write clean and error chunks to their respective files
        if not clean_chunk.empty:
            _append_df(clean_chunk, clean_write_path, header_flag_attr="clean")
            totals["rows_clean"] += len(clean_chunk)

        if not error_chunk.empty:
            _append_df(error_chunk, errors_path, header_flag_attr="err")
            totals["rows_error"] += len(error_chunk)

        # Clear the buffer and trigger garbage collection
        buffer_rows = []
        gc.collect()

    # --------- STREAM INPUT & COLLECT MALFORMED + GOOD ROWS ---------

    with open(input_path, encoding="latin1") as f_in:
        # Read and discard the first line as header (structure derived from schema)
        header_line = f_in.readline()
        if not header_line:
            print(f"[WARN] Empty input file: {input_path}")
        # Subsequent lines are data lines

        for line_no, line in enumerate(f_in, start=2):
            # Remove trailing newline character
            raw = line.rstrip("\n")
            # Split line into fields using the global output separator
            parts = raw.split(OUTPUT_SEP)

            # If column count does not match expectation, treat row as malformed
            if len(parts) != n_cols:
                totals["malformed_rows"] += 1

                # Construct a minimal row aligned to our schema, padding missing fields with empty strings
                row_dict = {col: (parts[i] if i < len(parts) else "") for i, col in enumerate(column_names)}
                row_dict["ErrorReason"] = f"MalformedRow_wrong_column_count_expected_{n_cols}_got_{len(parts)}"
                row_dict["RawLine"] = raw

                # Write malformed row directly to the error file
                err_df = pd.DataFrame([row_dict])
                _append_df(err_df, errors_path, header_flag_attr="err")
                totals["rows_error"] += 1
                continue

            # Well-formed row: add to buffer for further processing
            buffer_rows.append(parts)

            # If buffer reaches chunk size, process it as a unit
            if len(buffer_rows) >= CHUNK_SIZE:
                process_buffer()

        # Process any remaining rows left in the buffer
        process_buffer()

    # If local temp storage is used, move the fully written file to its final location
    if USE_LOCAL_TMP and clean_write_path.exists():
        shutil.move(str(clean_write_path), str(final_path))
        print(f"[clean:{final_basename}] moved final file to {final_path}")

    # ---- Sanity check: ensure all visible rows are accounted for ----
    total_visible = totals["rows_clean"] + totals["rows_error"]
    if total_visible != (totals["rows_in"] + totals["malformed_rows"]):
        print(
            "[WARN] Row mismatch in clean_coerce_and_require: "
            f"rows_in={totals['rows_in']} + malformed={totals['malformed_rows']} "
            f"!= clean({totals['rows_clean']}) + error({totals['rows_error']})"
        )
    else:
        print(
            f"[OK] All rows accounted for for {final_basename}: "
            f"clean={totals['rows_clean']}, errors={totals['rows_error']}, "
            f"malformed={totals['malformed_rows']}"
        )

    # Return final clean file path and audit dictionaries
    return str(final_path), totals, na_std, na_coerce, trimmed


# ---------- Run STEP 2 for all datasets ----------

# Clean, coerce, and classify rows for Fundamentals (A_num schema)
A_SINGLE_PATH, A_TOTALS, A_NA_STD, A_NA_COERCE, A_TRIM = clean_coerce_and_require(
    A_RELEVANT_PATH, schema="A_num", out_dir=Temp_file_path_GO, final_basename="Fundamentals_clean"
)

# Clean, coerce, and classify rows for Current (B schema)
B_SINGLE_PATH, B_TOTALS, B_NA_STD, B_NA_COERCE, B_TRIM = clean_coerce_and_require(
    B_RELEVANT_PATH, schema="B",     out_dir=Temp_file_path_GO, final_basename="Current_clean"
)

# Clean, coerce, and classify rows for Calendar (A_str schema)
C_SINGLE_PATH, C_TOTALS, C_NA_STD, C_NA_COERCE, C_TRIM = clean_coerce_and_require(
    C_RELEVANT_PATH, schema="A_str", out_dir=Temp_file_path_GO, final_basename="Calendar_clean"
)

# Clean, coerce, and classify rows for Meta (A_str schema)
D_SINGLE_PATH, D_TOTALS, D_NA_STD, D_NA_COERCE, D_TRIM = clean_coerce_and_require(
    D_RELEVANT_PATH, schema="A_str", out_dir=Temp_file_path_GO, final_basename="Meta_clean"
)

# ---------- Audit summary aggregation ----------

def _totals_to_rows(ds, totals):
    """
    Convert a totals dictionary into a list of row dicts for the audit summary.
    """
    return [{"dataset": ds, "section": "totals", "metric": k, "value": v} for k, v in totals.items()]

def _dict_to_rows(ds, section, d):
    """
    Convert a generic dictionary into sorted row dicts for the audit summary.
    """
    return [{"dataset": ds, "section": section, "metric": k, "value": v} for k, v in sorted((d or {}).items())]

# Collect all audit entries for each dataset and category into a single list
rows = []
rows += _totals_to_rows("Fundamentals", A_TOTALS) \
     + _dict_to_rows("Fundamentals", "new_NA_from_standardize", A_NA_STD) \
     + _dict_to_rows("Fundamentals", "new_NA_from_coercion", A_NA_COERCE) \
     + _dict_to_rows("Fundamentals", "string_trim_changes", A_TRIM)

rows += _totals_to_rows("Current", B_TOTALS) \
     + _dict_to_rows("Current", "new_NA_from_standardize", B_NA_STD) \
     + _dict_to_rows("Current", "new_NA_from_coercion", B_NA_COERCE) \
     + _dict_to_rows("Current", "string_trim_changes", B_TRIM)

rows += _totals_to_rows("Calendar", C_TOTALS) \
     + _dict_to_rows("Calendar", "new_NA_from_standardize", C_NA_STD) \
     + _dict_to_rows("Calendar", "new_NA_from_coercion", C_NA_COERCE) \
     + _dict_to_rows("Calendar", "string_trim_changes", C_TRIM)

rows += _totals_to_rows("Meta", D_TOTALS) \
     + _dict_to_rows("Meta", "new_NA_from_standardize", D_NA_STD) \
     + _dict_to_rows("Meta", "new_NA_from_coercion", D_NA_COERCE) \
     + _dict_to_rows("Meta", "string_trim_changes", D_TRIM)

# Build a consolidated audit summary DataFrame with a fixed column order
audit_summary_df = pd.DataFrame(rows, columns=["dataset", "section", "metric", "value"])

# Persist the audit summary as a CSV in the general overview temp directory
AUDIT_SUMMARY_CSV = Path(Temp_file_path_GO) / "Audit_Summary_all.csv"
audit_summary_df.to_csv(AUDIT_SUMMARY_CSV, index=False)
print("[OK] Wrote audit summary CSV to:", AUDIT_SUMMARY_CSV)

# Print paths to the clean files for quick reference
print("Clean files:")
print("  -", A_SINGLE_PATH)
print("  -", B_SINGLE_PATH)
print("  -", C_SINGLE_PATH)
print("  -", D_SINGLE_PATH)


[clean:Fundamentals_clean] moved final file to /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_clean.txt
[OK] All rows accounted for for Fundamentals_clean: clean=224547306, errors=5308192, malformed=0
[clean:Current_clean] moved final file to /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Current_clean.txt
[OK] All rows accounted for for Current_clean: clean=964684, errors=0, malformed=0
[clean:Calendar_clean] moved final file to /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Calendar_clean.txt
[OK] All rows accounted for for Calendar_clean: clean=8418018, errors=0, malformed=0
[clean:Meta_clean] moved final file to /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Meta_clean.txt
[OK] All rows accounted for for Meta_clean: clean=7162436, errors=0, malformed=0
[OK] Wrote audit summary CSV to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Audit_Summary_all.csv
Clean files:
  - /home/jovyan/work/hpo

### Overview = unique IDs per Year (from Dataset A's FiscalPeriod)

In [26]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell computes dataset-level overviews for all four cleaned datasets:
# Fundamentals, Current, Calendar, and Meta.
#
# Main transformations and derived metrics:
#   - For A-like datasets (Fundamentals, Calendar, Meta):
#       * Stream the cleaned file in chunks to handle large data efficiently.
#       * Track total row count and per-column missing value counts.
#       * Collect the set of unique firm IDs and unique (firm, FiscalPeriod)
#         pairs, giving a sense of panel structure and coverage.
#       * Derive "firm-year" counts by extracting year information from
#         FiscalPeriod using a regex and deduplicating (ID, Year) combinations.
#       * Compute minimum and maximum PIT Date values across the entire file.
#       * Compute minimum and maximum FiscalPeriod values across non-missing rows.
#       * Aggregate Frequency value counts to understand the distribution of
#         reporting frequencies.
#       * Return:
#           1) a compact overview table of totals, missing counts, date ranges,
#              FiscalPeriod ranges, and frequency counts
#           2) a per-year table with the number of unique firms per year.
#
#   - For B-like datasets (Current):
#       * Stream the cleaned file in chunks.
#       * Track total row count, missing values per column, and unique IDs.
#       * Compute the PIT Date range (min/max) across the whole dataset.
#       * Return a compact overview table similar to A-like, but without
#         frequency and FiscalPeriod information.
#
#   - All computed overviews are written as pipe-separated text files to the
#     general overview temp directory for later inspection and diagnostics.
# =============================================================================

import re

def overview_A_like(clean_path: str) -> tuple[pd.DataFrame, pd.DataFrame]:
    # Initialize aggregators and tracking structures for A-like datasets
    total_rows = 0
    missing = {}
    unique_ids = set()
    freq_counts = {}
    firm_year_pairs = set()
    per_year_counts = {}
    min_pit, max_pit, min_fp, max_fp = None, None, None, None

    # Regex to extract 4-digit year from FiscalPeriod strings
    year_re = re.compile(r"(\d{4})")

    # Columns expected in A-like cleaned files
    usecols = ["ID", "PIT Date", "Frequency", "FiscalPeriod", "ItemCode", "Value"]

    # Stream the dataset in chunks to limit memory usage
    for ch in pd.read_csv(
        clean_path,
        sep=OUTPUT_SEP,
        usecols=usecols,
        dtype=str,
        engine="c",
        on_bad_lines="skip",
        chunksize=1_000_000,
        low_memory=False,
        encoding="latin1"
    ):
        # Count total number of rows processed
        total_rows += len(ch)

        # Accumulate missing-value counts per column for this chunk
        m = ch.isna().sum()
        for c, v in m.items():
            missing[c] = missing.get(c, 0) + int(v)

        # Update the set of unique firm IDs (excluding missing values)
        unique_ids.update(ch["ID"].dropna().astype("string").unique().tolist())

        # If FiscalPeriod is present, derive firm-year structures and coverage
        if "FiscalPeriod" in ch.columns:
            # Build (ID, FiscalPeriod) pairs and add to global set for uniqueness
            pairs = ch[["ID", "FiscalPeriod"]].dropna().astype("string").drop_duplicates()
            for rid, fp in pairs.itertuples(index=False):
                firm_year_pairs.add((rid, fp))

            # Extract a 4-digit year from FiscalPeriod for firm-year counts
            yrs = ch["FiscalPeriod"].astype("string").str.extract(year_re, expand=False)
            tmp = pd.DataFrame({"ID": ch["ID"].astype("string"), "Year": yrs})

            # Keep non-missing (ID, Year) combinations and drop duplicates
            tmp = tmp.dropna(subset=["ID", "Year"]).drop_duplicates(subset=["ID", "Year"])

            # Count occurrences per year within this chunk
            counts = tmp["Year"].value_counts()
            for y, cnt in counts.items():
                per_year_counts[y] = per_year_counts.get(y, 0) + int(cnt)

            # Track minimum and maximum FiscalPeriod (string comparison)
            fp_nonnull = ch["FiscalPeriod"].dropna().astype("string")
            if len(fp_nonnull):
                min_fp = fp_nonnull.min() if min_fp is None else min(min_fp, fp_nonnull.min())
                max_fp = fp_nonnull.max() if max_fp is None else max(max_fp, fp_nonnull.max())

        # Convert PIT Date to datetime and update global min/max PIT Date
        d = pd.to_datetime(ch["PIT Date"], errors="coerce", utc=False, format="mixed")
        if d.notna().any():
            dmin, dmax = d.min(skipna=True), d.max(skipna=True)
            if pd.notna(dmin):
                min_pit = dmin if min_pit is None else min(min_pit, dmin)
            if pd.notna(dmax):
                max_pit = dmax if max_pit is None else max(max_pit, dmax)

        # Count Frequency occurrences in this chunk and update global frequency counts
        fq = ch["Frequency"].astype("string").dropna()
        vc = fq.value_counts()
        for k, v in vc.items():
            freq_counts[str(k)] = freq_counts.get(str(k), 0) + int(v)

        # Explicitly delete chunk reference to help garbage collection
        del ch

    # Build per-year summary DataFrame (Year vs number of unique IDs)
    fpy = (
        pd.DataFrame(sorted(per_year_counts.items()), columns=["Year", "n_unique_IDs"])
        if per_year_counts else
        pd.DataFrame(columns=["Year", "n_unique_IDs"])
    )

    # Sum over all per-year firm counts to get a grand total
    grand_total_firms_per_year = int(fpy["n_unique_IDs"].sum()) if not fpy.empty else 0

    # Assemble summary metrics into a list of rows
    rows = []
    rows += [
        ("totals", "total_rows", total_rows),
        ("totals", "unique_IDs", len(unique_ids)),
        ("totals", "firm_years_unique", len(firm_year_pairs)),
        ("totals", "firms_per_year_SUM", grand_total_firms_per_year),
    ]
    rows += [("missing", c, int(v)) for c, v in sorted(missing.items())]
    rows += [
        ("dates", "PIT_Date_min", None if min_pit is None else min_pit.strftime("%Y-%m-%d")),
        ("dates", "PIT_Date_max", None if max_pit is None else max_pit.strftime("%Y-%m-%d")),
        ("fiscalperiod", "FiscalPeriod_min", min_fp),
        ("fiscalperiod", "FiscalPeriod_max", max_fp),
    ]
    rows += [("frequency_rows", k, int(v)) for k, v in sorted(freq_counts.items())]

    # Return overview table and per-year firm-count table
    return pd.DataFrame(rows, columns=["section", "metric", "value"]), fpy


def overview_B_like(clean_path: str) -> pd.DataFrame:
    # Initialize aggregators and tracking structures for B-like datasets
    total_rows = 0
    missing = {}
    unique_ids = set()
    min_pit, max_pit = None, None

    # Columns expected in B-like cleaned files
    usecols = ["ID", "PIT Date", "ItemCode", "Value"]

    # Stream the dataset in chunks to limit memory usage
    for ch in pd.read_csv(
        clean_path,
        sep=OUTPUT_SEP,
        usecols=usecols,
        dtype=str,
        engine="c",
        on_bad_lines="skip",
        chunksize=1_000_000,
        low_memory=False,
        encoding="latin1"
    ):
        # Count total rows seen in this chunk
        total_rows += len(ch)

        # Aggregate missing value counts per column
        m = ch.isna().sum()
        for c, v in m.items():
            missing[c] = missing.get(c, 0) + int(v)

        # Update unique ID set, ignoring missing IDs
        unique_ids.update(ch["ID"].dropna().astype("string").unique().tolist())

        # Convert PIT Date and update global min/max PIT Date
        d = pd.to_datetime(ch["PIT Date"], errors="coerce", utc=False, format="mixed")
        if d.notna().any():
            dmin, dmax = d.min(skipna=True), d.max(skipna=True)
            if pd.notna(dmin):
                min_pit = dmin if min_pit is None else min(min_pit, dmin)
            if pd.notna(dmax):
                max_pit = dmax if max_pit is None else max(max_pit, dmax)

        # Explicitly delete chunk reference to help garbage collection
        del ch

    # Build summary rows for B-like overview
    rows = [
        ("totals", "total_rows", total_rows),
        ("totals", "unique_IDs", len(unique_ids)),
        ("dates", "PIT_Date_min", None if min_pit is None else min_pit.strftime("%Y-%m-%d")),
        ("dates", "PIT_Date_max", None if max_pit is None else max_pit.strftime("%Y-%m-%d")),
    ]
    rows += [("missing", c, int(v)) for c, v in sorted(missing.items())]

    # Return overview table as DataFrame
    return pd.DataFrame(rows, columns=["section", "metric", "value"])


# Compute overviews for each cleaned dataset using the paths from the cleaning step
over_A_df, over_A_fpy = overview_A_like(A_SINGLE_PATH)        # Fundamentals_clean (A-like schema)
over_B_df              = overview_B_like(B_SINGLE_PATH)       # Current_clean (B-like schema)
over_C_df, over_C_fpy = overview_A_like(C_SINGLE_PATH)        # Calendar_clean (A-like schema)
over_D_df, over_D_fpy = overview_A_like(D_SINGLE_PATH)        # Meta_clean (A-like schema)


# Construct output paths for the overview and firms-per-year files
A_OV_PATH  = Path(Temp_file_path_GO) / "Fundamentals_clean_overview.txt"
A_FPY_PATH = Path(Temp_file_path_GO) / "Fundamentals_firms_per_year.txt"
B_OV_PATH  = Path(Temp_file_path_GO) / "Current_clean_overview.txt"
C_OV_PATH  = Path(Temp_file_path_GO) / "Calendar_clean_overview.txt"
C_FPY_PATH = Path(Temp_file_path_GO) / "Calendar_firms_per_year.txt"
D_OV_PATH  = Path(Temp_file_path_GO) / "Meta_clean_overview.txt"
D_FPY_PATH = Path(Temp_file_path_GO) / "Meta_firms_per_year.txt"

# Save all overviews to pipe-separated text files for downstream inspection
over_A_df.to_csv(A_OV_PATH, index=False, sep=OUTPUT_SEP)
over_A_fpy.to_csv(A_FPY_PATH, index=False, sep=OUTPUT_SEP)
over_B_df.to_csv(B_OV_PATH, index=False, sep=OUTPUT_SEP)
over_C_df.to_csv(C_OV_PATH, index=False, sep=OUTPUT_SEP)
over_C_fpy.to_csv(C_FPY_PATH, index=False, sep=OUTPUT_SEP)
over_D_df.to_csv(D_OV_PATH, index=False, sep=OUTPUT_SEP)
over_D_fpy.to_csv(D_FPY_PATH, index=False, sep=OUTPUT_SEP)

# Print a short summary of where the overview files were written
print("[OK] Saved overviews:")
print("  -", A_OV_PATH)
print("  -", A_FPY_PATH)
print("  -", B_OV_PATH)
print("  -", C_OV_PATH)
print("  -", C_FPY_PATH)
print("  -", D_OV_PATH)
print("  -", D_FPY_PATH)


[OK] Saved overviews:
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_clean_overview.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_firms_per_year.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Current_clean_overview.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Calendar_clean_overview.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Calendar_firms_per_year.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Meta_clean_overview.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Meta_firms_per_year.txt


### Summaries grouped by (ID, ItemCode)

In [27]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell builds grouped summaries at the (ID, ItemCode) level for each of the
# four cleaned datasets (Fundamentals, Current, Calendar, Meta).
#
# Main filtering and transformation logic:
#   - Reads each cleaned file in chunks to handle large data efficiently.
#   - For every chunk:
#       * Parses PIT Date into datetime for temporal aggregation.
#       * Converts ItemCode to a nullable integer type for stable grouping.
#       * Depending on the schema:
#           - "A_num" (numeric Value, e.g. Fundamentals):
#               · Converts Value to numeric and groups by (ID, ItemCode).
#               · Computes per-group statistics:
#                   n_rows, n_valid_value, min/max/mean/std of Value,
#                   first_date and last_date (min/max PIT Date).
#           - "A_str" / "B" (string Value, e.g. Current, Calendar, Meta):
#               · Treats Value as string and groups by (ID, ItemCode).
#               · Computes per-group statistics:
#                   n_rows, n_nonnull_value, first_date, last_date.
#       * Each chunk-level grouped result is appended to a list.
#   - After all chunks:
#       * Concatenates all chunk-level group results.
#       * Re-aggregates at (ID, ItemCode) to combine across chunks:
#           - Numeric schema ("A_num"): sums counts, recomputes global min/max
#             for value and dates, and averages mean/std across chunks.
#           - String schema ("A_str"/"B"): sums counts and recomputes min/max dates.
#       * Normalizes first_date and last_date into 'YYYY-MM-DD' string format.
#   - Finally, it runs this summarization for all four datasets and writes the
#     grouped summaries to pipe-separated text files, each containing one row
#     per (ID, ItemCode) combination with the corresponding aggregation metrics.
# =============================================================================

def summarize_from_clean(single_path: str, schema: str) -> pd.DataFrame:
    # List to collect per-chunk grouped DataFrames
    frames = []

    # Columns required from the clean file for aggregation
    usecols = ["ID", "ItemCode", "Value", "PIT Date"]

    # Stream the cleaned file in chunks to control memory usage
    for ch in pd.read_csv(
        single_path,
        sep=OUTPUT_SEP,
        usecols=usecols,
        dtype=str,
        engine="c",
        on_bad_lines="skip",
        chunksize=1_000_000,
        low_memory=False,
        encoding="latin1"
    ):
        # Parse PIT Date column to datetime for temporal aggregations
        d = pd.to_datetime(ch["PIT Date"], errors="coerce", utc=False, format="mixed")

        if schema in ("A_num",):
            # For numeric schema: convert Value to numeric for statistical aggregation
            v = pd.to_numeric(ch["Value"], errors="coerce")

            # Build a DataFrame with converted columns for grouping
            g = (
                pd.DataFrame({
                    "ID": ch["ID"].astype("string"),
                    "ItemCode": pd.to_numeric(ch["ItemCode"], errors="coerce").astype("Int64"),
                    "Value": v,
                    "PIT Date": d,
                })
                # Group by (ID, ItemCode), keeping rows with possible missing keys
                .groupby(["ID", "ItemCode"], dropna=False)
                # Compute per-group numeric statistics and date range
                .agg(
                    n_rows=("Value", "size"),         # Total rows in group
                    n_valid_value=("Value", "count"), # Number of non-NA Value entries
                    min_value=("Value", "min"),       # Minimum Value
                    max_value=("Value", "max"),       # Maximum Value
                    mean_value=("Value", "mean"),     # Mean Value
                    std_value=("Value", "std"),       # Standard deviation of Value
                    first_date=("PIT Date", "min"),   # Earliest PIT Date
                    last_date=("PIT Date", "max"),    # Latest PIT Date
                )
                .reset_index()
            )
        else:
            # For string schema: keep Value as string, focus on counts and date range
            g = (
                pd.DataFrame({
                    "ID": ch["ID"].astype("string"),
                    "ItemCode": pd.to_numeric(ch["ItemCode"], errors="coerce").astype("Int64"),
                    "Value": ch["Value"].astype("string"),
                    "PIT Date": d,
                })
                # Group by (ID, ItemCode), keeping rows with possible missing keys
                .groupby(["ID", "ItemCode"], dropna=False)
                # Compute per-group counts and date range
                .agg(
                    n_rows=("Value", "size"),          # Total rows in group
                    n_nonnull_value=("Value", "count"),# Number of non-NA Value entries
                    first_date=("PIT Date", "min"),    # Earliest PIT Date
                    last_date=("PIT Date", "max"),     # Latest PIT Date
                )
                .reset_index()
            )

        # Append per-chunk grouped result to the list
        frames.append(g)
        # Clean up chunk-level objects to free memory
        del ch, g

    # If no frames were produced (e.g., empty file), return an empty structure
    if not frames:
        return pd.DataFrame(columns=["ID", "ItemCode"])

    # Concatenate per-chunk grouped DataFrames into a single DataFrame
    allg = pd.concat(frames, ignore_index=True)

    if schema in ("A_num",):
        # For numeric schema, aggregate again across chunks at (ID, ItemCode) level
        out = (
            allg.groupby(["ID", "ItemCode"], dropna=False)
                .agg(
                    n_rows=("n_rows", "sum"),              # Sum of rows across chunks
                    n_valid_value=("n_valid_value", "sum"),# Sum of valid Value counts
                    min_value=("min_value", "min"),        # Global minimum Value
                    max_value=("max_value", "max"),        # Global maximum Value
                    first_date=("first_date", "min"),      # Earliest observed PIT Date
                    last_date=("last_date", "max"),        # Latest observed PIT Date
                    mean_value=("mean_value", "mean"),     # Average of chunk-level means
                    std_value=("std_value", "mean"),       # Average of chunk-level std devs
                )
                .reset_index()
        )
    else:
        # For string schema, aggregate counts and date ranges across chunks
        out = (
            allg.groupby(["ID", "ItemCode"], dropna=False)
                .agg(
                    n_rows=("n_rows", "sum"),                  # Sum of rows across chunks
                    n_nonnull_value=("n_nonnull_value", "sum"),# Sum of non-null Value counts
                    first_date=("first_date", "min"),          # Earliest observed PIT Date
                    last_date=("last_date", "max"),            # Latest observed PIT Date
                )
                .reset_index()
        )

    # Normalize first_date and last_date to 'YYYY-MM-DD' string format if present
    if "first_date" in out.columns:
        out["first_date"] = pd.to_datetime(out["first_date"], errors="coerce").dt.strftime("%Y-%m-%d")
    if "last_date" in out.columns:
        out["last_date"] = pd.to_datetime(out["last_date"], errors="coerce").dt.strftime("%Y-%m-%d")

    # Return the final grouped summary DataFrame
    return out


# Build grouped summaries for all four datasets using the cleaned file paths
groupA = summarize_from_clean(A_SINGLE_PATH, schema="A_num")  # Fundamentals (numeric Value)
groupB = summarize_from_clean(B_SINGLE_PATH, schema="B")      # Current (string Value)
groupC = summarize_from_clean(C_SINGLE_PATH, schema="A_str")  # Calendar (string Value)
groupD = summarize_from_clean(D_SINGLE_PATH, schema="A_str")  # Meta (string Value)

# Define output paths for the grouped summaries
outA = Path(Temp_file_path_GO) / "Fundamentals_grouped_by_ID_ItemCode.txt"
outB = Path(Temp_file_path_GO) / "Current_grouped_by_ID_ItemCode.txt"
outC = Path(Temp_file_path_GO) / "Calendar_grouped_by_ID_ItemCode.txt"
outD = Path(Temp_file_path_GO) / "Meta_grouped_by_ID_ItemCode.txt"

# Write grouped summaries to pipe-separated text files with Unix line endings
groupA.to_csv(outA, index=False, sep=OUTPUT_SEP, lineterminator="\n")
groupB.to_csv(outB, index=False, sep=OUTPUT_SEP, lineterminator="\n")
groupC.to_csv(outC, index=False, sep=OUTPUT_SEP, lineterminator="\n")
groupD.to_csv(outD, index=False, sep=OUTPUT_SEP, lineterminator="\n")

# Print a short status report listing the generated summary files
print("[OK] Saved grouped summaries:")
print("  -", outA)
print("  -", outB)
print("  -", outC)
print("  -", outD)


[OK] Saved grouped summaries:
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_grouped_by_ID_ItemCode.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Current_grouped_by_ID_ItemCode.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Calendar_grouped_by_ID_ItemCode.txt
  - /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Meta_grouped_by_ID_ItemCode.txt


### Save Subsets

In [28]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell creates per-ItemCode subset files from each of the four cleaned
# datasets (Fundamentals, Current, Calendar, Meta) and validates that the
# subset rows sum back to the original row counts.
#
# This version overwrites existing subset files on first write within a run,
# instead of raising a RuntimeError, when ALLOW_APPEND_ON_EXIST=False.
# =============================================================================

ALLOW_APPEND_ON_EXIST = False  # Overwrite existing subset files by default

subset_dir = Path(Subset_file_path)
for p in subset_dir.glob("subset_*.txt"):
    os.remove(p)

def _format_itemcode_for_filename(code_val) -> str:
    """
    For filenames only:
      - If ItemCode is exactly 4 digits -> prefix a '0' (e.g., 2048 -> '02048').
      - Otherwise, keep numeric digits as-is.
      - Non-numeric codes -> return None (skip).
    """
    if pd.isna(code_val):
        return None
    try:
        s = str(int(code_val))
    except Exception:
        return None
    return "0" + s if len(s) == 4 else s


def _write_subset(df: pd.DataFrame, out_path: Path, wrote_headers: set):
    """
    Writes df to its subset file.

    Behavior:
      - First write to this file *in this run*:
          * If file exists:
              - ALLOW_APPEND_ON_EXIST = False -> OVERWRITE file (mode='w', write header)
              - ALLOW_APPEND_ON_EXIST = True  -> APPEND (mode='a', no header)
          * If file does not exist:
              - Create new file (mode='w', header)
      - Later writes in this run:
          - Always append without header.
    """
    fname = out_path.name
    first_touch = (fname not in wrote_headers)

    if first_touch:
        if out_path.exists():
            if ALLOW_APPEND_ON_EXIST:
                mode = "a"
                write_header = False
            else:
                # OVERWRITE existing file
                mode = "w"
                write_header = True
        else:
            mode = "w"
            write_header = True

        wrote_headers.add(fname)

    else:
        mode = "a"
        write_header = False

    df.to_csv(
        out_path,
        index=False,
        sep=OUTPUT_SEP,
        mode=mode,
        header=write_header,
        lineterminator="\n"
    )


def create_subsets_for_dataset_flat(clean_path: str, dataset_label: str, schema: str, subset_base_dir: str):
    """
    Builds one file per ItemCode from a clean dataset.
    """
    print(f"[subsets:{dataset_label}] reading -> {clean_path}")

    subset_dir = Path(subset_base_dir)
    subset_dir.mkdir(parents=True, exist_ok=True)

    if schema in ("A_num", "A_str"):
        usecols = ["ID", "PIT Date", "Frequency", "FiscalPeriod", "ItemCode", "Value"]
    else:
        usecols = ["ID", "PIT Date", "ItemCode", "Value"]

    total_rows_in_clean = 0
    total_rows_written  = 0
    wrote_headers = set()
    chunk_idx = 0

    for ch in pd.read_csv(
        clean_path,
        sep=OUTPUT_SEP,
        usecols=usecols,
        dtype=str,
        engine="c",
        on_bad_lines="skip",
        chunksize=1_000_000,
        low_memory=False,
        encoding="latin1"
    ):
        chunk_idx += 1
        n_chunk = len(ch)
        total_rows_in_clean += n_chunk

        if "PIT Date" in ch.columns:
            d = pd.to_datetime(ch["PIT Date"], errors="coerce", utc=False, format="mixed")
            ch = ch.copy()
            ch["PIT Date"] = d.dt.strftime("%Y-%m-%d")

        item_numeric = pd.to_numeric(ch["ItemCode"], errors="coerce").astype("Int64")
        ch = ch[item_numeric.notna()].copy()
        if ch.empty:
            print(f"[subsets:{dataset_label}] chunk {chunk_idx:,}: no valid ItemCode rows, skipping")
            continue

        ch["__ic__"] = item_numeric[item_numeric.notna()]

        for ic_val, df_ic in ch.groupby("__ic__", sort=False, dropna=True):
            code_str = _format_itemcode_for_filename(ic_val)
            if not code_str:
                continue

            out_path = Path(Subset_file_path) / f"subset_{code_str}.txt"

            _write_subset(
                df_ic.drop(columns="__ic__", errors="ignore"),
                out_path,
                wrote_headers
            )
            total_rows_written += len(df_ic)

        del ch
        gc.collect()

        print(
            f"[subsets:{dataset_label}] chunk={chunk_idx:,}  "
            f"total_rows_in_clean={total_rows_in_clean:,}  "
            f"total_rows_in_subsets={total_rows_written:,}"
        )

    ok = (total_rows_in_clean == total_rows_written)
    status = "OK" if ok else "MISMATCH"
    print(f"[subsets:{dataset_label}] validation: sum(subset rows) == rows in clean file -> {status}")
    if not ok:
        print(f"  rows_in_clean={total_rows_in_clean:,}, rows_in_subsets={total_rows_written:,}")

    return ok


# Require that clean file paths exist
assert all(name in globals() for name in ["A_SINGLE_PATH", "B_SINGLE_PATH", "C_SINGLE_PATH", "D_SINGLE_PATH"]), \
    "Missing clean file paths A_SINGLE_PATH/B_SINGLE_PATH/C_SINGLE_PATH/D_SINGLE_PATH."

# Build subsets for all four clean datasets
okA = create_subsets_for_dataset_flat(A_SINGLE_PATH, "Fundamentals", "A_num", Subset_file_path)
okB = create_subsets_for_dataset_flat(B_SINGLE_PATH, "Current",      "B",     Subset_file_path)
okC = create_subsets_for_dataset_flat(C_SINGLE_PATH, "Calendar",     "A_str", Subset_file_path)
okD = create_subsets_for_dataset_flat(D_SINGLE_PATH, "Meta",         "A_str", Subset_file_path)

print("\nSubset build results (flat folder):")
print("  Fundamentals:", "OK" if okA else "MISMATCH")
print("  Current     :", "OK" if okB else "MISMATCH")
print("  Calendar    :", "OK" if okC else "MISMATCH")
print("  Meta        :", "OK" if okD else "MISMATCH")


[subsets:Fundamentals] reading -> /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_clean.txt
[subsets:Fundamentals] chunk=1  total_rows_in_clean=1,000,000  total_rows_in_subsets=1,000,000
[subsets:Fundamentals] chunk=2  total_rows_in_clean=2,000,000  total_rows_in_subsets=2,000,000
[subsets:Fundamentals] chunk=3  total_rows_in_clean=3,000,000  total_rows_in_subsets=3,000,000
[subsets:Fundamentals] chunk=4  total_rows_in_clean=4,000,000  total_rows_in_subsets=4,000,000
[subsets:Fundamentals] chunk=5  total_rows_in_clean=5,000,000  total_rows_in_subsets=5,000,000
[subsets:Fundamentals] chunk=6  total_rows_in_clean=6,000,000  total_rows_in_subsets=6,000,000
[subsets:Fundamentals] chunk=7  total_rows_in_clean=7,000,000  total_rows_in_subsets=7,000,000
[subsets:Fundamentals] chunk=8  total_rows_in_clean=8,000,000  total_rows_in_subsets=8,000,000
[subsets:Fundamentals] chunk=9  total_rows_in_clean=9,000,000  total_rows_in_subsets=9,000,000
[subsets:Fundamentals] ch

### Final Check on Removed Rows

In [29]:
import pandas as pd
from pathlib import Path
from collections import Counter

# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell implements an audit routine for the two-file setup (clean + errors)
# of the pipeline. For each of the four datasets (Fundamentals, Current,
# Calendar, Meta), it:
#
#   - Counts data rows in the relevant, clean, and error files (excluding
#     header rows) and checks that:
#       rows_clean + rows_errors == rows_relevant
#     This validates that each row from the "relevant_raw" input is either
#     cleaned or classified as an error, with no silent drops.
#
#   - If an errors file exists and contains rows, it scans the error file
#     in chunks and derives several diagnostic summaries:
#       * The set size of unique 'Value' entries appearing in error rows.
#       * The set size of unique 'OriginalValue' entries (if that column
#         exists), which captures pre-coercion values.
#       * The most frequent ItemCodes among error rows, highlighting which
#         items drive most of the issues.
#       * The most frequent country codes extracted from positions 2–4 of
#         the ID string, indicating which regions have higher error rates.
#
#   - It prints human-readable summaries (row counts and top offenders) and
#     returns a structured dictionary with key results for potential programmatic
#     inspection, including row counts, uniqueness counts, and the top-n
#     ItemCodes and countries.
# =============================================================================


# ======================
# CONFIG
# ======================

OUTPUT_SEP = "|"           # Delimiter used in the pipeline's text files
CHUNK_SIZE = 1_000_000     # Chunk size to safely read large CSVs in memory


# ======================
# HELPERS
# ======================

def count_data_rows(path: Path) -> int:
    """
    Count data rows (excluding header). Returns 0 if file doesn't exist.
    """
    # If the file does not exist, there are no data rows
    if not path.exists():
        return 0

    # Count total lines in the file
    with open(path, encoding="latin1") as f:
        total = sum(1 for _ in f)

    # Subtract one for the header line, but never return negative values
    return max(total - 1, 0)


def get_country_code(id_val: str) -> str:
    """
    Extract country code as chars 2–4 (1-based) from alphanumeric ID.
    If ID too short or missing, return ''.
    """
    # Ensure the ID is a string; otherwise, return empty code
    if not isinstance(id_val, str):
        return ""
    # If ID has at least 4 characters, return positions 2–4; else return empty
    if len(id_val) >= 4:
        return id_val[1:4]
    return ""


def audit_new_pipeline(
    temp_dir: str,
    relevant_filename: str,
    clean_filename: str,
    errors_filename: str,
    label: str,
    top_n: int = 20,
):
    """
    For the 2-file setup (clean + errors):

    - Check: rows_clean + rows_errors == rows_relevant
    - Report:
        * Unique 'Value' in errors
        * Unique 'OriginalValue' in errors (if present)
        * Top ItemCodes by error count
        * Top country codes by error count (from ID[2–4])
    """

    # Base directory where all intermediate files of this dataset are stored
    base = Path(temp_dir)

    # Construct full paths to relevant, clean, and errors files
    relevant_path = base / relevant_filename
    clean_path    = base / clean_filename
    errors_path   = base / errors_filename

    # High-level context output
    print(f"\n=========== AUDIT: {label} ===========")
    print("Relevant :", relevant_path)
    print("Clean    :", clean_path)
    print("Errors   :", errors_path)

    # ----- A) Row check -----
    # Count rows in each of the three files, excluding headers
    n_rel   = count_data_rows(relevant_path)
    n_clean = count_data_rows(clean_path)
    n_err   = count_data_rows(errors_path)

    print("\n[A] ROW CHECK (clean + errors vs relevant)")
    print(f"Rows Relevant : {n_rel}")
    print(f"Rows Clean    : {n_clean}")
    print(f"Rows Errors   : {n_err}")

    # Total rows explained by clean + error outputs
    explained = n_clean + n_err
    # Difference between relevant rows and explained rows
    diff = n_rel - explained

    print(f"Rows Explained: {explained}")
    print(f"Difference    : {diff}")

    # Interpret the row-balance condition and warn if mismatch exists
    if n_rel > 0 and diff == 0:
        print("[OK] All rows accounted for: clean + errors == relevant")
    elif n_rel == 0:
        print("[WARN] Relevant file seems empty or missing.")
    else:
        print("[WARN] Mismatch: investigate pipeline / filenames.")

    # If there are no error rows or the error file is absent, skip detailed analysis
    if not errors_path.exists() or n_err == 0:
        print("\n[INFO] No errors found or errors file missing; skipping error-detail analysis.")
        return {
            "rows": {
                "relevant": n_rel,
                "clean": n_clean,
                "errors": n_err,
                "explained": explained,
                "diff": diff,
            },
            "error_value_unique_count": 0,
            "error_original_value_unique_count": 0,
            "top_itemcodes": [],
            "top_countries": [],
        }

    # ----- B) Scan errors file (chunked) -----
    # Sets to capture unique string representations of problematic values
    value_uniques = set()
    original_value_uniques = set()

    # Counters for ItemCodes and country codes observed in error rows
    item_counter = Counter()
    country_counter = Counter()

    # Read the error file in chunks to handle large files
    for chunk in pd.read_csv(
        errors_path,
        sep=OUTPUT_SEP,
        dtype=str,
        engine="c",
        encoding="latin1",
        chunksize=CHUNK_SIZE,
        on_bad_lines="skip"
    ):
        # Unique Value entries observed in the error rows
        if "Value" in chunk.columns:
            vals = chunk["Value"].dropna().astype(str)
            value_uniques.update(vals.unique())

        # Unique OriginalValue entries if column is present (before coercion)
        if "OriginalValue" in chunk.columns:
            ovals = chunk["OriginalValue"].dropna().astype(str)
            original_value_uniques.update(ovals.unique())

        # ItemCode distribution: count occurrences of each ItemCode in errors
        if "ItemCode" in chunk.columns:
            ic = chunk["ItemCode"].dropna().astype(str)
            item_counter.update(ic)

        # Country distribution: derive country code from ID[2–4] for each error row
        if "ID" in chunk.columns:
            ids = chunk["ID"].dropna().astype(str)
            countries = [get_country_code(x) for x in ids if len(x) >= 4]
            country_counter.update(countries)

    # ----- C) Print summaries -----

    print("\n[B] UNIQUE 'Value' ENTRIES IN ERRORS")
    print(f"Unique non-empty Value entries: {len(value_uniques)}")
    for v in list(value_uniques)[:20]:
        print(f"  {v}")

    print("\n[C] UNIQUE 'OriginalValue' ENTRIES IN ERRORS")
    print(f"Unique non-empty OriginalValue entries: {len(original_value_uniques)}")
    for v in list(original_value_uniques)[:20]:
        print(f"  {v}")

    print(f"\n[D] TOP {top_n} ITEMCODES BY ERROR ROWS")
    if item_counter:
        for item, cnt in item_counter.most_common(top_n):
            print(f"  {item}: {cnt}")
    else:
        print("  No ItemCode column or no error rows with ItemCode.")

    print(f"\n[E] TOP {top_n} COUNTRY CODES BY ERROR ROWS (from ID[2–4])")
    if country_counter:
        for cc, cnt in country_counter.most_common(top_n):
            print(f"  {cc}: {cnt}")
    else:
        print("  No valid country codes derived from ID in errors.")

    # Return a structured summary of the audit results
    return {
        "rows": {
            "relevant": n_rel,
            "clean": n_clean,
            "errors": n_err,
            "explained": explained,
            "diff": diff,
        },
        "error_value_unique_count": len(value_uniques),
        "error_original_value_unique_count": len(original_value_uniques),
        "top_itemcodes": item_counter.most_common(top_n),
        "top_countries": country_counter.most_common(top_n),
        "paths": {
            "relevant": str(relevant_path),
            "clean": str(clean_path),
            "errors": str(errors_path),
        },
    }


# ======================
# RUN AUDIT FOR ALL 4 DATASETS
# ======================

# Run audit on Fundamentals dataset and collect summary
fundamentals_result = audit_new_pipeline(
    Temp_file_path_GO,
    "A_relevant_raw.txt",
    "Fundamentals_clean.txt",
    "Fundamentals_clean_errors.txt",
    label="Fundamentals"
)

# Run audit on Current dataset and collect summary
current_result = audit_new_pipeline(
    Temp_file_path_GO,
    "B_relevant_raw.txt",
    "Current_clean.txt",
    "Current_clean_errors.txt",
    label="Current"
)

# Run audit on Calendar dataset and collect summary
calendar_result = audit_new_pipeline(
    Temp_file_path_GO,
    "C_relevant_raw.txt",
    "Calendar_clean.txt",
    "Calendar_clean_errors.txt",
    label="Calendar"
)

# Run audit on Meta dataset and collect summary
meta_result = audit_new_pipeline(
    Temp_file_path_GO,
    "D_relevant_raw.txt",
    "Meta_clean.txt",
    "Meta_clean_errors.txt",
    label="Meta"
)



Relevant : /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/A_relevant_raw.txt
Clean    : /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_clean.txt
Errors   : /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/Fundamentals_clean_errors.txt

[A] ROW CHECK (clean + errors vs relevant)
Rows Relevant : 229855498
Rows Clean    : 224547306
Rows Errors   : 5308192
Rows Explained: 229855498
Difference    : 0
[OK] All rows accounted for: clean + errors == relevant

[B] UNIQUE 'Value' ENTRIES IN ERRORS
Unique non-empty Value entries: 0

[C] UNIQUE 'OriginalValue' ENTRIES IN ERRORS
Unique non-empty OriginalValue entries: 1
  n

[D] TOP 20 ITEMCODES BY ERROR ROWS
  3273: 253028
  2652: 227161
  5202: 189514
  3051: 182258
  3451: 169624
  1250: 167469
  3251: 165963
  1001: 145255
  3426: 142845
  2256: 141608
  3066: 138575
  1051: 138149
  1551: 136511
  1706: 132036
  1451: 131602
  1151: 128217
  2501: 125291
  3501: 122574
  2999: 121818


# 2.0. Datastream (Restart Kernel before)

### Transform Constituents File

In [30]:
# -----------------------------------------------------------------------------
# Summary of this cell
# -----------------------------------------------------------------------------
# 1. Read the constituents CSV:
#    - column 0 → DS_ID
#    - column 2 (Excel "C") → EQ flag
#    - column 4 (Excel "E") → filter column (e.g. 'N')
#    - column 7 → ID
# 2. Keep only rows where column C == "EQ"; others → error file (reason).
# 3. Add NationCode (digits at positions 2–4 of ID) as a helper column.
# 4. Drop all rows where column E == "N" (these go to the error file with reason).
# 5. Remove duplicate (DS_ID, ID) combinations, keeping the first; dropped rows → error file.
# 6. Drop rows whose ID does not match the required pattern ^C[a-zA-Z0-9]{8}$; → error file.
# 7. Drop rows whose ID occurs more than once (non-unique IDs); all such rows → error file.
# 8. Save:
#    - Clean rows → ID_mapping_clean.txt  (without NationCode column)
#    - All dropped rows + DropReason → ID_mapping_errors.txt  (without NationCode column)
# 9. Print:
#    - Initial totals and unique-ID counts
#    - How many rows were dropped for each reason
#    - Final clean-row count and unique-ID count
#    - Error-row counts per NationCode
#    - Clean-row counts per NationCode
# -----------------------------------------------------------------------------

import os
import pandas as pd
import re

# -----------------------------------------------------------------------------
# 1. Define input and output paths
# -----------------------------------------------------------------------------
input_file = Constituents_file_path
output_folder = Temp_file_path_GO

# Create output folder if missing
os.makedirs(output_folder, exist_ok=True)

# -----------------------------------------------------------------------------
# 2. Read the CSV with fallback encoding
#    We need:
#      - column 0 → DS_ID
#      - column 2 (Excel "C") → EQ flag
#      - column 4 (Excel "E") → filter column (e.g. 'N')
#      - column 7 → ID
# -----------------------------------------------------------------------------
try:
    df = pd.read_csv(
        input_file,
        header=0,
        usecols=[0, 2, 4, 7]
    )
except UnicodeDecodeError:
    df = pd.read_csv(
        input_file,
        header=0,
        usecols=[0, 2, 4, 7],
        encoding="ISO-8859-1"
    )

# Keep original column names so we can rename only what we need
orig_cols = df.columns.tolist()
ds_col   = orig_cols[0]   # DS_ID column (col 0)
col_c    = orig_cols[1]   # column C (EQ flag)
flag_col = orig_cols[2]   # column E (used for 'N' filter)
id_col   = orig_cols[3]   # ID column (col 7)

# Rename DS_ID and ID; keep the original names of column C and column E
df = df.rename(columns={ds_col: "DS_ID", id_col: "ID"})
# col_c and flag_col stay as they are

# Store initial stats (before any filtering)
initial_total_rows = len(df)
initial_unique_ids = df["ID"].nunique()

# -----------------------------------------------------------------------------
# 3. Add NationCode helper column (positions 2–4 of ID)
# -----------------------------------------------------------------------------
df["NationCode"] = df["ID"].astype(str).str[1:4]

# List to collect all dropped rows
error_frames = []

# -----------------------------------------------------------------------------
# 4. NEW: Filter on column C == "EQ" (keep only "EQ")
# -----------------------------------------------------------------------------
mask_c_not_eq = df[col_c].astype(str) != "EQ"
dropped_c_not_eq = df[mask_c_not_eq].copy()
if not dropped_c_not_eq.empty:
    dropped_c_not_eq["DropReason"] = "C_not_EQ"
    error_frames.append(dropped_c_not_eq)

df = df[~mask_c_not_eq]

# -----------------------------------------------------------------------------
# 5. Drop rows with 'N' in column E
# -----------------------------------------------------------------------------
mask_flag_n = df[flag_col].astype(str) == "N"
dropped_flag_n = df[mask_flag_n].copy()
if not dropped_flag_n.empty:
    dropped_flag_n["DropReason"] = "Flag_N"
    error_frames.append(dropped_flag_n)

df = df[~mask_flag_n]

# -----------------------------------------------------------------------------
# 6. Remove duplicate DS_ID+ID combinations (keep first)
# -----------------------------------------------------------------------------
mask_dups_ds_id = df.duplicated(subset=["DS_ID", "ID"], keep="first")
dropped_dups_ds_id = df[mask_dups_ds_id].copy()
if not dropped_dups_ds_id.empty:
    dropped_dups_ds_id["DropReason"] = "Duplicate_DS_ID_ID"
    error_frames.append(dropped_dups_ds_id)

df = df[~mask_dups_ds_id]

# -----------------------------------------------------------------------------
# 7. Validate ID pattern: must match ^C[a-zA-Z0-9]{8}$
# -----------------------------------------------------------------------------
pattern = re.compile(r"^C[a-zA-Z0-9]{8}$")
mask_invalid = ~df["ID"].astype(str).str.match(pattern)

dropped_invalid = df[mask_invalid].copy()
if not dropped_invalid.empty:
    dropped_invalid["DropReason"] = "Invalid_ID_format"
    error_frames.append(dropped_invalid)

df = df[~mask_invalid]

# -----------------------------------------------------------------------------
# 8. Drop non-unique IDs (IDs that appear more than once)
# -----------------------------------------------------------------------------
id_counts = df["ID"].value_counts()
nonunique_ids = id_counts[id_counts > 1].index

mask_nonunique_ids = df["ID"].isin(nonunique_ids)
dropped_nonunique = df[mask_nonunique_ids].copy()
if not dropped_nonunique.empty:
    dropped_nonunique["DropReason"] = "Non_unique_ID"
    error_frames.append(dropped_nonunique)

df_clean = df[~mask_nonunique_ids].copy()

# -----------------------------------------------------------------------------
# 9. Combine error rows
# -----------------------------------------------------------------------------
if error_frames:
    df_errors = pd.concat(error_frames, ignore_index=True)
else:
    # Empty frame with expected columns if no errors (unlikely, but safe)
    df_errors = pd.DataFrame(columns=list(df_clean.columns) + ["DropReason"])

# -----------------------------------------------------------------------------
# 10. Final stats and NationCode summaries
# -----------------------------------------------------------------------------
final_total_rows = len(df_clean)
final_unique_ids = df_clean["ID"].nunique()

print(f"Initial total rows: {initial_total_rows}")
print(f"Initial unique IDs: {initial_unique_ids}")
print("Dropped rows by reason:")
print(df_errors["DropReason"].value_counts())

print(f"\nFinal clean rows: {final_total_rows}")
print(f"Unique IDs in clean output: {final_unique_ids}")

# Error counts per NationCode
if not df_errors.empty:
    print("\nError rows per NationCode:")
    error_nat_counts = (
        df_errors.groupby("NationCode")
        .size()
        .reset_index(name="ErrorRowCount")
        .sort_values(by="ErrorRowCount", ascending=False)
    )
    print(error_nat_counts.to_string(index=False))
else:
    print("\nNo error rows; therefore no NationCode error breakdown.")

# Clean counts per NationCode
if not df_clean.empty:
    print("\nClean rows per NationCode:")
    clean_nat_counts = (
        df_clean.groupby("NationCode")
        .size()
        .reset_index(name="CleanRowCount")
        .sort_values(by="CleanRowCount", ascending=False)
    )
    print(clean_nat_counts.to_string(index=False))
else:
    print("\nNo clean rows; therefore no NationCode clean breakdown.")

# -----------------------------------------------------------------------------
# 11. Drop helper column NationCode before saving
# -----------------------------------------------------------------------------
# Be defensive: only drop columns that actually exist (e.g. MAJOR might not be present)
cols_to_drop = [col for col in ["NationCode", "MAJOR", "TYPE"] if col in df_clean.columns]
df_clean_to_save = df_clean.drop(columns=cols_to_drop)

cols_to_drop_errors = [col for col in ["NationCode", "MAJOR"] if col in df_errors.columns]
df_errors_to_save = df_errors.drop(columns=cols_to_drop_errors)

# -----------------------------------------------------------------------------
# 12. Write pipe-separated output files
# -----------------------------------------------------------------------------
output_clean_file = os.path.join(output_folder, "ID_mapping_clean.txt")
output_error_file = os.path.join(output_folder, "ID_mapping_errors.txt")

df_clean_to_save.to_csv(
    output_clean_file,
    sep="|",
    index=False
)

df_errors_to_save.to_csv(
    output_error_file,
    sep="|",
    index=False
)

print(f"\nSaved {len(df_errors_to_save)} error rows → {output_error_file}")
print(f"Saved {len(df_clean_to_save)} clean rows → {output_clean_file}")
print(f"Unique IDs in clean output: {final_unique_ids}")


Initial total rows: 167649
Initial unique IDs: 102113
Dropped rows by reason:
DropReason
Duplicate_DS_ID_ID    53992
Flag_N                10525
C_not_EQ               6380
Non_unique_ID          1528
Invalid_ID_format       120
Name: count, dtype: int64

Final clean rows: 95104
Unique IDs in clean output: 95104

Error rows per NationCode:
NationCode  ErrorRowCount
       156           8805
       826           7276
       356           6946
       124           6883
       392           6047
       840           5682
       760           4536
       036           4060
       410           3074
       344           2271
       250           2196
       280           2189
       643           1557
       458           1545
       764            987
       076            968
        an            936
       752            641
       617            567
       796            502
       484            432
       756            378
       208            337
       300            302
       7

### Check for Duplicates

In [31]:
import pandas as pd
import os

base_folder_go = Temp_file_path_GO
mapping_file = os.path.join(base_folder_go, "ID_mapping_clean.txt")

print(f"Checking for standalone duplicate IDs and DS_IDs in: {mapping_file}")

try:
    # Load the mapping file, explicitly typing columns as strings to prevent mixed types
    mapping_df = pd.read_csv(
        mapping_file,
        sep="|",
        dtype={"DS_ID": "string", "ID": "string"},
        low_memory=False
    )

    print("\n--- Checking 'ID' column for duplicates ---")
    # Find duplicate IDs
    duplicate_ids = mapping_df[mapping_df['ID'].duplicated(keep=False)].sort_values('ID')

    if not duplicate_ids.empty:
        print(f"Found {len(duplicate_ids['ID'].unique())} unique IDs that appear more than once in the 'ID' column:")
        display(duplicate_ids[['ID']].drop_duplicates().reset_index(drop=True))
    else:
        print("No standalone duplicate values found in the 'ID' column.")

    print("\n--- Checking 'DS_ID' column for duplicates ---")
    # Find duplicate DS_IDs
    duplicate_ds_ids = mapping_df[mapping_df['DS_ID'].duplicated(keep=False)].sort_values('DS_ID')

    if not duplicate_ds_ids.empty:
        print(f"Found {len(duplicate_ds_ids['DS_ID'].unique())} unique DS_IDs that appear more than once in the 'DS_ID' column:")
        display(duplicate_ds_ids[['DS_ID']].drop_duplicates().reset_index(drop=True))
    else:
        print("No standalone duplicate values found in the 'DS_ID' column.")

except FileNotFoundError:
    print(f"Error: The file {mapping_file} was not found. Please ensure the path is correct.")
except Exception as e:
    print(f"An error occurred: {e}")

Checking for standalone duplicate IDs and DS_IDs in: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/ID_mapping_clean.txt

--- Checking 'ID' column for duplicates ---
No standalone duplicate values found in the 'ID' column.

--- Checking 'DS_ID' column for duplicates ---
No standalone duplicate values found in the 'DS_ID' column.


### Transform MarketValues and Drop Small Companies in Accordance with Bartram & Grinblatt (2021)(MV)

===> Hier mit USD MVs, damit <10 Mio rausgefiltert werden. Danach is USD MV erst wieder für Übersichtstabellen relevant

In [32]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# PURPOSE
# -------
# This cell processes a folder of raw daily market value CSV files and produces
# one standardized long-format output file, while logging every dropped row.
#
# It is designed to remove all market values below 10 million (MV < 10) without
# accidentally excluding firms that are above 10 million most of the time.
#
# INPUT
# -----
# - Multiple raw CSV files in `MarketValues_file_path` (processed sequentially).
# - Each file structure:
#     * First row = metadata (skipped)
#     * Next row contains date labels starting at `start_col`
#     * Subsequent rows contain DS_ID in column 0 and market values in date columns
#
# PROCESSING OVERVIEW (per file)
# ------------------------------
# 1) Read CSV (delimiter inferred), skip first metadata row.
# 2) Drop fully empty rows/columns.
# 3) Reshape wide matrix (DS_ID × date columns) into long format:
#       DS_ID | DayDate | MV
# 4) Date cleaning:
#       - Parse DayDate as datetime (day-first)
#       - Drop & log unparseable dates as "Invalid DayDate"
# 5) MV cleaning:
#       - Convert MV to numeric
#       - Drop & log non-numeric MV as "Invalid MV"
# 6) Enforce minimum MV rule:
#       - Drop & log all rows with MV < 10 as "MV below 10M"
# 7) Firm eligibility decision (firm-level):
#       - This step decides whether to keep the firm's remaining MV>=10 rows
#         or exclude the firm entirely.
#       - Eligibility metrics are computed using all valid MV observations
#         (including MV<10) to avoid skew:
#           * n_obs: number of observations for the firm in this file
#           * frac_ge10: fraction of observations where MV >= 10
#       - A firm is eligible if:
#           * n_obs >= MIN_OBS
#           * frac_ge10 >= KEEP_FRAC
#       - Firms that do not meet these thresholds are excluded entirely, and any
#         MV>=10 rows that would otherwise remain are logged as:
#           "Firm excluded by stability thresholds"
#
# OUTPUT
# ------
# - MV_raw.txt:
#     DS_ID|DayDate|MV   (only MV>=10 for eligible firms)
# - MV_cleaning_errors.txt:
#     DS_ID|DayDate|MV|Reason|SourceFile (all dropped rows)
#
# AUDITABILITY
# ------------
# Per file, prints row accounting so:
#   initial_rows = written + invalid_dates + invalid_mv + mv_below_10 + mv_ge10_excluded_firm
# =============================================================================

import os
import glob
import pandas as pd
from pandas.errors import ParserError

# --------------------------
# User-tunable thresholds
# --------------------------
KEEP_FRAC = 0.80  # keep firms that are >=10M on at least 80% of observations
MIN_OBS = 30      # require at least 30 observations per firm in a file

# Input / output configuration
input_folder = MarketValues_file_path
output_folder = Temp_file_path_GO
start_col = 7

os.makedirs(output_folder, exist_ok=True)

output_file = os.path.join(output_folder, "MV_raw.txt")
error_file = os.path.join(output_folder, "MV_cleaning_errors.txt")

# Remove old files to start clean
for f in [output_file, error_file]:
    if os.path.exists(f):
        os.remove(f)

# Create output files with headers
with open(output_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|MV\n")

with open(error_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|MV|Reason|SourceFile\n")

csv_files = sorted(glob.glob(os.path.join(input_folder, "*.csv")))

for input_file in csv_files:
    print(f"Processing: {input_file}")
    source_name = os.path.basename(input_file)

    # -----------------------------
    # Read raw file (robust parsing)
    # -----------------------------
    try:
        df = pd.read_csv(input_file, header=None, sep=None, engine="python", skiprows=1)
    except ParserError:
        df = pd.read_csv(
            input_file,
            header=None,
            sep=None,
            engine="python",
            skiprows=1,
            on_bad_lines="skip"
        )

    # Drop empty rows and columns
    df = df.dropna(how="all").dropna(axis=1, how="all")

    # -----------------------------------
    # Restructure wide -> long (DS_ID, DayDate, MV)
    # -----------------------------------
    dates = df.iloc[0, start_col:]
    ids = df.iloc[1:, 0]
    values = df.iloc[1:, start_col:]
    values.columns = dates.values

    id_map = ids.to_dict()

    long_df = values.stack().reset_index()
    long_df.columns = ["row_idx", "DayDate", "MV"]
    long_df["DS_ID"] = long_df["row_idx"].map(id_map)

    # Row accounting: initial rows after reshaping (before cleaning)
    initial_rows = len(long_df)

    # Initialize counters for removals
    invalid_daydate_count = 0
    invalid_mv_count = 0
    low_mv_count = 0
    firm_excluded_mv_ge10_count = 0

    # ---------------------------------------------------------------------
    # 1) Date parsing
    # ---------------------------------------------------------------------
    long_df["DayDate"] = pd.to_datetime(long_df["DayDate"], errors="coerce", dayfirst=True)

    invalid_daydate = long_df[long_df["DayDate"].isna()].copy()
    invalid_daydate_count = len(invalid_daydate)

    if invalid_daydate_count > 0:
        invalid_daydate["Reason"] = "Invalid DayDate"
        invalid_daydate["SourceFile"] = source_name
        invalid_daydate[["DS_ID", "DayDate", "MV", "Reason", "SourceFile"]].to_csv(
            error_file, sep="|", index=False, mode="a", header=False
        )
        print(f"  Warning: {invalid_daydate_count} invalid dates")

    # Keep only valid dates
    long_df = long_df[long_df["DayDate"].notna()].copy()
    long_df["DayDate"] = long_df["DayDate"].dt.strftime("%Y-%m-%d")

    # ---------------------------------------------------------------------
    # 2) Market value parsing
    # ---------------------------------------------------------------------
    long_df["MV"] = pd.to_numeric(long_df["MV"], errors="coerce")

    invalid_mv = long_df[long_df["MV"].isna()].copy()
    invalid_mv_count = len(invalid_mv)

    if invalid_mv_count > 0:
        invalid_mv["Reason"] = "Invalid MV"
        invalid_mv["SourceFile"] = source_name
        invalid_mv[["DS_ID", "DayDate", "MV", "Reason", "SourceFile"]].to_csv(
            error_file, sep="|", index=False, mode="a", header=False
        )
        print(f"  Warning: {invalid_mv_count} invalid MV values")

    # Keep only valid numeric MV
    long_df = long_df[long_df["MV"].notna()].copy()

    # ---------------------------------------------------------------------
    # 3) Always remove MV < 10 (row-level)
    # ---------------------------------------------------------------------
    low_mv = long_df[long_df["MV"] < 10].copy()
    low_mv_count = len(low_mv)

    if low_mv_count > 0:
        low_mv["Reason"] = "MV below 10M"
        low_mv["SourceFile"] = source_name
        low_mv[["DS_ID", "DayDate", "MV", "Reason", "SourceFile"]].to_csv(
            error_file, sep="|", index=False, mode="a", header=False
        )
        print(f"  Warning: {low_mv_count} MV < 10M")

    # ---------------------------------------------------------------------
    # 4) Firm-level eligibility using n_obs and frac_ge10 ONLY
    # ---------------------------------------------------------------------
    # Compute metrics on all valid MV rows (including <10) so frac_ge10 is meaningful.
    metrics = long_df.groupby("DS_ID")["MV"].agg(
        n_obs="size",
        frac_ge10=lambda x: (x >= 10).mean()
    )

    eligible_ids = metrics[
        (metrics["n_obs"] >= MIN_OBS) &
        (metrics["frac_ge10"] >= KEEP_FRAC)
    ].index

    # Log MV>=10 rows that are removed solely because the firm is excluded
    ineligible_mv_ge10 = long_df[
        (~long_df["DS_ID"].isin(eligible_ids)) &
        (long_df["MV"] >= 10)
    ].copy()
    firm_excluded_mv_ge10_count = len(ineligible_mv_ge10)

    if firm_excluded_mv_ge10_count > 0:
        ineligible_mv_ge10["Reason"] = "Firm excluded by stability thresholds"
        ineligible_mv_ge10["SourceFile"] = source_name
        ineligible_mv_ge10[["DS_ID", "DayDate", "MV", "Reason", "SourceFile"]].to_csv(
            error_file, sep="|", index=False, mode="a", header=False
        )
        print(f"  Warning: {firm_excluded_mv_ge10_count} MV>=10 rows removed (firm excluded)")

    # Final kept dataset: eligible firms + MV >= 10 only
    result = long_df[
        (long_df["DS_ID"].isin(eligible_ids)) &
        (long_df["MV"] >= 10)
    ][["DS_ID", "DayDate", "MV"]].copy()

    rows_written = len(result)

    # ---------------------------------------------------------------------
    # Row accounting reconciliation
    # ---------------------------------------------------------------------
    total_removed = (
        invalid_daydate_count
        + invalid_mv_count
        + low_mv_count
        + firm_excluded_mv_ge10_count
    )
    check_value = initial_rows - total_removed  # should equal rows_written

    print("  Row accounting:")
    print(f"    Initial rows                          : {initial_rows}")
    print(f"    Invalid dates removed                 : {invalid_daydate_count}")
    print(f"    Invalid MV removed                    : {invalid_mv_count}")
    print(f"    MV < 10M removed (row-level)          : {low_mv_count}")
    print(f"    MV>=10 removed (firm excluded)        : {firm_excluded_mv_ge10_count}")
    print(f"    Rows written                          : {rows_written}")
    print(f"    Total removed                         : {total_removed}")
    print(f"    Check (initial-removed)               : {check_value}")

    if check_value != rows_written:
        print("  WARNING: Row accounting mismatch detected (please review pipeline).")

    # Append output
    if not result.empty:
        result.to_csv(output_file, sep="|", index=False, mode="a", header=False)

print(f"Done. Combined file written to: {output_file}")
print(f"All dropped rows logged to: {error_file}")


Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV USD/DailyMVUSD19911995.01.csv
  Row accounting:
    Initial rows                          : 1643017
    Invalid dates removed                 : 0
    Invalid MV removed                    : 0
    MV < 10M removed (row-level)          : 339246
    MV>=10 removed (firm excluded)        : 107557
    Rows written                          : 1196214
    Total removed                         : 446803
    Check (initial-removed)               : 1196214
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV USD/DailyMVUSD19911995.02.csv
  Row accounting:
    Initial rows                          : 970197
    Invalid dates removed                 : 0
    Invalid MV removed                    : 0
    MV < 10M removed (row-level)          : 123829
    MV>=10 removed (firm excluded)        : 39930
    Rows written                          : 806438
    Total removed                         : 163759
    Check (initial-removed)

### Remove Delisted Comp's MVs

In [33]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell takes the standardized long-format market value file `MV_raw.txt`
# and removes "dead" tails for instruments that appear to be delisted.
#
# Delisting detection logic (per DS_ID):
#   - Sort observations by DayDate ascending.
#   - Look from the end of the series backwards and compute the length of the
#     final run of identical MV values (a constant tail).
#   - If this final constant run is at least 20 consecutive rows, and the last
#     MV equals the MV in this run (which it does by construction), we treat
#     the instrument as delisted.
#   - We then DROP ALL ROWS from the first date of this final constant run
#     onwards (i.e., the entire flat tail including the initial 20 days).
#
# Outputs:
#   - `MV_cleaned.txt`  : same structure as MV_raw (DS_ID|DayDate|MV) but with
#                         delisted tails removed.
#   - `MV_delisting_log.txt` : list of instruments identified as delisted with
#                              the date from which they are excluded.
# =============================================================================

import os
import pandas as pd

# Input/output folder (same as in previous cell)
output_folder = Temp_file_path_GO

# Input standardized market value file
mv_input_file = os.path.join(output_folder, "MV_raw.txt")

# Output files
mv_cleaned_file = os.path.join(output_folder, "MV_raw_excl_delisted.txt")
delist_log_file = os.path.join(output_folder, "MV_delisting_audit.txt")

# Remove old outputs if they exist
for path in [mv_cleaned_file, delist_log_file]:
    if os.path.exists(path):
        os.remove(path)

# Read the standardized long-format MV file
# DS_ID is kept as string to avoid losing leading zeros etc.
mv_df = pd.read_csv(
    mv_input_file,
    sep="|",
    dtype={"DS_ID": str}
)

# Parse DayDate to datetime for proper sorting and comparison
mv_df["DayDate"] = pd.to_datetime(mv_df["DayDate"], format="%Y-%m-%d", errors="coerce")

# Drop any rows where DayDate could not be parsed (should normally not occur)
mv_df = mv_df[mv_df["DayDate"].notna()].copy()

# Create output files with headers
with open(mv_cleaned_file, "w", encoding="utf-8") as f_out:
    f_out.write("DS_ID|DayDate|MV\n")

with open(delist_log_file, "w", encoding="utf-8") as f_log:
    f_log.write("DS_ID|DelistFromDate\n")

# Minimum tail length (in consecutive rows) to declare delisting
MIN_TAIL_LENGTH = 20

# Process each instrument separately
for ds_id, grp in mv_df.groupby("DS_ID"):
    g = grp.sort_values("DayDate").copy()
    mv_values = g["MV"].values
    n = len(mv_values)

    # Default: keep full history
    cutoff_date = None

    if n >= MIN_TAIL_LENGTH:
        # Identify the final run of identical MV values from the end backwards
        last_mv = mv_values[-1]
        tail_length = 1

        for i in range(n - 2, -1, -1):
            if mv_values[i] == last_mv:
                tail_length += 1
            else:
                break

        # If the final constant tail is long enough, declare delisting
        if tail_length >= MIN_TAIL_LENGTH:
            # Index (within the sorted group) of the first obs in the final constant run
            start_tail_idx = n - tail_length
            cutoff_date = g.iloc[start_tail_idx]["DayDate"]

    if cutoff_date is not None:
        # Log delisting info: instrument and the date from which we drop values
        with open(delist_log_file, "a", encoding="utf-8") as f_log:
            f_log.write(f"{ds_id}|{cutoff_date.strftime('%Y-%m-%d')}\n")

        # Exclude all rows from cutoff_date onwards (including the tail itself)
        g_filtered = g[g["DayDate"] < cutoff_date].copy()
    else:
        # No delisting detected: keep full history
        g_filtered = g

    # Append remaining rows for this DS_ID to cleaned output file
    if not g_filtered.empty:
        # Convert DayDate back to YYYY-MM-DD string format
        g_filtered["DayDate"] = g_filtered["DayDate"].dt.strftime("%Y-%m-%d")
        g_filtered[["DS_ID", "DayDate", "MV"]].to_csv(
            mv_cleaned_file,
            sep="|",
            index=False,
            header=False,
            mode="a"
        )

print(f"Done. Cleaned MV file written to: {mv_cleaned_file}")
print(f"Delisting log written to: {delist_log_file}")

# -----------------------------
# Cleanup: free memory from this cell
# -----------------------------


# Delete large objects
del mv_df, g, grp, mv_values, g_filtered

# If still in scope
try:
    del ds_id
except:
    pass

import gc
gc.collect()


Done. Cleaned MV file written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/MV_raw_excl_delisted.txt
Delisting log written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/MV_delisting_audit.txt


652

### Transform Total Return Index (TRI)

In [34]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell standardizes a collection of raw daily TRI CSV files into a single
# long-format text file while explicitly logging all dropped rows:
#
#   - Reads all TRI CSV files from a given folder one by one to keep memory
#     usage controlled.
#   - Skips the first metadata row, infers the separator for each CSV, and
#     removes fully empty rows and columns.
#   - Interprets the first remaining row (from start_col onward) as date labels,
#     and the following rows as IDs with TRI values across those date columns.
#   - Reshapes the wide date matrix into long format with columns:
#         DS_ID, DayDate, TRI
#   - Cleans and filters:
#       * Parses DayDate as datetime (day-first). Rows where the date fails to
#         parse are written to TRI_cleaning_errors.txt with reason
#         "Invalid DayDate".
#       * Converts TRI values to numeric. Rows with non-numeric TRI are written
#         to the same error file with reason "Invalid TRI".
#       * Valid rows keep a normalized DayDate in 'YYYY-MM-DD' format.
#   - Appends all valid rows into TRI_raw.txt, using a pipe separator.
#   - Appends all invalid rows into TRI_cleaning_errors.txt, including the
#     originating source file name and a reason for filtering.
# =============================================================================

import os                       # Filesystem operations (paths, existence checks, removals)
import glob                     # Pattern-based file listing for CSV files
import pandas as pd             # DataFrame handling and CSV parsing
from pandas.errors import ParserError  # Specific exception type for CSV parsing issues

# Input folder containing all raw CSV files
input_folder = DailyTotalReturns_file_path

# Output folder where the combined result will be written
output_folder = Temp_file_path_GO

# Column index (0-based) where the date columns begin (column H)
start_col = 7

# Ensure the output directory exists (create if missing)
os.makedirs(output_folder, exist_ok=True)

# Path of the combined output file
output_file = os.path.join(output_folder, "TRI_raw.txt")

# Path for the cleaning error log
error_file = os.path.join(output_folder, "TRI_cleaning_errors.txt")

# Remove old output files if they already exist so this run starts fresh
if os.path.exists(output_file):
    os.remove(output_file)

if os.path.exists(error_file):
    os.remove(error_file)

# Create the output file with header describing the standardized columns
with open(output_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|TRI\n")

# Create the error log file with header describing diagnostic columns
with open(error_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|TRI|Reason|SourceFile\n")

# Collect all CSV files in sorted order for deterministic processing
csv_files = sorted(glob.glob(os.path.join(input_folder, "*.csv")))

# Main loop: process one file at a time to limit memory usage
for input_file in csv_files:
    # Log which file is being processed
    print(f"Processing: {input_file}")
    # Extract just the file name to store as the SourceFile in logs
    basename = os.path.basename(input_file)

    try:
        # Attempt to read the CSV, skipping the first metadata row
        # sep=None lets pandas infer the delimiter dynamically
        df = pd.read_csv(
            input_file,
            header=None,
            sep=None,
            engine="python",
            skiprows=1
        )
    except ParserError:
        # If parsing fails due to malformed lines, retry while skipping bad lines
        df = pd.read_csv(
            input_file,
            header=None,
            sep=None,
            engine="python",
            skiprows=1,
            on_bad_lines="skip"
        )

    # Remove fully empty rows
    df = df.dropna(how="all")
    # Remove fully empty columns
    df = df.dropna(axis=1, how="all")

    # Extract dates (header from start_col onward), IDs (first column), and TRI values matrix
    dates = df.iloc[0, start_col:]      # First row after metadata contains date labels from start_col
    ids = df.iloc[1:, 0]                # First column (excluding the date row) holds DS_ID values
    values = df.iloc[1:, start_col:]    # Remaining rows and columns hold TRI values

    # Assign date strings as column names on the values DataFrame
    values.columns = dates.values

    # Map row indices in values to the corresponding DS_IDs from ids
    id_map = ids.to_dict()

    # Convert from wide (IDs x dates) to long format:
    #   stack -> MultiIndex (row_idx, DayDate) with TRI as values
    long_df = values.stack().reset_index()
    long_df.columns = ["row_idx", "DayDate", "TRI"]

    # Insert DS_ID column using the row index mapping
    long_df["DS_ID"] = long_df["row_idx"].map(id_map)

    # Record originating file for each row (used in error logging)
    long_df["SourceFile"] = basename

    # -------------------------------------------------------------------------
    # 1) Validate DayDate: keep invalid rows separately with reason
    # -------------------------------------------------------------------------
    # Parse DayDate strings as datetime in day-first format (e.g., DD/MM/YYYY)
    parsed_dates = pd.to_datetime(
        long_df["DayDate"],
        errors="coerce",
        dayfirst=True
    )
    # Boolean mask of rows where date parsing failed (NaT)
    invalid_date_mask = parsed_dates.isna()
    # Subset of rows with invalid dates
    invalid_daydate = long_df[invalid_date_mask].copy()

    if not invalid_daydate.empty:
        # Label invalid date rows with a descriptive reason
        invalid_daydate["Reason"] = "Invalid DayDate"
        # Select and order the columns to match the error file header
        invalid_daydate[["DS_ID", "DayDate", "TRI", "Reason", "SourceFile"]].to_csv(
            error_file,
            sep="|",
            index=False,
            mode="a",
            header=False
        )
        # Report count of invalid-date rows for this file
        print(f"  Warning: {len(invalid_daydate)} invalid date entries detected in this file.")

    # Keep only rows with successfully parsed dates for further processing
    valid_df = long_df[~invalid_date_mask].copy()
    # Attach the parsed datetime values for valid rows
    valid_df["DayDate"] = parsed_dates[~invalid_date_mask]

    # Standardize the date representation to 'YYYY-MM-DD' strings
    valid_df["DayDate"] = valid_df["DayDate"].dt.strftime("%Y-%m-%d")

    # -------------------------------------------------------------------------
    # 2) Validate TRI: must be numeric; log invalid rows with reason
    # -------------------------------------------------------------------------
    # Convert TRI values to numeric; non-convertible entries become NaN
    tri_numeric = pd.to_numeric(valid_df["TRI"], errors="coerce")
    # Mask of rows where TRI is invalid (NaN after coercion)
    invalid_tri_mask = tri_numeric.isna()
    # Rows with invalid TRI values
    invalid_tri = valid_df[invalid_tri_mask].copy()

    if not invalid_tri.empty:
        # Label these rows with reason indicating TRI parsing issues
        invalid_tri["Reason"] = "Invalid TRI"
        # Save invalid TRI rows to the error log file
        invalid_tri[["DS_ID", "DayDate", "TRI", "Reason", "SourceFile"]].to_csv(
            error_file,
            sep="|",
            index=False,
            mode="a",
            header=False
        )
        # Report how many invalid TRI entries were found for this file
        print(f"  Warning: {len(invalid_tri)} invalid TRI entries detected in this file.")

    # Keep only rows with valid numeric TRI values
    clean_df = valid_df[~invalid_tri_mask].copy()
    # Assign the numeric TRI series to the clean DataFrame
    clean_df["TRI"] = tri_numeric[~invalid_tri_mask]

    # Select output columns for the standardized long-format result
    result = clean_df[["DS_ID", "DayDate", "TRI"]]

    # Report how many cleaned rows will be written for this file
    print(f"  Rows written: {len(result)}")

    # If there are no valid rows, skip appending to the combined output
    if len(result) == 0:
        continue

    # Append valid rows to the combined TRI_raw output file without header
    result.to_csv(
        output_file,
        sep="|",
        index=False,
        mode="a",
        header=False
    )

# Final summary logs with paths to the combined outputs
print(f"Done. Combined file written to: {output_file}")
print(f"Dropped rows with reasons logged in: {error_file}")


Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.01.csv
  Rows written: 1571667
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.02.csv
  Rows written: 973685
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.03.csv
  Rows written: 2035848
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.04.csv
  Rows written: 1542081
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.05.csv
  Rows written: 1839856
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.06.csv
  Rows written: 2742617
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.07.csv
  Rows written: 1940603
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily Returns USD/DailyReturnsUSD19911995.

### Remove Delisted Comp's TRIs

In [35]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell takes the standardized long-format TRI file `TRI_raw.txt`
# and removes "dead" tails for instruments that appear to be delisted.
#
# Delisting detection logic (per DS_ID):
#   - Sort observations by DayDate ascending.
#   - Look from the end of the series backwards and compute the length of the
#     final run of identical TRI values (a constant tail).
#   - If this final constant run is at least 20 consecutive rows, and the last
#     TRI equals the TRI in this run (true by construction), we treat the
#     instrument as delisted.
#   - We then DROP ALL ROWS from the first date of this final constant run
#     onwards (i.e., the entire flat tail including the initial 20 days).
#
# Outputs:
#   - `TRI_cleaned.txt`  : same structure as TRI_raw (DS_ID|DayDate|TRI) but with
#                          delisted tails removed.
#   - `TRI_delisting_log.txt` : list of instruments identified as delisted with
#                               the date from which they are excluded.
# =============================================================================

import os
import pandas as pd

# Output folder (same as in previous cells)
output_folder = Temp_file_path_GO

# Input standardized TRI file
tri_input_file = os.path.join(output_folder, "TRI_raw.txt")

# Output files
tri_cleaned_file = os.path.join(output_folder, "TRI_raw_excl_delisted.txt")
tri_delist_log_file = os.path.join(output_folder, "TRI_delisting_log.txt")

# Remove old outputs if they exist
for path in [tri_cleaned_file, tri_delist_log_file]:
    if os.path.exists(path):
        os.remove(path)

# Read the standardized long-format TRI file
# DS_ID is kept as string to avoid losing leading zeros etc.
tri_df = pd.read_csv(
    tri_input_file,
    sep="|",
    dtype={"DS_ID": str}
)

# Parse DayDate to datetime for proper sorting and comparison
tri_df["DayDate"] = pd.to_datetime(tri_df["DayDate"], format="%Y-%m-%d", errors="coerce")

# Drop any rows where DayDate could not be parsed (should normally not occur)
tri_df = tri_df[tri_df["DayDate"].notna()].copy()

# Create output files with headers
with open(tri_cleaned_file, "w", encoding="utf-8") as f_out:
    f_out.write("DS_ID|DayDate|TRI\n")

with open(tri_delist_log_file, "w", encoding="utf-8") as f_log:
    f_log.write("DS_ID|DelistFromDate\n")

# Minimum tail length (in consecutive rows) to declare delisting
MIN_TAIL_LENGTH = 20

# Process each instrument separately
for ds_id, grp in tri_df.groupby("DS_ID"):
    g = grp.sort_values("DayDate").copy()
    tri_values = g["TRI"].values
    n = len(tri_values)

    # Default: keep full history
    cutoff_date = None

    if n >= MIN_TAIL_LENGTH:
        # Identify the final run of identical TRI values from the end backwards
        last_tri = tri_values[-1]
        tail_length = 1

        for i in range(n - 2, -1, -1):
            if tri_values[i] == last_tri:
                tail_length += 1
            else:
                break

        # If the final constant tail is long enough, declare delisting
        if tail_length >= MIN_TAIL_LENGTH:
            # Index (within the sorted group) of the first obs in the final constant run
            start_tail_idx = n - tail_length
            cutoff_date = g.iloc[start_tail_idx]["DayDate"]

    if cutoff_date is not None:
        # Log delisting info: instrument and the date from which we drop values
        with open(tri_delist_log_file, "a", encoding="utf-8") as f_log:
            f_log.write(f"{ds_id}|{cutoff_date.strftime('%Y-%m-%d')}\n")

        # Exclude all rows from cutoff_date onwards (including the tail itself)
        g_filtered = g[g["DayDate"] < cutoff_date].copy()
    else:
        # No delisting detected: keep full history
        g_filtered = g

    # Append remaining rows for this DS_ID to cleaned output file
    if not g_filtered.empty:
        # Convert DayDate back to YYYY-MM-DD string format
        g_filtered["DayDate"] = g_filtered["DayDate"].dt.strftime("%Y-%m-%d")
        g_filtered[["DS_ID", "DayDate", "TRI"]].to_csv(
            tri_cleaned_file,
            sep="|",
            index=False,
            header=False,
            mode="a"
        )

print(f"Done. Cleaned TRI file written to: {tri_cleaned_file}")
print(f"Delisting log written to: {tri_delist_log_file}")



# -----------------------------
# Cleanup: free memory from this cell
# -----------------------------
import gc

# Delete the large / important objects from this cell
for name in [
    "tri_df", "g", "grp", "tri_values", "g_filtered",
    "ds_id", "last_tri", "tail_length", "cutoff_date"
]:
    if name in globals():
        del globals()[name]

gc.collect()


Done. Cleaned TRI file written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/TRI_raw_excl_delisted.txt
Delisting log written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/TRI_delisting_log.txt


445

### ID Merge

In [36]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell takes the raw market value (MV_raw_excl_delisted.txt) and total return index
# (TRI_raw_excl_delisted.txt) files and reconciles them with a clean ID mapping table:
#
#   - Loads ID_mapping_clean.txt and forces DS_ID and ID to be strings to avoid
#     mixed-type issues during merges.
#
#   - Defines a chunked helper function `merge_with_mapping` that:
#       * Reads the raw MV/TRI files in chunks to control memory usage.
#       * Left-joins each chunk with the mapping on DS_ID so that all raw rows
#         are retained and rows failing to map can be identified.
#       * Flags rows to be dropped based on:
#           - Missing mapping for DS_ID (no row in the mapping table).
#           - Empty or effectively missing ID after the merge.
#       * Splits each chunk into:
#           - Clean rows with valid IDs, where DS_ID is dropped from the output.
#           - Error rows with a DropReason explaining why they were removed.
#       * Appends clean rows to MV_clean.txt or TRI_clean.txt and error rows
#         to MV_mapping_errors.txt or TRI_mapping_errors.txt, writing headers
#         only once per file.
#       * Ensures that all four output files (MV_clean.txt, TRI_clean.txt,
#         MV_mapping_errors.txt, TRI_mapping_errors.txt) contain a single
#         header row at the top of the file.
#       * Accumulates counts of dropped rows by reason for reporting.
#
#   - Runs the merge function for both MV_raw_excl_delisted.txt and
#     TRI_raw_excl_delisted.txt and prints a summary of total rows read, kept,
#     and dropped, along with a breakdown of drop reasons. This step ensures
#     that only rows with valid mapped IDs are propagated into the cleaned MV
#     and TRI outputs, with all removals fully logged and traceable.
# =============================================================================

import os
import pandas as pd
from collections import Counter

# Base folder where all intermediate files are stored
base_folder = Temp_file_path_GO

# -------------------------------------------------------------------------
# 1. Load ID mapping (force DS_ID and ID to string to avoid mixed-type issues)
# -------------------------------------------------------------------------
mapping_file = os.path.join(base_folder, "ID_mapping_clean.txt")

# Read the mapping file; DS_ID and ID are explicitly typed as strings
mapping_df = pd.read_csv(
    mapping_file,
    sep="|",
    dtype={"DS_ID": "string", "ID": "string"},
    low_memory=False,
)

# Log basic information about the mapping file for traceability
print(f"Loaded mapping file: {mapping_file}")
print("Mapping columns:", list(mapping_df.columns))

# -------------------------------------------------------------------------
# 2. Helper function to merge a raw file with the mapping and clean IDs (chunked)
# -------------------------------------------------------------------------
def merge_with_mapping(raw_filename, clean_filename, error_filename, chunksize=1_000_000):
    """
    Merge a raw DS_ID-based file with the ID mapping and produce:
      - a cleaned file with valid IDs and DS_ID removed
      - an error file with dropped rows and their drop reasons

    The function:
      - Processes the raw file in chunks.
      - Performs a left merge on DS_ID.
      - Flags rows with missing mappings or empty IDs.
      - Writes clean and error rows to separate outputs, ensuring that each
        output file receives its header exactly once.
    """
    # Construct full paths for raw input and clean/error outputs
    raw_path = os.path.join(base_folder, raw_filename)
    clean_path = os.path.join(base_folder, clean_filename)
    error_path = os.path.join(base_folder, error_filename)

    # If the raw file is missing, log a warning and skip processing
    if not os.path.exists(raw_path):
        print(f"WARNING: {raw_path} not found. Skipping.")
        return

    # High-level status log for this raw file
    print(f"\n=== Processing {raw_filename} in chunks of {chunksize} rows ===")

    # Modes for writing clean and error outputs; start with write mode, then switch to append
    clean_mode = "w"
    error_mode = "w"

    # Flags to ensure headers are only written once per output file
    header_written_clean = False
    header_written_error = False

    # Counters for auditing how many rows are processed, kept, and dropped
    total_rows = 0
    total_clean = 0
    total_error = 0

    # Counter to accumulate drop reasons and their frequencies
    drop_reason_counts = Counter()

    # Read raw file in chunks to avoid loading the entire file into memory
    for chunk_idx, chunk in enumerate(
        pd.read_csv(
            raw_path,
            sep="|",
            dtype={"DS_ID": "string"},
            low_memory=False,
            chunksize=chunksize,
        )
    ):
        # Count rows in the current chunk and add to total
        chunk_rows = len(chunk)
        total_rows += chunk_rows
        print(f"  Chunk {chunk_idx + 1}: {chunk_rows} rows")

        # Merge raw chunk with mapping on DS_ID using a left join to preserve all raw rows
        df_merged = chunk.merge(mapping_df, on="DS_ID", how="left", indicator=True)

        # Helper column for ID checks: normalized string representation of ID
        df_merged["ID_str"] = df_merged["ID"].astype(str).str.strip()

        # Initialize DropReason as empty for all rows
        df_merged["DropReason"] = ""

        # 1) Rows with no matching DS_ID in the mapping:
        #    - ID is NaN and merge indicator is "left_only"
        mask_no_mapping = df_merged["ID"].isna() & (df_merged["_merge"] == "left_only")
        df_merged.loc[mask_no_mapping, "DropReason"] = "No matching DS_ID in ID_mapping_clean"

        # 2) Rows where ID is present in mapping but becomes empty or "nan" as a string
        mask_empty_id = (df_merged["DropReason"] == "") & (
            (df_merged["ID_str"] == "") | (df_merged["ID_str"].str.lower() == "nan")
        )
        df_merged.loc[mask_empty_id, "DropReason"] = "Empty ID after merge"

        # Split into error rows (with a DropReason) and clean rows (with no DropReason)
        error_df = df_merged[df_merged["DropReason"] != ""].copy()
        clean_df = df_merged[df_merged["DropReason"] == ""].copy()

        # Drop helper and merge-indicator columns from the clean subset
        clean_df = clean_df.drop(columns=["DropReason", "ID_str", "_merge"], errors="ignore")

        # Drop helper columns (except DropReason) from the error subset
        error_df = error_df.drop(columns=["ID_str", "_merge"], errors="ignore")

        # Remove DS_ID from the clean output; ID now serves as the main identifier
        if "DS_ID" in clean_df.columns:
            clean_df = clean_df.drop(columns=["DS_ID"])

        # Update cumulative counts of clean and error rows
        total_clean += len(clean_df)
        total_error += len(error_df)

        # If there are error rows, update the drop reason frequency counts
        if not error_df.empty:
            drop_reason_counts.update(error_df["DropReason"].value_counts().to_dict())

        # Append clean rows to the clean output file, writing header only once
        if len(clean_df) > 0:
            clean_df.to_csv(
                clean_path,
                sep="|",
                index=False,
                mode=clean_mode,
                header=not header_written_clean,
            )
            header_written_clean = True
            clean_mode = "a"  # Switch to append mode after first write

        # Append error rows to the error output file, writing header only once
        if len(error_df) > 0:
            error_df.to_csv(
                error_path,
                sep="|",
                index=False,
                mode=error_mode,
                header=not header_written_error,
            )
            header_written_error = True
            error_mode = "a"  # Switch to append mode after first write

    # Summary of processing for this raw file
    print(f"\nSummary for {raw_filename}:")
    print(f"  Total rows read: {total_rows}")
    print(f"  Rows kept for {clean_filename}: {total_clean}")
    print(f"  Rows dropped for {error_filename}: {total_error}")

    # If rows were dropped, print breakdown by drop reason
    if total_error > 0:
        print("  Dropped rows by reason:")
        for reason, count in drop_reason_counts.items():
            print(f"    {reason}: {count}")

    # Log paths of generated clean and error files
    print(f"Saved clean file → {clean_path}")
    print(f"Saved error file → {error_path}")

# -------------------------------------------------------------------------
# 3. Run for MV_raw and TRI_raw
# -------------------------------------------------------------------------
merge_with_mapping(
    raw_filename="MV_raw_excl_delisted.txt",
    clean_filename="MV_clean.txt",
    error_filename="MV_mapping_errors.txt",
)

merge_with_mapping(
    raw_filename="TRI_raw_excl_delisted.txt",
    clean_filename="TRI_clean.txt",
    error_filename="TRI_mapping_errors.txt",
)

print("\nDone merging and cleaning MV/TRI with ID mapping.")


Loaded mapping file: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/ID_mapping_clean.txt
Mapping columns: ['DS_ID', 'ID']

=== Processing MV_raw_excl_delisted.txt in chunks of 1000000 rows ===
  Chunk 1: 1000000 rows
  Chunk 2: 1000000 rows
  Chunk 3: 1000000 rows
  Chunk 4: 1000000 rows
  Chunk 5: 1000000 rows
  Chunk 6: 1000000 rows
  Chunk 7: 1000000 rows
  Chunk 8: 1000000 rows
  Chunk 9: 1000000 rows
  Chunk 10: 1000000 rows
  Chunk 11: 1000000 rows
  Chunk 12: 1000000 rows
  Chunk 13: 1000000 rows
  Chunk 14: 1000000 rows
  Chunk 15: 1000000 rows
  Chunk 16: 1000000 rows
  Chunk 17: 1000000 rows
  Chunk 18: 1000000 rows
  Chunk 19: 1000000 rows
  Chunk 20: 1000000 rows
  Chunk 21: 1000000 rows
  Chunk 22: 1000000 rows
  Chunk 23: 1000000 rows
  Chunk 24: 1000000 rows
  Chunk 25: 1000000 rows
  Chunk 26: 1000000 rows
  Chunk 27: 1000000 rows
  Chunk 28: 1000000 rows
  Chunk 29: 1000000 rows
  Chunk 30: 1000000 rows
  Chunk 31: 1000000 rows
  Chunk 32: 1000000 rows


### View Output

In [37]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell performs a quick visual sanity check of the cleaned market value
# and TRI datasets by reading and displaying the first 50 rows of:
#   - MV_clean.txt (cleaned market values with mapped IDs)
#   - TRI_clean.txt (cleaned total return index values with mapped IDs)
#
# Behavior:
#   - Constructs full file paths for MV_clean.txt and TRI_clean.txt based on
#     the Temp_file_path_GO base folder.
#   - Attempts to read up to 50 rows from each file using a pipe separator.
#   - If a file is present, loads its first rows into a DataFrame and displays
#     it in a notebook-friendly format for manual inspection.
#   - If a file is missing, prints a clear error message instead of failing.
# This provides a lightweight way to confirm that previous cleaning and
# merging steps produced outputs in the expected structure and format.
# =============================================================================

import pandas as pd
import os

# Base folder where the general overview and cleaned files are stored
base_folder_go = Temp_file_path_GO

# Full paths for the cleaned MV and TRI files
mv_clean_path = os.path.join(base_folder_go, "MV_clean.txt")
tri_clean_path = os.path.join(base_folder_go, "TRI_clean.txt")

# -------------------------------------------------------------------------
# Inspect first 50 rows of MV_clean.txt
# -------------------------------------------------------------------------
print("First 50 rows of MV_clean.txt:")
try:
    # Read the first 50 rows from the cleaned MV file (pipe-separated)
    df_mv_head = pd.read_csv(mv_clean_path, sep="|", nrows=50)
    # Display the resulting DataFrame for visual inspection in the notebook
    display(df_mv_head)
except FileNotFoundError:
    # If the MV clean file does not exist, inform the user instead of raising
    print(f"Error: {mv_clean_path} not found.")

# -------------------------------------------------------------------------
# Inspect first 50 rows of TRI_clean.txt
# -------------------------------------------------------------------------
print("\nFirst 50 rows of TRI_clean.txt:")
try:
    # Read the first 50 rows from the cleaned TRI file (pipe-separated)
    df_tri_head = pd.read_csv(tri_clean_path, sep="|", nrows=50)
    # Display the resulting DataFrame for visual inspection in the notebook
    display(df_tri_head)
except FileNotFoundError:
    # If the TRI clean file does not exist, inform the user instead of raising
    print(f"Error: {tri_clean_path} not found.")


First 50 rows of MV_clean.txt:


Unnamed: 0,DayDate,MV,ID
0,1994-02-17,19.44,C840P9270
1,1994-02-18,20.02,C840P9270
2,1994-02-21,20.02,C840P9270
3,1994-02-22,20.02,C840P9270
4,1994-02-23,18.59,C840P9270
5,1994-02-24,18.01,C840P9270
6,1994-02-25,17.59,C840P9270
7,1994-02-28,17.16,C840P9270
8,1994-03-01,17.59,C840P9270
9,1994-03-02,17.16,C840P9270



First 50 rows of TRI_clean.txt:


Unnamed: 0,DayDate,TRI,ID
0,1994-02-17,100.0,C840P9270
1,1994-02-18,102.94,C840P9270
2,1994-02-21,102.94,C840P9270
3,1994-02-22,102.94,C840P9270
4,1994-02-23,95.59,C840P9270
5,1994-02-24,92.65,C840P9270
6,1994-02-25,90.44,C840P9270
7,1994-02-28,88.24,C840P9270
8,1994-03-01,90.44,C840P9270
9,1994-03-02,88.24,C840P9270


## MVs in LC

### For Local Currency, Check for Ambiguous Currency Links (Not Problems Detected)

In [38]:
import pandas as pd
import os

# =============================================================================
# SUMMARY OF THIS SCRIPT
# -----------------------------------------------------------------------------
# This script identifies firms with conflicting currency data (Multiple PCURs).
#
# LOGIC:
# 1. Loads a specific Target CSV file (e.g., "Constituents.01.csv").
# 2. Loads the Clean ID Mapping File ("ID_mapping_clean.txt").
# 3. FILTERS the Target CSV to keep only rows where the 'Code' exists in the 
#    mapping file (ensuring we only check relevant companies).
# 4. CHECKS if any remaining 'Code' has more than one unique value in the 
#    'PCUR' (Price Currency) column.
# 5. SAVES a report ("MultipleCurrencyMVs.xlsx") containing all rows for 
#    any companies found with multiple currencies, so you can inspect them.
# =============================================================================

# ==========================================
# CONFIGURATION
# ==========================================
CONFIG = {
    # 1. Name of the CSV file you want to check
    'target_csv_name': "Constituents.01.csv", 
    
    # 2. Path to the folder containing that CSV
    'input_folder': f"{Input_file_path}",

    # 3. Path where the output Excel will be saved
    'output_folder': f"{Temp_file_path_GO}",

    # 4. Path to the ID mapping file
    'mapping_file_path': f"{Temp_file_path_GO}/ID_mapping_clean.txt",
    
    # 5. Output file name for the results
    'output_file': 'MultipleCurrencyMVs.xlsx'
}

def check_multiple_pcur(config):
    # Construct full path for the input CSV
    csv_path = os.path.join(config['input_folder'], config['target_csv_name'])
    
    # ---------------------------------------------------------
    # 1. Load the Target CSV
    # ---------------------------------------------------------
    print(f"Loading {config['target_csv_name']} from {config['input_folder']}...")
    try:
        df = pd.read_csv(
            csv_path, 
            low_memory=False, 
            encoding='ISO-8859-1' # Safe encoding for your files
        )
        print(f"  - Total rows in CSV: {len(df)}")
    except FileNotFoundError:
        print(f"Error: Could not find {csv_path}")
        return

    # ---------------------------------------------------------
    # 2. Load the ID Mapping File
    # ---------------------------------------------------------
    print(f"Loading Mapping file...")
    try:
        df_map = pd.read_csv(
            config['mapping_file_path'], 
            sep='|', 
            low_memory=False, 
            encoding='ISO-8859-1'
        )
        # Get valid IDs as a set of strings
        valid_ids = set(df_map['DS_ID'].astype(str))
        
        # --- NEW: Print unique count in mapping file ---
        print(f"  - Unique IDs in Mapping File: {len(valid_ids)}")
        
    except Exception as e:
        print(f"Error reading mapping file: {e}")
        return

    # ---------------------------------------------------------
    # 3. Filter: Keep only Codes present in Mapping File
    # ---------------------------------------------------------
    if 'Code' not in df.columns:
        print("Error: Column 'Code' not found in the CSV.")
        return

    # Ensure Code is string for matching
    df['Code'] = df['Code'].astype(str)
    
    # Filter
    df_subset = df[df['Code'].isin(valid_ids)].copy()
    
    # --- NEW: Print stats after intersection ---
    unique_codes_in_subset = df_subset['Code'].nunique()
    print(f"  - Rows after matching with ID Mapping: {len(df_subset)}")
    print(f"  - Unique Codes in Target CSV after intersection: {unique_codes_in_subset}")

    if df_subset.empty:
        print("No matching Codes found. Stopping.")
        return

    # ---------------------------------------------------------
    # 4. Check for Multiple PCURs
    # ---------------------------------------------------------
    if 'PCUR' not in df_subset.columns:
        print("Error: Column 'PCUR' not found in the CSV.")
        return

    # Group by Code and count unique PCUR values
    pcur_counts = df_subset.groupby('Code')['PCUR'].nunique()
    
    # Filter for Codes that have > 1 unique PCUR
    multi_pcur_codes = pcur_counts[pcur_counts > 1].index
    
    if len(multi_pcur_codes) > 0:
        print(f"\nFOUND {len(multi_pcur_codes)} CODES WITH MULTIPLE PCURs!")
        
        # Extract the specific rows for inspection
        result_df = df_subset[df_subset['Code'].isin(multi_pcur_codes)].sort_values(by=['Code', 'PCUR'])
        
        # Save to Excel
        output_path = os.path.join(config['output_folder'], config['output_file'])
        result_df.to_excel(output_path, index=False)
        
        print(f"Details saved to: {output_path}")
        print("\nPreview of Codes with multiple PCURs:")
        print(result_df[['Code', 'PCUR']].drop_duplicates().head(10))
    else:
        print("\nNo Codes were found with multiple PCUR values.")

# Run the function
check_multiple_pcur(CONFIG)

Loading Constituents.01.csv from /home/jovyan/work/hpool1/pseidel/test/Input...
  - Total rows in CSV: 167649
Loading Mapping file...
  - Unique IDs in Mapping File: 95104
  - Rows after matching with ID Mapping: 149033
  - Unique Codes in Target CSV after intersection: 95104

No Codes were found with multiple PCUR values.


### Transform Total MarketValues in Local Currency (CL_MV)

In [39]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell standardizes a collection of raw daily LC_MV CSV files into a single
# long-format text file while explicitly logging all dropped rows:
#
#   - Reads all LC_MV CSV files from a given folder one by one to keep memory
#     usage controlled.
#   - Skips the first metadata row, infers the separator for each CSV, and
#     removes fully empty rows and columns.
#   - Interprets the first remaining row (from start_col onward) as date labels,
#     and the following rows as IDs with LC_MV values across those date columns.
#   - Reshapes the wide date matrix into long format with columns:
#         DS_ID, DayDate, LC_MV
#   - Cleans and filters:
#       * Parses DayDate as datetime (day-first). Rows where the date fails to
#         parse are written to LC_MV_cleaning_errors.txt with reason
#         "Invalid DayDate".
#       * Converts LC_MV values to numeric. Rows with non-numeric TRLC_MVI are written
#         to the same error file with reason "Invalid LC_MV".
#       * Valid rows keep a normalized DayDate in 'YYYY-MM-DD' format.
#   - Appends all valid rows into LC_MV_raw.txt, using a pipe separator.
#   - Appends all invalid rows into LC_MV_cleaning_errors.txt, including the
#     originating source file name and a reason for filtering.
# =============================================================================

import os                       # Filesystem operations (paths, existence checks, removals)
import glob                     # Pattern-based file listing for CSV files
import pandas as pd             # DataFrame handling and CSV parsing
from pandas.errors import ParserError  # Specific exception type for CSV parsing issues

# Input folder containing all raw CSV files
input_folder = MarketValues_file_path_LC

# Output folder where the combined result will be written
output_folder = Temp_file_path_GO

# Column index (0-based) where the date columns begin (column H)
start_col = 7

# Ensure the output directory exists (create if missing)
os.makedirs(output_folder, exist_ok=True)

# Path of the combined output file
output_file = os.path.join(output_folder, "LC_MV_raw.txt")

# Path for the cleaning error log
error_file = os.path.join(output_folder, "LC_MV_cleaning_errors.txt")

# Remove old output files if they already exist so this run starts fresh
if os.path.exists(output_file):
    os.remove(output_file)

if os.path.exists(error_file):
    os.remove(error_file)

# Create the output file with header describing the standardized columns
with open(output_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|LC_MV\n")

# Create the error log file with header describing diagnostic columns
with open(error_file, "w", encoding="utf-8") as f:
    f.write("DS_ID|DayDate|LC_MV|Reason|SourceFile\n")

# Collect all CSV files in sorted order for deterministic processing
csv_files = sorted(glob.glob(os.path.join(input_folder, "*.csv")))

# Main loop: process one file at a time to limit memory usage
for input_file in csv_files:
    # Log which file is being processed
    print(f"Processing: {input_file}")
    # Extract just the file name to store as the SourceFile in logs
    basename = os.path.basename(input_file)

    try:
        # Attempt to read the CSV, skipping the first metadata row
        # sep=None lets pandas infer the delimiter dynamically
        df = pd.read_csv(
            input_file,
            header=None,
            sep=None,
            engine="python",
            skiprows=1
        )
    except ParserError:
        # If parsing fails due to malformed lines, retry while skipping bad lines
        df = pd.read_csv(
            input_file,
            header=None,
            sep=None,
            engine="python",
            skiprows=1,
            on_bad_lines="skip"
        )

    # Remove fully empty rows
    df = df.dropna(how="all")
    # Remove fully empty columns
    df = df.dropna(axis=1, how="all")

    # Extract dates (header from start_col onward), IDs (first column), and LC_MV values matrix
    dates = df.iloc[0, start_col:]      # First row after metadata contains date labels from start_col
    ids = df.iloc[1:, 0]                # First column (excluding the date row) holds DS_ID values
    values = df.iloc[1:, start_col:]    # Remaining rows and columns hold LC_MV values

    # Assign date strings as column names on the values DataFrame
    values.columns = dates.values

    # Map row indices in values to the corresponding DS_IDs from ids
    id_map = ids.to_dict()

    # Convert from wide (IDs x dates) to long format:
    #   stack -> MultiIndex (row_idx, DayDate) with LC_MV as values
    long_df = values.stack().reset_index()
    long_df.columns = ["row_idx", "DayDate", "LC_MV"]

    # Insert DS_ID column using the row index mapping
    long_df["DS_ID"] = long_df["row_idx"].map(id_map)

    # Record originating file for each row (used in error logging)
    long_df["SourceFile"] = basename

    # -------------------------------------------------------------------------
    # 1) Validate DayDate: keep invalid rows separately with reason
    # -------------------------------------------------------------------------
    # Parse DayDate strings as datetime in day-first format (e.g., DD/MM/YYYY)
    parsed_dates = pd.to_datetime(
        long_df["DayDate"],
        errors="coerce",
        dayfirst=True
    )
    # Boolean mask of rows where date parsing failed (NaT)
    invalid_date_mask = parsed_dates.isna()
    # Subset of rows with invalid dates
    invalid_daydate = long_df[invalid_date_mask].copy()

    if not invalid_daydate.empty:
        # Label invalid date rows with a descriptive reason
        invalid_daydate["Reason"] = "Invalid DayDate"
        # Select and order the columns to match the error file header
        invalid_daydate[["DS_ID", "DayDate", "LC_MV", "Reason", "SourceFile"]].to_csv(
            error_file,
            sep="|",
            index=False,
            mode="a",
            header=False
        )
        # Report count of invalid-date rows for this file
        print(f"  Warning: {len(invalid_daydate)} invalid date entries detected in this file.")

    # Keep only rows with successfully parsed dates for further processing
    valid_df = long_df[~invalid_date_mask].copy()
    # Attach the parsed datetime values for valid rows
    valid_df["DayDate"] = parsed_dates[~invalid_date_mask]

    # Standardize the date representation to 'YYYY-MM-DD' strings
    valid_df["DayDate"] = valid_df["DayDate"].dt.strftime("%Y-%m-%d")

    # -------------------------------------------------------------------------
    # 2) Validate LC_MV: must be numeric; log invalid rows with reason
    # -------------------------------------------------------------------------
    # Convert LC_MV values to numeric; non-convertible entries become NaN
    tri_numeric = pd.to_numeric(valid_df["LC_MV"], errors="coerce")
    # Mask of rows where LC_MV is invalid (NaN after coercion)
    invalid_tri_mask = tri_numeric.isna()
    # Rows with invalid LC_MV values
    invalid_tri = valid_df[invalid_tri_mask].copy()

    if not invalid_tri.empty:
        # Label these rows with reason indicating LC_MV parsing issues
        invalid_tri["Reason"] = "Invalid LC_MV"
        # Save invalid LC_MV rows to the error log file
        invalid_tri[["DS_ID", "DayDate", "LC_MV", "Reason", "SourceFile"]].to_csv(
            error_file,
            sep="|",
            index=False,
            mode="a",
            header=False
        )
        # Report how many invalid LC_MV entries were found for this file
        print(f"  Warning: {len(invalid_tri)} invalid LC_MV entries detected in this file.")

    # Keep only rows with valid numeric LC_MV values
    clean_df = valid_df[~invalid_tri_mask].copy()
    # Assign the numeric LC_MV series to the clean DataFrame
    clean_df["LC_MV"] = tri_numeric[~invalid_tri_mask]

    # Select output columns for the standardized long-format result
    result = clean_df[["DS_ID", "DayDate", "LC_MV"]]

    # Report how many cleaned rows will be written for this file
    print(f"  Rows written: {len(result)}")

    # If there are no valid rows, skip appending to the combined output
    if len(result) == 0:
        continue

    # Append valid rows to the combined TRI_raw output file without header
    result.to_csv(
        output_file,
        sep="|",
        index=False,
        mode="a",
        header=False
    )

# Final summary logs with paths to the combined outputs
print(f"Done. Combined file written to: {output_file}")
print(f"Dropped rows with reasons logged in: {error_file}")


Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.01.csv
  Rows written: 1643017
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.02.csv
  Rows written: 970197
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.03.csv
  Rows written: 1882282
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.04.csv
  Rows written: 1538379
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.05.csv
  Rows written: 1836862
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.06.csv
  Rows written: 2742539
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.07.csv
  Rows written: 1933145
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV LC/DailyMVLC19911995.08.csv
  Rows written: 1101462
Processing: /home/jovyan/work/hpool1/pseidel/test/Input/Daily MV 

### Remove Delisted Comp's MVs

In [40]:
# =============================================================================
# CELL SUMMARY
# -----------------------------------------------------------------------------
# This cell takes the standardized long-format market value file `LC_MV_raw.txt`
# and removes "dead" tails for instruments that appear to be delisted.
#
# Delisting detection logic (per DS_ID):
#   - Sort observations by DayDate ascending.
#   - Look from the end of the series backwards and compute the length of the
#     final run of identical MV values (a constant tail).
#   - If this final constant run is at least 20 consecutive rows, and the last
#     MV equals the MV in this run (which it does by construction), we treat
#     the instrument as delisted.
#   - We then DROP ALL ROWS from the first date of this final constant run
#     onwards (i.e., the entire flat tail including the initial 20 days).
#
# Outputs:
#   - `LC_MV_cleaned.txt`  : same structure as LC_MV_raw (DS_ID|DayDate|MV) but with
#                         delisted tails removed.
#   - `LC_MV_delisting_log.txt` : list of instruments identified as delisted with
#                              the date from which they are excluded.
# =============================================================================

import os
import pandas as pd

# Input/output folder (same as in previous cell)
output_folder = Temp_file_path_GO

# Input standardized market value file
mv_input_file = os.path.join(output_folder, "LC_MV_raw.txt")

# Output files
mv_cleaned_file = os.path.join(output_folder, "LC_MV_raw_excl_delisted.txt")
delist_log_file = os.path.join(output_folder, "LC_MV_delisting_audit.txt")

# Remove old outputs if they exist
for path in [mv_cleaned_file, delist_log_file]:
    if os.path.exists(path):
        os.remove(path)

# Read the standardized long-format MV file
# DS_ID is kept as string to avoid losing leading zeros etc.
mv_df = pd.read_csv(
    mv_input_file,
    sep="|",
    dtype={"DS_ID": str}
)

# Parse DayDate to datetime for proper sorting and comparison
mv_df["DayDate"] = pd.to_datetime(mv_df["DayDate"], format="%Y-%m-%d", errors="coerce")

# Drop any rows where DayDate could not be parsed (should normally not occur)
mv_df = mv_df[mv_df["DayDate"].notna()].copy()

# Create output files with headers
with open(mv_cleaned_file, "w", encoding="utf-8") as f_out:
    f_out.write("DS_ID|DayDate|MV\n")

with open(delist_log_file, "w", encoding="utf-8") as f_log:
    f_log.write("DS_ID|DelistFromDate\n")

# Minimum tail length (in consecutive rows) to declare delisting
MIN_TAIL_LENGTH = 20

# Process each instrument separately
for ds_id, grp in mv_df.groupby("DS_ID"):
    g = grp.sort_values("DayDate").copy()
    mv_values = g["LC_MV"].values
    n = len(mv_values)

    # Default: keep full history
    cutoff_date = None

    if n >= MIN_TAIL_LENGTH:
        # Identify the final run of identical MV values from the end backwards
        last_mv = mv_values[-1]
        tail_length = 1

        for i in range(n - 2, -1, -1):
            if mv_values[i] == last_mv:
                tail_length += 1
            else:
                break

        # If the final constant tail is long enough, declare delisting
        if tail_length >= MIN_TAIL_LENGTH:
            # Index (within the sorted group) of the first obs in the final constant run
            start_tail_idx = n - tail_length
            cutoff_date = g.iloc[start_tail_idx]["DayDate"]

    if cutoff_date is not None:
        # Log delisting info: instrument and the date from which we drop values
        with open(delist_log_file, "a", encoding="utf-8") as f_log:
            f_log.write(f"{ds_id}|{cutoff_date.strftime('%Y-%m-%d')}\n")

        # Exclude all rows from cutoff_date onwards (including the tail itself)
        g_filtered = g[g["DayDate"] < cutoff_date].copy()
    else:
        # No delisting detected: keep full history
        g_filtered = g

    # Append remaining rows for this DS_ID to cleaned output file
    if not g_filtered.empty:
        # Convert DayDate back to YYYY-MM-DD string format
        g_filtered["DayDate"] = g_filtered["DayDate"].dt.strftime("%Y-%m-%d")
        g_filtered[["DS_ID", "DayDate", "LC_MV"]].to_csv(
            mv_cleaned_file,
            sep="|",
            index=False,
            header=False,
            mode="a"
        )

print(f"Done. Cleaned MV file written to: {mv_cleaned_file}")
print(f"Delisting log written to: {delist_log_file}")

# -----------------------------
# Cleanup: free memory from this cell
# -----------------------------


# Delete large objects
del mv_df, g, grp, mv_values, g_filtered

# If still in scope
try:
    del ds_id
except:
    pass

import gc
gc.collect()


Done. Cleaned MV file written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_raw_excl_delisted.txt
Delisting log written to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_delisting_audit.txt


0

### Merge the Local Currency of MV (1:Many Relationship as Wanted; Few Rows Lost Due to Missing PCUR)

In [41]:
import pandas as pd
import os

# =============================================================================
# SUMMARY OF THIS SCRIPT
# -----------------------------------------------------------------------------
# 1. Loads the "Constituents" CSV and extracts 'Code' & 'PCUR'.
# 2. DEDUPLICATES the Constituents file so each Code appears only once.
# 3. Loads the "Stock Data" TXT (pipe-separated).
# 4. Analyzes the merge keys to confirm the relationship is now One-to-Many.
# 5. Merges the two datasets on Code = DS_ID.
# 6. Saves the merged result to a PIPE-SEPARATED file.
# =============================================================================

# ==========================================
# CONFIGURATION
# ==========================================
CONFIG = {
    # 1. Path to the Constituents CSV (Source of Code & PCUR)
    'constituents_csv_path': f"{Input_file_path}/Constituents.01.csv",
    
    # 2. Path to the Daily Stock Data TXT (Pipe-separated)
    'stock_data_txt_path': f"{Temp_file_path_GO}/LC_MV_raw_excl_delisted.txt",
    
    # 3. Output file path
    'output_file_path': f"{Temp_file_path_GO}/LC_MV_pre_clean.txt",
    
    # 4. Join Keys
    'merge_key_csv': 'Code',
    'merge_key_txt': 'DS_ID'
}

def merge_and_track_relations(config):
    # ---------------------------------------------------------
    # 1. Load Constituents File (Left Side)
    # ---------------------------------------------------------
    print(f"Loading CSV: {config['constituents_csv_path']}...")
    try:
        df_csv = pd.read_csv(
            config['constituents_csv_path'], 
            low_memory=False, 
            encoding='ISO-8859-1'
        )
        
        if 'Code' not in df_csv.columns or 'PCUR' not in df_csv.columns:
            print(f"Error: Columns 'Code' and 'PCUR' required in CSV.")
            return

        # Extract only relevant columns
        df_left = df_csv[['Code', 'PCUR']].copy()
        df_left[config['merge_key_csv']] = df_left[config['merge_key_csv']].astype(str)
        
        print(f"  - Loaded {len(df_left)} rows. Columns extracted: Code, PCUR")

        # --- NEW: DEDUPLICATE ---
        rows_before = len(df_left)
        # Drop duplicates based on the 'Code' column, keeping the first occurrence
        df_left = df_left.drop_duplicates(subset=[config['merge_key_csv']], keep='first')
        rows_after = len(df_left)
        
        if rows_before > rows_after:
            print(f"  - DEDUPLICATION: Removed {rows_before - rows_after} duplicate Codes.")
            print(f"  - Unique rows remaining: {rows_after}")
        else:
            print(f"  - No duplicates found in CSV.")
        
    except FileNotFoundError:
        print(f"Error: CSV file not found at {config['constituents_csv_path']}")
        return

    # ---------------------------------------------------------
    # 2. Load Stock Data File (Right Side)
    # ---------------------------------------------------------
    print(f"Loading TXT: {config['stock_data_txt_path']}...")
    try:
        df_right = pd.read_csv(
            config['stock_data_txt_path'], 
            sep='|', 
            low_memory=False, 
            encoding='ISO-8859-1'
        )
        
        if config['merge_key_txt'] not in df_right.columns:
            print(f"Error: Merge key '{config['merge_key_txt']}' not found in TXT file.")
            return

        df_right[config['merge_key_txt']] = df_right[config['merge_key_txt']].astype(str)
        print(f"  - Loaded {len(df_right)} rows.")

    except FileNotFoundError:
        print(f"Error: TXT file not found at {config['stock_data_txt_path']}")
        return

    # ---------------------------------------------------------
    # 3. RELATIONSHIP TRACKING / ANALYSIS
    # ---------------------------------------------------------
    print("\n" + "="*40)
    print("RELATIONSHIP ANALYSIS")
    print("="*40)

    # Calculate max occurrences
    left_key_counts = df_left[config['merge_key_csv']].value_counts()
    right_key_counts = df_right[config['merge_key_txt']].value_counts()

    max_left_dups = left_key_counts.max() if not left_key_counts.empty else 0
    max_right_dups = right_key_counts.max() if not right_key_counts.empty else 0

    left_is_unique = (max_left_dups <= 1)
    right_is_unique = (max_right_dups <= 1)

    # Determine Type
    relation_type = "Unknown"
    if left_is_unique and right_is_unique:
        relation_type = "One-to-One (1:1)"
    elif left_is_unique and not right_is_unique:
        relation_type = "One-to-Many (1:m)"
    elif not left_is_unique and right_is_unique:
        relation_type = "Many-to-One (m:1)"
    else:
        relation_type = "Many-to-Many (m:m)"

    print(f"Merge Key (CSV): '{config['merge_key_csv']}'")
    print(f"Merge Key (TXT): '{config['merge_key_txt']}'")
    print("-" * 30)
    print(f"Max duplicates in CSV (Code):  {max_left_dups}  [{'Unique' if left_is_unique else 'Duplicates Present'}]")
    print(f"Max duplicates in TXT (DS_ID): {max_right_dups}  [{'Unique' if right_is_unique else 'Duplicates Present'}]")
    print("-" * 30)
    print(f"DETECTED RELATIONSHIP: {relation_type}")
    print("-" * 30)
    
    # ---------------------------------------------------------
    # 4. Merge
    # ---------------------------------------------------------
    print("Merging datasets...")
    
    merged_df = pd.merge(
        df_left,
        df_right,
        left_on=config['merge_key_csv'],
        right_on=config['merge_key_txt'],
        how='inner' 
    )
    
    print(f"  - Rows in CSV: {len(df_left)}")
    print(f"  - Rows in TXT: {len(df_right)}")
    print(f"  - Rows in Merged Result: {len(merged_df)}")

    # ---------------------------------------------------------
    # 5. Save
    # ---------------------------------------------------------
    if not merged_df.empty:
        # Create directory if it doesn't exist
        out_dir = os.path.dirname(config['output_file_path'])
        if out_dir and not os.path.exists(out_dir):
            os.makedirs(out_dir)
            
        # Save as pipe-separated
        merged_df.to_csv(config['output_file_path'], index=False, sep='|')
        print(f"\nSuccess! File saved to: {config['output_file_path']}")
    else:
        print("\nWarning: Merged dataframe is empty. No file was saved.")

# Run the function
merge_and_track_relations(CONFIG)

Loading CSV: /home/jovyan/work/hpool1/pseidel/test/Input/Constituents.01.csv...
  - Loaded 167649 rows. Columns extracted: Code, PCUR
  - DEDUPLICATION: Removed 56299 duplicate Codes.
  - Unique rows remaining: 111350
Loading TXT: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_raw_excl_delisted.txt...
  - Loaded 337375075 rows.

RELATIONSHIP ANALYSIS
Merge Key (CSV): 'Code'
Merge Key (TXT): 'DS_ID'
------------------------------
Max duplicates in CSV (Code):  1  [Unique]
Max duplicates in TXT (DS_ID): 17310  [Duplicates Present]
------------------------------
DETECTED RELATIONSHIP: One-to-Many (1:m)
------------------------------
Merging datasets...
  - Rows in CSV: 111350
  - Rows in TXT: 337375075
  - Rows in Merged Result: 337233221

Success! File saved to: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_pre_clean.txt


### Remove Non-mappable IDs and Add ID Column

In [42]:
import pandas as pd
import os
import gc

# =============================================================================
# SUMMARY OF THIS SCRIPT
# -----------------------------------------------------------------------------
# This script filters and standardizes the pre-cleaned stock data using a
# high-performance dictionary mapping approach (optimized for High RAM).
#
# LOGIC:
# 1. Loads "ID_mapping_clean.txt" into a Python Dictionary (Hash Map).
#    - Key: DS_ID, Value: ID
# 2. Loads "LC_MV_pre_clean.txt" (Dataset with DS_ID, DayDate, LC_MV).
# 3. Maps 'DS_ID' to 'ID' using the dictionary.
#    - Fast vectorized lookup.
#    - IDs not found in the dictionary become NaN.
# 4. Filters out rows where ID is NaN (unmapped rows).
# 5. Removes the old 'DS_ID' and 'Code' columns.
# 6. Reorders columns to put 'ID' first and saves to "LC_MV_clean.txt".
# =============================================================================

# ==========================================
# CONFIGURATION
# ==========================================
CONFIG = {
    'input_file': f"{Temp_file_path_GO}/LC_MV_pre_clean.txt",
    'mapping_file': f"{Temp_file_path_GO}/ID_mapping_clean.txt",
    'output_file': f"{Temp_file_path_GO}/LC_MV_clean.txt",
    'sep': '|'
}

def fast_filter_and_replace(config):
    # ---------------------------------------------------------
    # 1. Load ID Mapping directly into a Dictionary
    # ---------------------------------------------------------
    print(f"1. Loading Mapping Dict: {config['mapping_file']}...")
    
    # Read just the two needed columns
    df_map = pd.read_csv(
        config['mapping_file'], 
        sep=config['sep'], 
        usecols=['ID', 'DS_ID'],
        dtype=str
    )
    
    # Convert to dictionary: { 'DS_ID_Value': 'ID_Value' }
    # This hash map is extremely fast for lookups
    id_mapper = dict(zip(df_map['DS_ID'], df_map['ID']))
    
    del df_map
    gc.collect()
    print(f"   Mapped loaded. Contains {len(id_mapper):,} keys.")

    # ---------------------------------------------------------
    # 2. Load Stock Data
    # ---------------------------------------------------------
    print(f"2. Loading Stock Data: {config['input_file']}...")
    df_main = pd.read_csv(
        config['input_file'], 
        sep=config['sep'], 
        low_memory=False, 
        dtype={'DS_ID': str} # Important for matching
    )
    initial_rows = len(df_main)
    print(f"   Loaded {initial_rows:,} rows.")

    # ---------------------------------------------------------
    # 3. FAST OPERATION: MAP & DROP
    # ---------------------------------------------------------
    print("3. Mapping IDs (Vectorized)...")

    # This single line does the lookup. 
    # If DS_ID is in the dict, it gives the ID. 
    # If NOT, it returns NaN.
    df_main['ID'] = df_main['DS_ID'].map(id_mapper)

    # Filter: Drop rows where ID is NaN (meaning DS_ID was not in map)
    before_drop = len(df_main)
    df_main.dropna(subset=['ID'], inplace=True)
    rows_dropped = initial_rows - len(df_main)
    
    print(f"   Mapping complete. Dropped {rows_dropped:,} unmapped rows.")

    # ---------------------------------------------------------
    # 4. Cleanup & Save
    # ---------------------------------------------------------
    # Drop old columns
    cols_to_drop = ['DS_ID', 'Code']
    df_main.drop(columns=[c for c in cols_to_drop if c in df_main.columns], inplace=True)

    # Reorder columns: ID first
    cols = ['ID'] + [c for c in df_main.columns if c != 'ID']
    df_main = df_main[cols]

    print(f"4. Saving to {config['output_file']}...")
    df_main.to_csv(config['output_file'], index=False, sep=config['sep'])

    # Final Stats
    print("\n" + "="*40)
    print("FINAL STATS")
    print("="*40)
    print(f"Total Rows:   {len(df_main):,}")
    print(f"Unique IDs:   {df_main['ID'].nunique():,}")
    print("Header:       ", df_main.columns.tolist())

# Run
fast_filter_and_replace(CONFIG)

1. Loading Mapping Dict: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/ID_mapping_clean.txt...
   Mapped loaded. Contains 95,104 keys.
2. Loading Stock Data: /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_pre_clean.txt...
   Loaded 337,233,221 rows.
3. Mapping IDs (Vectorized)...
   Mapping complete. Dropped 21,994,946 unmapped rows.
4. Saving to /home/jovyan/work/hpool1/pseidel/test/Temp/TempGeneralOverview/LC_MV_clean.txt...

FINAL STATS
Total Rows:   315,238,275
Unique IDs:   85,049
Header:        ['ID', 'PCUR', 'DayDate', 'MV']
