# 📊 Lightweight Data Analysis - Diabetes Prediction
### Alternatif Ringan untuk Hive menggunakan PySpark dan Jupyter

Notebook ini memberikan alternatif yang lebih ringan dibandingkan Hive untuk:
- Query data di HDFS
- Analisis data interaktif
- Visualisasi sederhana
- Export metrics untuk Grafana

In [None]:
# Setup PySpark dengan konfigurasi ringan
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
from datetime import datetime

# Konfigurasi Spark yang ringan
spark = SparkSession.builder \
    .appName("DiabetesAnalysisNotebook") \
    .config("spark.executor.memory", "512m") \
    .config("spark.driver.memory", "256m") \
    .config("spark.executor.cores", "1") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session initialized with lightweight configuration")

In [None]:
# Load data dari HDFS (Bronze layer)
try:
    df = spark.read.option("header", "true").option("inferSchema", "true") \
        .csv("hdfs://namenode:9000/data/bronze/diabetes.csv")
    
    print(f"✅ Data loaded successfully: {df.count()} records")
    
    # Show schema
    print("\n📋 Data Schema:")
    df.printSchema()
    
    # Show sample data
    print("\n📊 Sample Data:")
    df.show(5)
    
except Exception as e:
    print(f"❌ Error loading data: {e}")
    # Fallback to local data
    print("🔄 Trying to load from local path...")
    df = spark.read.option("header", "true").option("inferSchema", "true") \
        .csv("/opt/data/diabetes.csv")

In [None]:
# Buat temporary view untuk SQL queries (Alternatif Hive Tables)
df.createOrReplaceTempView("diabetes")
print("✅ Temporary view 'diabetes' created")

# Basic statistics menggunakan SQL
basic_stats = spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) as diabetic_count,
        SUM(CASE WHEN Outcome = 0 THEN 1 ELSE 0 END) as non_diabetic_count,
        ROUND(AVG(Glucose), 2) as avg_glucose,
        ROUND(AVG(BMI), 2) as avg_bmi,
        ROUND(AVG(Age), 2) as avg_age,
        ROUND(AVG(BloodPressure), 2) as avg_blood_pressure
    FROM diabetes
""")

print("\n📈 Basic Statistics:")
basic_stats.show()

In [None]:
# Query analisis usia (seperti Hive table queries)
age_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN Age < 30 THEN '20-29'
            WHEN Age < 40 THEN '30-39'
            WHEN Age < 50 THEN '40-49'
            WHEN Age < 60 THEN '50-59'
            ELSE '60+'
        END as age_group,
        COUNT(*) as total_count,
        SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) as diabetic_count,
        ROUND(AVG(Glucose), 2) as avg_glucose,
        ROUND(AVG(BMI), 2) as avg_bmi
    FROM diabetes
    GROUP BY age_group
    ORDER BY age_group
""")

print("👥 Age Group Analysis:")
age_analysis.show()

In [None]:
# Visualisasi sederhana (tanpa beban berat)
# Convert ke Pandas untuk plotting ringan
age_data = age_analysis.toPandas()

# Plot distribusi usia
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.bar(age_data['age_group'], age_data['total_count'], color='lightblue', alpha=0.7)
plt.title('Distribution by Age Group')
plt.xlabel('Age Group')
plt.ylabel('Count')
plt.xticks(rotation=45)

plt.subplot(1, 2, 2)
plt.bar(age_data['age_group'], age_data['diabetic_count'], color='orange', alpha=0.7)
plt.title('Diabetic Cases by Age Group')
plt.xlabel('Age Group')
plt.ylabel('Diabetic Count')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

print("📊 Visualizations created successfully")

In [None]:
# Query lanjutan untuk korelasi (seperti advanced Hive queries)
correlation_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN Glucose < 100 THEN 'Normal (<100)'
            WHEN Glucose < 140 THEN 'Pre-diabetic (100-139)'
            ELSE 'Diabetic (140+)'
        END as glucose_category,
        CASE 
            WHEN BMI < 18.5 THEN 'Underweight'
            WHEN BMI < 25 THEN 'Normal'
            WHEN BMI < 30 THEN 'Overweight'
            ELSE 'Obese'
        END as bmi_category,
        COUNT(*) as count,
        SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) as diabetic_cases,
        ROUND((SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)), 2) as diabetes_rate
    FROM diabetes
    WHERE Glucose > 0 AND BMI > 0
    GROUP BY glucose_category, bmi_category
    ORDER BY diabetes_rate DESC
""")

print("🔍 Glucose-BMI Correlation Analysis:")
correlation_analysis.show(20)

In [None]:
# Export metrics untuk Grafana (seperti Hive export)
def export_metrics_for_grafana():
    # Collect data untuk export
    basic_stats_dict = basic_stats.collect()[0].asDict()
    age_distribution = [row.asDict() for row in age_analysis.collect()]
    correlation_data = [row.asDict() for row in correlation_analysis.collect()]
    
    # Prepare metrics JSON
    metrics = {
        "basic_stats": basic_stats_dict,
        "age_distribution": age_distribution,
        "correlation_analysis": correlation_data,
        "daily_metrics": {
            "records_processed": basic_stats_dict["total_records"],
            "avg_glucose": basic_stats_dict["avg_glucose"],
            "avg_bmi": basic_stats_dict["avg_bmi"],
            "positive_predictions": basic_stats_dict["diabetic_count"],
            "timestamp": datetime.now().isoformat(),
            "date": datetime.now().strftime('%Y-%m-%d')
        },
        "timestamp": datetime.now().isoformat()
    }
    
    # Save to JSON file
    output_path = "/tmp/diabetes_metrics.json"
    with open(output_path, 'w') as f:
        json.dump(metrics, f, indent=2, default=str)
    
    print(f"✅ Metrics exported to {output_path}")
    print("📊 Summary:")
    print(f"  - Total Records: {basic_stats_dict['total_records']}")
    print(f"  - Diabetic Cases: {basic_stats_dict['diabetic_count']}")
    print(f"  - Average Glucose: {basic_stats_dict['avg_glucose']}")
    print(f"  - Average BMI: {basic_stats_dict['avg_bmi']}")
    
    return metrics

# Export metrics
exported_metrics = export_metrics_for_grafana()

In [None]:
# Custom queries (seperti ad-hoc Hive queries)
print("🔧 Custom Query Examples:")
print("\n1. High Risk Patients (Glucose > 140 AND BMI > 30):")

high_risk = spark.sql("""
    SELECT 
        COUNT(*) as high_risk_count,
        SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) as confirmed_diabetic,
        ROUND(AVG(Age), 2) as avg_age,
        ROUND(AVG(Pregnancies), 2) as avg_pregnancies
    FROM diabetes
    WHERE Glucose > 140 AND BMI > 30
""")

high_risk.show()

print("\n2. Pregnancy Impact Analysis:")
pregnancy_impact = spark.sql("""
    SELECT 
        CASE 
            WHEN Pregnancies = 0 THEN 'No Pregnancy'
            WHEN Pregnancies <= 2 THEN '1-2 Pregnancies'
            WHEN Pregnancies <= 5 THEN '3-5 Pregnancies'
            ELSE '6+ Pregnancies'
        END as pregnancy_group,
        COUNT(*) as total,
        SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) as diabetic,
        ROUND((SUM(CASE WHEN Outcome = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)), 2) as diabetes_rate
    FROM diabetes
    GROUP BY pregnancy_group
    ORDER BY diabetes_rate DESC
""")

pregnancy_impact.show()

In [None]:
# Clean up
print("🧹 Cleaning up resources...")
spark.stop()
print("✅ Spark session stopped")
print("\n📋 Summary:")
print("  ✅ Data loaded and analyzed successfully")
print("  ✅ Metrics exported for Grafana")
print("  ✅ Alternative to Hive implemented using PySpark")
print("\n🎯 Next Steps:")
print("  1. Run the export script: ./scripts/query/export_for_grafana.sh")
print("  2. Access Grafana: http://localhost:3000")
print("  3. Login with: admin / grafana_admin_2025")