# Test account create recieving

In [1]:
pip install aiokafka && pip install dotenv

Collecting aiokafka
  Downloading aiokafka-0.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (17 kB)
Collecting async-timeout (from aiokafka)
  Downloading async_timeout-5.0.1-py3-none-any.whl.metadata (5.1 kB)
Collecting typing-extensions>=4.10.0 (from aiokafka)
  Downloading typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Downloading aiokafka-0.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading typing_extensions-4.15.0-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading async_timeout-5.0.1-py3-none-any.whl (6.2 kB)
Installing collected packages: typing-extensions, async-timeout, aiokafka
  Attempting uninstall: typing-extensions
    Found existing installation: typing_e

In [2]:
import asyncio
import json
import nest_asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from dotenv import load_dotenv
import uuid

In [3]:
nest_asyncio.apply()

In [6]:
KAFKA_BOOTSTRAP = "kafka:9092"
TOPIC = "account_creation"

In [21]:
async def produce():
    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
    await producer.start()
    try:
        for i in range(5):
            msg = {"id": i, "text": f"message {i}"}
            await producer.send_and_wait(TOPIC, json.dumps(msg).encode("utf-8"))
            print(f"sent {msg}")
    finally:
        await producer.stop()

In [7]:
async def consume():
    consumer = AIOKafkaConsumer(
        TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Received: {json.loads(msg.value.decode())}")
    finally:
        await consumer.stop()

In [8]:
consumer_task = asyncio.create_task(consume())

Received: {'id': 370, 'name': 'АО "РПЗ"', 'fullName': None, 'inn': '5040001426', 'kpp': None, 'ogrn': '1025005116839', 'company_guid': None, 'DogovorNumber': None, 'NomenclatureItems': [{'id': 1448, 'CarModel': 'Hongqi H5', 'VIN': 'LFPH4ACP4R2A95883', 'DataCarId': 370, 'car_guid': None}, {'id': 1449, 'CarModel': 'Hongqi H5', 'VIN': 'LFPH4ACP9R2A99346', 'DataCarId': 370, 'car_guid': None}, {'id': 1450, 'CarModel': 'Haval M6 AT', 'VIN': 'LGWEF4A52RF738650', 'DataCarId': 370, 'car_guid': None}, {'id': 1451, 'CarModel': 'Haval M6 AT', 'VIN': 'LGWEF4A54RF738651', 'DataCarId': 370, 'car_guid': None}]}
Received: {'id': 659, 'name': 'АО "НЗР "ОКСИД"', 'fullName': 'АО "НЗР "ОКСИД"', 'inn': '5405441299', 'kpp': '540501001', 'ogrn': '1115476107570', 'company_guid': '3f2dbc16-5986-11ec-8ed0-001e67994f19', 'DogovorNumber': 'ТС-Д-250925-2 ', 'NomenclatureItems': [{'id': 1238, 'CarModel': 'TANK 500 Premium', 'VIN': 'LGWFF9A65RM622123', 'DataCarId': 659, 'car_guid': None}, {'id': 1239, 'CarModel': 'TA

In [12]:
consumer_task.cancel()

True

# Nomenclature guid updater

In [4]:
KAFKA_BOOTSTRAP = "kafka:9092"
TOPIC = "nomenclature_guid_updater"

In [5]:
# str(uuid.uuid4())
ids = [1448, 1451, 1239]
car_guids = ['a4a86c76-2b0f-4db7-a0ff-3dffa7bb5761', '60c57083-9e03-41d7-964d-8df01c7897d1', '2ce73a6b-463e-41d1-bbe5-649d8736a72a']

async def produce():
    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
    await producer.start()
    try:
        for id, car_guid in zip(ids, car_guids):
            msg = {"id": id, "car_guid": car_guid}
            await producer.send_and_wait(TOPIC, json.dumps(msg).encode("utf-8"))
            print(f"sent {msg}")
    finally:
        await producer.stop()


In [6]:
await produce()

sent {'id': 1448, 'car_guid': 'a4a86c76-2b0f-4db7-a0ff-3dffa7bb5761'}
sent {'id': 1451, 'car_guid': '60c57083-9e03-41d7-964d-8df01c7897d1'}
sent {'id': 1239, 'car_guid': '2ce73a6b-463e-41d1-bbe5-649d8736a72a'}


In [13]:
async def consume():
    consumer = AIOKafkaConsumer(
        TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Received: {json.loads(msg.value.decode())}")
    finally:
        await consumer.stop()

In [14]:
consumer_task = asyncio.create_task(consume())

Received: {'id': 1451, 'car_guid': '60c57083-9e03-41d7-964d-8df01c7897d1'}
Received: {'id': 1448, 'car_guid': 'a4a86c76-2b0f-4db7-a0ff-3dffa7bb5761'}
Received: {'id': 1239, 'car_guid': '2ce73a6b-463e-41d1-bbe5-649d8736a72a'}
