# Learning Analytics Platform - Student Performance Analysis

This notebook demonstrates a complete learning analytics pipeline using AWS services:
- Generate and upload student activity data to S3
- Trigger Lambda functions for analytics processing
- Query results from DynamoDB
- Run SQL queries with Athena
- Visualize learning insights and identify at-risk students

**Duration:** 60-90 minutes  
**Cost:** $6-11

## Setup and Configuration

In [None]:
# Install required packages (if not already installed)
!pip install -q boto3 pandas numpy scipy matplotlib seaborn python-dotenv tabulate

In [None]:
import json
import os
import sys
from pathlib import Path

import boto3
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from dotenv import load_dotenv

# Configure plotting
plt.style.use("seaborn-v0_8-darkgrid")
sns.set_palette("husl")
%matplotlib inline

# Load environment variables
load_dotenv()

print("✓ Imports successful")

In [None]:
# AWS Configuration
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET = os.getenv("S3_BUCKET_NAME", "learning-analytics-your-user-id")
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE", "StudentAnalytics")
LAMBDA_FUNCTION = os.getenv("LAMBDA_FUNCTION_NAME", "analyze-student-performance")

# Initialize AWS clients
s3_client = boto3.client("s3", region_name=AWS_REGION)
lambda_client = boto3.client("lambda", region_name=AWS_REGION)
dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION)
athena_client = boto3.client("athena", region_name=AWS_REGION)

# DynamoDB table
table = dynamodb.Table(DYNAMODB_TABLE)

print("✓ AWS Configuration:")
print(f"  Region: {AWS_REGION}")
print(f"  S3 Bucket: {S3_BUCKET}")
print(f"  DynamoDB Table: {DYNAMODB_TABLE}")
print(f"  Lambda Function: {LAMBDA_FUNCTION}")

## Step 1: Generate Sample Student Data

We'll generate synthetic student activity data including:
- Quiz scores
- Assignment submissions
- Engagement metrics (time on task, resource views)
- Multiple courses and assessment types

In [None]:
# Add scripts directory to path
sys.path.insert(0, "../scripts")
from upload_to_s3 import generate_sample_data

# Generate sample data
NUM_STUDENTS = 500
NUM_COURSES = 3
ASSESSMENTS_PER_COURSE = 10

print(f"Generating sample data for {NUM_STUDENTS} students...")
df = generate_sample_data(
    num_students=NUM_STUDENTS,
    num_courses=NUM_COURSES,
    assessments_per_course=ASSESSMENTS_PER_COURSE,
)

print(f"\n✓ Generated {len(df)} activity records")
print("\nSample data:")
df.head(10)

In [None]:
# Data overview
print("Dataset Overview:")
print(f"  Total records: {len(df):,}")
print(f"  Students: {df['student_id'].nunique()}")
print(f"  Courses: {df['course_id'].nunique()}")
print(f"  Assessment types: {df['assessment_type'].unique()}")
print(f"  Date range: {df['submission_date'].min()} to {df['submission_date'].max()}")
print(f"\nSubmission rate: {df['submitted'].mean() * 100:.1f}%")
print(f"Average score (submitted): {df[df['submitted']]['score'].mean():.2f}")

## Step 2: Upload Data to S3

Upload student data to S3, which will trigger Lambda processing.

In [None]:
# Save data locally (split by course for realistic scenario)
output_dir = Path("../generated_data")
output_dir.mkdir(exist_ok=True)

uploaded_files = []

for course_id in df["course_id"].unique():
    course_df = df[df["course_id"] == course_id]

    # Remove original_id (keep only anonymized)
    if "original_id" in course_df.columns:
        course_df = course_df.drop(columns=["original_id"])

    # Save to CSV
    filename = f"student_data_{course_id}.csv"
    filepath = output_dir / filename
    course_df.to_csv(filepath, index=False)
    uploaded_files.append(filepath)
    print(f"✓ Saved {filename} ({len(course_df)} records)")

In [None]:
# Upload to S3
from upload_to_s3 import S3DataUploader

uploader = S3DataUploader(S3_BUCKET, region=AWS_REGION)

print(f"Uploading files to s3://{S3_BUCKET}/raw-data/")
results = uploader.upload_directory(str(output_dir), s3_prefix="raw-data/")

print("\n✓ Upload complete:")
print(f"  Successful: {results['successful']}")
print(f"  Failed: {results['failed']}")
print(f"  Total records: {results['total_records']:,}")

## Step 3: Trigger Lambda Processing

If S3 event triggers are configured, Lambda will run automatically. Otherwise, manually invoke Lambda.

In [None]:
# Manual Lambda invocation (if S3 trigger not configured)
def invoke_lambda_for_file(s3_key: str):
    """Manually invoke Lambda for a specific S3 file."""
    event = {"Records": [{"s3": {"bucket": {"name": S3_BUCKET}, "object": {"key": s3_key}}}]}

    response = lambda_client.invoke(
        FunctionName=LAMBDA_FUNCTION, InvocationType="RequestResponse", Payload=json.dumps(event)
    )

    result = json.loads(response["Payload"].read())
    return result


# Invoke for each uploaded file
print("Invoking Lambda for uploaded files...\n")
for course_id in df["course_id"].unique():
    s3_key = f"raw-data/student_data_{course_id}.csv"
    print(f"Processing {s3_key}...")
    result = invoke_lambda_for_file(s3_key)

    if result.get("statusCode") == 200:
        body = json.loads(result.get("body", "{}"))
        print(f"  ✓ Success: {body.get('students_processed', 0)} students processed")
    else:
        print(f"  ✗ Error: {result}")
    print()

print("✓ Lambda processing complete")

## Step 4: Query Results from DynamoDB

Retrieve student analytics from DynamoDB.

In [None]:
# Wait a moment for DynamoDB writes to complete
import time

print("Waiting 5 seconds for processing...")
time.sleep(5)

# Scan DynamoDB table
response = table.scan()
items = response.get("Items", [])

# Handle pagination
while "LastEvaluatedKey" in response:
    response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
    items.extend(response.get("Items", []))

print(f"✓ Retrieved {len(items)} student records from DynamoDB")

# Convert to DataFrame
students_df = pd.DataFrame(items)
students_df.head()

In [None]:
# Class-level statistics
print("CLASS STATISTICS")
print("=" * 60)
print(f"Total Students:      {len(students_df)}")
print(
    f"Average Grade:       {students_df['avg_grade'].mean():.2f} ± {students_df['avg_grade'].std():.2f}"
)
print(f"Median Grade:        {students_df['avg_grade'].median():.2f}")
print(f"Completion Rate:     {students_df['completion_rate'].mean():.2f}%")
print(f"Engagement Score:    {students_df['engagement_score'].mean():.2f}")
print(f"Mastery Level:       {students_df['mastery_level'].mean():.2f}%")

# Risk distribution
print("\nRisk Distribution:")
risk_counts = students_df["risk_level"].value_counts()
for level in ["high", "medium", "low", "none"]:
    count = risk_counts.get(level, 0)
    pct = count / len(students_df) * 100 if len(students_df) > 0 else 0
    print(f"  {level:10s}: {count:3d} ({pct:5.1f}%)")

## Step 5: Visualize Learning Analytics

Create dashboards and visualizations for educational insights.

In [None]:
# Grade distribution
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. Grade distribution histogram
axes[0, 0].hist(students_df["avg_grade"], bins=30, edgecolor="black", alpha=0.7)
axes[0, 0].axvline(
    students_df["avg_grade"].mean(),
    color="red",
    linestyle="--",
    label=f"Mean: {students_df['avg_grade'].mean():.1f}",
)
axes[0, 0].axvline(
    students_df["avg_grade"].median(),
    color="green",
    linestyle="--",
    label=f"Median: {students_df['avg_grade'].median():.1f}",
)
axes[0, 0].set_xlabel("Average Grade")
axes[0, 0].set_ylabel("Number of Students")
axes[0, 0].set_title("Grade Distribution")
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 2. Risk level distribution
risk_order = ["none", "low", "medium", "high"]
risk_colors = {"none": "green", "low": "yellow", "medium": "orange", "high": "red"}
risk_data = students_df["risk_level"].value_counts().reindex(risk_order, fill_value=0)
axes[0, 1].bar(
    risk_data.index, risk_data.values, color=[risk_colors.get(x, "gray") for x in risk_data.index]
)
axes[0, 1].set_xlabel("Risk Level")
axes[0, 1].set_ylabel("Number of Students")
axes[0, 1].set_title("Student Risk Levels")
axes[0, 1].grid(True, alpha=0.3)

# 3. Engagement vs Grade scatter
scatter = axes[1, 0].scatter(
    students_df["engagement_score"],
    students_df["avg_grade"],
    c=students_df["completion_rate"],
    cmap="viridis",
    alpha=0.6,
)
axes[1, 0].set_xlabel("Engagement Score")
axes[1, 0].set_ylabel("Average Grade")
axes[1, 0].set_title("Engagement vs Performance")
axes[1, 0].grid(True, alpha=0.3)
plt.colorbar(scatter, ax=axes[1, 0], label="Completion Rate")

# 4. Mastery level distribution
axes[1, 1].hist(students_df["mastery_level"], bins=20, edgecolor="black", alpha=0.7)
axes[1, 1].axvline(
    students_df["mastery_level"].mean(),
    color="red",
    linestyle="--",
    label=f"Mean: {students_df['mastery_level'].mean():.1f}%",
)
axes[1, 1].set_xlabel("Mastery Level (%)")
axes[1, 1].set_ylabel("Number of Students")
axes[1, 1].set_title("Mastery Learning Distribution")
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig("learning_analytics_dashboard.png", dpi=300, bbox_inches="tight")
plt.show()

print("✓ Dashboard saved as 'learning_analytics_dashboard.png'")

In [None]:
# Correlation heatmap
plt.figure(figsize=(10, 8))

# Select numeric columns
numeric_cols = [
    "avg_grade",
    "completion_rate",
    "engagement_score",
    "mastery_level",
    "grade_trend",
    "total_time_minutes",
]
correlation_matrix = students_df[numeric_cols].corr()

sns.heatmap(
    correlation_matrix, annot=True, fmt=".2f", cmap="coolwarm", center=0, square=True, linewidths=1
)
plt.title("Learning Metrics Correlation Matrix")
plt.tight_layout()
plt.savefig("correlation_heatmap.png", dpi=300, bbox_inches="tight")
plt.show()

print("✓ Correlation heatmap saved as 'correlation_heatmap.png'")

## Step 6: Identify At-Risk Students

Generate actionable reports for educators.

In [None]:
# High-risk students
high_risk = students_df[students_df["risk_level"] == "high"].sort_values("avg_grade")

print("HIGH-RISK STUDENTS REPORT")
print("=" * 80)
print(f"Total high-risk students: {len(high_risk)}\n")

if len(high_risk) > 0:
    display_cols = [
        "student_id",
        "course_id",
        "avg_grade",
        "completion_rate",
        "engagement_score",
        "risk_factors",
    ]
    print(high_risk[display_cols].head(10).to_string(index=False))

    # Common risk factors
    all_risk_factors = []
    for factors in high_risk["risk_factors"]:
        if isinstance(factors, str):
            all_risk_factors.extend(
                json.loads(factors) if factors.startswith("[") else factors.split(",")
            )

    if all_risk_factors:
        print("\nMost Common Risk Factors:")
        from collections import Counter

        factor_counts = Counter(all_risk_factors)
        for factor, count in factor_counts.most_common(5):
            print(f"  {factor}: {count} students ({count / len(high_risk) * 100:.1f}%)")
else:
    print("No high-risk students identified!")

In [None]:
# Intervention recommendations
def recommend_intervention(row):
    """Generate intervention recommendations based on student profile."""
    recommendations = []

    if row["avg_grade"] < 60:
        recommendations.append("Urgent academic support needed")
    elif row["avg_grade"] < 70:
        recommendations.append("Schedule tutoring session")

    if row["completion_rate"] < 70:
        recommendations.append("Address assignment completion issues")

    if row["engagement_score"] < 50:
        recommendations.append("Increase student engagement")

    if row["grade_trend"] < -2:
        recommendations.append("Monitor declining performance")

    return "; ".join(recommendations) if recommendations else "Continue current support"


# Apply to at-risk students
at_risk = students_df[students_df["risk_level"].isin(["high", "medium"])].copy()
at_risk["recommendations"] = at_risk.apply(recommend_intervention, axis=1)

print("\nINTERVENTION RECOMMENDATIONS")
print("=" * 80)
print(f"Students needing intervention: {len(at_risk)}\n")

for _idx, row in at_risk.head(10).iterrows():
    print(f"Student: {row['student_id'][:12]}... | Course: {row['course_id']}")
    print(
        f"  Grade: {row['avg_grade']:.1f} | Completion: {row['completion_rate']:.1f}% | Engagement: {row['engagement_score']:.1f}"
    )
    print(f"  → {row['recommendations']}")
    print()

## Step 7: Query with Athena (Optional)

Run SQL queries on processed data in S3.

In [None]:
# Example Athena query
ATHENA_DATABASE = "learning_analytics"
ATHENA_OUTPUT = f"s3://{S3_BUCKET}/athena-results/"


def run_athena_query(query: str, max_wait: int = 30):
    """Execute Athena query and return results."""
    # Start query execution
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": ATHENA_DATABASE},
        ResultConfiguration={"OutputLocation": ATHENA_OUTPUT},
    )

    query_id = response["QueryExecutionId"]

    # Wait for completion
    import time

    for _ in range(max_wait):
        status = athena_client.get_query_execution(QueryExecutionId=query_id)
        state = status["QueryExecution"]["Status"]["State"]

        if state == "SUCCEEDED":
            # Get results
            results = athena_client.get_query_results(QueryExecutionId=query_id)

            # Convert to DataFrame
            rows = results["ResultSet"]["Rows"]
            if len(rows) > 1:
                headers = [col["VarCharValue"] for col in rows[0]["Data"]]
                data = [[col.get("VarCharValue", "") for col in row["Data"]] for row in rows[1:]]
                return pd.DataFrame(data, columns=headers)
            return pd.DataFrame()

        elif state in ["FAILED", "CANCELLED"]:
            raise Exception(
                f"Query {state}: {status['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')}"
            )

        time.sleep(1)

    raise TimeoutError("Query timed out")


# Example query: Average grade by course
query = """
SELECT
    course_id,
    COUNT(*) as student_count,
    AVG(avg_grade) as avg_grade,
    AVG(completion_rate) as avg_completion,
    AVG(engagement_score) as avg_engagement
FROM student_metrics
GROUP BY course_id
ORDER BY avg_grade DESC
"""

try:
    print("Running Athena query...")
    results = run_athena_query(query)
    print("\nCOURSE-LEVEL STATISTICS (from Athena)")
    print("=" * 80)
    print(results.to_string(index=False))
except Exception as e:
    print(f"Note: Athena query skipped or failed: {e}")
    print("This is optional - you may need to configure Athena first (see setup_guide.md)")

## Step 8: Export Reports

Save analysis results for sharing with educators.

In [None]:
# Export at-risk students report
at_risk_report = at_risk[
    [
        "student_id",
        "course_id",
        "avg_grade",
        "completion_rate",
        "engagement_score",
        "risk_level",
        "recommendations",
    ]
].copy()

at_risk_report.to_csv("at_risk_students_report.csv", index=False)
print("✓ At-risk students report saved: 'at_risk_students_report.csv'")

# Export class statistics
class_stats = (
    students_df.groupby("course_id")
    .agg(
        {
            "student_id": "count",
            "avg_grade": ["mean", "median", "std"],
            "completion_rate": "mean",
            "engagement_score": "mean",
            "mastery_level": "mean",
        }
    )
    .round(2)
)

class_stats.to_csv("class_statistics.csv")
print("✓ Class statistics saved: 'class_statistics.csv'")

print("\n✓ Analysis complete! Reports ready for review.")

## Summary and Cost Estimate

Review what we accomplished and estimate AWS costs.

In [None]:
print("LEARNING ANALYTICS PIPELINE SUMMARY")
print("=" * 80)
print("\n1. Data Generation:")
print(f"   - Students: {NUM_STUDENTS}")
print(f"   - Courses: {NUM_COURSES}")
print(f"   - Activity records: {len(df):,}")

print("\n2. AWS Processing:")
print(f"   - Files uploaded to S3: {results['successful']}")
print(f"   - Lambda invocations: {NUM_COURSES}")
print(f"   - DynamoDB records: {len(students_df)}")

print("\n3. Analytics Results:")
print(f"   - Average grade: {students_df['avg_grade'].mean():.2f}")
print(f"   - Completion rate: {students_df['completion_rate'].mean():.2f}%")
print(f"   - High-risk students: {len(high_risk)}")
print(f"   - Students needing intervention: {len(at_risk)}")

print("\n4. Estimated AWS Costs:")
data_size_mb = len(df) * 0.001  # Rough estimate
s3_cost = data_size_mb * 0.023 / 1024  # $0.023 per GB per month
lambda_cost = NUM_COURSES * 0.0000002 * 20  # 20 seconds per invocation
dynamodb_cost = len(students_df) * 0.00000125  # On-demand write cost
total_cost = s3_cost + lambda_cost + dynamodb_cost

print(f"   - S3 storage: ${s3_cost:.4f}")
print(f"   - Lambda compute: ${lambda_cost:.4f}")
print(f"   - DynamoDB writes: ${dynamodb_cost:.4f}")
print(f"   - Total (for this run): ${total_cost:.4f}")
print("\n   Note: Actual costs may vary. Monitor AWS Cost Explorer.")

print("\n5. Next Steps:")
print("   - Review at-risk students report")
print("   - Implement recommended interventions")
print("   - Run cleanup script to delete AWS resources")
print("   - See cleanup_guide.md for instructions")

print("\n✓ Learning analytics pipeline complete!")