# Gold Layer - Analytics & Clustering

This notebook creates business-ready aggregates and demonstrates K-Means clustering for student segmentation.

**Key Concepts:**
- Gold layer aggregations
- Complex SQL analytics
- K-Means clustering for student segmentation
- MLflow experiment tracking (optional, not available in Free Edition)

---

**Prerequisites:** Run `02_silver_layer` first to create the silver tables.

> **Note**: MLflow is not available in Databricks Free Edition. The clustering model training works without MLflow tracking. If you're using a paid edition, MLflow tracking will be enabled automatically.

## Setup

In [None]:
# Type stub for Databricks runtime variable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from pyspark.sql import SparkSession  # pyright: ignore[reportMissingImports]
    spark: SparkSession  # Provided by Databricks runtime

from pyspark.sql import functions as F  # type: ignore[reportMissingImports]  # Databricks-only module
from pyspark.sql.window import Window  # type: ignore[reportMissingImports]  # Databricks-only module

# Try to import MLflow (not available in Free Edition)
try:
    import mlflow  # type: ignore[reportMissingImports]  # Databricks-only module
    import mlflow.spark  # type: ignore[reportMissingImports]  # Databricks-only module
    MLFLOW_AVAILABLE = True
except ImportError:
    MLFLOW_AVAILABLE = False
    print("[INFO] MLflow not available (Free Edition). Clustering will work without MLflow tracking.")

# Configuration
SILVER_DB = "babblr_silver"
GOLD_DB = "babblr_gold"

# Create gold database
spark.sql(f"CREATE DATABASE IF NOT EXISTS {GOLD_DB}")  # Provided by Databricks runtime
print(f"Gold database: {GOLD_DB}")

## 1. Daily Learning Metrics

Aggregate daily metrics for dashboard KPIs.

In [None]:
%%sql
CREATE OR REPLACE TABLE babblr_gold.daily_metrics AS
SELECT
    DATE(created_at) as activity_date,
    language,
    COUNT(DISTINCT user_id) as active_users,
    COUNT(*) as conversations,
    SUM(message_count) as total_messages,
    ROUND(AVG(duration_minutes), 1) as avg_session_minutes,
    ROUND(AVG(error_rate), 3) as avg_error_rate
FROM babblr_silver.conversations
GROUP BY DATE(created_at), language
ORDER BY activity_date, language

In [None]:
%%sql
-- Preview daily metrics
SELECT * FROM babblr_gold.daily_metrics
ORDER BY activity_date DESC
LIMIT 10

## 2. Lesson Effectiveness Scores

Calculate which lessons lead to the best learning outcomes.

In [None]:
%%sql
CREATE OR REPLACE TABLE babblr_gold.lesson_effectiveness AS
WITH lesson_stats AS (
    SELECT
        lesson_id,
        lesson_type,
        subject,
        lesson_difficulty,
        COUNT(*) as total_attempts,
        SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completions,
        AVG(CASE WHEN status = 'completed' THEN mastery_score END) as avg_mastery,
        AVG(time_to_complete_hours) as avg_completion_time
    FROM babblr_silver.lesson_progress
    GROUP BY lesson_id, lesson_type, subject, lesson_difficulty
)
SELECT
    *,
    ROUND(completions * 1.0 / total_attempts, 3) as completion_rate,
    -- Effectiveness score: weighted combination of completion rate and mastery
    ROUND(
        (completions * 1.0 / total_attempts) * 0.4 +
        COALESCE(avg_mastery, 0) * 0.6,
        3
    ) as effectiveness_score
FROM lesson_stats
WHERE total_attempts >= 3

In [None]:
%%sql
-- Top 10 most effective lessons
SELECT
    lesson_type,
    subject,
    lesson_difficulty,
    total_attempts,
    completion_rate,
    ROUND(avg_mastery, 3) as avg_mastery,
    effectiveness_score
FROM babblr_gold.lesson_effectiveness
ORDER BY effectiveness_score DESC
LIMIT 10

## 3. CEFR Level Progression Funnel

Track how students progress through CEFR levels.

In [None]:
%%sql
CREATE OR REPLACE TABLE babblr_gold.cefr_funnel AS
SELECT
    language,
    recommended_level as cefr_level,
    COUNT(DISTINCT user_id) as users_at_level,
    ROUND(AVG(score), 1) as avg_assessment_score,
    COUNT(*) as total_assessments
FROM babblr_silver.assessment_attempts
GROUP BY language, recommended_level
ORDER BY language,
    CASE recommended_level
        WHEN 'A1' THEN 1
        WHEN 'A2' THEN 2
        WHEN 'B1' THEN 3
        WHEN 'B2' THEN 4
        WHEN 'C1' THEN 5
        WHEN 'C2' THEN 6
    END

In [None]:
%%sql
-- CEFR funnel visualization data
SELECT * FROM babblr_gold.cefr_funnel
WHERE language = 'spanish'

## 4. Topic Engagement Analysis

In [None]:
%%sql
CREATE OR REPLACE TABLE babblr_gold.topic_engagement AS
SELECT
    topic_id,
    language,
    COUNT(*) as conversation_count,
    COUNT(DISTINCT user_id) as unique_users,
    ROUND(AVG(message_count), 1) as avg_messages_per_conv,
    ROUND(AVG(duration_minutes), 1) as avg_duration_min,
    ROUND(AVG(error_rate), 3) as avg_error_rate
FROM babblr_silver.conversations
WHERE topic_id IS NOT NULL
GROUP BY topic_id, language

In [None]:
%%sql
-- Most engaging topics (by session duration)
SELECT
    topic_id,
    SUM(conversation_count) as total_conversations,
    ROUND(AVG(avg_duration_min), 1) as avg_duration,
    ROUND(AVG(avg_messages_per_conv), 1) as avg_messages
FROM babblr_gold.topic_engagement
GROUP BY topic_id
ORDER BY avg_duration DESC
LIMIT 10

## 5. Student Clustering

Demonstrate K-Means clustering to segment students by learning patterns. MLflow tracking is optional (not available in Free Edition).

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler  # type: ignore[reportMissingImports]  # Databricks-only module
from pyspark.ml.clustering import KMeans  # type: ignore[reportMissingImports]  # Databricks-only module
from pyspark.ml import Pipeline  # type: ignore[reportMissingImports]  # Databricks-only module

# Prepare features for clustering
user_features = spark.sql("""  # type: ignore[reportUndefinedVariable]  # Provided by Databricks runtime
    SELECT
        up.user_id,
        up.total_assessments,
        up.avg_score,
        COALESCE(up.avg_improvement, 0) as avg_improvement,
        up.proficiency_score,
        COALESCE(conv.total_conversations, 0) as total_conversations,
        COALESCE(conv.avg_error_rate, 0) as avg_error_rate,
        COALESCE(lp.completed_lessons, 0) as completed_lessons,
        COALESCE(lp.avg_mastery, 0) as avg_mastery
    FROM babblr_silver.user_profiles up
    LEFT JOIN (
        SELECT user_id, COUNT(*) as total_conversations, AVG(error_rate) as avg_error_rate
        FROM babblr_silver.conversations
        GROUP BY user_id
    ) conv ON up.user_id = conv.user_id
    LEFT JOIN (
        SELECT user_id, COUNT(*) as completed_lessons, AVG(mastery_score) as avg_mastery
        FROM babblr_silver.lesson_progress
        WHERE status = 'completed'
        GROUP BY user_id
    ) lp ON up.user_id = lp.user_id
""")

print(f"Users for clustering: {user_features.count()}")
user_features.show(5)

### MLflow Experiment Setup (Optional - Not Available in Free Edition)

In [None]:
# Set experiment name (only if MLflow is available)
if MLFLOW_AVAILABLE:
    experiment_name = "/Shared/babblr_student_segmentation"
    mlflow.set_experiment(experiment_name)
    print(f"MLflow experiment: {experiment_name}")
else:
    print("[INFO] MLflow not available - skipping experiment setup. Clustering will work without tracking.")

### Train Clustering Model (with Optional MLflow Tracking)

In [None]:
# Feature columns for clustering
feature_cols = [
    "total_assessments", "avg_score", "avg_improvement",
    "total_conversations", "avg_error_rate", "completed_lessons", "avg_mastery"
]

# Build pipeline
k_clusters = 4
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
kmeans = KMeans(k=k_clusters, seed=42, featuresCol="features", predictionCol="cluster")

pipeline = Pipeline(stages=[assembler, scaler, kmeans])

# Train model (with optional MLflow tracking)
if MLFLOW_AVAILABLE:
    # Start MLflow run
    with mlflow.start_run(run_name="student_clustering_v1"):
        # Log parameters
        mlflow.log_param("k_clusters", k_clusters)
        mlflow.log_param("features", feature_cols)
        mlflow.log_param("n_users", user_features.count())

        # Fit model
        model = pipeline.fit(user_features)

        # Get predictions
        predictions = model.transform(user_features)

        # Calculate cluster metrics
        cluster_stats = predictions.groupBy("cluster").agg(
            F.count("*").alias("user_count"),
            F.round(F.avg("avg_score"), 1).alias("avg_score"),
            F.round(F.avg("total_conversations"), 1).alias("avg_conversations"),
            F.round(F.avg("completed_lessons"), 1).alias("avg_completed_lessons")
        ).orderBy("cluster")

        # Log metrics
        kmeans_model = model.stages[-1]
        mlflow.log_metric("inertia", kmeans_model.summary.trainingCost)

        for row in cluster_stats.collect():
            mlflow.log_metric(f"cluster_{row['cluster']}_size", row["user_count"])
            mlflow.log_metric(f"cluster_{row['cluster']}_avg_score", row["avg_score"])

        # Log model
        mlflow.spark.log_model(model, "student_clustering_model")

        print("Model training complete with MLflow tracking!")
        print(f"Inertia (within-cluster sum of squares): {kmeans_model.summary.trainingCost:.2f}")
else:
    # Fit model without MLflow
    model = pipeline.fit(user_features)
    predictions = model.transform(user_features)
    
    # Calculate cluster metrics
    cluster_stats = predictions.groupBy("cluster").agg(
        F.count("*").alias("user_count"),
        F.round(F.avg("avg_score"), 1).alias("avg_score"),
        F.round(F.avg("total_conversations"), 1).alias("avg_conversations"),
        F.round(F.avg("completed_lessons"), 1).alias("avg_completed_lessons")
    ).orderBy("cluster")
    
    kmeans_model = model.stages[-1]
    print("Model training complete (without MLflow tracking)!")
    print(f"Inertia (within-cluster sum of squares): {kmeans_model.summary.trainingCost:.2f}")

### Cluster Analysis

In [None]:
# Show cluster statistics
cluster_stats.show()

In [None]:
# Analyze cluster characteristics
cluster_analysis = predictions.groupBy("cluster").agg(
    F.count("*").alias("users"),
    F.round(F.avg("avg_score"), 1).alias("avg_assessment_score"),
    F.round(F.avg("avg_error_rate"), 3).alias("avg_error_rate"),
    F.round(F.avg("completed_lessons"), 1).alias("avg_lessons_completed"),
    F.round(F.avg("avg_mastery"), 3).alias("avg_mastery_score")
).orderBy("cluster")

display(cluster_analysis)

### Interpret Clusters

Based on the cluster characteristics, we can label them:
- **Cluster 0**: High performers (high scores, many completed lessons)
- **Cluster 1**: Active learners (many conversations, moderate scores)
- **Cluster 2**: Struggling students (high error rates, lower scores)
- **Cluster 3**: Casual users (low engagement, few assessments)

In [None]:
# Save cluster assignments to gold layer
cluster_assignments = predictions.select("user_id", "cluster")
cluster_assignments.write.format("delta").mode("overwrite").saveAsTable(f"{GOLD_DB}.user_clusters")

print(f"Saved {cluster_assignments.count()} user cluster assignments")

## 6. Gold Layer Summary

In [None]:
%%sql
-- Summary of gold layer tables
SELECT 'daily_metrics' as table_name, COUNT(*) as row_count FROM babblr_gold.daily_metrics
UNION ALL
SELECT 'lesson_effectiveness', COUNT(*) FROM babblr_gold.lesson_effectiveness
UNION ALL
SELECT 'cefr_funnel', COUNT(*) FROM babblr_gold.cefr_funnel
UNION ALL
SELECT 'topic_engagement', COUNT(*) FROM babblr_gold.topic_engagement
UNION ALL
SELECT 'user_clusters', COUNT(*) FROM babblr_gold.user_clusters

## Summary

In this notebook we:
1. Created daily metrics aggregates
2. Calculated lesson effectiveness scores
3. Built CEFR progression funnel
4. Analyzed topic engagement
5. **Trained K-Means clustering** (with optional MLflow tracking in paid editions)

**Note:** MLflow is not available in Free Edition. The clustering model works without MLflow tracking. In paid editions, MLflow provides:
- Parameter logging
- Metric tracking
- Model versioning
- Reproducible experiments

**Next:** Run `04_dashboard` for visualizations.