# **Final Assignment (Part 2) - Creating Streaming Data Pipelines using Kafka**

# **Scenario**

You are a data engineer at a data analytics consulting company. You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like `vehicle_id`,`vehicle_type`,`toll_plaza_id` and timestamp are streamed to Kafka. Your job is to create a data pipe line that collects the streaming data and loads it into a database.

## **Objectives**

In this assignment you will create a streaming data pipe by performing these steps:

- Start a MySQL Database server.
- Create a table to hold the toll data.
- Start the Kafka server.
- Install the Kafka python driver.
- Install the MySQL python driver.
- Create a topic named toll in kafka.
- Download streaming data generator program.
- Customize the generator program to steam to toll topic.
- Download and customise streaming data consumer.
- Customize the consumer program to write into a MySQL database table.
- Verify that streamed data is being collected in the database table.

# **Exercise 1 - Prepare the lab environment**

Before you start the assignment, complete the following steps to set up the lab:

- Step 1: Download Kafka.

In [1]:
!curl -O https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 68.2M  100 68.2M    0     0  4303k      0  0:00:16  0:00:16 --:--:-- 9961k


In [19]:
!curl -O https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  113M  100  113M    0     0  4811k      0  0:00:24  0:00:24 --:--:-- 10.7M


Step 2: Extract Kafka.

In [2]:
!tar -xzf kafka_2.12-2.8.0.tgz

In [20]:
!tar -xzf kafka_2.13-3.7.0.tgz

Step 3: Start MySQL server.

In [10]:
import os
from dotenv import load_dotenv

# Cargar variables de entorno desde el archivo .env
load_dotenv()

# Obtener la contraseña de la variable de entorno
password = os.getenv("DB_PASSWORD")

In [11]:
path = "/usr/local/mysql-8.0.31-macos12-arm64/bin/"

Step 5: Create a database named tolldata.

In [4]:
!{path}mysql --host=127.0.0.1 --port=3306 --user=root --password={password} --execute="CREATE DATABASE tolldata" 2>/dev/null;

In [5]:
!{path}mysql --host=127.0.0.1 --port=3306 --user=root --password={password} --execute="SHOW DATABASES" 2>/dev/null;

+--------------------+
| Database           |
+--------------------+
| employees          |
| information_schema |
| mysql              |
| performance_schema |
| sakila             |
| sys                |
| tolldata           |
| world              |
+--------------------+


In [12]:
%load_ext sql

In [13]:
# Crear la URL de conexión
%sql mysql+pymysql://root:{password}@localhost:3306/tolldata

In [8]:
%sql SHOW TABLES;

 * mysql+pymysql://root:***@localhost:3306/tolldata
0 rows affected.


Tables_in_tolldata


In [9]:
%sql create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);

 * mysql+pymysql://root:***@localhost:3306/tolldata
0 rows affected.


[]

In [10]:
%sql SHOW TABLES;

 * mysql+pymysql://root:***@localhost:3306/tolldata
1 rows affected.


Tables_in_tolldata
livetolldata


Step 8: Install the python module kafka-python using the pip command.

In [12]:
!python3 -m pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25h[33mDEPRECATION: pytorch-lightning 1.6.5 has a non-standard dependency specifier torch>=1.8.*. pip 24.0 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pytorch-lightning or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0mInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
[0m

Step 9: Install the python module mysql-connector-python using the pip command.

In [13]:
!python3 -m pip install mysql-connector-python==8.0.31

Collecting mysql-connector-python==8.0.31
  Downloading mysql_connector_python-8.0.31-cp39-cp39-macosx_11_0_arm64.whl.metadata (1.8 kB)
Collecting protobuf<=3.20.1,>=3.11.0 (from mysql-connector-python==8.0.31)
  Downloading protobuf-3.20.1-py2.py3-none-any.whl.metadata (720 bytes)
Downloading mysql_connector_python-8.0.31-cp39-cp39-macosx_11_0_arm64.whl (4.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.6/4.6 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hDownloading protobuf-3.20.1-py2.py3-none-any.whl (162 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.1/162.1 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h[33mDEPRECATION: pytorch-lightning 1.6.5 has a non-standard dependency specifier torch>=1.8.*. pip 24.0 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pytorch-lightning or contact the author to suggest that they release a version with a conforming dependency 

# **Exercise 2 - Start Kafka**

### **Task 2.1 - Start Zookeeper**

Start zookeeper server.

Take a screenshot of the command you run.

Name the screenshot `start_zookeeper.jpg`. (Images can be saved with either the .jpg or .png extension.)

### **Task 2.2 - Start Kafka server**

Start Kafka server

Take a screenshot of the command you run.

Name the screenshot `start_kafka.jpg`. (Images can be saved with either the .jpg or .png extension.)

### **CREATE DOCKER-COMPOSE.YML**

In [2]:
# version: '2'

# services:
#   zookeeper:
#     image: arm64v8/zookeeper
#     ports:
#       - "2181:2181"

#   kafka:
#     image: bitnami/kafka:latest
#     ports:
#       - "9092:9092"
#     expose:
#       - "9093"
#     environment:
#       KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
#       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
#       KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
#       KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
#       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#       KAFKA_CREATE_TOPICS: "my-topic:1:1"
#     depends_on:
#       - zookeeper
#     volumes:
#       - /var/run/docker.sock:/var/run/docker.sock

In [3]:
#docker-compose up -d

### **Task 2.3 - Create a topic named toll**

In [1]:
#docker exec -it <kafka-container-id> /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic

### **Task 2.4 - Download the Toll Traffic Simulator**

In [4]:
!curl -O https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   828  100   828    0     0   1071      0 --:--:-- --:--:-- --:--:--  1085


### **Task 2.5 - Configure the Toll Traffic Simulator**

Open the `toll_traffic_generator.py` and set the topic to `toll`.

### **Task 2.6 - Run the Toll Traffic Simulator**

In [5]:
#python3 toll_traffic_generator.py

### **Task 2.7 - Configure streaming_data_reader.py**

Download the `streaming_data_reader.py` from the url below using ‘wget’

`https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py`

Open the `streaming_data_reader.py` and modify the following details so that the program can connect to your mysql server.

`TOPIC`

`DATABASE`

`USERNAME`

`PASSWORD`

In [6]:
!curl -O https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1364  100  1364    0     0   1410      0 --:--:-- --:--:-- --:--:--  1515


### **Task 2.8 - Run streaming_data_reader.py**

In [7]:
#python3 streaming_data_reader.py

## **Task 2.9 - Health check of the streaming data pipeline.**

In [14]:
%sql SELECT * FROM livetolldata LIMIT 10

 * mysql+pymysql://root:***@localhost:3306/tolldata
10 rows affected.


timestamp,vehicle_id,vehicle_type,toll_plaza_id
2024-05-04 01:22:05,5809462,car,4007
2024-05-04 01:22:07,7842899,truck,4004
2024-05-04 01:22:08,2078344,car,4001
2024-05-04 01:22:09,8338187,car,4010
2024-05-04 01:22:10,1096077,car,4001
2024-05-04 01:22:10,5138689,car,4005
2024-05-04 01:22:11,8376169,car,4001
2024-05-04 01:22:11,6546105,truck,4002
2024-05-04 01:22:12,9569143,truck,4001
2024-05-04 01:22:12,6540270,van,4010


In [15]:
%sql SELECT COUNT(*) FROM livetolldata 

 * mysql+pymysql://root:***@localhost:3306/tolldata
1 rows affected.


COUNT(*)
54
