# Studi Kasus: Kafka Stream dari Sensor

## Overview
Notebook ini mendemonstrasikan implementasi streaming data dari sensor menggunakan Apache Kafka. Kita akan membuat:
1. **Producer**: Mensimulasikan data sensor (suhu, kelembaban, tekanan)
2. **Consumer**: Membaca dan memproses data sensor
3. **Stream Processing**: Analisis real-time data sensor

## Prerequisites
- Apache Kafka terinstall dan berjalan
- Python 3.7+
- Library: kafka-python, pandas, matplotlib

## 1. Setup dan Import Library

In [None]:
# Install required packages
!pip install kafka-python-ng pandas matplotlib numpy

In [None]:
import json
import time
import random
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from collections import deque
import threading

print("✓ All libraries imported successfully")

## 2. Konfigurasi Kafka

In [None]:
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
TOPIC_NAME = 'sensor-data'

# Sensor Configuration
SENSOR_IDS = ['SENSOR_001', 'SENSOR_002', 'SENSOR_003']
SENSOR_LOCATIONS = ['Ruang Server', 'Ruang Produksi', 'Gudang']

print(f"Kafka Bootstrap Servers: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"Topic: {TOPIC_NAME}")
print(f"Sensors: {SENSOR_IDS}")

## 3. Fungsi Helper untuk Generate Data Sensor

In [None]:
def generate_sensor_data(sensor_id, location):
    """
    Generate data sensor yang realistis
    - Suhu: 20-35°C dengan variasi random
    - Kelembaban: 40-80%
    - Tekanan: 1000-1020 hPa
    """
    # Base values untuk setiap lokasi
    base_temps = {
        'Ruang Server': 22.0,
        'Ruang Produksi': 28.0,
        'Gudang': 25.0
    }
    
    base_temp = base_temps.get(location, 25.0)
    
    sensor_reading = {
        'sensor_id': sensor_id,
        'location': location,
        'timestamp': datetime.now().isoformat(),
        'temperature': round(base_temp + random.uniform(-3, 3), 2),
        'humidity': round(random.uniform(40, 80), 2),
        'pressure': round(random.uniform(1000, 1020), 2),
        'status': random.choice(['normal', 'normal', 'normal', 'warning'])  # 75% normal
    }
    
    return sensor_reading

# Test function
sample_data = generate_sensor_data('SENSOR_001', 'Ruang Server')
print("Sample sensor data:")
print(json.dumps(sample_data, indent=2))

## 4. Kafka Producer - Mensimulasikan Data Sensor

In [None]:
class SensorProducer:
    def __init__(self, bootstrap_servers, topic_name):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        self.topic_name = topic_name
        self.running = False
        
    def send_sensor_data(self, sensor_id, location):
        """Send single sensor reading to Kafka"""
        data = generate_sensor_data(sensor_id, location)
        
        try:
            future = self.producer.send(
                self.topic_name,
                key=sensor_id,
                value=data
            )
            # Wait for send to complete
            record_metadata = future.get(timeout=10)
            return True, data
        except KafkaError as e:
            print(f"Error sending data: {e}")
            return False, None
    
    def start_streaming(self, duration_seconds=30, interval=2):
        """Stream sensor data for specified duration"""
        self.running = True
        start_time = time.time()
        messages_sent = 0
        
        print(f"🚀 Starting sensor data streaming for {duration_seconds} seconds...")
        print(f"Sending data every {interval} seconds\n")
        
        try:
            while self.running and (time.time() - start_time) < duration_seconds:
                # Send data from all sensors
                for sensor_id, location in zip(SENSOR_IDS, SENSOR_LOCATIONS):
                    success, data = self.send_sensor_data(sensor_id, location)
                    if success:
                        messages_sent += 1
                        print(f"✓ [{data['timestamp']}] {sensor_id} ({location}): "
                              f"Temp={data['temperature']}°C, "
                              f"Humidity={data['humidity']}%, "
                              f"Status={data['status']}")
                
                time.sleep(interval)
                
        except KeyboardInterrupt:
            print("\n⚠️ Streaming interrupted by user")
        finally:
            self.producer.flush()
            print(f"\n✅ Streaming completed. Total messages sent: {messages_sent}")
    
    def stop(self):
        """Stop streaming"""
        self.running = False
        self.producer.close()

print("✓ SensorProducer class defined")

## 5. Test Producer - Kirim Data Sensor

In [None]:
# Initialize producer
producer = SensorProducer(KAFKA_BOOTSTRAP_SERVERS, TOPIC_NAME)

# Stream data for 30 seconds (interval 2 seconds)
producer.start_streaming(duration_seconds=30, interval=2)

# Clean up
producer.stop()

## 6. Kafka Consumer - Membaca Data Sensor

In [None]:
class SensorConsumer:
    def __init__(self, bootstrap_servers, topic_name, group_id='sensor-consumer-group'):
        self.consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',  # Start from beginning
            enable_auto_commit=True,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.data_buffer = []
        self.running = False
        
    def consume_messages(self, max_messages=50, timeout_ms=10000):
        """Consume messages from Kafka"""
        self.running = True
        messages_consumed = 0
        
        print(f"📥 Starting to consume messages (max: {max_messages})...\n")
        
        try:
            for message in self.consumer:
                if not self.running or messages_consumed >= max_messages:
                    break
                    
                data = message.value
                self.data_buffer.append(data)
                messages_consumed += 1
                
                # Display message
                status_icon = "⚠️" if data['status'] == 'warning' else "✓"
                print(f"{status_icon} Message {messages_consumed}: "
                      f"{data['sensor_id']} - Temp: {data['temperature']}°C, "
                      f"Humidity: {data['humidity']}%, "
                      f"Status: {data['status']}")
                
        except KeyboardInterrupt:
            print("\n⚠️ Consumption interrupted by user")
        finally:
            print(f"\n✅ Consumed {messages_consumed} messages")
            return self.data_buffer
    
    def get_dataframe(self):
        """Convert buffer to pandas DataFrame"""
        if not self.data_buffer:
            return pd.DataFrame()
        return pd.DataFrame(self.data_buffer)
    
    def stop(self):
        """Stop consumer"""
        self.running = False
        self.consumer.close()

print("✓ SensorConsumer class defined")

## 7. Test Consumer - Baca Data Sensor

In [None]:
# Initialize consumer
consumer = SensorConsumer(KAFKA_BOOTSTRAP_SERVERS, TOPIC_NAME)

# Consume messages
messages = consumer.consume_messages(max_messages=50)

# Convert to DataFrame
df = consumer.get_dataframe()

# Display summary
print("\n" + "="*60)
print("DATA SUMMARY")
print("="*60)
print(f"Total records: {len(df)}")
print(f"\nFirst 5 records:")
print(df.head())

# Clean up
consumer.stop()

## 8. Analisis Data Sensor

In [None]:
# Basic statistics
if not df.empty:
    print("📊 STATISTIK DATA SENSOR")
    print("="*60)
    
    # Group by sensor
    sensor_stats = df.groupby('sensor_id').agg({
        'temperature': ['mean', 'min', 'max', 'std'],
        'humidity': ['mean', 'min', 'max'],
        'pressure': ['mean', 'min', 'max']
    }).round(2)
    
    print(sensor_stats)
    
    # Status distribution
    print("\n📈 DISTRIBUSI STATUS")
    print("="*60)
    status_counts = df['status'].value_counts()
    print(status_counts)
    
    # Warnings by sensor
    print("\n⚠️ WARNING COUNT PER SENSOR")
    print("="*60)
    warnings = df[df['status'] == 'warning'].groupby('sensor_id').size()
    print(warnings)
else:
    print("⚠️ No data available for analysis")

## 9. Visualisasi Data Sensor

In [None]:
if not df.empty:
    # Convert timestamp to datetime
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df = df.sort_values('timestamp')
    
    # Create subplots
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    fig.suptitle('📊 Real-time Sensor Data Monitoring', fontsize=16, fontweight='bold')
    
    # Plot 1: Temperature
    for sensor_id in SENSOR_IDS:
        sensor_data = df[df['sensor_id'] == sensor_id]
        axes[0].plot(sensor_data['timestamp'], sensor_data['temperature'], 
                    marker='o', label=sensor_id, linewidth=2, markersize=4)
    axes[0].set_title('🌡️ Temperature Over Time', fontsize=12, fontweight='bold')
    axes[0].set_ylabel('Temperature (°C)')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    axes[0].axhline(y=30, color='r', linestyle='--', alpha=0.5, label='Warning Threshold')
    
    # Plot 2: Humidity
    for sensor_id in SENSOR_IDS:
        sensor_data = df[df['sensor_id'] == sensor_id]
        axes[1].plot(sensor_data['timestamp'], sensor_data['humidity'], 
                    marker='s', label=sensor_id, linewidth=2, markersize=4)
    axes[1].set_title('💧 Humidity Over Time', fontsize=12, fontweight='bold')
    axes[1].set_ylabel('Humidity (%)')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    # Plot 3: Pressure
    for sensor_id in SENSOR_IDS:
        sensor_data = df[df['sensor_id'] == sensor_id]
        axes[2].plot(sensor_data['timestamp'], sensor_data['pressure'], 
                    marker='^', label=sensor_id, linewidth=2, markersize=4)
    axes[2].set_title('🔘 Pressure Over Time', fontsize=12, fontweight='bold')
    axes[2].set_xlabel('Timestamp')
    axes[2].set_ylabel('Pressure (hPa)')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    # Rotate x-axis labels
    for ax in axes:
        ax.tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Additional visualization: Box plots
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    fig.suptitle('📊 Sensor Data Distribution by Location', fontsize=16, fontweight='bold')
    
    # Temperature box plot
    df.boxplot(column='temperature', by='location', ax=axes[0])
    axes[0].set_title('Temperature Distribution')
    axes[0].set_ylabel('Temperature (°C)')
    axes[0].set_xlabel('Location')
    
    # Humidity box plot
    df.boxplot(column='humidity', by='location', ax=axes[1])
    axes[1].set_title('Humidity Distribution')
    axes[1].set_ylabel('Humidity (%)')
    axes[1].set_xlabel('Location')
    
    # Pressure box plot
    df.boxplot(column='pressure', by='location', ax=axes[2])
    axes[2].set_title('Pressure Distribution')
    axes[2].set_ylabel('Pressure (hPa)')
    axes[2].set_xlabel('Location')
    
    plt.tight_layout()
    plt.show()
else:
    print("⚠️ No data available for visualization")

## 10. Stream Processing dengan Windowing

In [None]:
class StreamProcessor:
    def __init__(self, window_size=10):
        self.window_size = window_size
        self.windows = {
            sensor_id: {
                'temperature': deque(maxlen=window_size),
                'humidity': deque(maxlen=window_size),
                'pressure': deque(maxlen=window_size)
            }
            for sensor_id in SENSOR_IDS
        }
        self.alerts = []
    
    def process_message(self, data):
        """Process incoming sensor message"""
        sensor_id = data['sensor_id']
        
        # Add to window
        self.windows[sensor_id]['temperature'].append(data['temperature'])
        self.windows[sensor_id]['humidity'].append(data['humidity'])
        self.windows[sensor_id]['pressure'].append(data['pressure'])
        
        # Calculate moving averages
        avg_temp = np.mean(self.windows[sensor_id]['temperature'])
        avg_humidity = np.mean(self.windows[sensor_id]['humidity'])
        avg_pressure = np.mean(self.windows[sensor_id]['pressure'])
        
        # Check for anomalies
        alerts = []
        if avg_temp > 30:
            alerts.append(f"🔥 High temperature alert: {avg_temp:.2f}°C")
        if avg_humidity > 75:
            alerts.append(f"💧 High humidity alert: {avg_humidity:.2f}%")
        if avg_temp < 20:
            alerts.append(f"❄️ Low temperature alert: {avg_temp:.2f}°C")
        
        result = {
            'sensor_id': sensor_id,
            'location': data['location'],
            'timestamp': data['timestamp'],
            'current': {
                'temperature': data['temperature'],
                'humidity': data['humidity'],
                'pressure': data['pressure']
            },
            'moving_avg': {
                'temperature': round(avg_temp, 2),
                'humidity': round(avg_humidity, 2),
                'pressure': round(avg_pressure, 2)
            },
            'window_size': len(self.windows[sensor_id]['temperature']),
            'alerts': alerts
        }
        
        if alerts:
            self.alerts.append(result)
        
        return result
    
    def get_statistics(self, sensor_id):
        """Get statistics for a sensor"""
        if sensor_id not in self.windows:
            return None
        
        window = self.windows[sensor_id]
        return {
            'sensor_id': sensor_id,
            'temperature': {
                'mean': np.mean(window['temperature']),
                'std': np.std(window['temperature']),
                'min': np.min(window['temperature']),
                'max': np.max(window['temperature'])
            },
            'humidity': {
                'mean': np.mean(window['humidity']),
                'std': np.std(window['humidity'])
            },
            'pressure': {
                'mean': np.mean(window['pressure']),
                'std': np.std(window['pressure'])
            }
        }

print("✓ StreamProcessor class defined")

## 11. Test Stream Processing

In [None]:
# Initialize stream processor
processor = StreamProcessor(window_size=5)

print("🔄 Processing sensor data with windowing...\n")

# Process existing data
if not df.empty:
    for idx, row in df.iterrows():
        data = row.to_dict()
        result = processor.process_message(data)
        
        # Print results for every 5th message
        if (idx + 1) % 5 == 0:
            print(f"\n{'='*70}")
            print(f"Message {idx + 1}: {result['sensor_id']} - {result['location']}")
            print(f"{'='*70}")
            print(f"Current:     Temp={result['current']['temperature']}°C, "
                  f"Humidity={result['current']['humidity']}%, "
                  f"Pressure={result['current']['pressure']} hPa")
            print(f"Moving Avg:  Temp={result['moving_avg']['temperature']}°C, "
                  f"Humidity={result['moving_avg']['humidity']}%, "
                  f"Pressure={result['moving_avg']['pressure']} hPa")
            print(f"Window Size: {result['window_size']}")
            
            if result['alerts']:
                print("\n⚠️ ALERTS:")
                for alert in result['alerts']:
                    print(f"  - {alert}")
    
    # Show all alerts
    if processor.alerts:
        print(f"\n\n{'='*70}")
        print(f"📋 TOTAL ALERTS: {len(processor.alerts)}")
        print(f"{'='*70}")
        for alert_data in processor.alerts[-5:]:  # Show last 5 alerts
            print(f"\n{alert_data['sensor_id']} - {alert_data['location']}:")
            for alert in alert_data['alerts']:
                print(f"  {alert}")
    
    # Show statistics for each sensor
    print(f"\n\n{'='*70}")
    print("📊 SENSOR STATISTICS (Window-based)")
    print(f"{'='*70}")
    for sensor_id in SENSOR_IDS:
        stats = processor.get_statistics(sensor_id)
        if stats:
            print(f"\n{sensor_id}:")
            print(f"  Temperature: {stats['temperature']['mean']:.2f}°C "
                  f"(±{stats['temperature']['std']:.2f}, "
                  f"range: {stats['temperature']['min']:.2f}-{stats['temperature']['max']:.2f})")
            print(f"  Humidity:    {stats['humidity']['mean']:.2f}% "
                  f"(±{stats['humidity']['std']:.2f})")
            print(f"  Pressure:    {stats['pressure']['mean']:.2f} hPa "
                  f"(±{stats['pressure']['std']:.2f})")
else:
    print("⚠️ No data available for processing")

## 12. End-to-End Demo: Producer + Consumer + Processing

In [None]:
def run_full_demo(duration=20, interval=2):
    """
    Menjalankan demo lengkap:
    1. Producer mengirim data
    2. Consumer membaca data
    3. Stream processor menganalisis real-time
    """
    print("🚀 Starting Full Kafka Stream Demo...\n")
    print(f"Duration: {duration} seconds")
    print(f"Interval: {interval} seconds")
    print("="*70 + "\n")
    
    # Initialize components
    producer = SensorProducer(KAFKA_BOOTSTRAP_SERVERS, TOPIC_NAME)
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset='latest',
        enable_auto_commit=True,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=1000
    )
    processor = StreamProcessor(window_size=3)
    
    results = []
    start_time = time.time()
    
    try:
        while (time.time() - start_time) < duration:
            # Produce data
            print(f"\n⏱️  Time: {int(time.time() - start_time)}s")
            print("-" * 70)
            
            for sensor_id, location in zip(SENSOR_IDS, SENSOR_LOCATIONS):
                success, data = producer.send_sensor_data(sensor_id, location)
                if success:
                    print(f"📤 Sent: {sensor_id} - Temp={data['temperature']}°C")
            
            # Small delay to allow message propagation
            time.sleep(0.5)
            
            # Consume and process
            for message in consumer:
                data = message.value
                result = processor.process_message(data)
                results.append(result)
                
                status = "⚠️" if result['alerts'] else "✓"
                print(f"{status} Processed: {result['sensor_id']} - "
                      f"MA Temp={result['moving_avg']['temperature']}°C")
                
                if result['alerts']:
                    for alert in result['alerts']:
                        print(f"  🚨 {alert}")
            
            time.sleep(interval - 0.5)
    
    except KeyboardInterrupt:
        print("\n⚠️ Demo interrupted by user")
    
    finally:
        producer.stop()
        consumer.close()
        
        print(f"\n\n{'='*70}")
        print("✅ DEMO COMPLETED")
        print(f"{'='*70}")
        print(f"Total messages processed: {len(results)}")
        print(f"Total alerts generated: {len(processor.alerts)}")
        
        return pd.DataFrame(results)

# Run the demo
# Uncomment to run:
# results_df = run_full_demo(duration=20, interval=2)
# print("\nResults DataFrame:")
# print(results_df.head(10))

print("✓ Demo function ready. Uncomment the last lines to run the full demo.")

## 14. 📝 TODO: Adaptasi untuk Data Sensor Anda Sendiri

### 🎯 **Tugas untuk Mahasiswa**

Ikuti langkah-langkah berikut untuk mengadaptasi kode ini dengan data sensor Anda sendiri:

---

#### **TODO 1: Konfigurasi Sensor Anda** ⚙️
**File/Cell: Section 2 - Konfigurasi Kafka**

- [ ] **Ganti nama topic** sesuai dengan use case Anda
  - Contoh: `'temperature-monitor'`, `'air-quality'`, `'machine-vibration'`
  
- [ ] **Definisikan sensor IDs** Anda sendiri
  - Contoh: `['TEMP_01', 'TEMP_02']`, `['AQ_SENSOR_A', 'AQ_SENSOR_B']`
  
- [ ] **Tentukan lokasi sensor** yang relevan
  - Contoh: `['Lantai 1', 'Lantai 2']`, `['Indoor', 'Outdoor']`

```python
# Contoh modifikasi:
TOPIC_NAME = 'suhu-ruangan-kampus'
SENSOR_IDS = ['TEMP_A01', 'TEMP_A02', 'TEMP_B01']
SENSOR_LOCATIONS = ['Ruang Kelas A', 'Lab Komputer', 'Perpustakaan']
```

---

#### **TODO 2: Modifikasi Data Sensor** 📊
**File/Cell: Section 3 - Fungsi Helper untuk Generate Data Sensor**

- [ ] **Sesuaikan parameter sensor** dengan jenis data Anda:
  - Ubah range nilai (min-max)
  - Tambah atau kurangi parameter
  - Sesuaikan satuan pengukuran

- [ ] **Contoh modifikasi untuk sensor CO2:**
```python
sensor_reading = {
    'sensor_id': sensor_id,
    'location': location,
    'timestamp': datetime.now().isoformat(),
    'co2_ppm': round(random.uniform(400, 1000), 2),  # CO2 dalam ppm
    'temperature': round(base_temp + random.uniform(-2, 2), 2),
    'noise_level': round(random.uniform(30, 80), 2),  # dB
    'status': 'normal' if co2_ppm < 800 else 'warning'
}
```

- [ ] **Contoh untuk sensor getaran mesin:**
```python
sensor_reading = {
    'sensor_id': sensor_id,
    'machine_name': machine_name,
    'timestamp': datetime.now().isoformat(),
    'vibration_x': round(random.uniform(0, 5), 3),  # mm/s
    'vibration_y': round(random.uniform(0, 5), 3),
    'vibration_z': round(random.uniform(0, 5), 3),
    'rpm': round(random.uniform(1000, 3000), 0),
    'status': 'normal'
}
```

---

#### **TODO 3: Ubah Threshold Alert** 🚨
**File/Cell: Section 10 - Stream Processing dengan Windowing**

- [ ] **Sesuaikan kondisi alert** berdasarkan use case Anda
- [ ] **Tentukan threshold yang tepat** untuk data Anda

```python
# Contoh modifikasi di method process_message():
alerts = []

# Untuk monitoring kualitas udara:
if avg_co2 > 800:
    alerts.append(f"⚠️ CO2 tinggi: {avg_co2:.0f} ppm")
if avg_temp > 28:
    alerts.append(f"🔥 Suhu tidak nyaman: {avg_temp:.1f}°C")
if avg_noise > 70:
    alerts.append(f"🔊 Kebisingan tinggi: {avg_noise:.0f} dB")

# Untuk monitoring mesin:
if avg_vibration > 3.5:
    alerts.append(f"⚡ Getaran abnormal: {avg_vibration:.2f} mm/s")
if avg_rpm < 1500 or avg_rpm > 2500:
    alerts.append(f"⚙️ RPM di luar range: {avg_rpm:.0f}")
```

---

#### **TODO 4: Sesuaikan Visualisasi** 📈
**File/Cell: Section 9 - Visualisasi Data Sensor**

- [ ] **Ubah judul grafik** sesuai dengan parameter Anda
- [ ] **Ganti label axis** dengan satuan yang sesuai
- [ ] **Tambah atau kurangi subplot** sesuai jumlah parameter

```python
# Contoh untuk 2 parameter (CO2 dan Suhu):
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Plot 1: CO2
axes[0].set_title('🌫️ CO2 Levels Over Time')
axes[0].set_ylabel('CO2 (ppm)')
axes[0].axhline(y=800, color='r', linestyle='--', label='Safe Limit')

# Plot 2: Temperature
axes[1].set_title('🌡️ Temperature Over Time')
axes[1].set_ylabel('Temperature (°C)')
axes[1].axhline(y=28, color='r', linestyle='--', label='Comfort Limit')
```

---

#### **TODO 5: Ubah Ukuran Window** 🪟
**File/Cell: Section 10 & 11**

- [ ] **Tentukan window_size** yang tepat untuk analisis Anda:
  - Window kecil (3-5): Untuk deteksi cepat perubahan mendadak
  - Window sedang (10-20): Untuk smoothing noise
  - Window besar (50+): Untuk trend jangka panjang

```python
# Contoh:
processor = StreamProcessor(window_size=10)  # Ubah sesuai kebutuhan
```

---

#### **TODO 6: Konfigurasi Kafka Connection** 🔌
**File/Cell: Section 2**

- [ ] **Sesuaikan Kafka server address** jika menggunakan server lain
- [ ] **Tambahkan konfigurasi keamanan** jika diperlukan

```python
# Jika Kafka di server lain:
KAFKA_BOOTSTRAP_SERVERS = ['192.168.1.100:9092']

# Jika menggunakan authentication:
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
```

---

#### **TODO 7: Tambahkan Data Real dari Hardware** 🔧(jika anda punya, jika tidak buat dummy data)
**File/Cell: Buat cell baru**

- [ ] **Integrasikan dengan sensor fisik** (Arduino, Raspberry Pi, ESP32)
- [ ] **Ganti fungsi generate_sensor_data()** dengan pembacaan sensor real

```python
# Contoh integrasi dengan Raspberry Pi dan DHT22:
import Adafruit_DHT

def read_real_sensor_data(sensor_id, gpio_pin):
    """Baca data dari sensor DHT22"""
    sensor = Adafruit_DHT.DHT22
    humidity, temperature = Adafruit_DHT.read_retry(sensor, gpio_pin)
    
    if humidity is not None and temperature is not None:
        return {
            'sensor_id': sensor_id,
            'location': 'Lab IoT',
            'timestamp': datetime.now().isoformat(),
            'temperature': round(temperature, 2),
            'humidity': round(humidity, 2),
            'status': 'normal' if temperature < 30 else 'warning'
        }
    else:
        return None

# Gunakan di producer:
# data = read_real_sensor_data('SENSOR_001', gpio_pin=4)
```

---

#### **TODO 8: Simpan ke Database** 💾
**File/Cell: Buat cell baru**

- [ ] **Tambahkan koneksi ke database** untuk menyimpan data historis
- [ ] **Pilih database** yang sesuai: PostgreSQL, MongoDB, InfluxDB, TimescaleDB

```python
# Contoh dengan PostgreSQL:
import psycopg2
from psycopg2.extras import Json

def save_to_database(data):
    conn = psycopg2.connect(
        host="localhost",
        database="sensor_db",
        user="your_user",
        password="your_password"
    )
    cur = conn.cursor()
    
    cur.execute("""
        INSERT INTO sensor_readings 
        (sensor_id, location, timestamp, temperature, humidity, status)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        data['sensor_id'],
        data['location'],
        data['timestamp'],
        data['temperature'],
        data['humidity'],
        data['status']
    ))
    
    conn.commit()
    cur.close()
    conn.close()

# Panggil di consumer setelah menerima message:
# save_to_database(message.value)
```

---

#### **TODO 9: Buat Dashboard Real-time** 📊
**File/Cell: Buat cell baru**

- [ ] **Gunakan Plotly/Dash** untuk dashboard interaktif
- [ ] **Atau gunakan Streamlit** untuk web app sederhana

```python
# Contoh dengan Plotly untuk live update:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from IPython.display import clear_output

def create_live_dashboard(data_buffer, update_interval=2):
    \"\"\"Dashboard yang update secara real-time\"\"\"
    fig = make_subplots(
        rows=2, cols=1,
        subplot_titles=('Temperature', 'Humidity')
    )
    
    # Update loop
    while True:
        df = pd.DataFrame(data_buffer)
        if not df.empty:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            fig.add_trace(
                go.Scatter(x=df['timestamp'], y=df['temperature'], mode='lines'),
                row=1, col=1
            )
            
            clear_output(wait=True)
            fig.show()
        
        time.sleep(update_interval)
```

---

#### **TODO 10: Testing dan Validasi** ✅
**File/Cell: Buat cell baru**

- [ ] **Buat unit tests** untuk fungsi-fungsi penting
- [ ] **Validasi data** sebelum dikirim ke Kafka
- [ ] **Test error handling**

```python
# Contoh validasi data:
def validate_sensor_data(data):
    \"\"\"Validasi data sensor sebelum dikirim\"\"\"
    required_fields = ['sensor_id', 'timestamp', 'temperature']
    
    # Check required fields
    for field in required_fields:
        if field not in data:
            raise ValueError(f"Missing required field: {field}")
    
    # Validate temperature range
    if not -50 <= data['temperature'] <= 100:
        raise ValueError(f"Temperature out of range: {data['temperature']}")
    
    # Validate timestamp format
    try:
        datetime.fromisoformat(data['timestamp'])
    except ValueError:
        raise ValueError("Invalid timestamp format")
    
    return True

# Gunakan sebelum send:
# if validate_sensor_data(data):
#     producer.send(topic, data)
```

---

### 🎓 **Tips untuk Mahasiswa:**

1. **Mulai dari yang sederhana**: Modifikasi satu parameter dulu, test, baru lanjut
2. **Dokumentasi**: Catat setiap perubahan yang Anda buat
3. **Version control**: Gunakan Git untuk track perubahan
4. **Testing**: Selalu test setelah setiap modifikasi
5. **Kolaborasi**: Diskusikan dengan teman atau dosen jika ada kesulitan

---

### 📚 **Referensi Tambahan:**

- **Kafka Documentation**: https://kafka.apache.org/documentation/
- **IoT Sensor Integration**: 
  - Arduino + Kafka: https://github.com/edenhill/librdkafka
  - Raspberry Pi + Kafka: https://www.confluent.io/blog/raspberry-pi-kafka/
- **Data Visualization**:
  - Plotly: https://plotly.com/python/
  - Streamlit: https://streamlit.io/
- **Time Series Database**:
  - InfluxDB: https://www.influxdata.com/
  - TimescaleDB: https://www.timescale.com/

---

### ✨ **Ide Project Lanjutan:**

1. **Predictive Maintenance**: Gunakan ML untuk prediksi kerusakan mesin
2. **Multi-location Monitoring**: Dashboard untuk monitoring banyak lokasi
3. **Alert Notification**: Kirim notifikasi via Email/Telegram/WhatsApp
4. **Data Analytics**: Analisis pola dan trend jangka panjang
5. **Mobile App**: Buat app mobile untuk monitoring
6. **Edge Computing**: Processing di edge device sebelum kirim ke Kafka
7. **Anomaly Detection**: Implementasi ML untuk deteksi anomali otomatis

---

**Good luck! 🚀 Selamat bereksperimen dengan data sensor Anda!**