Skip to content

oznakdn/Practical-Kafka

Repository files navigation

kafka

Nuget package

Confluent.Kafka

docker-compose

version: '3.8'

services:
  kafka:
    image: confluentinc/cp-kafka:6.0.14
    depends_on:
      - zookeeper
    ports:
      - '29092:29092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8085:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      DYNAMIC_CONFIG_ENABLED: 'true'

  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.14
    ports:
      - '22181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

Producer (Publisher)

Message Service

public class MessageProducerService
{
    public async Task SendMessageAsync(string topic, CustomerDto customer)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:29092",
            ClientId = "OrderClient",
            Acks = Acks.All
        };

        var message = new Message<string, string>
        {
            Key = customer.Id,
            Value = JsonSerializer.Serialize(customer)
        };

        using var producer = new ProducerBuilder<string, string>(config).Build();
        await producer.ProduceAsync(topic, message);
    }
}

Api Controller

[Route("api/[controller]")]
[ApiController]
public class CustomersController(CustomerDbContext context, MessageProducerService producerService) : ControllerBase
{

    [HttpGet]
    public async Task<IActionResult>GetCustomers()
    {
        return Ok(await context.Customers.ToListAsync());
    }


    [HttpGet("{customerId}")]
    public async Task<IActionResult>CreateCustomerOrder(string customerId)
    {
        var customer = await context.Customers.SingleOrDefaultAsync(_ => _.Id == customerId);
        if (customer is null) return NotFound();

        await producerService.SendMessageAsync(MessageTopic.CREATE_ORDER, new CustomerDto(customer.Id, customer.Name, customer.Surname, customer.Balance));
        return Ok();
    }
}

Consumer (Subscriber)

Worker Service

public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly OrderDbContext _context;

        public Worker(ILogger<Worker> logger, OrderDbContext context)
        {
            _logger = logger;
            _context = context;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:29092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                ClientId = "OrderClient",
                GroupId = "OrderGroup"
            };

            using var consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe(MessageTopic.CREATE_ORDER);
            _logger.LogInformation("Connected kafka");

            while (!stoppingToken.IsCancellationRequested)
            {
                var data = consumer.Consume();

                if (data is not null)
                {
                    var customer = JsonSerializer.Deserialize<CustomerDto>(data.Message.Value);

                    if (customer!.Balance > 0)
                    {
                        _context.Orders.Add(new Order(customer.Id, DateTime.Now));
                        await _context.SaveChangesAsync(stoppingToken);
                        _logger.LogInformation($"{customer.Id} Customer's order has been created successfully.");
                    }
                    else
                    {
                        _logger.LogInformation($"{customer.Id} Customer's balance is less than 0!");

                    }
                }

                _logger.LogInformation($"There is no message");
            }
        }
    }

Kafka UI

Screenshot_1