# Comprehensive Multi‑Dataset ML Workflow
Reproducible pipeline with artifact logging (plots/tables/metrics).

In [None]:
"""Comprehensive multi-dataset ML workflow with reproducibility and artifact logging."""
from __future__ import annotations

import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Mapping, Sequence, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.figure import Figure
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, calinski_harabasz_score, classification_report
from sklearn.metrics import davies_bouldin_score, silhouette_score
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.cluster import KMeans

RANDOM_STATE: int = 42
ARTIFACT_DIR = Path("artifacts")
PLOT_DIR = ARTIFACT_DIR / "plots"
TABLE_DIR = ARTIFACT_DIR / "tables"
METRIC_DIR = ARTIFACT_DIR / "metrics"


@dataclass
class ClassificationResult:
    """Container for wine classification outputs."""

    accuracy: float
    classification_report: str
    best_params: Mapping[str, object]
    label_encoder_classes: Sequence[str]


@dataclass
class RecommendationResult:
    """Container for feed recommendation embeddings and similarities."""

    feed_embeddings: pd.DataFrame
    cosine_similarity: pd.DataFrame
    top_recommendations: Mapping[str, Mapping[str, float]]


@dataclass
class ClusteringResult:
    """Container for clustering model outputs and diagnostics."""

    kmeans_labels: np.ndarray
    gmm_labels: np.ndarray
    gmm_probabilities: np.ndarray
    silhouette_scores: Dict[str, float]
    metric_table: pd.DataFrame


def _resolve_dataset_path(mac_path: str, workspace_path: str) -> Path:
    """Resolve dataset path, preferring the local user path but falling back to workspace."""

    for candidate in (Path(mac_path), Path(workspace_path)):
        if candidate.exists():
            return candidate
    raise FileNotFoundError(f"None of the provided paths exist: {mac_path!r}, {workspace_path!r}")


def _ensure_directories(*directories: Path) -> None:
    """Create artifact directories if they do not already exist."""

    for directory in directories:
        directory.mkdir(parents=True, exist_ok=True)


def summarize_dataframe(name: str, df: pd.DataFrame) -> Dict[str, object]:
    """Return a reproducible summary for logging and auditing."""

    summary = {
        "name": name,
        "shape": df.shape,
        "null_counts": df.isna().sum().to_dict(),
    }
    logging.info("%s summary: %s", name, summary)
    return summary


def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Fill missing values with sensible defaults to handle inconsistencies."""

    cleaned = df.copy()
    for column in cleaned.columns:
        if cleaned[column].isna().any():
            if cleaned[column].dtype.kind in "biufc":
                cleaned[column] = cleaned[column].fillna(cleaned[column].mean())
            else:
                mode = cleaned[column].mode(dropna=True)
                fill_value = mode.iloc[0] if not mode.empty else "unknown"
                cleaned[column] = cleaned[column].fillna(fill_value)
    return cleaned


def _save_plot(fig: Figure, filename: str) -> None:
    """Persist a Matplotlib figure to disk and close it to free memory."""

    filepath = PLOT_DIR / filename
    fig.tight_layout()
    fig.savefig(filepath, dpi=200)
    plt.close(fig)
    logging.info("Saved plot to %s", filepath)


def _save_table(df: pd.DataFrame, filename: str) -> None:
    """Save a DataFrame to disk as CSV for downstream inspection."""

    filepath = TABLE_DIR / filename
    df.to_csv(filepath, index=True)
    logging.info("Saved table to %s", filepath)


def _save_metrics(metrics: Mapping[str, object], filename: str) -> None:
    """Persist JSON metrics to disk."""

    filepath = METRIC_DIR / filename
    with filepath.open("w", encoding="utf-8") as fh:
        json.dump(metrics, fh, indent=2)
    logging.info("Saved metrics to %s", filepath)


def _encode_target(target: pd.Series) -> Tuple[pd.Series, Sequence[str]]:
    """Convert categorical target labels into numeric values using LabelEncoder."""

    if target.dtype.kind in "biufc":
        return target, []
    encoder = LabelEncoder()
    encoded = encoder.fit_transform(target)
    return pd.Series(encoded, index=target.index, name=target.name), encoder.classes_


def run_wine_classification(df: pd.DataFrame) -> ClassificationResult:
    """Train and evaluate a PCA + k-NN pipeline on the wine dataset."""

    features = df.drop(columns=["target"]).copy()
    target, classes = _encode_target(df["target"].copy())

    X_train, X_test, y_train, y_test = train_test_split(
        features,
        target,
        test_size=0.25,
        random_state=RANDOM_STATE,
        stratify=target,
    )

    pipeline = Pipeline(
        [
            ("scaler", StandardScaler()),
            ("pca", PCA(n_components=0.95, random_state=RANDOM_STATE)),
            ("knn", KNeighborsClassifier()),
        ]
    )

    param_grid = {
        "knn__n_neighbors": [1, 3, 5, 7, 9, 11],
        "knn__metric": ["euclidean", "manhattan", "minkowski"],
        "knn__weights": ["uniform", "distance"],
    }

    splitter = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    grid = GridSearchCV(
        pipeline,
        param_grid=param_grid,
        cv=splitter,
        scoring="accuracy",
        n_jobs=-1,
    )
    grid.fit(X_train, y_train)

    predictions = grid.best_estimator_.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    report = classification_report(y_test, predictions)
    report_dict = classification_report(y_test, predictions, output_dict=True)

    metrics = {
        "accuracy": accuracy,
        "cv_mean_accuracy": float(grid.best_score_),
        "best_params": grid.best_params_,
        "classification_report": report_dict,
        "label_classes": list(classes),
    }
    _save_metrics(metrics, "wine_classification.json")

    return ClassificationResult(
        accuracy=accuracy,
        classification_report=report,
        best_params=grid.best_params_,
        label_encoder_classes=classes,
    )


def run_feed_recommendations(df: pd.DataFrame) -> RecommendationResult:
    """Standardize feed weights, project to 1D with PCA, and compute cosine similarities."""

    if "feed_name" not in df.columns:
        raise ValueError("Expected 'feed_name' column in chick feed dataset.")

    numeric_columns = df.select_dtypes(include=["number"]).columns.tolist()
    if not numeric_columns:
        raise ValueError("Expected at least one numeric column for recommendations.")

    scaler = StandardScaler()
    standardized = scaler.fit_transform(df[numeric_columns])
    standardized_df = pd.DataFrame(standardized, columns=numeric_columns)
    standardized_df["feed_name"] = df["feed_name"].values

    feed_profiles = standardized_df.groupby("feed_name").mean()

    pca = PCA(n_components=1, random_state=RANDOM_STATE)
    embeddings = pca.fit_transform(feed_profiles.values)
    embed_df = pd.DataFrame(embeddings, index=feed_profiles.index, columns=["pc1"])

    cosine = cosine_similarity(embed_df)
    cosine_df = pd.DataFrame(cosine, index=feed_profiles.index, columns=feed_profiles.index)

    recommendations: Dict[str, Dict[str, float]] = {}
    for feed in cosine_df.index:
        top_similar = (
            cosine_df.loc[feed].drop(feed).sort_values(ascending=False).head(3).to_dict()
        )
        recommendations[feed] = top_similar

    _save_table(embed_df, "feed_embeddings.csv")
    _save_table(cosine_df, "feed_cosine_similarity.csv")
    _save_metrics(recommendations, "feed_recommendations.json")

    return RecommendationResult(
        feed_embeddings=embed_df,
        cosine_similarity=cosine_df,
        top_recommendations=recommendations,
    )


def _compute_line_distance(points: np.ndarray) -> int:
    """Identify the elbow point as the maximum distance from the straight line between endpoints."""

    start, end = points[0], points[-1]
    line_vec = end - start
    norm = np.linalg.norm(line_vec)
    if norm == 0:
        return 0
    line_norm = line_vec / norm
    distances = []
    for point in points:
        projection_length = np.dot(point - start, line_norm)
        projection = start + projection_length * line_norm
        distances.append(np.linalg.norm(point - projection))
    return int(np.argmax(distances))


def _select_k_via_elbow(inertias: Sequence[float], ks: Sequence[int]) -> int:
    """Select the number of clusters using the elbow heuristic with a distance-based method."""

    points = np.column_stack((ks, inertias))
    idx = _compute_line_distance(points)
    selected_k = ks[idx]
    if selected_k < 2 and len(ks) > 1:
        selected_k = ks[1]
    return selected_k


def _evaluate_cluster_metrics(X: np.ndarray, labels: np.ndarray) -> Dict[str, float]:
    """Compute a suite of clustering metrics for the given labels."""

    if len(np.unique(labels)) < 2:
        return {"silhouette": float("nan"), "calinski_harabasz": float("nan"), "davies_bouldin": float("nan")}
    silhouette = silhouette_score(X, labels)
    calinski = calinski_harabasz_score(X, labels)
    davies = davies_bouldin_score(X, labels)
    return {
        "silhouette": silhouette,
        "calinski_harabasz": calinski,
        "davies_bouldin": davies,
    }


def run_arrests_clustering(df: pd.DataFrame) -> ClusteringResult:
    """Cluster the USArrests dataset with multiple diagnostics and artifact logging."""

    features = ["Murder", "Assault", "UrbanPop", "Rape"]
    variance_rank = df[features].var().sort_values(ascending=False)
    top_features = variance_rank.index.tolist()[:3]
    logging.info("Top variance features: %s", top_features)

    scaler = StandardScaler()
    standardized = scaler.fit_transform(df[top_features])

    pca = PCA(n_components=2, random_state=RANDOM_STATE)
    embeddings = pca.fit_transform(standardized)

    ks = list(range(2, 10))
    inertia_values: List[float] = []
    silhouette_values: List[float] = []
    calinski_values: List[float] = []
    davies_values: List[float] = []

    for k in ks:
        kmeans = KMeans(n_clusters=k, n_init=20, random_state=RANDOM_STATE)
        labels = kmeans.fit_predict(embeddings)
        inertia_values.append(kmeans.inertia_)
        metrics = _evaluate_cluster_metrics(embeddings, labels)
        silhouette_values.append(metrics["silhouette"])
        calinski_values.append(metrics["calinski_harabasz"])
        davies_values.append(metrics["davies_bouldin"])

    metrics_df = pd.DataFrame(
        {
            "k": ks,
            "inertia": inertia_values,
            "silhouette": silhouette_values,
            "calinski_harabasz": calinski_values,
            "davies_bouldin": davies_values,
        }
    )
    _save_table(metrics_df, "arrests_cluster_metrics.csv")

    elbow_k = _select_k_via_elbow(inertia_values, ks)
    logging.info("Elbow-selected k: %s", elbow_k)

    bic_scores = []
    for k in ks:
        gmm = GaussianMixture(n_components=k, random_state=RANDOM_STATE, n_init=10)
        gmm.fit(embeddings)
        bic_scores.append(gmm.bic(embeddings))
    bic_df = pd.DataFrame({"k": ks, "bic": bic_scores})
    _save_table(bic_df, "arrests_bic_scores.csv")
    gmm_k = int(bic_df.sort_values("bic").iloc[0]["k"])

    kmeans_final = KMeans(n_clusters=elbow_k, n_init=20, random_state=RANDOM_STATE).fit(embeddings)
    gmm_final = GaussianMixture(n_components=gmm_k, random_state=RANDOM_STATE, n_init=10).fit(embeddings)

    kmeans_labels = kmeans_final.predict(embeddings)
    gmm_labels = gmm_final.predict(embeddings)
    gmm_probabilities = gmm_final.predict_proba(embeddings).max(axis=1)

    kmeans_metrics = _evaluate_cluster_metrics(embeddings, kmeans_labels)
    gmm_metrics = _evaluate_cluster_metrics(embeddings, gmm_labels)
    silhouette_scores = {
        "kmeans": kmeans_metrics.get("silhouette", float("nan")),
        "gmm": gmm_metrics.get("silhouette", float("nan")),
    }
    _save_metrics(
        {
            "silhouette": silhouette_scores,
            "elbow_k": elbow_k,
            "gmm_k": gmm_k,
            "kmeans_metrics": kmeans_metrics,
            "gmm_metrics": gmm_metrics,
        },
        "arrests_clustering.json",
    )

    cluster_table = df[["State"]].copy()
    cluster_table["kmeans_cluster"] = kmeans_labels
    cluster_table["gmm_cluster"] = gmm_labels
    cluster_table["gmm_probability"] = gmm_probabilities
    _save_table(cluster_table, "arrests_clusters.csv")

    fig_elbow, ax_elbow = plt.subplots(figsize=(6, 4))
    ax_elbow.plot(ks, inertia_values, marker="o")
    ax_elbow.axvline(elbow_k, color="red", linestyle="--", label=f"Elbow k={elbow_k}")
    ax_elbow.set_title("K-Means Elbow Plot")
    ax_elbow.set_xlabel("Number of clusters (k)")
    ax_elbow.set_ylabel("WCSS")
    ax_elbow.legend()
    _save_plot(fig_elbow, "arrests_elbow.png")

    fig_bic, ax_bic = plt.subplots(figsize=(6, 4))
    ax_bic.plot(ks, bic_scores, marker="o")
    ax_bic.axvline(gmm_k, color="red", linestyle="--", label=f"BIC k={gmm_k}")
    ax_bic.set_title("GMM BIC Scores")
    ax_bic.set_xlabel("Number of components (k)")
    ax_bic.set_ylabel("BIC")
    ax_bic.legend()
    _save_plot(fig_bic, "arrests_bic.png")

    fig_clusters, axes = plt.subplots(1, 2, figsize=(12, 5), sharex=True, sharey=True)
    scatter_kwargs = dict(cmap="viridis", edgecolor="k")
    axes[0].scatter(embeddings[:, 0], embeddings[:, 1], c=kmeans_labels, **scatter_kwargs)
    axes[0].set_title(f"K-Means Clusters (k={elbow_k})")
    axes[0].set_xlabel("PC1")
    axes[0].set_ylabel("PC2")
    axes[1].scatter(embeddings[:, 0], embeddings[:, 1], c=gmm_labels, **scatter_kwargs)
    axes[1].set_title(f"GMM Clusters (k={gmm_k})")
    axes[1].set_xlabel("PC1")
    _save_plot(fig_clusters, "arrests_cluster_scatter.png")

    return ClusteringResult(
        kmeans_labels=kmeans_labels,
        gmm_labels=gmm_labels,
        gmm_probabilities=gmm_probabilities,
        silhouette_scores=silhouette_scores,
        metric_table=metrics_df,
    )


def main() -> None:
    """Entry point for executing the workflow end-to-end."""

    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    np.random.seed(RANDOM_STATE)

    _ensure_directories(ARTIFACT_DIR, PLOT_DIR, TABLE_DIR, METRIC_DIR)

    wine_path = _resolve_dataset_path(
        "/Users/karlkurzius/Downloads/wine_data.csv",
        "/mnt/data/wine_data.csv",
    )
    chick_path = _resolve_dataset_path(
        "/Users/karlkurzius/Downloads/chickwts_data.csv",
        "/mnt/data/chickwts_data.csv",
    )
    arrest_path = _resolve_dataset_path(
        "/Users/karlkurzius/Downloads/arrest_data.csv",
        "/mnt/data/arrests_data.csv",
    )

    wine_df = clean_dataframe(pd.read_csv(wine_path))
    chick_df = clean_dataframe(pd.read_csv(chick_path))
    arrest_df = clean_dataframe(pd.read_csv(arrest_path).rename(columns={"Unnamed: 0": "State"}))

    summaries = {
        "wine": summarize_dataframe("wine", wine_df),
        "chickwts": summarize_dataframe("chickwts", chick_df),
        "us_arrests": summarize_dataframe("us_arrests", arrest_df),
    }
    _save_metrics(summaries, "dataset_summaries.json")

    wine_result = run_wine_classification(wine_df)
    logging.info("Wine accuracy: %.3f", wine_result.accuracy)
    logging.info("Wine best params: %s", wine_result.best_params)

    feed_result = run_feed_recommendations(chick_df)
    logging.info("Feed cosine similarity matrix saved with shape %s", feed_result.cosine_similarity.shape)

    arrests_result = run_arrests_clustering(arrest_df)
    logging.info("KMeans silhouette: %.3f", arrests_result.silhouette_scores["kmeans"])
    logging.info("GMM silhouette: %.3f", arrests_result.silhouette_scores["gmm"])


if __name__ == "__main__":
    main()