##  Sample 1
### #Basic Example of KafkaProducer

In [7]:
#Correct Way to Use KafkaProducer in kafka-python   

#pip install kafka 
#pip install kafka-python    

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='10.222.68.223:9092')
producer.send('Test10', b'Hello, Kafka!')
producer.flush()


## Sample 2

In [6]:
from kafka import KafkaProducer

kafka_props = {
    'bootstrap_servers': '10.222.68.223:9092',
    'key_serializer': str.encode,
    'value_serializer': str.encode
}

producer = KafkaProducer(**kafka_props)

try:
    producer.send("CustomerCountry", key="Precision Products", value="France")
except Exception as e:
    print(e)
    

## Sample 3
### Basic Example of KafkaConsumer

In [None]:

#Basic Example of KafkaConsumer

from kafka import KafkaConsumer

# Create a consumer instance
consumer = KafkaConsumer(
    'Test10',
    bootstrap_servers='10.222.68.223:9092',
    auto_offset_reset='earliest', # or 'latest'
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: x.decode('utf-8')
)

# Consume messages
for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
    print(f"Key: {message.key}, Value: {message.value}")


## Sample 4
### Using Confluent Kafka

In [16]:
#pip install confluent-kafka


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [17]:
from confluent_kafka import Producer

conf = {
    'bootstrap.servers': '10.222.68.223:9092',
    'client.id': 'my_producer'
}
producer = Producer(conf)

In [18]:
producer.produce(topic='my_topic', key='my_key', value='my_value')

## Sample 5
### Using Callback

In [19]:
from confluent_kafka import Producer

class DemoProducerCallback:
    def __call__(self, err, msg):
        if err is not None:
            print(f"Error: {err}")
        else:
            print(f"Produced message to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

producer = Producer({'bootstrap.servers': '10.222.68.223:9092'})

record = {'topic': 'CustomerCountry', 'key': 'Biomedical Materials', 'value': 'BRAZIL'}
producer.produce(record['topic'], key=record['key'], value=record['value'], callback=DemoProducerCallback())

producer.flush()

Produced message to CustomerCountry [0] @ offset 1


0

## Sample 6

In [None]:
pip install confluent-kafka[avro] faker

In [27]:

#Define the Avro Schema

{  
 "type": "record",
  "name": "Customer",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "string"},    
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}


{'type': 'record',
 'name': 'Customer',
 'namespace': 'com.example',
 'fields': [{'name': 'id', 'type': 'string'},
  {'name': 'name', 'type': 'string'},
  {'name': 'email', 'type': 'string'},
  {'name': 'age', 'type': 'int'}]}

In [29]:
# Create customer_generator.py
customer_generator_code = """
from faker import Faker

class CustomerGenerator:
    def __init__(self):
        self.fake = Faker()

    def generate_customer(self):
        return {
            'name': self.fake.name(),
            'email': self.fake.email(),
            'address': self.fake.address(),
            'phone_number': self.fake.phone_number(),
            'birthdate': self.fake.date_of_birth(minimum_age=18, maximum_age=90).isoformat()
        }
"""

with open('customer_generator.py', 'w') as f:
    f.write(customer_generator_code)

In [30]:
# Create kafka_producer.py
kafka_producer_code = """
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from customer_generator import CustomerGenerator

# Define the Avro schema for customer data
customer_schema_str = '''
{
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "address", "type": "string"},
        {"name": "phone_number", "type": "string"},
        {"name": "birthdate", "type": "string"}
    ]
}
'''

# Configuration for Schema Registry
schema_registry_conf = {
    'url': 'http://10.222.68.223:8081'  # Replace with your Schema Registry URL
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Avro serializer for customer data
avro_serializer = AvroSerializer(
    schema_registry_client,
    customer_schema_str,
    to_dict=lambda obj, ctx: obj  # Assuming the customer data is already in dictionary format
)

# Configuration for Kafka producer
producer_conf = {
    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker URL
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': avro_serializer
}

producer = SerializingProducer(producer_conf)

# Function to produce customer data to Kafka
def produce_customer_data():
    customer_generator = CustomerGenerator()
    customer_data = customer_generator.generate_customer()
    producer.produce(topic='customer_topic', key=customer_data['email'], value=customer_data)
    producer.flush()

if __name__ == '__main__':
    produce_customer_data()
"""

with open('kafka_producer.py', 'w') as f:
    f.write(kafka_producer_code)

print("Files 'customer_generator.py' and 'kafka_producer.py' have been created.")

Files 'customer_generator.py' and 'kafka_producer.py' have been created.


## Sample 7

In [15]:
pip install fastavro

Collecting fastavroNote: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\Administrator\AppData\Local\Programs\Python\Python38\python.exe -m pip install --upgrade pip' command.



  Downloading fastavro-1.9.5-cp38-cp38-win_amd64.whl (550 kB)
Installing collected packages: fastavro
Successfully installed fastavro-1.9.5


In [7]:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka import SerializingProducer
import json

In [8]:

# Schema Registry configuration
schema_registry_conf = {'url': 'http://10.222.68.223:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)


In [None]:
pip install requests

In [12]:

# Define your Avro schema (as a string or load from file)
avro_schema_str = """
{
  "type": "record",
  "name": "Customer",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
     ]
} """


In [13]:

# Create the Avro serializer
avro_serializer = AvroSerializer(schema_registry_client, avro_schema_str)

# Kafka producer configuration
producer_config = {
    'bootstrap.servers': '10.222.68.223:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': avro_serializer
}


In [14]:

producer = SerializingProducer(producer_config)


In [15]:

import uuid

# Define the value to send (must match the Avro schema)
customer_data = {
    "id": str(uuid.uuid4()),
    "name": "Alice Johnson",
    "email": "alice.johnson@example.com"
}


In [16]:

# Send the message
producer.produce(
    topic="customers",
    key=customer_data["id"],
    value=customer_data,
    on_delivery=lambda err, msg: print(
         f"Delivery {'failed: ' + str(err) if err else 'successful: ' + msg.key().decode('utf-8')}"
    )
)

# Wait for all messages to be delivered
producer.flush()


Delivery successful: 099eedf5-ff04-4af3-9977-524eb350346d


0

## Sample 8

In [17]:
#pip install avro

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
bootstrap_servers = '10.222.68.223:9092'
schema_registry_url = 'http://10.222.68.223:8081/'

In [2]:
avro_schema = {
    "type": "record",
    "name": "MyRecord",
    "fields": [
        {"name": "field1", "type": "string"}
            # Add other fields as needed
    ]
}

In [3]:
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'schema.registry.url': schema_registry_url
}

In [10]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField





In [11]:
schema_registry_conf = {'url': 'http://10.222.68.223:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)


In [12]:

# Define schema and data

value_schema_str = """
{
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]}
    ]
}
"""


In [13]:

# Create Avro serializer
avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)



In [14]:
user_data = {"name": "Alice", "favorite_number": 42, "favorite_color": "blue"}
#serialized_data = avro_serializer.encode_record_with_schema(value_schema_str, user_data)


# Serialize the data
serialized_data = avro_serializer(user_data, SerializationContext("MyTopic3", MessageField.VALUE))



In [15]:
avro_deserializer = AvroDeserializer(schema_registry_client)
deserialized_data = avro_deserializer(serialized_data, SerializationContext("MyTopic3", MessageField.VALUE))


## Sample 9

In [None]:
```bash
#pip install confluent-kafka[avro]
```



---

#### 📦 Step 1: Define the Avro Schema


In [24]:


#user.avsc 
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "email", "type": "string"}
  ]
}


{'namespace': 'example.avro',
 'type': 'record',
 'name': 'User',
 'fields': [{'name': 'name', 'type': 'string'},
  {'name': 'age', 'type': 'int'},
  {'name': 'email', 'type': 'string'}]}


#### 📝 Step 2: Producer Code (Serialize and Send)


In [25]:


from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import avro

value_schema = avro.load('user.avsc')

producer_config = {
    'bootstrap.servers': '10.222.68.223:9092',
    'schema.registry.url': 'http://10.222.68.223:8081'
}

producer = AvroProducer(producer_config, default_value_schema=value_schema)

value = {"name": "Alice", "age": 30, "email": "alice@example.com"}

try:
    producer.produce(topic='users', value=value)
    producer.flush()
    print("Message produced successfully.")
except SerializerError as e:
    print(f"Message serialization failed: {e}")



  producer = AvroProducer(producer_config, default_value_schema=value_schema)


Message produced successfully.



#### 📥 Step 3: Consumer Code (Deserialize and Read)


In [26]:

from confluent_kafka.avro import AvroConsumer

consumer_config = {
    'bootstrap.servers': '10.222.68.223:9092',
    'group.id': 'user-consumer-group',
    'auto.offset.reset': 'earliest',
    'schema.registry.url': 'http://10.222.68.223:8081'
}

consumer = AvroConsumer(consumer_config)
consumer.subscribe(['users'])

print("Waiting for messages...")

while True:
    try:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        print(f"Received message: {msg.value()}")
    except KeyboardInterrupt:
        break

consumer.close()

  consumer = AvroConsumer(consumer_config)


Waiting for messages...
Received message: {'name': 'Alice', 'age': 30, 'email': 'alice@example.com'}
