Skip to content

Latest commit

 

History

History
127 lines (95 loc) · 5.39 KB

stream_processing.md

File metadata and controls

127 lines (95 loc) · 5.39 KB

Kafka stream processing example

Stream processing is beneficial in various scenarios where real-time or near-real-time data processing is required. Here are some situations where stream processing can be valuable:

  • Real-time Analytics: Stream processing is ideal for performing real-time analytics on streaming data. It allows you to continuously process and analyze data as it arrives, enabling timely insights and immediate actions based on the data.

  • Event-Driven Architectures: Stream processing fits well in event-driven architectures where events are generated and consumed asynchronously. It enables seamless event processing, event-driven workflows, and event-driven integrations across different systems.

  • IoT Data Processing: With the growing adoption of the Internet of Things (IoT), stream processing plays a vital role in processing and analyzing the massive volumes of streaming data generated by IoT devices. It enables real-time monitoring, device management, and decision-making based on the IoT data.

In this example, based on the InsideOut sample solution, the SensorsTableConsumer class represents a Kafka consumer that consumes messages from an underlying topic of a ksqlDB table within a consumer group. The ksqlDB table in this example is created by utilizing the CreateOrReplaceTableStatement and ExecuteStatementAsync methods from the ksqlDB.RestApi.Client client library.

Install-Package ksqlDB.RestApi.Client
using System.Threading.Tasks;
using ksqlDB.RestApi.Client.KSql.Linq.Statements;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi.Extensions;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;

private async Task CreateOrReplaceMaterializedTableAsync()
{
  string ksqlDbUrl = "http://localhost:8088";

  await using var context = new KSqlDBContext(ksqlDbUrl);

  var statement = context.CreateOrReplaceTableStatement(tableName: "SENSORSTABLE")
    .As<IoTSensor>("IotSensors")
    .Where(c => c.SensorId != "Sensor-5")
    .GroupBy(c => c.SensorId)
    .Select(c => new { SensorId = c.Key, Count = c.Count(), AvgValue = c.Avg(a => a.Value) });

  var httpResponseMessage = await statement.ExecuteStatementAsync();

  if (!httpResponseMessage.IsSuccessStatusCode)
  {
    var statementResponse = httpResponseMessage.ToStatementResponse();
  }
}

public record IoTSensor
{
  [Key]
  public string SensorId { get; set; }
  public int Value { get; set; }
}

The provided code snippet includes a class called SensorsTableConsumer and a record called IoTSensorStats.

public class SensorsTableConsumer : KafkaConsumer<string, IoTSensorStats>
{
  public SensorsTableConsumer(ConsumerConfig consumerConfig)
    : base("SENSORSTABLE", consumerConfig)
  {
  }
}

Class called IoTSensorStats represents a Plain Old CLR Object (POCO).

public record IoTSensorStats
{
  public string SensorId { get; set; }
  public double AvgValue { get; set; }
  public int Count { get; set; }
}

Committing offsets directly through the ksqlDB REST API is not supported. Kafka provides different strategies for committing offsets, allowing consumers to track their progress in consuming messages from topics.

With automatic committing, the consumer periodically commits offsets to Kafka without any explicit action from the application. The frequency of automatic commits is determined by the auto.commit.interval.ms configuration property. This strategy is simple to use but provides less control over the commit process.

The choice of offset committing strategy depends on the requirements of the application.

Automatic committing is simple to use but provides less control.

Manual committing allows for precise control but requires explicit management of commit points.

Synchronous committing provides strong guarantees at the cost of some latency, while asynchronous committing reduces potential delays.

Transactional committing is useful when consuming messages as part of larger atomic operations.

This code snippet demonstrates a basic example of consuming messages from a Kafka topic using a Kafka consumer class. You can customize the consumer configuration and the processing logic inside the await foreach loop to fit your specific requirements.

Install-Package System.Interactive.Async -Version 5.0.0
using System;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Confluent.Kafka;
using InsideOut.Consumer;

const string bootstrapServers = "localhost:29092";

static async Task Main(string[] args)
{
  var consumerConfig = new ConsumerConfig
                       {
                         BootstrapServers = bootstrapServers,
                         GroupId = "Client-01",
                         AutoOffsetReset = AutoOffsetReset.Latest
                       };

  var kafkaConsumer = new KafkaConsumer<string, IoTSensorStats>("IoTSensors", consumerConfig);

  await foreach (var consumeResult in kafkaConsumer.ConnectToTopic().ToAsyncEnumerable().Take(10))
  {
    Console.WriteLine(consumeResult.Message);
  }

  using (kafkaConsumer)
  { }
}

For additional information and more in-depth understanding, you can explore the Blazor server-side example server-side example in InsideOut.sln