### First change server.properties & zookeeper.properties log & data path

### Zookeeper Run Code: .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

### Kafka Server Code: .\bin\windows\kafka-server-start.bat .\config\server.properties

### Create Topic:

```
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

or

.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
```

### List All Topics

```
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list
```
> or use --bootstrap-server localhost:9092


### For more than 1 partition:

1- messages will be published to partitions randomly, unless specified the partition number.


2- messages will be consumed round robin 

### For 1 Partition Multiple Consumer 

Same Partition can't be assigned to multiple consumer of the same group.

## Important

- auto_offset_reset :
1. earliest = to read all uncomitted messages
2. latest = to read messages which are sent after the consumer starts

- either use enable_auto_commit = True or manually do consumer.commit() after reading every message
- To determine to which partition data should go, use partition parameter in producer.send()
- use KafkaProducer() with value_serializer, as json.dumps()
- When group_id is None & auto_offset_reset = 'earliest', then every time when we run consumer it will read all the available messages

#### Producer Syntax:

- producer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=json.dumps(data).encode('utf-8'))
- producer.send(topic=topic,value=data,partition=partition)"


#### Consumer Syntax:
##### Consumer reading all data of topic, not a particular partition
- consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id=group_id)
    
##### Consumer reading all data of topic, not a particular partition
- consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id=group_id)

- consumer.assign([TopicPartition(topic,partition)]) # assign consumer to a particular topic partition



In [2]:
from kafka import KafkaProducer
from kafka import KafkaConsumer

from faker import Faker
import json,time

In [3]:
def get_fake_data():
    fake=Faker()
    return{
        'name':fake.name(),
        'address':fake.address(),
        'D.O.Y':fake.year()
    }

In [4]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

In [5]:
def produce_data(topic_name,n):
    producer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=json_serializer)
    for i in range(n):
        print(i)
        fake_data=get_fake_data()
        producer.send(topic_name,fake_data)
        time.sleep(5)
    producer.close()

In [8]:
def consume_data(topic_name,group_id=None):
    # auto_offset_reset -> where to start from, earliest means from begining
    consumer=KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id=group_id  )
    print('Consumer Started..')
    for data in consumer:
        time.sleep(10)
        print(f"Data: {json.loads(data.value)}")
    consumer.close()