In [5]:
!pip install confluent-kafka


Defaulting to user installation because normal site-packages is not writeable


In [7]:
from confluent_kafka import Producer
import json
import time
import random

BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_INPUT = 'transactions-input'

TOTAL_TRANSACTIONS = 6000
DURATION_SECONDS = 120
INTERVAL = DURATION_SECONDS / TOTAL_TRANSACTIONS 

def delivery_report(err, msg):
    """
    Callback pour signaler le succès ou l'échec de la livraison d'un message.
    """
    if err is not None:
        print(f"Échec de l'envoi du message : {err}")
    else:
        print(f"Message envoyé avec succès au topic {msg.topic()} [partition {msg.partition()}]")

def produce_transactions():
    producer_config = {
        'bootstrap.servers': BOOTSTRAP_SERVERS,
        'linger.ms': 10,
    }
    producer = Producer(producer_config)

    print(f"Envoi de {TOTAL_TRANSACTIONS} transactions au topic '{TOPIC_INPUT}' en {DURATION_SECONDS} secondes...")

    user_ids = ['11111', '22222', '33333', '44444', '55555']
    for i in range(TOTAL_TRANSACTIONS):
        user_id = random.choice(user_ids)
        amount = random.randint(1000, 20000)
        timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        transaction = {
            "userId": user_id,
            "amount": amount,
            "timestamp": timestamp
        }

        try:
            producer.produce(
                topic=TOPIC_INPUT,
                key=str(user_id),
                value=json.dumps(transaction),
                callback=delivery_report
            )
        except Exception as e:
            print(f"Erreur lors de l'envoi : {e}")

        time.sleep(INTERVAL)

    producer.flush()

    print("Toutes les transactions ont été envoyées avec succès.")

if __name__ == "__main__":
    produce_transactions()

Envoi de 6000 transactions au topic 'transactions-input' en 120 secondes...
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès 