In [190]:
!pip install pandas requests gitpython
!pip install reportlab
!pip install matplotlib
!pip install seaborn
!pip install scikit-learn
!pip install openpyxl
!pip install pyarrow
!pip install dvc
!pip install fastparquet
!pip install prefect




Step 1: Setup Logging & Directories

In [191]:
import os
import pandas as pd
import numpy as np
import requests
import logging
from datetime import datetime
from git import Repo
import shutil
import sqlite3
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import joblib

# For PDF reports
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table
from reportlab.lib.styles import getSampleStyleSheet

import json
import subprocess


# ========== CONFIG ==========
RAW_DATA_DIR = "Initial_data"
LOG_FILE = "ingestion.log"
CSV_SOURCE = "D:\BITS_SEM2\DMML_Assignment\CustomerTransactionData\WA_Fn-UseC_-Telco-Customer-Churn.csv"  # local CSV file
API_URL = "https://jsonplaceholder.typicode.com/users"  # mock REST API
REPO_DIR = "."  # current project directory for Git versioning
BRANCH_NAME = "data-versioning"
DATA_LAKE_DIR = "data_lake"
DQ_REPORT_DIR = "data_quality_reports"
PROCESSED_DIR = os.path.join(DATA_LAKE_DIR, "processed")
DB_PATH = os.path.join(DATA_LAKE_DIR, "db", "customer_data.db")
FEATURE_STORE_DIR = "feature_store"
MODEL_DIR = "models"



# ========== LOGGING ==========
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# Ensure directory exists
os.makedirs(RAW_DATA_DIR, exist_ok=True)
os.makedirs(DATA_LAKE_DIR, exist_ok=True)
os.makedirs(DQ_REPORT_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
os.makedirs(FEATURE_STORE_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

print("Setup complete")




Setup complete


Step 2: CSV anf API Ingestion

In [192]:
# --- Column name cleaning ---
def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = (
        df.columns
        .str.strip()                          # remove spaces
        .str.lower()                          # lowercase
        .str.replace(r'[\s\-]+', '_', regex=True)  # spaces/dashes → underscore
        .str.replace(r'[^a-z0-9_]', '', regex=True) # remove other junk
        .str.replace(r'_+', '_', regex=True)       # collapse multiple underscores
        .str.rstrip('_')                      # remove trailing underscores
    )

    # Explicit mappings for known variants
    rename_map = {
        "customerid": "customer_id",
        "customer_id_": "customer_id",
        "customerid_": "customer_id"
    }
    df = df.rename(columns={col: rename_map[col] for col in df.columns if col in rename_map})

    return df

# --- CSV Ingestion ---

def ingest_csv(file_path):
    try:
        df = pd.read_csv(file_path)
        df = clean_column_names(df)
        output_file = os.path.join(
            RAW_DATA_DIR, f"transactions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        )
        df.to_csv(output_file, index=False)
        logging.info(f"CSV ingestion successful. File saved: {output_file}")
        print(f"[CSV] Ingestion successful → {output_file}")
        return output_file
    except Exception as e:
        logging.error(f"CSV ingestion failed: {e}")
        print(f"[CSV] Ingestion failed: {e}")
        return None
    
    # ========== API INGESTION ==========
def ingest_api(api_url):
    try:
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()
        df = pd.DataFrame(response.json())
        df = clean_column_names(df)
        output_file = os.path.join(
            RAW_DATA_DIR, f"api_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        )
        df.to_csv(output_file, index=False)
        logging.info(f"API ingestion successful. File saved: {output_file}")
        print(f"[API] Ingestion successful → {output_file}")
        return output_file
    except Exception as e:
        logging.error(f"API ingestion failed: {e}")
        print(f"[API] Ingestion failed: {e}")
        return None

Step 3: Git Versioning

In [193]:
def git_version_data(file_path, repo_dir=".", branch_name="data-versioning"):
    try:
        repo = Repo(repo_dir)

        # Create/switch branch
        if branch_name not in repo.heads:
            repo.git.checkout('-b', branch_name)
        else:
            repo.git.checkout(branch_name)

        repo.index.add([file_path])
        repo.index.commit(f"Ingested data file {file_path} at {datetime.now()}")
        
        print(f"[GIT] Versioned → {file_path}")
        logging.info(f"Data versioned in Git: {file_path}")
    except Exception as e:
        logging.error(f"Git versioning failed: {e}")
        print(f"[GIT] Versioning failed: {e}")

Execution

Task 3: Upload / Store Raw Data Locally

In [194]:
from pathlib import Path  # <- add this at the top

def store_raw_data(file_path: str, source: str) -> tuple[pd.DataFrame, str]:
    """
    Stores raw data into data_lake/raw/<source>/ with timestamped filename.
    Returns both the DataFrame and the stored file path.
    """
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    dest_path = os.path.join(
        RAW_DATA_DIR, source,
        f"{Path(file_path).stem}_{timestamp}{Path(file_path).suffix}"
    )
    os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    shutil.copy(file_path, dest_path)
    logging.info(f"Stored raw data → {dest_path}")

    # Load as DataFrame and return both
    df = pd.read_csv(dest_path)
    return df, dest_path


Task 4: Data Validation

In [195]:
def validate_data(data):
    # If input is a path, read CSV
    if isinstance(data, str):
        df = pd.read_csv(data)
    else:
        df = data.copy()  # assume it's already a DataFrame

    report = []

    # Column types
    for col in df.columns:
        report.append({"Column": col, "Check": "Data Type", "Issue": None, "Value": str(df[col].dtype)})

    # Missing values
    missing = df.isnull().sum()
    for col, count in missing.items():
        if count > 0:
            report.append({"Column": col, "Check": "Missing Values", "Issue": f"{count} missing values", "Value": None})

    # Duplicates
    duplicate_rows = df.duplicated().sum()
    if duplicate_rows > 0:
        report.append({"Column": "ALL", "Check": "Duplicate Rows", "Issue": f"{duplicate_rows} duplicate rows", "Value": None})

    # Numeric range checks
    numeric_cols = df.select_dtypes(include=["int64", "float64"]).columns
    for col in numeric_cols:
        min_val, max_val = df[col].min(), df[col].max()
        if min_val < 0:
            report.append({"Column": col, "Check": "Range Check", "Issue": f"Negative values detected (min={min_val})", "Value": None})
        report.append({"Column": col, "Check": "Range Summary", "Issue": None, "Value": f"Min={min_val}, Max={max_val}"})

    return pd.DataFrame(report)

# --- Generate PDF report ---
def generate_pdf_report(df_report, file_name):
    pdf_path = os.path.join(DQ_REPORT_DIR, file_name)
    doc = SimpleDocTemplate(pdf_path, pagesize=A4)
    styles = getSampleStyleSheet()
    story = []
    
    story.append(Paragraph("Data Quality Report", styles['Title']))
    story.append(Spacer(1, 12))

    # Convert DataFrame to simple table
    data = [df_report.columns.tolist()] + df_report.values.tolist()
    t = Table(data)
    story.append(t)

    doc.build(story)
    print(f"PDF report generated → {pdf_path}")
    return pdf_path

Task 5: Data Preparation

In [196]:
def prepare_data(data):
    """
    Prepares/cleans the dataset.
    Accepts either a DataFrame or a CSV file path.
    Preserves `customer_id` if present.
    """
    # Load CSV if a path is passed
    if isinstance(data, str):
        df = pd.read_csv(data)
    else:
        df = data.copy()

    # Preserve ID column if it exists
    id_col = None
    for col in df.columns:
        if col.lower() in ["customer_id", "customerid", "id"]:
            id_col = col
            break

    if id_col:
        id_series = df[id_col].copy()
        df = df.drop(columns=[id_col])
    else:
        id_series = None

    # Normalize column names
    df = clean_column_names(df)

    # Handle missing values
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    cat_cols = df.select_dtypes(include=["object"]).columns

    for col in numeric_cols:
        if df[col].isnull().any():
            df[col] = df[col].fillna(df[col].median())
    for col in cat_cols:
        if df[col].isnull().any():
            df[col] = df[col].fillna(df[col].mode()[0])

    # Encode categorical
    for col in cat_cols:
        if df[col].nunique() <= 2:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col])

    multi_cat_cols = [col for col in cat_cols if df[col].nunique() > 2]
    if multi_cat_cols:
        df = pd.get_dummies(df, columns=multi_cat_cols, drop_first=True)

    # Scale numeric (only if present)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        scaler = StandardScaler()
        df[numeric_cols] = scaler.fit_transform(df[numeric_cols])

    # Reattach ID column
    if id_series is not None:
        df.insert(0, id_col, id_series)

    return df

# --- Generate basic EDA ---
def generate_eda(df, file_prefix):
    os.makedirs(os.path.join(PROCESSED_DIR, "visualizations"), exist_ok=True)
    
    # Histograms
    df.hist(bins=15, figsize=(12, 8))
    plt.tight_layout()
    hist_path = os.path.join(PROCESSED_DIR, "visualizations", f"{file_prefix}_hist.png")
    plt.savefig(hist_path)
    plt.close()
    
    # Boxplots
    plt.figure(figsize=(12,6))
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    sns.boxplot(data=df[numeric_cols])
    box_path = os.path.join(PROCESSED_DIR, "visualizations", f"{file_prefix}_boxplot.png")
    plt.savefig(box_path)
    plt.close()
    
    print(f"EDA visualizations saved → {hist_path}, {box_path}")

Task 6: Data Transformation and Storage

In [197]:
def transform_and_store(df, timestamp):
    # Normalize column names first
    df = clean_column_names(df)

    # --- Derived features ---
    if "transaction_amount" in df.columns and "customer_id" in df.columns:
        total_spend = df.groupby("customer_id")["transaction_amount"].sum().reset_index()
        total_spend.rename(columns={"transaction_amount": "total_spend"}, inplace=True)
        df = df.merge(total_spend, on="customer_id", how="left")

    if "customer_id" in df.columns:
        activity_count = df.groupby("customer_id").size().reset_index(name="activity_count")
        df = df.merge(activity_count, on="customer_id", how="left")

    if "signup_date" in df.columns and "last_activity_date" in df.columns:
        df["signup_date"] = pd.to_datetime(df["signup_date"])
        df["last_activity_date"] = pd.to_datetime(df["last_activity_date"])
        df["tenure_months"] = (df["last_activity_date"] - df["signup_date"]).dt.days / 30

    # Scale numeric columns
    numeric_cols = df.select_dtypes(include=["int64","float64"]).columns
    df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].min()) / (df[numeric_cols].max() - df[numeric_cols].min())

    # --- Store in DB ---
    key_columns = [col for col in ["customer_id","total_spend","activity_count","tenure_months"] if col in df.columns]
    if key_columns:
        with sqlite3.connect(DB_PATH) as conn:
            df[key_columns].to_sql("customer_transactions", conn, if_exists="replace", index=False)
        print(f"DB updated with key columns → {key_columns}")
    else:
        print("No key columns found. Skipping DB storage.")

    # Save transformed CSV
    transformed_csv = os.path.join(PROCESSED_DIR, f"transactions_transformed_{timestamp}.csv")
    df.to_csv(transformed_csv, index=False)
    print(f"Transformed dataset saved → {transformed_csv}")

    return transformed_csv


Task 7: Feature Store

In [198]:


def export_features_to_parquet(db_path=DB_PATH):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query("SELECT * FROM customer_transactions", conn)
    conn.close()

    # Convert problematic types
    for col in df.columns:
        if pd.api.types.is_period_dtype(df[col]):
            df[col] = df[col].astype(str)
    for col in df.select_dtypes(["category", "object"]).columns:
        df[col] = df[col].astype(str)
    for col in df.select_dtypes(include=["datetimetz"]).columns:
        df[col] = df[col].dt.tz_localize(None)

    # Export with fastparquet
    out_path = os.path.join(FEATURE_STORE_DIR, "customer_features.parquet")
    df.to_parquet(out_path, index=False, engine="fastparquet")
    print(f"[Feature Store] Exported features → {out_path}")
    return out_path

#LABELS_FILE = "Initial_data/labels.csv"

# def export_features_to_parquet(db_path=DB_PATH, labels_file=LABELS_FILE):
#     """Exports features and target from DB and labels CSV into Parquet for model training."""
    
#     # --- Load transactions from DB ---
#     conn = sqlite3.connect(db_path)
#     df = pd.read_sql_query("SELECT * FROM customer_transactions", conn)
#     conn.close()
    
#     if df.empty:
#         raise ValueError("No transaction data found in DB!")

#     # Determine the amount column dynamically
#     possible_amount_cols = ["transaction_amount", "amount", "amt"]
#     amount_col = next((col for col in possible_amount_cols if col in df.columns), None)
    
#     if "customer_id" not in df.columns or not amount_col:
#         raise ValueError(f"Required columns 'customer_id' or transaction amount not found. Columns present: {df.columns.tolist()}")
    
#     # --- Aggregate features ---
#     total_spend = df.groupby("customer_id")[amount_col].sum().reset_index()
#     total_spend.rename(columns={amount_col: "total_spend"}, inplace=True)

#     transaction_count = df.groupby("customer_id")[amount_col].count().reset_index()
#     transaction_count.rename(columns={amount_col: "transaction_count"}, inplace=True)

#     df_features = df.drop_duplicates("customer_id")[["customer_id"]]
#     df_features = df_features.merge(total_spend, on="customer_id", how="left")
#     df_features = df_features.merge(transaction_count, on="customer_id", how="left")

#     # --- Merge churn target ---
#     if os.path.exists(labels_file):
#         df_labels = pd.read_csv(labels_file)  # Must contain customer_id, churn
#         if "customer_id" not in df_labels.columns or "churn" not in df_labels.columns:
#             raise ValueError("Labels file must contain 'customer_id' and 'churn' columns.")
#         df_features = df_features.merge(df_labels, on="customer_id", how="left")
#     else:
#         raise FileNotFoundError(f"Labels file not found: {labels_file}")

#     # --- Convert problematic types to string for Parquet compatibility ---
#     for col in df_features.select_dtypes(["category", "object", "period"]).columns:
#         df_features[col] = df_features[col].astype(str)
#     for col in df_features.select_dtypes(include=['datetimetz']).columns:
#         df_features[col] = df_features[col].dt.tz_localize(None)

#     # --- Save to Parquet ---
#     os.makedirs(FEATURE_STORE_DIR, exist_ok=True)
#     out_path = os.path.join(FEATURE_STORE_DIR, "customer_features.parquet")
#     df_features.to_parquet(out_path, index=False, engine="pyarrow")
#     print(f"[Feature Store] Exported features → {out_path}")

#     return out_path


def log_feature_metadata(file_path, version="1.0"):
    meta = {
        "file": file_path,
        "version": version,
        "generated_at": datetime.now().isoformat(),
        "features": ["total_spend", "transaction_count", "last_transaction_date"]
    }
    with open(os.path.join(FEATURE_STORE_DIR, "feature_metadata.json"), "w") as f:
        json.dump(meta, f, indent=2)
    print("[Feature Store] Metadata logged.")

def update_feature_store(feat_repo_dir="feature_repo"):
    """
    Run Feast registry update & demo retrieval safely.
    Checks if the feature repo directory exists before executing commands.
    """
    if not os.path.isdir(feat_repo_dir):
        print(f"[Warning] Feature repository not found at {feat_repo_dir}, skipping Feast steps.")
        return

    # Apply Feast registry
    print(f"[Feature Store] Applying Feast registry in {feat_repo_dir}...")
    subprocess.run(["feast", "apply"], cwd=feat_repo_dir, check=True)

    # Retrieve historical features example
    retrieved_path = os.path.join("feature_store", "retrieved_features.parquet")
    os.makedirs(os.path.dirname(retrieved_path), exist_ok=True)

    print("[Feature Store] Retrieving sample historical features...")
    subprocess.run([
        "feast", "get-historical-features",
        "--feature-refs", "customer_features:total_spend,customer_features:transaction_count",
        "--entity-rows", os.path.join(feat_repo_dir, "sample_entities.parquet"),
        "--output", retrieved_path
    ], check=True)

    print(f"[Feature Store] Retrieval sample completed. Output → {retrieved_path}")

In [199]:
# ================= MAIN EXECUTION =================
if __name__ == "__main__":
    logging.info("=== Starting full pipeline ===")
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # --- CSV ---
    csv_file = ingest_csv(CSV_SOURCE)
    if csv_file:
        df_raw, stored_csv_path = store_raw_data(csv_file, "csv")
        git_version_data(stored_csv_path)
        cleaned_csv = clean_column_names(df_raw)

        # Validate & report
        dq_report_csv = validate_data(cleaned_csv)
        dq_report_csv.to_csv(os.path.join(DQ_REPORT_DIR, f"dq_report_csv_{timestamp}.csv"), index=False)
        generate_pdf_report(dq_report_csv, f"dq_report_csv_{timestamp}.pdf")

        # Clean, EDA, transform & DB
        clean_csv = prepare_data(cleaned_csv)
        generate_eda(clean_csv, f"csv_{timestamp}")
        transform_and_store(clean_csv, timestamp)

        

    # --- API ---
    api_file = ingest_api(API_URL)
if api_file:
    df_api, stored_api_path = store_raw_data(api_file, "api")
    git_version_data(stored_api_path)

    # Map API 'id' → 'customer_id' for consistency
    if 'id' in df_api.columns:
        df_api.rename(columns={'id':'customer_id'}, inplace=True)

    cleaned_api = clean_column_names(df_api)
    dq_report_api = validate_data(cleaned_api)
    dq_report_api.to_csv(os.path.join(DQ_REPORT_DIR, f"dq_report_api_{timestamp}.csv"), index=False)
    generate_pdf_report(dq_report_api, f"dq_report_api_{timestamp}.pdf")

    clean_api = prepare_data(cleaned_api)
    generate_eda(clean_api, f"api_{timestamp}")
    transform_and_store(clean_api, timestamp)

    logging.info("=== Full pipeline completed ===")
    print("Pipeline execution finished")


[CSV] Ingestion successful → Initial_data\transactions_20250823_221951.csv
[GIT] Versioned → Initial_data\csv\transactions_20250823_221951_20250823_221951.csv
PDF report generated → data_quality_reports\dq_report_csv_20250823_221951.pdf
EDA visualizations saved → data_lake\processed\visualizations\csv_20250823_221951_hist.png, data_lake\processed\visualizations\csv_20250823_221951_boxplot.png
DB updated with key columns → ['customer_id', 'activity_count']
Transformed dataset saved → data_lake\processed\transactions_transformed_20250823_221951.csv
[API] Ingestion successful → Initial_data\api_data_20250823_222003.csv
[GIT] Versioned → Initial_data\api\api_data_20250823_222003_20250823_222003.csv
PDF report generated → data_quality_reports\dq_report_api_20250823_221951.pdf
EDA visualizations saved → data_lake\processed\visualizations\api_20250823_221951_hist.png, data_lake\processed\visualizations\api_20250823_221951_boxplot.png
DB updated with key columns → ['customer_id', 'activity_cou

Task 8: Data Versioning

In [200]:
from glob import glob

def version_data(folder_path="Initial_data/api"):
    """
    Dynamically version all new files in the given folder using DVC.
    """
    folder_path = folder_path.replace("\\", "/")

    # Get all files except .dvc
    files = [f for f in glob(os.path.join(folder_path, "*.*")) if not f.endswith(".dvc")]
    if not files:
        print(f"[Data Versioning] No files found in {folder_path}")
        return

    for file_path in files:
        try:
            dvc_file = file_path + ".dvc"
            if os.path.exists(dvc_file):
                print(f"[Data Versioning] Already tracked by DVC → {file_path}")
                continue

            subprocess.run(["git", "rm", "--cached", file_path],
                           check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            subprocess.run(["dvc", "add", file_path], check=True)
            print(f"[Data Versioning] Added: {file_path}")

        except subprocess.CalledProcessError as e:
            print(f"[Data Versioning] Error adding file → {file_path}\n{e}")

 
def log_version_metadata(file_path, source):
    """
    Log dataset version metadata into a JSON file for reference.
    
    Args:
        file_path (str): Path to the dataset file.
        source (str): Source type (e.g., "CSV", "API", "Transformed").
    """
    meta_path = os.path.join("data_lake", "version_metadata.json")
    
    meta_entry = {
        "file": file_path,
        "source": source,
        "timestamp": datetime.now().isoformat()
    }
    
    if os.path.exists(meta_path):
        with open(meta_path, "r") as f:
            data = json.load(f)
    else:
        data = []
    
    data.append(meta_entry)
    
    os.makedirs(os.path.dirname(meta_path), exist_ok=True)
    with open(meta_path, "w") as f:
        json.dump(data, f, indent=2)
    
    print(f"[Data Versioning] Metadata logged → {meta_path}")

In [201]:
feature_file = export_features_to_parquet()
log_feature_metadata(feature_file)
update_feature_store()
#version_data(stored_api_path)

[Feature Store] Exported features → feature_store\customer_features.parquet
[Feature Store] Metadata logged.


  if pd.api.types.is_period_dtype(df[col]):


In [202]:
version_data("Initial_data/csv")

[Data Versioning] Added: Initial_data/csv\transactions_20250823_221951_20250823_221951.csv


Task 9: Model Building

In [203]:
import glob
LABELS_FILE = "Initial_data/labels.csv"  # adjust as needed

# --- Step 2: Train Model ---
def train_model(df_features, target_col="churn", model_dir="models"):
    """
    Trains a Random Forest classifier on the features DataFrame.
    Automatically handles categorical features and missing values.
    
    Returns:
        results: dict of evaluation metrics
        model_file: path to saved model
    """
    df = df_features.copy()

    # --- Add synthetic target if missing ---
    if target_col not in df.columns:
        print(f"[Pipeline] WARNING: '{target_col}' column not found. Adding synthetic target.")
        df[target_col] = np.random.randint(0, 2, size=len(df))

    # --- Separate features and target ---
    X = df.drop(columns=[target_col, 'customer_id'], errors='ignore')
    y = df[target_col]

    # --- Encode categorical columns ---
    from sklearn.preprocessing import LabelEncoder
    for col in X.columns:
        if X[col].dtype == object or X[col].dtype.name == "category":
            X[col] = X[col].fillna("Unknown")
            le = LabelEncoder()
            X[col] = le.fit_transform(X[col])

    # --- Fill missing numeric values ---
    X = X.fillna(0)

    # --- Train/test split ---
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # --- Train model ---
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # --- Evaluate ---
    y_pred = model.predict(X_test)
    results = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred, zero_division=0),
        "recall": recall_score(y_test, y_pred, zero_division=0),
        "f1": f1_score(y_test, y_pred, zero_division=0)
    }
    print(f"[Pipeline] Model Results → {results}")

    # --- Save model ---
    os.makedirs(model_dir, exist_ok=True)
    model_file = os.path.join(model_dir, "customer_churn_model.pkl")
    joblib.dump(model, model_file)
    print(f"[Pipeline] Model saved → {model_file}")

    return results, model_file

def convert_csv_to_parquet(csv_file):
    df = pd.read_csv(csv_file)
    parquet_file = os.path.splitext(csv_file)[0] + ".parquet"
    df.to_parquet(parquet_file, index=False)
    return parquet_file

In [204]:
def get_latest_transaction_file(folder_path="Initial_data/csv"):
    """
    Returns the latest CSV file in the folder based on modified timestamp.
    """
    folder_path = folder_path.replace("\\", "/")
    files = glob.glob(os.path.join(folder_path, "*.csv"))
    if not files:
        raise FileNotFoundError(f"No CSV files found in {folder_path}")
    
    latest_file = max(files, key=os.path.getmtime)
    print(f"[Pipeline] Latest transaction file detected → {latest_file}")
    return latest_file

def merge_target(df_features, target_file=None):
    """
    Merge the target column into features if available.
    """
    if target_file and os.path.exists(target_file):
        df_target = pd.read_csv(target_file)
        df_merged = df_features.merge(df_target, on="customer_id", how="left")
        print("[Pipeline] Target column merged.")
        return df_merged
    else:
        print("[Pipeline] WARNING: No target file provided, skipping merge.")
        return df_features

feature_file = export_features_to_parquet()
df_features = pd.read_parquet(feature_file)

# Merge target if exists
target_file = "Initial_data/labels.csv"  # or None if not available
df_features = merge_target(df_features, target_file)

import numpy as np
df_features['churn'] = np.random.randint(0, 2, size=len(df_features))

# Encode categorical/text columns
from sklearn.preprocessing import LabelEncoder
for col in df_features.columns:
    if df_features[col].dtype == object:
        df_features[col] = df_features[col].fillna("Unknown")
        le = LabelEncoder()
        df_features[col] = le.fit_transform(df_features[col])

results, model_file = train_model(df_features)


[Feature Store] Exported features → feature_store\customer_features.parquet
[Pipeline] Model Results → {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1': 1.0}
[Pipeline] Model saved → models\customer_churn_model.pkl


  if pd.api.types.is_period_dtype(df[col]):


Task 10: Orchestrating the Data Pipeline

In [205]:
# if __name__ == "__main__":
#     logging.info("=== Starting full pipeline ===")
#     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

#     # --- CSV ---
#     csv_file = ingest_csv(CSV_SOURCE)
#     if csv_file:
#         df_raw, stored_csv_path = store_raw_data(csv_file, "csv")
#         git_version_data(stored_csv_path)
#         cleaned_csv = clean_column_names(df_raw)

#         # Validate & report
#         dq_report_csv = validate_data(cleaned_csv)
#         dq_report_csv.to_csv(os.path.join(DQ_REPORT_DIR, f"dq_report_csv_{timestamp}.csv"), index=False)
#         generate_pdf_report(dq_report_csv, f"dq_report_csv_{timestamp}.pdf")

#         # Clean, EDA, transform & DB
#         clean_csv = prepare_data(cleaned_csv)
#         generate_eda(clean_csv, f"csv_{timestamp}")
#         transform_and_store(clean_csv, timestamp)

#         feature_file = export_features_to_parquet()
#         log_feature_metadata(feature_file)
#         update_feature_store()

#         version_data("Initial_data/csv")

#         feature_file = export_features_to_parquet()
#         df_features = pd.read_parquet(feature_file)

# # Merge target if exists
# target_file = "Initial_data/labels.csv"  # or None if not available
# df_features = merge_target(df_features, target_file)
# for col in df_features.columns:
#     if df_features[col].dtype == object:
#         df_features[col] = df_features[col].fillna("Unknown")
#         le = LabelEncoder()
#         df_features[col] = le.fit_transform(df_features[col])
# results, model_file = train_model(df_features)
  