In [7]:
import json
import time
import random
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

In [2]:
bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']
client_id = 'jupyter_test_client'

In [None]:
admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers,
        client_id=client_id,
        request_timeout_ms=10000 
    )

In [6]:
new_topic_name = 'pressure_data' # Choose a name for your new topic
num_partitions = 3               # Number of partitions for the topic
replication_factor = 3           # Replication factor (should be <= number of brokers)

In [8]:
topic_to_create = NewTopic(name=new_topic_name,
                            num_partitions=num_partitions,
                            replication_factor=replication_factor)
print(f"\nAttempting to create topic '{new_topic_name}' with {num_partitions} partitions and replication factor {replication_factor}...")
admin_client.create_topics(new_topics=[topic_to_create], validate_only=False)


Attempting to create topic 'pressure_data' with 3 partitions and replication factor 3...


CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='pressure_data', error_code=0, error_message=None)])

In [9]:
topics = admin_client.list_topics()
print(topics)

['temperature_data', 'vibration_data', 'pressure_data', 'humidity_data']


In [11]:
from kafka import KafkaConsumer
import json

THRESHOLDS = {
    'vibration': 4.0,
    'humidity': 80.0,
    'temperature': 35.0
}


In [12]:

consumer = KafkaConsumer(
    'temperature_data', 'humidity_data', 'vibration_data',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    group_id='anomaly_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

def get_measurement_info(data):
    for key, value in data.items():
        if key not in ('device_id', 'timestamp'):
            return key, float(value)
    return None, None 


In [None]:

for message in consumer:
    data = message.value # Deserialized dictionary
    
    # Direct access - will crash if keys are missing or data is malformed
    device_id = data['device_id']
    timestamp = data['timestamp']
    measurement_type, measurement_value = get_measurement_info(data)

    if measurement_type in THRESHOLDS:
        threshold = THRESHOLDS[measurement_type]
        if measurement_value > threshold:
            # --- Print ONLY if anomaly is detected ---
            print(
                f"ALERT! Anomaly Detected: Device={device_id}, "
                f"Type={measurement_type}, Value={measurement_value}, "
                f"Threshold={threshold}, Timestamp={timestamp}"
            )