# Installation

## Zookeeper
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz 

## Java 
https://www.java.com/en/download/apple.jsp  
https://www.oracle.com/java/technologies/downloads/#jdk19-mac


## Kafka
https://kafka.apache.org/downloads

# Kafka Architeture Types

## Single Node, Single Cluster
<div>
<img src="figures/fig1.png" width="700"/>
</div>


## Single Node, Multiple Brokers
<div>
<img src="figures/fig2.png" width="700"/>
</div>

## Multiple Node, Multiple Brokers

<div>
<img src="figures/fig3.png" width="700"/>
</div>

# Kafka Command Lines Tools

* zookeeper-server-start.sh : Starts Zookeeper using properties in `config/zookeeper.properties`

* kafka-server-start.sh : Starts a Kafka server using the properties in `config/server.properties`

* kafka-topics.sh : Used to create and list topics.

* kafka-console-producer.sh : Used to send messages to Kafka cluster.

* kafka-console-consumer.sh : Used to consume messages from Kafka cluster.

# System Setup

## Single Node, Single Broker Setup

### Zookeeper

#### Getting Startd with Zookeeper
[Apache Foundation Documentation](https://zookeeper.apache.org/doc/r3.8.0/zookeeperStarted.html)  

#### Zookeeper config
After downloading zookeeper go to its conf directory and write a `conf/zoo.cfg` file. It should look like the following:

```
tickTime=2000  
dataDir=/var/lib/zookeeper  
clientPort=2181  
```

* tickTime : the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum   
session timeout will be twice the tickTime.

* dataDir : the location to store the in-memory database snapshots and, unless specified otherwise,   
the transaction log of updates to the database.

* clientPort : the port to listen for client connections


#### Starting zookeeper
`bin/zkServer.sh start`


*Sample Output*  
Karims-MBP:apache-zookeeper-3.8.0-bin karimitani$ sudo bin/zkServer.sh start  
/usr/bin/java  
ZooKeeper JMX enabled by default  
Using config: /Users/karimitani/Develloper/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg  
Starting zookeeper ... STARTED  

#### To manage Zookeeper through the CLI
`bin/zkCli.sh -server 127.0.0.1:2181`

### Kafka

#### Getting started with Kafka

[Apache Foundation Documentation](https://kafka.apache.org/documentation/)

#### Start the kafka environment  

to start the built in zookeeper :   `bin/zookeeper-server-start.sh config/zookeeper.properties`  
to start the kafka broker service :   `bin/kafka-server-start.sh config/server.properties`  

### Verification Step

run `jps`, you will see kafka running.

*sample output*  
>Karims-MBP: jps  
>63956 Jps  
>63220 Kafka  
>62828 QuorumPeerMain  


### Creating topics

run `bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Example`  

*Example Output*  
>Karims-MBP: bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Example 
>Created topic Example.  

### Listing Topics

run `bin/kafka-topics.sh --list --bootstrap-server localhost:9092`  

*Sample Output*  
>Karims-MBP: bin/kafka-topics.sh --list --bootstrap-server localhost:9092  
>Example  

### Creating a producer

run the following to start a producer cli session:  
`bin/kafka-console-producer.sh --topic Example --bootstrap-server localhost:9092`

*Sample Output*   
>Karims-MBP: bin/kafka-console-producer.sh --topic Example --bootstrap-server localhost:9092   
>hi  
>this  
>is    
>me 
>karim  

### Creating a consumer 

run the following to consume messages on a given topic:  
`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Example --from-beginning `

*Sample Output*   
>Karims-MBP: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Example --from-beginning
>hi  
>this  
>is   
>me
>karim 

## Single Node, Mutli Broker Setup

### Configuring multiple brokers
In order to start multiple brokers you need multiple server.properties files. This is done by running the following:   

```
cp server.properties server1.properties
cp server.properties server2.properties
```

You need to change a few settings within each of the new config file (the below example is for server1.properties):  

```
broker.id = 1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
```

### Starting the new brokers
Run the following:
```
./bin/kafka-server-start.sh ./conf/server1.properties
./bin/kafka-server-start.sh ./conf/server2.properties
```

### Creating topic
```
./bin/kafka-topics.sh --create --topic Example2 --zookeeper localhost:2181 --partitions 3 --replication factor 2
```

### Inspecting the created topics
The below command will show the topic/partition details. Which topic replicat has which leader, which brokers keep replicas  
of the partition and which of those replicas are in sync (ISR = In Sync Replicas)


```
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Example2
Topic: Example2	TopicId: PykfT1-2Sk6JC9E_aya0qw	PartitionCount: 3	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: Example2	Partition: 0	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: Example2	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: Example2	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

```

### Creating a producer
```
./bin/kafka-console-producer.sh --broker-list localhost:9092 localhost:9093 localhost:9094 --topic Example2
```

*Sample Output*
> Karims-MacBook-Pro: ./bin/kafka-console-producer.sh --broker-list localhost:9092 localhost:9093 localhost:9094 --topic Example2  
> \>1  
> \>2  
> \>3  
> \>4  
> \>5  


### Creating consumers
When you create a consumer it automatically get put into a consumer group. That is why the consumer is getting all the  
messages that have been sent to the three partitions that we just created with topic "Example2"



```
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Example2 --from-beginning
```


<div>
<img src="figures/fig4.png" width="400"/>
</div>

<br>
<br>
<br>
<br>
The below shows you that running two consumer will get you all the messages that we sent (above) which is not the behavior  
that we're looking for when consuming from multiple partitions.

> Karims-MacBook-Pro:kafka_2.12-3.2.3 karimitani$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  
>1  
>2  
>3  
>4  
>5  

> Karims-MacBook-Pro:kafka_2.12-3.2.3 karimitani$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Example2  
>1  
>2  
>3  
>4  
>5  


### Creating consumers with specific consumer groups
If we want to have three consumers, all of them within the same consumer group, to each be mapped to a different paritition.  
We run the following in three separate CLI tabs.

```
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  --group example-group
```

*Sending new Messages*
> Karims-MacBook-Pro: ./bin/kafka-console-producer.sh --broker-list localhost:9092 localhost:9093 localhost:9094 --topic Example2  
> \>1  
> \>2  
> \>3  
> \>4  
> \>5 



Each of the consumer gets a non duplicate message

*From consumer 1*  
>./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  --group example-group  
>1  
>2  

*From consumer 3*  
>./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  --group example-group  
>5  

*From consumer 3*  
>./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  --group example-group  
>3  
>4  

<div>
<img src="figures/fig5.png" width="400"/>
</div>

If we create a new consumer with its own new group we see that it gets all the messages since its by itself in the   
new consumer group

*From consumer 4 (group 2)*  
>./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic Example2  --group example-group-2  
>1  
>2  
>3  
>4  
>5  

<div>
<img src="figures/fig6.png" width="400"/>
</div>


### Altering a topic

In Kafka you can't reduce the number of partitions but you can increase them. However its a best practice to `over partition`  
to the number of paritions that you need in the future ahead of time.


```
./bin/kafka-topics.sh --alter --bootstrap-server localhost:9094 --topic Example2 --partitions 4
```

You now can inspect the same topic again

```
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Example2
```

*Sample Output*
>Topic: Example2	TopicId: PykfT1-2Sk6JC9E_aya0qw	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824  
	>>Topic: Example2	Partition: 0	Leader: 0	Replicas: 2,0	Isr: 0,2  
	Topic: Example2	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2  
	Topic: Example2	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1  
	Topic: Example2	Partition: 3	Leader: 2	Replicas: 2,1	Isr: 2,1  


### Deleting a topic 

```
./bin/kafka-topics.sh --delete --topic Example2 --bootstrap-server localhost:9092
```

