## CREATE TOPIC

In [2]:
from kafka.admin import KafkaAdminClient, NewTopic

def create_kafka_topic(topic_name, num_partitions, replication_factor, bootstrap_servers):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic = NewTopic(name=topic_name,num_partitions=num_partitions,replication_factor=replication_factor)
    request = admin_client.create_topics([topic])
    print(request)




In [3]:
# Parámetros del tópico a crear.
TOPIC_NAME = "tpc-test"
NUM_PARTITIONS = 1
REPLICATION_FACTOR = 1
BOOTSTRAP_SERVERS = "host.docker.internal:9092"


create_kafka_topic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR, BOOTSTRAP_SERVERS)
print(f"Tópico {TOPIC_NAME} creado con éxito.")

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='tpc-test', error_code=0, error_message=None)])
Tópico tpc-test creado con éxito.


## Eliminando un tópico de kafka

In [8]:
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
admin_client.delete_topics([TOPIC_NAME])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='my_new_topic', error_code=0)])

## Enviar mensajes a un tópico de kafka

In [14]:
from kafka import KafkaProducer
import json

broker = "host.docker.internal:9092"
topic = "my_new_topic"
message = {
    "productId":1,
    "product_name":"IPhone"
}
producer = KafkaProducer(
    bootstrap_servers=broker,
    value_serializer=lambda x:json.dumps(x).encode("utf-8") # para que los mensajes primero los transforme a json y luego los serialice
)

#res es una promesa, pues el envio es asíncrono, es de tipo FutureRecordMetadata
res = producer.send(
        topic=topic,
        value=message,   
    )

# para obtener los valores de la respuesta hay que esperarla
metadata = res.get(timeout=5)

#una vez que se le espera ya te devuelve un objeto de tipo RecordMetdata con información como: topic,topic_partition,offset,etc...
print(metadata)

# asegura que todos los mensajes que están actualmente en buffer o en tránsito se envíen al broker de Kafka y se confirmen antes de continuar con la ejecución del programa
#Es una operación bloqueante, lo que significa que detendrá la ejecución de tu programa hasta que todos los mensajes hayan sido enviados y confirmados por el broker.
producer.flush()

producer.close()


RecordMetadata(topic='my_new_topic', partition=0, topic_partition=TopicPartition(topic='my_new_topic', partition=0), offset=1, timestamp=1697852056496, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)


#### creando una funcion para enviar mensajes


In [15]:
from kafka import KafkaProducer
import json



def send_kafka_message(message,topic,config):
    producer = KafkaProducer(**config)
    response = producer.send(value=message,topic=topic)
    response = response.get(timeout=5)
    print(response)
    producer.flush()
    producer.close()


In [16]:
producer_config = {
    "bootstrap_servers":"host.docker.internal:9092",
    "value_serializer":lambda x : json.dumps(x).encode("utf8")
}

message = {
    "productId":2,
    "product_name":"IPad"
}

topic = "my_new_topic"

send_kafka_message(message,topic,producer_config)

RecordMetadata(topic='my_new_topic', partition=0, topic_partition=TopicPartition(topic='my_new_topic', partition=0), offset=2, timestamp=1697852567092, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=40, serialized_header_size=-1)


## Enviando mensajes de forma asíncrona con callbacks

In [6]:
from kafka import KafkaProducer
import json

def on_success(metadata):
    print(metadata)

def on_failure(excepcion):
    print(excepcion)

producer = KafkaProducer(
    bootstrap_servers = "host.docker.internal:9092",
    value_serializer=lambda x:json.dumps(x).encode("utf-8")
)


message = {
    "productId": 3,
    "product_name": "HIJ"
}

topic = "tpc-test"

producer.send(
    topic=topic,
    value=message,   
).add_callback(on_success).add_errback(on_failure)

# No necesitamos esperar confirmaciones, así que simplemente cerramos el productor al final.
producer.close()


RecordMetadata(topic='tpc-test', partition=0, topic_partition=TopicPartition(topic='tpc-test', partition=0), offset=2, timestamp=1698112391140, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)


## Utilizando Avro

In [18]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


# generamos el schema
value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "productId",
       "type" : "int"
     },
     {
       "name" : "product_name",
       "type" : "string"
     }
   ]
}
"""


value_schema = avro.loads(value_schema_str)

value = {"productId": 1, "product_name": "IPhone"}

avro_producer = AvroProducer(
    {
        'bootstrap.servers': 'host.docker.internal:9092',
        'schema.registry.url': 'http://localhost:8082'  # URL de tu Schema Registry
    }, default_value_schema=value_schema
)

avro_producer.produce(topic='my_new_topic', value=value)
avro_producer.flush()






<kafka.producer.future.FutureRecordMetadata at 0x7fb1d1f18370>

RecordMetadata(topic='my_new123c', partition=0, topic_partition=TopicPartition(topic='my_new123c', partition=0), offset=0, timestamp=1697853995778, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=43, serialized_header_size=-1)


## Consultando las particiones de un tópico

In [38]:
from kafka import KafkaAdminClient

admin_client = KafkaAdminClient(bootstrap_servers="host.docker.internal:9092")

# Obtiene detalles del tópico
topic_metadata = admin_client.describe_topics(topics=["my_new_topic"])[0]

partitions = topic_metadata["partitions"] # esto me retorna la lista de particiones que contenga el tópico


for partition in partitions:
    print("-----------------------------------------")
    for key in partition.keys():
        print(f" {key} : {partition[key]}")



-----------------------------------------
 error_code : 0
 partition : 0
 leader : 0
 replicas : [0]
 isr : [0]
 offline_replicas : []
-----------------------------------------
 error_code : 0
 partition : 1
 leader : 0
 replicas : [0]
 isr : [0]
 offline_replicas : []
-----------------------------------------
 error_code : 0
 partition : 2
 leader : 0
 replicas : [0]
 isr : [0]
 offline_replicas : []


## Creando particiones

In [36]:
from kafka.admin import NewPartitions
from kafka import KafkaAdminClient

admin_client = KafkaAdminClient(bootstrap_servers="host.docker.internal:9092")


new_partitions = NewPartitions(total_count=3)
admin_client.create_partitions({"my_new_topic": new_partitions})


CreatePartitionsResponse_v1(throttle_time_ms=0, topic_errors=[(topic='my_new_topic', error_code=0, error_message=None)])

In [39]:
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers = "host.docker.internal:9092",
    value_serializer=lambda x:json.dumps(x).encode("utf-8")
)

message = {
    "productId": 4,
    "product_name": "XYZ"
}


producer.send(
    value=message,
    topic="my_new_topic",
    partition=2
).add_callback(on_success)



<kafka.producer.future.FutureRecordMetadata at 0x7fb1d33183d0>

RecordMetadata(topic='my_new_topic', partition=2, topic_partition=TopicPartition(topic='my_new_topic', partition=2), offset=0, timestamp=1697870379664, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)


Records can, in addition to key and value, also include headers. Record headers give you the ability to add some metadata about the Kafka record, without adding any extra information to the key/value pair of the record itself. Headers are often used for lineage to indicate the source of the data in the record, and for routing or tracing messages based on header information without having to parse the message itself (perhaps the message is encrypted and the router doesn’t have permissions to access the data).

## Mandando mensaje con headers

In [43]:
from kafka import KafkaProducer


# Crear un productor
producer = KafkaProducer(
    bootstrap_servers="host.docker.internal:9092",
    key_serializer=lambda k: json.dumps(k).encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Crear encabezados
headers = [("privacy-level", b"YOLO")]  # Nota: b"YOLO" es un byte literal en Python.

# Enviar el mensaje
producer.send("my_new_topic", key="Precision Products", value="France", headers=headers)

# Asegúrate de cerrar el productor cuando hayas terminado.
producer.close()


In [7]:
from kafka import KafkaProducer
import json
import random
import string


def random_string(length=10):
    """Generate a random string of specified length."""
    return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))

def send_thousand_messages(num_messages):

    producer = KafkaProducer(
        bootstrap_servers = "host.docker.internal:9092",
        value_serializer = lambda x:json.dumps(x).encode("utf-8")
    )

    num = range(num_messages)

    for i in num:
        message  = {
            "ID":i,
            "name":random_string(15)
        }
        producer.send(topic="tpc-test-cons",value=message)

    

In [9]:
send_thousand_messages(1000)

In [21]:

from dataclasses import dataclass


@dataclass
class StorageConfig:
    azure_account_name:str
    azure_account_key :str
    bootstrap_servers:str


schema = {
    "params":{
        "bootstrap_props":{
            "bootstrap_servers":"host.docker.internal:9092",
            "is_required":{
                "hola":"adios"
            }
        }
    }
}


StorageConfig(azure_account_key="hola",azure_account_name="adios")













In [None]:
class Config_manager:
    def __init__(self) -> None:
        self.config = self._get_config()
        self._storage_config = None

    @property
    def storage_config(self):
        self._storage_config =  self._get_storage_config()
        return self._storage_config


    def _get_config(self):
        pass


    def _get_storage_config(self):
        s = StorageConfig(azure_account_key="hola",azure_account_name="adios")
        return s

In [None]:
class StorageProcess:
    def __init__(self,config:StorageConfig) -> None:
        self.config:StorageConfig = config

    def _get_blob_name(self):
        pass

    def save_kafka(self):
        b = self.config.bootstrap_servers  

    def save_parquet(self):
        pass

    def save_csv(self):
        pass

    def save_local(self):
        pass

In [None]:
def main():
    conf_manager = Config_manager()
    s = StorageProcess(conf_manager.storage_config)