1. Setup & Target Files

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np

PROC = Path("data/processed")
TBL  = Path("reports/tables");  TBL.mkdir(parents=True, exist_ok=True)

# Default target (can add/subtract)
FILES = [
    PROC/"gold_monthly_clean_2020_2025.csv",
    PROC/"btc_monthly_close_2020_2025.csv",
    PROC/"merged_gold_btc_monthly_2020_2025.csv",
    PROC/"monthly_returns_gold_btc_2020_2025.csv",
]

# Also fetch all other CSVs in processed (if any)
for extra in PROC.glob("*.csv"):
    if extra not in FILES:
        FILES.append(extra)

print("Processed files:", [f.name for f in FILES if f.exists()])


Processed files: ['gold_monthly_clean_2020_2025.csv', 'btc_monthly_close_2020_2025.csv', 'merged_gold_btc_monthly_2020_2025.csv', 'monthly_returns_gold_btc_2020_2025.csv']


2. Function Profiler (Column & Dataset-Level)

In [2]:
from typing import Dict, List, Tuple, Any

def _infer_role(col: str) -> str:
    c = col.lower()
    if c == "date": return "key(time)"
    if "ret" in c:  return "feature(return)"
    if "usd" in c:  return "feature(level)"
    return "feature"

def _infer_unit(col: str) -> str:
    c = col.lower()
    if c.endswith("_usd") or "usd" in c: return "USD"
    if "ret" in c: return "log return (unitless)"
    if c == "date": return "YYYY-MM (EOM)"
    return ""

def _infer_desc(file: str, col: str) -> str:
    f = file.lower()
    c = col.lower()
    if f.startswith("gold") and c=="gold_usd": return "Monthly gold price (USD), end-of-month close"
    if f.startswith("btc")  and c=="btc_usd":  return "Monthly BTC price (USD), end-of-month close"
    if "merged" in f and c in ("gold_usd","btc_usd"): return f"Merged {col} at EOM"
    if "returns" in f and "ret" in c: return f"Log return of {col.split('_')[0].title()} (month-over-month)"
    if c=="date": return "End-of-month timestamp (EOM)"
    return ""

def profile_file(path: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Return (columns_profile, dataset_profile)."""
    # Parse Date if any
    df = pd.read_csv(path)
    if "Date" in df.columns:
        df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
        df = df.set_index("Date").sort_index()
        # Normalize to EOM if index is datetime
        if isinstance(df.index, pd.DatetimeIndex):
            df.index = df.index.to_period("M").to_timestamp("M")

    # Column-level
    rows: List[Dict[str, Any]] = []
    for col in (["Date"] if "Date" in df.reset_index().columns else []) + [c for c in df.columns if c!="Date"]:
        s = (df.reset_index()[col] if "Date" in df.index.names else df[col]) if col!="Date" else df.index.to_series(name="Date")
        dtype = str(s.dtype)
        non_null = int(s.notna().sum())
        nulls    = int(s.isna().sum())
        distinct = int(s.nunique(dropna=True))
        example  = s.dropna().iloc[0] if non_null>0 else np.nan
        # min/max for numeric or datetime
        if np.issubdtype(s.dtype, np.number):
            vmin = float(np.nanmin(s)) if non_null else np.nan
            vmax = float(np.nanmax(s)) if non_null else np.nan
        elif np.issubdtype(s.dtype, "datetime64[ns]"):
            vmin = s.min()
            vmax = s.max()
        else:
            vmin = vmax = np.nan

        rows.append({
            "file": path.name,
            "column": col,
            "dtype": dtype,
            "non_null": non_null,
            "nulls": nulls,
            "distinct": distinct,
            "min": vmin,
            "max": vmax,
            "example": example,
            "unit": _infer_unit(col),
            "role": _infer_role(col),
            "description": _infer_desc(path.name, col),   # boleh diedit manual nanti
            "source": "Processed (pipeline 02–07)",
            "transform": ("Hourly→EOM monthly close" if "btc" in path.name.lower()
                          else "Monthly EOM; cleaned & filtered 2020–2025" if "gold" in path.name.lower()
                          else "Merged EOM" if "merged" in path.name.lower()
                          else "Log returns (Δ ln level)")
        })

    columns_profile = pd.DataFrame(rows)

    # Compact dataset-level
    if isinstance(df.index, pd.DatetimeIndex):
        start, end = (df.index.min(), df.index.max())
        # EOM compliance
        eom_expected = pd.period_range("2020-01","2025-12", freq="M").to_timestamp("M")
        missing = eom_expected.difference(df.index)
        dupe_idx = int(pd.Index(df.index).duplicated().sum())
    else:
        start = end = pd.NaT
        missing = pd.DatetimeIndex([])
        dupe_idx = 0

    dataset_profile = pd.DataFrame([{
        "file": path.name,
        "rows": len(df),
        "n_cols": df.shape[1],
        "start": start if pd.notna(start) else "",
        "end": end if pd.notna(end) else "",
        "eom_missing": len(missing),
        "dup_index": dupe_idx
    }])

    return columns_profile, dataset_profile


3. Build Data Dictionaries + Combine Records Manually

In [3]:
all_cols, all_ds = [], []
for p in FILES:
    if p.exists():
        cprof, dprof = profile_file(p)
        all_cols.append(cprof)
        all_ds.append(dprof)

data_dict = pd.concat(all_cols, ignore_index=True)
dataset_summary = pd.concat(all_ds, ignore_index=True)

# (Optional) merge notes manually if you have this file:
# create this file if you want to override 'description/unit/role'
notes_path = TBL/"data_dictionary_notes.csv"   
if notes_path.exists():
    # minimum columns: file, column, description (plus unit/role if any)
    notes = pd.read_csv(notes_path) 
    data_dict = data_dict.merge(notes, on=["file","column"], how="left", suffixes=("","_note"))
    for k in ("description","unit","role"):
        if f"{k}_note" in data_dict.columns:
            data_dict[k] = data_dict[f"{k}_note"].fillna(data_dict[k])
            data_dict.drop(columns=[f"{k}_note"], inplace=True)

data_dict.head(8), dataset_summary


(                                     file    column           dtype  non_null  \
 0        gold_monthly_clean_2020_2025.csv      Date  datetime64[ns]        67   
 1        gold_monthly_clean_2020_2025.csv  Gold_USD         float64        67   
 2         btc_monthly_close_2020_2025.csv      Date  datetime64[ns]        70   
 3         btc_monthly_close_2020_2025.csv   BTC_USD         float64        70   
 4   merged_gold_btc_monthly_2020_2025.csv      Date  datetime64[ns]        67   
 5   merged_gold_btc_monthly_2020_2025.csv  Gold_USD         float64        67   
 6   merged_gold_btc_monthly_2020_2025.csv   BTC_USD         float64        67   
 7  monthly_returns_gold_btc_2020_2025.csv      Date  datetime64[ns]        66   
 
    nulls  distinct                  min                  max  \
 0      0        67  2020-01-31 00:00:00  2025-07-31 00:00:00   
 1      0        67              1560.67              3352.66   
 2      0        70  2020-01-31 00:00:00  2025-10-31 00:00:00   


4. Save CSV + Markdown

In [4]:
# Save CSV
dict_path = TBL/"data_dictionary.csv"
sum_path  = TBL/"dataset_summary.csv"
data_dict.to_csv(dict_path, index=False)
dataset_summary.to_csv(sum_path, index=False)
print("Saved →", dict_path.resolve())
print("Saved →", sum_path.resolve())

# Create concise Markdown for reports
md_lines = ["# Data Dictionary (Processed)\n"]
for f, g in data_dict.groupby("file"):
    md_lines.append(f"\n## {f}\n")
    sub = g[["column","dtype","unit","role","description"]].copy()
    md_lines.append(sub.to_markdown(index=False))
md = "\n".join(md_lines)
md_path = TBL/"data_dictionary.md"
with open(md_path, "w", encoding="utf-8") as fh:
    fh.write(md)
print("Saved →", md_path.resolve())


Saved → C:\Users\Noveno\OneDrive\CA1-BTC-Gold-Correlation\reports\tables\data_dictionary.csv
Saved → C:\Users\Noveno\OneDrive\CA1-BTC-Gold-Correlation\reports\tables\dataset_summary.csv
Saved → C:\Users\Noveno\OneDrive\CA1-BTC-Gold-Correlation\reports\tables\data_dictionary.md
