In [None]:
!pip install kafka-python



In [None]:
import pandas as pd
import numpy as np
import time
import random
from datetime import datetime, timedelta
from kafka import KafkaProducer
import json


KAFKA_BROKER = '13.201.228.245:9092'
KAFKA_TOPIC = 'roro1'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER, value_serializer=lambda v: json.dumps(v).encode('utf-8'))

device_ids = [f'Router_{i}' for i in range(1, 6)]
customer_ids = [f'Customer_{i}' for i in range(1, 21)]
location_ids = [f'Location_{i}' for i in range(1, 4)]

# Function to introduce anomalies (e.g., None values) in the data
def introduce_anomalies(data):
    # Randomly introduce None values for certain fields
    if random.random() < 0.1:  # 10% chance to make Signal_Strength None
        data["Signal_Strength"] = None
    if random.random() < 0.05:  # 5% chance to make Data_Transfer_Rate None
        data["Data_Transfer_Rate"] = None
    if random.random() < 0.05:  # 5% chance to make Call_Drop_Rate None
        data["Call_Drop_Rate"] = None
    if random.random() < 0.03:  # 3% chance to make Timestamp None
        data["Timestamp"] = None
    return data

try:
    while True:
        # Generate initial data
        signal_strength = np.random.randint(-110, -29)  # Signal strength in dBm
        data_transfer_speed = np.random.randint(1, 1001)  # Data transfer rate in Mbps
        call_drop_rate = np.random.uniform(0, 1)  # Call drop rate in percentage (0 to 1)

        # Correlate lower signal strength with higher call drop rates and slower data speeds
        if signal_strength < -70:
            data_transfer_speed -= random.uniform(50, 150)  # Reduce data speed when signal is weak
            call_drop_rate += random.uniform(0.2, 0.5)  # Increase call drop rate when signal is weak

        # Ensure no negative or overly high values after adjustments
        data_transfer_speed = max(1, data_transfer_speed)  # Ensure speed is at least 1 Mbps
        call_drop_rate = min(1, max(0, call_drop_rate))  # Ensure call drop rate is between 0 and 1

        # Construct the data message
        data = {

            "Customer_ID": random.choice(customer_ids),
            "Device_ID": random.choice(device_ids),
            "Timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            "Signal_Strength": signal_strength,
            "Call_Drop_Rate": call_drop_rate,
            "Data_Transfer_Rate": data_transfer_speed,
            "Latency": np.random.randint(1, 250),  # Latency in milliseconds
            "Location_ID": random.choice(location_ids),
        }

        # Introduce anomalies, including the possibility of making Timestamp None
        data = introduce_anomalies(data)

        # Print and send the data to Kafka
        print(data)
        producer.send(KAFKA_TOPIC, value=data)
        print(f"Sent data to Kafka: {data}")

        # Wait before sending the next data point
        time.sleep(1)

except KeyboardInterrupt:
    print("Live data generation stopped.")

finally:
    producer.close()

{'Customer_ID': 'Customer_18', 'Device_ID': 'Router_1', 'Timestamp': '2024-09-24 05:55:43', 'Signal_Strength': -63, 'Call_Drop_Rate': None, 'Data_Transfer_Rate': 53, 'Latency': 248, 'Location_ID': 'Location_1'}
Sent data to Kafka: {'Customer_ID': 'Customer_18', 'Device_ID': 'Router_1', 'Timestamp': '2024-09-24 05:55:43', 'Signal_Strength': -63, 'Call_Drop_Rate': None, 'Data_Transfer_Rate': 53, 'Latency': 248, 'Location_ID': 'Location_1'}
{'Customer_ID': 'Customer_10', 'Device_ID': 'Router_5', 'Timestamp': '2024-09-24 05:55:44', 'Signal_Strength': -65, 'Call_Drop_Rate': 0.027591918477896882, 'Data_Transfer_Rate': 246, 'Latency': 169, 'Location_ID': 'Location_1'}
Sent data to Kafka: {'Customer_ID': 'Customer_10', 'Device_ID': 'Router_5', 'Timestamp': '2024-09-24 05:55:44', 'Signal_Strength': -65, 'Call_Drop_Rate': 0.027591918477896882, 'Data_Transfer_Rate': 246, 'Latency': 169, 'Location_ID': 'Location_1'}
{'Customer_ID': 'Customer_9', 'Device_ID': 'Router_4', 'Timestamp': '2024-09-24 05