Skip to content
This repository has been archived by the owner on Apr 10, 2024. It is now read-only.
/ kafka-streaming-job Public archive

Kafka streaming job from iomete. This streaming job copies data from Kafka to Iceberg.

Notifications You must be signed in to change notification settings

iomete/kafka-streaming-job

Repository files navigation

iomete: Kafka Streaming Job

This is a collection of data movement capabilities. This streaming job copies data from Kafka to Iceberg.

Table of Contents

Deserialization

Currently, two deserialization format supported.

  1. JSON
  2. AVRO

JSON

In the Spark configuration, a user-defined reference json schema can be defined, and the system processes the binary data accordingly. Otherwise, It considers the schema of the first row and assumes the rest of the rows is compatible.

Avro

Converts binary data according to the schema defined by the user or retrieves the schema from the schema registry.

avro-deserialization-diagram

Job creation

  • Go to Spark Jobs.
  • Click on Create New.

Specify the following parameters (these are examples, you can change them based on your preference):

  • Name: kafka-streaming-job
  • Docker Image: iomete/iomete_kafka_streaming_job:0.2.1
  • Main application file: local:///app/driver.py
  • Environment Variables: LOG_LEVEL: INFO or ERROR
  • Config file:
{
  kafka: {
      bootstrap_servers: "localhost:9092",
      topic: "usage.spark.0",
      serialization_format: json,
      group_id: group_1,
      starting_offsets: latest,
      trigger: {
        interval: 5
        unit: seconds # minutes
      },
      schema_registry_url: "http://127.0.0.1:8081"
  },
  database: {
    schema: default,
    table: spark_usage_20
  }
}

Configuration properties

Property Description
kafka

Required properties to connect and configure.

bootstrap_servers

Kafka broker server.

topic

Kafka topic name.

serialization_format

Value data serialization format.

group_id

Consumer group id.

starting_offsets

Specify where to start instead.

trigger
  • interval Processing trigger interval.
  • unit Processing trigger unit: seconds, minutes
database

Destination database properties.

  • schema Specify the schema (database) to store into.
  • table Specify the table.

Create Spark Job Create Spark Job.png

Create Spark Job - Instance

You can use Environment Variables to store your sensitive data like password, secrets, etc. Then you can use these variables in your config file using the ${ENV_NAME} syntax.

Create Spark Job.png

Create Spark Job - Application Environment Create Spark Job - Application Config.png

Create Spark Job - Application dependencies Create Spark Job - Application Environment.png

Tests

Prepare the dev environment

virtualenv .env #or python3 -m venv .env
source .env/bin/activate

pip install -e ."[dev]"

Run test

pytest

About

Kafka streaming job from iomete. This streaming job copies data from Kafka to Iceberg.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published