<a href="https://colab.research.google.com/github/matthewpecsok/data_engineering/blob/main/tutorials/de_streaming_kafka_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Install the required kafka packages

In [15]:
output_path = '/content/drive/MyDrive/Colab Notebooks/data_engineering'

In [16]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━[0m [32m194.6/246.5 kB[0m [31m5.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


### Import packages

In [17]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError


## Download and setup Kafka and Zookeeper instances

For demo purposes, the following instances are setup locally:

- Kafka (Brokers: 127.0.0.1:9092)
- Zookeeper (Node: 127.0.0.1:2181)


In [1]:
!curl -sSOL https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
!tar -xzf kafka_2.12-3.7.0.tgz

Kafka with defaults

In [2]:
!./kafka_2.12-3.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.7.0/config/zookeeper.properties
!./kafka_2.12-3.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.7.0/config/server.properties
!echo "Give the processes 10 seconds to start before proceeding."
!sleep 10

Give the processes 10 seconds to start before proceeding.


Is Kafka running?

In [7]:
!ps -ef | grep java

root        6474       1  1 22:14 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:
root        6884       1  5 22:14 ?        00:00:09 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxG
root        7658    5951  0 22:17 ?        00:00:00 /bin/bash -c ps -ef | grep java
root        7660    7658  0 22:17 ?        00:00:00 grep java


Create the kafka topics with the following specs:

- sample-streaming-data: partitions=1

In [8]:
!./kafka_2.12-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic sample-streaming-data

Created topic sample-streaming-data.


Describe the topic for details on the configuration

In [9]:
!./kafka_2.12-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic sample-streaming-data

Topic: sample-streaming-data	TopicId: GOJvkxByQXKihHS9m94DQQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: sample-streaming-data	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


## generator python script

In [30]:
%%writefile generator.py

import sys
args = sys.argv  # a list of the arguments provided (str)
print("running generator.py", args)
iterations = int(args[1])
print(f'iterations: {iterations}')

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  from kafka import KafkaProducer

  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'), partition=0).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

import random
from time import sleep

def generate_data(rows=2):

  for i in range(1,rows):

    index_num = random.randint(0,1000000)
    print(index_num)
    keys = list([f'{index_num}'])
    msg = list([f'hello world!{index_num}'])
    data = zip(msg, keys)

  return data

for i in range(iterations):
  write_to_kafka("sample-streaming-data", generate_data())
  sleep(random.randint(0,10))



Overwriting generator.py


In [37]:
!pip install xmltodict

Collecting xmltodict
  Downloading xmltodict-0.13.0-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: xmltodict
Successfully installed xmltodict-0.13.0


In [103]:
token_dict = {'token':userdata.get('uta')}

In [104]:
with open('token.json', 'w') as file:
    json.dump(token_dict, file)

In [114]:
%%writefile uta_generator.py

from google.colab import userdata
import json
import xmltodict
import sys


args = sys.argv  # a list of the arguments provided (str)
print("running generator.py", args)
iterations = int(args[1])
print(f'iterations: {iterations}')

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  from kafka import KafkaProducer

  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  print(items)

  location = f'{items[0]},{items[1]}'
  producer.send(topic_name, value=location.encode('utf-8'), partition=0).add_errback(error_callback)
  count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

def get_locations(token):
    from time import sleep
    from google.colab import userdata
    import requests
    import xmltodict
    import pandas as pd
    import os

    sleep(4)

    url = f'http://api.rideuta.com/SIRI/SIRI.svc/VehicleMonitor/ByRoute?route=703&onwardcalls=true&usertoken={token}'
    print(url)
    response = requests.get(url)
    xml_dict = xmltodict.parse(response.text)
    df = pd.DataFrame(xml_dict['Siri']['VehicleMonitoringDelivery']['VehicleActivity']['MonitoredVehicleJourney'])
    df = df[df['VehicleRef']=='1107']
    location_df = pd.json_normalize(df['VehicleLocation'])

    return location_df.iloc[0]['Latitude'],location_df.iloc[0]['Longitude']

with open('token.json', 'r') as file:
    token_dict = json.load(file)

token = token_dict['token']

for i in range(iterations):
  write_to_kafka("sample-streaming-data", get_locations(token))



Overwriting uta_generator.py


In [95]:
import os
os.environ['uta_token'] = userdata.get('uta')

In [108]:
def get_locations(token):
    from time import sleep
    from google.colab import userdata
    import requests
    import xmltodict
    import pandas as pd

    sleep(4)

    token = os.environ['uta_token']
    url = f'http://api.rideuta.com/SIRI/SIRI.svc/VehicleMonitor/ByRoute?route=703&onwardcalls=true&usertoken={token}'
    response = requests.get(url)
    xml_dict = xmltodict.parse(response.text)
    df = pd.DataFrame(xml_dict['Siri']['VehicleMonitoringDelivery']['VehicleActivity']['MonitoredVehicleJourney'])
    df = df[df['VehicleRef']=='1113']
    location_df = pd.json_normalize(df['VehicleLocation'])

    return location_df.iloc[0]['Latitude'],location_df.iloc[0]['Longitude']

In [109]:
    from google.colab import userdata
    import requests
    import xmltodict
    import pandas as pd
    token = userdata.get('uta')
    url = f'http://api.rideuta.com/SIRI/SIRI.svc/VehicleMonitor/ByRoute?route=703&onwardcalls=true&usertoken={token}'
    response = requests.get(url)
    xml_dict = xmltodict.parse(response.text)
    df = pd.DataFrame(xml_dict['Siri']['VehicleMonitoringDelivery']['VehicleActivity']['MonitoredVehicleJourney'])
    df = df[df['VehicleRef']!='1113']
    location_df = pd.json_normalize(df['VehicleLocation'])

    location_df

Unnamed: 0,Longitude,Latitude
0,-112.01673383333332,40.575237666666666
1,-111.896955,40.716735166666666
2,-111.89143766666666,40.754171
3,-111.92172966666666,40.61132233333333
4,-111.89926133333331,40.6320775
5,-111.92175033333334,40.610838
6,-111.88311366666667,40.760668333333335
7,-112.01644416666667,40.54643416666666
8,-111.89684233333334,40.734614666666666


In [110]:
df

Unnamed: 0,LineRef,DirectionRef,FramedVehicleJourneyRef,PublishedLineName,OriginRef,DestinationRef,Monitored,VehicleLocation,ProgressRate,CourseOfJourneyRef,VehicleRef,Extensions
0,703,TO MEDICAL,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,TX101715,TX127252,True,"{'Longitude': '-112.01673383333333', 'Latitude...",1,399562,1107,"{'LastGPSFix': '2024-04-08T17:45:15.083', 'Sch..."
1,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.896955', 'Latitude': '40.7...",1,400900,1112,"{'LastGPSFix': '2024-04-08T17:45:13.817', 'Sch..."
2,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.89143766666666', 'Latitude...",1,400850,1123,"{'LastGPSFix': '2024-04-08T17:45:14.16', 'Sche..."
3,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.92172966666666', 'Latitude...",1,400820,1128,"{'LastGPSFix': '2024-04-08T17:45:14.647', 'Sch..."
4,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.89926133333333', 'Latitude...",1,400896,1138,"{'LastGPSFix': '2024-04-08T17:45:10.24', 'Sche..."
5,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.92175033333334', 'Latitude...",1,400833,1141,"{'LastGPSFix': '2024-04-08T17:45:13.817', 'Sch..."
6,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.88311366666667', 'Latitude...",1,400862,1153,"{'LastGPSFix': '2024-04-08T17:45:17.16', 'Sche..."
7,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-112.01644416666667', 'Latitude...",1,414973,1155,"{'LastGPSFix': '2024-04-08T17:45:14.66', 'Sche..."
8,703,,"{'DataFrameRef': '2024-04-08T00:00:00-06:00', ...",RED LINE,0,0,True,"{'Longitude': '-111.89684233333334', 'Latitude...",1,400853,1164,"{'LastGPSFix': '2024-04-08T17:45:16.813', 'Sch..."


In [81]:
%%script bash --bg

python uta_generator.py 10

In [66]:
location_df.iloc[0]['Latitude']

'40.733855'

In [68]:
get_locations()

('40.744858', '-111.896807')

# write some data

In [31]:
%%script bash --bg

python generator.py 30

In [116]:
from kafka import KafkaConsumer

# Kafka consumer configuration
bootstrap_servers = ['localhost:9092']  # Kafka server address
topic_name = 'sample-streaming-data'  # Kafka topic you want to read from
group_id = 'some_group'  # Consumer group ID

# Create a Kafka consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',  # Start reading at the earliest message
    enable_auto_commit=True,
    group_id=group_id,
    value_deserializer=lambda x: x.decode('utf-8')  # Assuming messages are UTF-8 encoded
)

# Read and print five messages from the topic
try:
    for _ in range(5):
        message = next(consumer)
        print(f"Received message: {message.value}")
finally:
    # Clean up on exit
    consumer.close()


ERROR:kafka.consumer.fetcher:Fetch to node 0 failed: Cancelled: <BrokerConnection node_id=0 host=6272db7d31f6:9092 <connected> [IPv4 ('172.28.0.12', 9092)]>


KeyboardInterrupt: 

function for retrieve messages.

In [29]:
message

ConsumerRecord(topic='sample-streaming-data', partition=0, offset=15, timestamp=1712615482262, timestamp_type=0, key=b'hello world!404679', value='404679', headers=[], checksum=None, serialized_key_size=18, serialized_value_size=6, serialized_header_size=-1)

# retrieve messages

In [19]:
def retrieve_messages(topic='sample-streaming-data',bootstrap_servers = '127.0.0.1:9092',output_path='output_path'):

  from kafka import KafkaConsumer
  from kafka.structs import TopicPartition
  import gspread

  consumer = KafkaConsumer(
      bootstrap_servers=bootstrap_servers,
      auto_offset_reset='earliest',
      group_id='retrieve_last_items_group'
      )###,enable_auto_commit=True)
  # Read the specified partition
  consumer.assign([TopicPartition(topic, 0)])



  for msg in consumer:
      # Open the file in write mode ('a')
      file_path = 'steaming_data.csv'  # Replace with the desired file path
      file = open(file_path, 'a')

      print(f"the msg: {msg.value.decode('utf-8')}")
        # Write content to the file
      file.write(f"{msg.value.decode('utf-8')}\n")
      # Close the file
      file.close()

In [20]:
retrieve_messages()

KeyboardInterrupt: 

In [None]:
!cat steaming_data.csv

In [None]:
from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(5000)"))

In [None]:
from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(9092)"))

In [None]:
#%%write_file recommender.py

import numpy as np

# Sample user-item matrix
user_item_matrix = np.array([
    [4, 5, 0, 5, 0],
    [5, 0, 4, 0, 3],
    [0, 3, 0, 4, 5],
    [4, 0, 5, 0, 4],
    [0, 4, 0, 3, 0]
])

# Calculate item similarities using cosine similarity
def calculate_item_similarities(matrix):
    num_items = matrix.shape[1]
    similarities = np.zeros((num_items, num_items))

    for i in range(num_items):
        for j in range(num_items):
            if i != j:
                item_i = matrix[:, i]
                item_j = matrix[:, j]
                similarity = np.dot(item_i, item_j) / (np.linalg.norm(item_i) * np.linalg.norm(item_j))
                similarities[i, j] = similarity

    return similarities

# Generate item recommendations for a target user
def generate_item_recommendations(user_id, matrix, similarities, top_k=3):
    user_ratings = matrix[user_id]
    num_items = matrix.shape[1]

    item_scores = np.zeros(num_items)

    for i in range(num_items):
        if user_ratings[i] == 0:
            item_i_similarity = similarities[i]
            item_i_ratings = matrix[:, i]
            item_scores[i] = np.dot(item_i_similarity, item_i_ratings) / np.sum(item_i_similarity)
            print(item_scores)

    #item_scores = #np.argsort(item_scores)[::-1][:top_k]
    print(item_scores)

    return item_scores

# Calculate item similarities
item_similarities = calculate_item_similarities(user_item_matrix)

# Generate recommendations for user 2
user_id = 4
recommendations = generate_item_recommendations(user_id, user_item_matrix, item_similarities, top_k=6)

print(f"Recommendations for user {user_id}: {recommendations} with matrix {user_item_matrix[user_id]}")

In [None]:
np.nonzero(recommendations)

In [None]:
np.take(recommendations, np.nonzero(recommendations), axis=0)

In [None]:
import numpy as np
arr1 = np.nonzero(recommendations)
arr2 = np.take(recommendations, np.nonzero(recommendations), axis=0)

In [None]:
arr1inds = np.take(recommendations, np.nonzero(recommendations), axis=0).argsort()
sorted_arr1 = arr1[arr1inds[::-1]]
sorted_arr2 = arr2[arr1inds[::-1]]

In [None]:
sorted_arr1

In [None]:
from kafka import KafkaAdminClient, KafkaProducer
from kafka.admin import NewTopic

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Topic details
topic_name = 'my_topic'
num_partitions = 1
replication_factor = 1

# Create Kafka topic
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
admin_client.create_topics([topic])

# Produce messages to Kafka topic
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
messages = ['Hello', 'World', 'Kafka', 'Python']

for message in messages:
    producer.send(topic_name, value=message.encode('utf-8'))

# Close Kafka producer
producer.flush()
producer.close()

In [None]:
from kafka import KafkaConsumer

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Topic details
topic_name = 'my_topic'

# Kafka consumer configuration
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    group_id='my_consumer_group2',
    auto_offset_reset='earliest',
    #enable_auto_commit=True,
    value_deserializer=lambda x: x.decode('utf-8')
)

# Consume and process messages from Kafka topic
for message in consumer:
    print("Received message:", message.value)

# Close Kafka consumer
consumer.close()
