### Message Broker Connection

Importing Libraries

In [1]:
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
import time
import pandas as pd
from datetime import datetime
import random
from kafka.errors import KafkaError
import json

Defining Kafka Broker Address

In [40]:
broker_address = 'localhost:9092'
topic_name = 'test_topic'

Creating Kafka Producer and Testing the Topic creation

In [41]:
# Create Kafka producer
producer = KafkaProducer(bootstrap_servers=broker_address)

# Create Kafka admin client to create the topic
admin_client = KafkaAdminClient(bootstrap_servers=broker_address)

# Create topic if not exists
try:
    admin_client.create_topics([NewTopic(name=topic_name, num_partitions=1, replication_factor=1)])
    print(f"Topic '{topic_name}' created.")
except Exception as e:
    print(f"Topic creation failed: {e}")


Topic 'test_topic' created.


Sending Message from Producer

In [42]:
# Produce a test message
try:
    producer.send(topic_name, b'Test message')
    producer.flush()
    print(f"Message sent to topic '{topic_name}'.")
except Exception as e:
    print(f"Failed to send message: {e}")

Message sent to topic 'test_topic'.


Creating Kafka Cosumer and Receiving Message

In [43]:
# Create Kafka consumer
consumer = KafkaConsumer(topic_name, bootstrap_servers=broker_address, auto_offset_reset='earliest', consumer_timeout_ms=1000)

# Consume the message
try:
    for message in consumer:
        print(f"Received message: {message.value}")
        break
except Exception as e:
    print(f"Failed to consume message: {e}")
finally:
    consumer.close()


Received message: b'Test message'


Checking Topics Present

In [44]:
topics = consumer.topics()

if not topics: 
    raise RuntimeError()
else:
    print(topics)

{'test_topic'}


### Periodically sending message

In [45]:
# Define the connection parameters
kafka_broker = 'localhost:9092'
main_topic = 'measurements'
i = 0

# Function to create random measurement data
def create_random_measurement():
    measurement = {
        'timestamp': datetime.utcnow().isoformat(),
        'temperature': round(random.uniform(-20.0, 40.0), 2),
        'moisture': round(random.uniform(0.0, 100.0), 2),
        'wind_speed': round(random.uniform(0.0, 20.0), 2)
    }
    return measurement

# Function to send measurement data to Kafka
def send_measurement_to_kafka(producer, measurement):
    try:
        future = producer.send(main_topic, value=measurement)
        result = future.get(timeout=10)
        print(f"Measurement sent to Kafka: {measurement}")
    except KafkaError as e:
        print(f"Error sending measurement to Kafka: {e}")

# Main function to run the producer
def run_producer():
    i = 0
    try:
        # Create a Kafka producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_broker],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        print("Starting to send measurements to Kafka...")
        while i< 15:
            measurement = create_random_measurement()
            send_measurement_to_kafka(producer, measurement)
            i += 1
            time.sleep(5)  # Send a measurement every 5 seconds
            
        
    except KafkaError as e:
        print(f"Error connecting to Kafka Producer: {e}")
    finally:
        producer.close()
        print("Kafka Producer closed.")

# Run the producer
if __name__ == "__main__":
    run_producer()

Starting to send measurements to Kafka...
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:27:39.896385', 'temperature': 32.42, 'moisture': 15.24, 'wind_speed': 10.44}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:27:45.526374', 'temperature': 16.69, 'moisture': 33.72, 'wind_speed': 1.8}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:27:50.530230', 'temperature': 26.52, 'moisture': 90.07, 'wind_speed': 6.4}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:27:55.536408', 'temperature': -17.71, 'moisture': 63.11, 'wind_speed': 14.75}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:28:00.544421', 'temperature': 29.75, 'moisture': 70.88, 'wind_speed': 0.72}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:28:05.552621', 'temperature': 11.98, 'moisture': 19.83, 'wind_speed': 17.47}
Measurement sent to Kafka: {'timestamp': '2024-06-15T17:28:10.555439', 'temperature': 31.05, 'moisture': 7.95, 'wind_speed': 12.69}
Measurement sent to Kafka: {'tim

### Receiving Message Periodically 

In [14]:
measurement_dict = {}
timestampt_list = []
temp_list = []
moist_list = []
wind_list = []

In [15]:
kafka_broker = 'localhost:9092'
main_topic = 'measurements'
i = 0

In [16]:
# Function to consume and display messages from Kafka
def consume_messages():
    try:
        consumer_measurement = KafkaConsumer(
            main_topic,
            bootstrap_servers=[kafka_broker],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='measurement-consumer-group-first',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        print("Starting to consume messages from Kafka...")
        for message in consumer_measurement:
            measurement = message.value
            print(f"Measurement received from Kafka:\n"
                  f"Timestamp: {measurement['timestamp']}\n"
                  f"Temperature: {measurement['temperature']}°C\n"
                  f"Moisture: {measurement['moisture']}%\n"
                  f"Wind Speed: {measurement['wind_speed']} m/s\n")
            timestampt_list.append(measurement['timestamp'])
            temp_list.append(measurement['temperature'])
            moist_list.append(measurement['moisture'])
            wind_list.append(measurement['wind_speed'])
            
            if len(timestampt_list) == 15:
                break
    
    except KeyboardInterrupt:
        print("\nStopping the consumer...")
            

    except KafkaError as e:
        print(f"Error connecting to Kafka Consumer: {e}")

    finally:
        measurement_dict['timestamp'] = timestampt_list
        measurement_dict['temperature'] = temp_list
        measurement_dict['moisture'] = moist_list
        measurement_dict['wind_speed'] = wind_list
        consumer_measurement.close()
        print("Kafka Consumer closed.")

# Run the consumer
if __name__ == "__main__":

    consume_messages()

Starting to consume messages from Kafka...
Measurement received from Kafka:
Timestamp: 2024-06-15T17:27:39.896385
Temperature: 32.42°C
Moisture: 15.24%
Wind Speed: 10.44 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:27:45.526374
Temperature: 16.69°C
Moisture: 33.72%
Wind Speed: 1.8 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:27:50.530230
Temperature: 26.52°C
Moisture: 90.07%
Wind Speed: 6.4 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:27:55.536408
Temperature: -17.71°C
Moisture: 63.11%
Wind Speed: 14.75 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:28:00.544421
Temperature: 29.75°C
Moisture: 70.88%
Wind Speed: 0.72 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:28:05.552621
Temperature: 11.98°C
Moisture: 19.83%
Wind Speed: 17.47 m/s

Measurement received from Kafka:
Timestamp: 2024-06-15T17:28:10.555439
Temperature: 31.05°C
Moisture: 7.95%
Wind Speed: 12.69 m/s

Measurement received from Kafka:
Times

Converting Data Received in Pandas Dataframe

In [18]:
measurement_df = pd.DataFrame(measurement_dict)

In [19]:
measurement_df

Unnamed: 0,timestamp,temperature,moisture,wind_speed
0,2024-06-15T17:27:39.896385,32.42,15.24,10.44
1,2024-06-15T17:27:45.526374,16.69,33.72,1.8
2,2024-06-15T17:27:50.530230,26.52,90.07,6.4
3,2024-06-15T17:27:55.536408,-17.71,63.11,14.75
4,2024-06-15T17:28:00.544421,29.75,70.88,0.72
5,2024-06-15T17:28:05.552621,11.98,19.83,17.47
6,2024-06-15T17:28:10.555439,31.05,7.95,12.69
7,2024-06-15T17:28:15.558146,-11.52,18.44,7.79
8,2024-06-15T17:28:20.565474,13.51,76.94,2.81
9,2024-06-15T17:28:25.570438,-16.38,74.04,12.82
