# Apache Kafka
Powering Real-Time Data at Scale

![Apache Kafka](images/logo.jpg)

## Introduction
This workshop aims to provide a practical understanding of Apache Kafka's core concepts and hands-on experience with its functionality.

### Objectives
- Understand the key concepts of Event Streaming and Apache Kafka.
- Learn how to produce and consume messages.
- Explore advanced features like partitions and fault tolerance.
- Conclude with a quiz to summarize and memorize 

[Apache Kafka Official Documentation](https://kafka.apache.org/documentation/)


# Introduction to Event Streaming

Event streaming is a technology paradigm for continuously capturing, storing, processing, and reacting to events happening across your business or applications in real time. It fundamentally changes how data is handled, moving away from batch processing to a continual flow of data.

Event streaming platforms like Apache Kafka enable the collection, integration, and analysis of massive streams of event data from multiple sources.

![Event Streaming](images/event.svg)

## The Relevance of Event Streaming

Event streaming has become vital in today’s digital world, where real-time data and insights are crucial for decision making. Applications range from real-time analytics and monitoring to data integration and microservices communication.

### <span style="color:red">Question: Why do you think real-time data processing is important in modern applications?</span>


## What is Apache Kafka?

Apache Kafka is a distributed event streaming platform that provides high-throughput, highly scalable, and fault-tolerant event streaming capabilities. It is designed to handle real-time data feeds and provides a unified platform for both producing and consuming data streams.

### Key Features:
- **High Throughput**: Capable of handling millions of events per second.
- **Scalability**: Easily scales horizontally to accommodate growing data.
- **Fault Tolerance**: Robust against system failures, ensuring no data loss.
- **Real-Time Processing**: Enables immediate data processing and decision making.

![Applications](images/apps.png)

# Companies Using Apache Kafka

Apache Kafka is widely adopted by numerous companies across various industries. Its ability to handle large-scale, real-time data makes it a preferred choice for modern data architectures.

## Some Notable Companies:
- **LinkedIn**: Originally developed Kafka to handle their activity stream and operational metrics.
- **Netflix**: Utilizes Kafka for real-time monitoring and event processing in their streaming service.
- **Uber**: Employs Kafka for gathering user, trip, and geospatial data for real-time analytics and decision-making.
- **Twitter**: Uses Kafka as a backbone for their event streaming architecture, handling billions of events each day.

### <span style="color:red">Question: Can you think of a scenario in your industry or field where Kafka's capabilities would be beneficial?</span>


# Transition to Kafka Key Concepts

Now that we understand the importance of event streaming and the role of Apache Kafka in this domain, let's delve into the key concepts that make Kafka a powerful tool for event-driven data processing.


## Overview of Apache Kafka

### Key Concepts

Understanding the fundamental concepts of Apache Kafka is crucial for working with this powerful streaming platform. In this section, we will explore the essential components and their roles in Kafka's architecture.

- **Topics**: Categories where records are stored.
- **Producers**: Entities that publish messages to topics.
- **Consumers**: Entities that subscribe to topics and process messages.
- **Brokers**: Servers in a Kafka cluster that store data and serve clients.  
- **Partitions**: Kafka topics are divided into partitions, which allow for data to be distributed and parallelized across multiple brokers.
- **Offsets**: Unique identifiers of records within a partition.

![Kafka Architecture](images/simple.png)


## 1. Topics

- **Definition**: A topic is a category or feed name to which records are published. It is like a channel where data is stored and distributed.
- **Characteristics**:
  - Topics are partitioned for scalability.
  - Data within a topic is immutable.
- **Use Case**: Different topics for logs, metrics, customer activities, etc.

### <span style="color:red">Question: What purpose do partitions within a topic serve in Apache Kafka?</span>


## 2. Producers

- **Definition**: Applications or processes that publish data to Kafka topics.
- **How it Works**:
  - Producers send data to topics, optionally choosing the partition.
  - Data can be sent synchronously or asynchronously.
- **Key Points**: Responsible for key determination and efficient data distribution.

### <span style="color:red">Question: How do producers influence which partition a message is sent to in Kafka?</span>


## 3. Consumers

- **Definition**: Processes that read data from Kafka topics.
- **Consumption Patterns**:
  - Subscribe to one or more topics and read data in order.
  - Track which records have been consumed using offsets.
- **Grouping**: Part of a consumer group to avoid duplicate processing.

### <span style="color:red">Question: Why is it important for consumers to track offsets in Kafka?</span>


![Topic](images/topic.png)

![Producers](images/producers.png)

## 4. Brokers

- **Definition**: Servers in a Kafka cluster storing data and serving clients.
- **Cluster Role**:
  - Handle load balancing and fault tolerance.
  - Manage requests from producers and serve data to consumers.
- **Replication**: Ensures data availability and durability.

### <span style="color:red">Question: What is the role of a broker in the Kafka architecture?</span>


## 5. Partitions

- **Definition**: Divisions within a Kafka topic.
- **Scaling and Performance**:
  - Enable parallel processing across nodes.
  - Hosted on different servers for better data handling and consumer management.

### <span style="color:red">Question: How do partitions contribute to Kafka’s scalability and fault tolerance?</span>

![Partitions](images/partitions.png)

![More Consumers](images/more_consumers.png)

![Impossible](images/impossible.png)

![Idle Consumer](images/idle_consumer.png)

![More Groups](images/more_groups.png)

## 6. Offsets

- **Definition**: Unique identifiers for each record within a partition.
- **Consumer Tracking**:
  - Enable consumers to keep track of consumed messages.
  - Allow resuming reading from the last consumed offset.

### <span style="color:red">Question: What would happen if Kafka consumers didn't use offsets?</span>


## Kafka's Transition from ZooKeeper to KRaft

### ZooKeeper in Kafka

- **Role**: ZooKeeper has been integral to Kafka for tasks like controller election, cluster membership, topic configuration, ACLs, and quotas.

- **Challenges**:

  - **Scalability**: ZooKeeper can become a bottleneck in large Kafka clusters.

  - **Complexity**: Adds a layer of complexity and dependency to Kafka’s architecture.

  - **Consistency**: Requires a quorum of nodes to be available to process any request.

![Zookeeper](images/zookeeper.png)

### Introduction to KRaft

- **KRaft** (Kafka Raft Metadata mode) is Kafka's new consensus protocol, replacing ZooKeeper.

- **Benefits**:

  - **Simplicity**: Simplifies Kafka’s architecture by removing the need for ZooKeeper.

  - **Scalability**: Improves scalability by reducing the load on the metadata store.

  - **Availability**: Enhances Kafka’s availability by allowing partial failures in the metadata store.

### Transition from ZooKeeper to KRaft

- Kafka's architecture overhaul involves moving from ZooKeeper to KRaft, starting with Kafka 2.8.

- **Considerations**:

  - **Compatibility**: Transition requires careful planning and testing due to compatibility issues.

  - **Consistency**: KRaft allows eventual consistency in some cases.

![ZooKeeper vs. KRaft](images/kraft.jpg)

### <span style="color:red">Question</span>

**What major challenge associated with ZooKeeper was addressed by Kafka's transition to KRaft?**

- A) Inability to handle large data volumes.

- B) Complexity and scalability issues.

- C) Lack of security features.

- D) Difficulty in integrating with other systems.


# Key Concepts Conclusion

These key concepts form the backbone of Apache Kafka's architecture and functionality. Understanding these will aid in grasping more advanced topics and effectively utilizing Kafka for real-time data processing.


## Exercises

## Exercise 1: Basic Messaging with Kafka

### Objective
Understand the fundamental process of producing and consuming messages using Kafka.



### Producing and Consuming Messages

#### Setup


In [3]:
!pip install --no-cache-dir confluent-kafka

from confluent_kafka import Producer, Consumer, KafkaException
import json


# Kafka Configuration
kafka_brokers = "kafka"  # Use the Docker service name as the hostname
topic_a = "topic-a"
topic_b = "topic-b"
topic_c = "topic_c"

# Function for producer creation
def create_producer(broker_url):
    conf = {'bootstrap.servers': broker_url}
    return Producer(conf)

# Function for consumers creation
def create_consumer(broker_url, group_id, auto_offset_reset='earliest'):
    conf = {
        'bootstrap.servers': broker_url,
        'group.id': group_id,
        'auto.offset.reset': auto_offset_reset
    }
    return Consumer(conf)

# Function for error reporting
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
        



In [4]:
import threading
import confluent_kafka
import time


# Define the on_assign callback function
def on_assign(consumer, partitions):
    for p in partitions:
        # Reset the offset to the beginning
        p.offset = confluent_kafka.OFFSET_BEGINNING
    consumer.assign(partitions)

# List to hold messages
messages = []

# Consumer function to run in a separate thread
def consume_messages():
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue
            print(f"Received message: {msg.value().decode('utf-8')}")
            messages.append(msg.value().decode('utf-8'))
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
        
# Initialize the consumer
consumer = create_consumer(kafka_brokers, "mygroup")

# Subscribe to the topic with the on_assign callback
consumer.subscribe(['topic-a'], on_assign=on_assign)

# Start consumer in a background thread
consumer_thread = threading.Thread(target=consume_messages)
consumer_thread.start()
print("Consumer started")

Consumer started


#### Producing Messages


In [5]:
# Producer setup
producer = create_producer(kafka_brokers)

for i in range(10):
    producer.produce('topic-a', value=f'Text message number {i}')
    print(f'Sent: Text message number {i}')
    time.sleep(1)

# Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered
producer.flush()


Sent: Text message number 0
Received message: Text message number 0
Sent: Text message number 1
Received message: Text message number 1
Sent: Text message number 2
Received message: Text message number 2
Sent: Text message number 3
Received message: Text message number 3
Sent: Text message number 4
Received message: Text message number 4
Sent: Text message number 5
Received message: Text message number 5
Sent: Text message number 6
Received message: Text message number 6
Sent: Text message number 7
Received message: Text message number 7
Sent: Text message number 8
Received message: Text message number 8
Sent: Text message number 9
Received message: Text message number 9


0

In [6]:
# Stop the consumer thread
consumer_thread.join(timeout=1)

# Display the messages received
print("Messages received:")
for message in messages:
    print(message)


Messages received:
Text message number 0
Text message number 1
Text message number 2
Text message number 3
Text message number 4
Text message number 5
Text message number 6
Text message number 7
Text message number 8
Text message number 9


## Exercise 2: Working with Partitions

### Objective
Learn how to produce messages to specific partitions and consume them from individual partitions.


In [None]:
# Producing messages to specific partitions
topic_nam = "topic-b"
producer_2 = create_producer(kafka_brokers)

for i in range(10):
    partition = i % 3  # in our case we have 3 partitions
    message = f"Partition {partition} - Message {i}"
    print(partition)
    producer_2.produce(topic_nam, message.encode('utf-8'), partition=partition, callback=delivery_report)

producer_2.flush()



In [None]:
from confluent_kafka import TopicPartition, KafkaException

def consume_from_partition(consumer, partition_id, num_messages=5):
    consumer.assign([TopicPartition(topic_nam, partition_id)])
    try:
        for _ in range(num_messages):
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                print(f"Received message from partition {partition_id}: {msg.value().decode('utf-8')}")
    except KafkaException as e:
        print(f"Error while consuming from partition {partition_id}: {e}")
    finally:
        # Reset consumer assignment rather than closing it
        consumer.unassign()

# Creating the consumer
consumer_2 = create_consumer(kafka_brokers, "mygroup")

# Consuming from partition 0
consume_from_partition(consumer_2, 0)

# Consuming from partition 1
consume_from_partition(consumer_2, 1)

# Consuming from partition 1
consume_from_partition(consumer_2, 2)

# Close the consumer after all partitions have been consumed
consumer_2.close()


## Exercise 3: Message Key-Based Routing

### Objective
Explore how message keys influence the partition to which a message is sent.

![Keys](images/keys.png)


In [None]:
consumer_3 = create_consumer(kafka_brokers, "mygroup")
producer_3 = create_producer(kafka_brokers)
# Producing messages with keys
for i in range(10):
    key = f"key-{i % 2}"  # Two keys for demonstration
    message = f"Message with key {key}"
    producer_3.produce(topic_c, key=key.encode('utf-8'), value=message.encode('utf-8'), callback=delivery_report)

producer_3.flush()


In [None]:
# Consuming messages and displaying keys
consumer_3.subscribe([topic_c])
try:
    for _ in range(10):
        msg = consumer_3.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            print(f"Received message with key {msg.key().decode('utf-8')}: {msg.value().decode('utf-8')}")
finally:
    consumer_3.close()


## Quiz time

### As a summary of everything so far, we have prepared a short quiz

Go to this [link] for the quiz. (https://take.quiz-maker.com/Q01Y2VNTR)

## Additional Resources

For further learning and exploration, check out the resources used for this workshop:

- [Apache Kafka Official Site](https://kafka.apache.org/)
- [Kafka Docker Repository by Wurstmeister](https://github.com/wurstmeister/kafka-docker)
- [Kafka Documentation](https://kafka.apache.org/documentation/)
- [Confluent Developer Courses](https://developer.confluent.io/courses/)
- [CloudKarafka: Kafka for Beginners](https://www.cloudkarafka.com/blog/part1-kafka-for-beginners-what-is-apache-kafka.html)
- [Apache Kafka Introduction - WayToEasyLearn](https://waytoeasylearn.com/learn/apache-kafka-introduction/)
- [Apache Kafka Architecture - Project Pro](https://www.projectpro.io/article/apache-kafka-architecture-/442)
- [Confluent: Kafka Introduction](https://docs.confluent.io/kafka/introduction.html)
- [The Evolution of Kafka Architecture: From Zookeeper to KRaft](https://romanglushach.medium.com/the-evolution-of-kafka-architecture-from-zookeeper-to-kraft-f42d511ba242)
- [Docker Kafka Kraft on GitHub](https://github.com/moeenz/docker-kafka-kraft)
- [Apache Kafka Architecture - JavaTpoint](https://www.javatpoint.com/apache-kafka-architecture)
- [Apache Kafka Architecture - Instaclustr](https://www.instaclustr.com/blog/apache-kafka-architecture/)
- [Apache Kafka GitHub Repository](https://github.com/apache/kafka)
- [KRaft - Confluent](https://developer.confluent.io/learn/kraft/)
- [Event Streaming](https://www.tibco.com/reference-center/what-is-event-streaming)
- [Thank you picture](https://students.ubc.ca/ubclife/develop-deliver-riveting-presentation)


## Conclusion

Thank you for participating in the Apache Kafka workshop. We hope this session provided valuable insights into Kafka and its capabilities. Feel free to explore the additional resources and apply the knowledge in your projects.

![Thank You](images/thank_you.jpg)
