In [1]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
from kafka import KafkaConsumer, KafkaProducer
import time 
from datetime import datetime
from json import dumps
import json
import requests
from configparser import RawConfigParser

In [3]:
config_local = RawConfigParser()
config_local.read("config_local.conf")

['config_local.conf']

Kafka Producer Initialization:
- The KafkaProducer is initialized with bootstrap_servers pointing to the Kafka brokers, and
- value_serializer is used to serialize data (Python objects) into JSON format and encode it to bytes before sending it to Kafka.

In [5]:
# This code sets up a Kafka producer that connects to a specified broker (in this case its local) 
# It encodes the messages to UTF-8 so they can be sent to Kafka correctly and any consumer that reads these messages can decode them as UTF-8 JSON.

server = config_local['Host']['ip']
port = config_local['Host']['port']
server = [f"{server}:{port}"]

producer = KafkaProducer(bootstrap_servers=server,
                        value_serializer=lambda x:dumps(x).encode('utf-8'))

In [6]:
# Test
producer.send('crypto-currency', value= "{'gaurav': 'bar' }")

<kafka.producer.future.FutureRecordMetadata at 0x128eb4250>

In [7]:
# Function to fetch cryptocurrency data from the API 

def cryptoApi():
    url = config_local['CryptoCoinAPI']['url']
    
    response = requests.get(url)      # get response from open-weather-url
    result = response.json()          # convert in json
    
    current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    top_10 = {}
    top_10['timestamp'] = current_timestamp
    top_10['data'] = result['data'][:10]   # extract top 10 response (python object i.e Dictionary)
    
    # Send the data to the Kafka topic
    producer.send('crypto-currency',value=top_10)
    return top_10
     

In [8]:
# Call the crypto API 10 times with a 15-second interval. Change condition to while(True) to continuously fetch real-time data.

counter = 0
while(counter < 10):
    counter = counter+1
    print(f"Calling crypto API for Counter: {counter}")
    cryptoApi()
    print("Sleep for 15 seconds...")
    time.sleep(15)


Calling crypto API for Counter: 1
Sleep for 15 seconds...
Calling crypto API for Counter: 2
Sleep for 15 seconds...
Calling crypto API for Counter: 3
Sleep for 15 seconds...
Calling crypto API for Counter: 4
Sleep for 15 seconds...
Calling crypto API for Counter: 5
Sleep for 15 seconds...
Calling crypto API for Counter: 6
Sleep for 15 seconds...
Calling crypto API for Counter: 7
Sleep for 15 seconds...
Calling crypto API for Counter: 8
Sleep for 15 seconds...
Calling crypto API for Counter: 9
Sleep for 15 seconds...
Calling crypto API for Counter: 10
Sleep for 15 seconds...
