Xenon connector for Kafka
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
checkstyle
config
src
.gitignore
LICENSE
README.md
pom.xml

README.md

QuickStart Guide

Table of contents

The following guide provides instructions to integrate Xenon with Kafka. KafkaXenonSinkConnector is a Kafka Sink Connector for loading data from Kafka to Xenon with support for multiple data formats like Json(schema'd and schemaless) and Avro.
The connector is aimed at making Xenon Kafka accessible, implying that data may be streamed from Kafka to Xenon via Kafka Connect.


Streaming Data from Kafka into Xenon

Core Types

Currently the connector is able to process Kafka Connect SinkRecords with support for the following schema types Schema.Type: INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP and STRUCT.

Logical Types

Besides the core types it is possible to use logical types by checking schema name in order to have field type support for Decimal, Date, Time (millis/micros) and Timestamp (millis/micros). However, Logical Types(requiring schema name) may only be supported for either AVRO or JSON + Schema data.

Supported Data Formats

The sink connector implementation is configurable in order to support:

  • AVRO (makes use of Confluent's Kafka Schema Registry)
  • JSON with Schema (offers JSON record structure with explicit schema information)
  • JSON without Schema (offers JSON record structure without any attached schema)

Since these settings can be independently configured, it's possible to have different setups respectively.


Packaging

  • Install XenonClient jar and pom in local maven repository:
mvn install:install-file -Dfile=$KAFKA_CONNECTOR_HOME/src/main/resources/XenonClient-1.0.0.jar -DpomFile=$KAFKA_CONNECTOR_HOME/src/main/resources/pom.xml 
  • mvn clean package or mvn clean package -DskipTests (to skip the test directory)

  • JAR file produced by this project:
    kafka-connect-xenon-1.0.0.jar - default JAR (to be used)

  • Copy kafka-connect-xenon-1.0.0.jar and XenonClient-1.0.0.jar to target directory kafka-connect-xenon.


Deployment

  • Below is a simple deployment example. For more details please contact info@levyx.com .
  • Start xenon using docker shown below :
    Access dockerhub using UID and password.
    docker login
    
    Download image.
    docker pull levyx/xenon
    
    Remove an already existing xenon container.
    docker rm -f xenon 
    
    Check presently running dockers.
    docker ps -a 
    
    Xenon requires a raw block device, and needs write permissions to this device.  Further on, to run docker user needs to be 
    either a member of docker group, or to have sudo sudo/root access.
    In our example, we are running as root, and have full acceses to /dev/vdb on our VM.  Then, this device - /dev/vdb - is  
    mapped to our docker as /dev/sdb.
    docker run -d -t -p 0.0.0.0:41000:41000/tcp \
      --device /dev/vdb:/dev/sdb \
      --hostname xenon \
      --name xenon levyx/xenon \
      init.sh /dev/sdb 0.0.0.0 41000 0.0.0.0:41000 
    
    Check to see whether xenon is running and the logs of docker.
    docker logs xenon 
    
    Check presently running dockers.
    docker ps -a 
  • Download and install Confluent Platform on the host machine.
  • Download kafka-connect-xenon on the host machine.
  • Create a configuration file (connect-xenon.properties) for the sink connector(example below):
name=xenon-sink-connector
connector.class=XenonSinkConnector
topics=testOne
  • Create directory kafka-connect-xenon and copy kafka-connect-xenon-1.0.0.jar, XenonClient-1.0.0.jar and connect-xenon.properties from the project build location to $CONFLUENT_HOME/share/java/kafka-connect-xenon
mkdir $CONFLUENT_HOME/share/java/kafka-connect-xenon
cp target/kafka-connect-xenon-1.0.0.jar  $CONFLUENT_HOME/share/java/kafka-connect-xenon/
cp $KAFKA_CONNECTOR_HOME/src/main/resources/XenonClient-1.0.0.jar $CONFLUENT_HOME/share/java/kafka-connect-xenon/
cp connect-xenon.properties $CONFLUENT_HOME/share/java/kafka-connect-xenon/

JSON without Schema setup

  • Start Zookeeper and Kafka using confluent cli.
export PATH=$CONFLUENT_HOME/confluent-3.3.0/bin:$PATH
confluent start kafka
  • Sample input data (trial.txt) for JSON without Schema setup.
{"f1":1, "f2":70000000, "f3": "sd", "f4":89.78, "f5":true}
  • Create topic testOne(replication factor 1 and partitions 4)
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic testOne
  • Verify the correctness of the records delivered by running kafka-console-consumer in another terminal:
$CONFLUENT_HOME/bin/kafka-console-consumer \
--bootstrap-server=localhost:9092 --topic testOne
  • Configure Standalone or distributed worker properties file.
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

plugin.path=$CONFLUENT_HOME/share/java/kafka-connect-xenon
  • Starting connectors on sink(Standalone/Remote mode).

Standalone mode

$CONFLUENT_HOME/bin/connect-standalone etc/kafka/connect-standalone.properties share/java/kafka-connect-xenon/connect-xenon.properties 

Remote mode

$CONFLUENT_HOME/bin/connect-distributed etc/kafka/connect-distributed.properties
Configure the connector:
curl -X POST -H "Content-Type: application/json" 'localhost:8083/connectors' -d '{
"name":"xenon-sink-connector",
"config" : {
 "connector.class":"XenonSinkConnector",
 "topics":"testOne"
 }
}'
  • Start kafka-console-producer to write content of file to it.
$CONFLUENT_HOME/bin/kafka-console-producer \
--broker-list localhost:9092 --topic testOne \
< ~/trial.txt &

Configuration

The configuration for kafka-connect-xenon accepts the following parameters:

  • name: name assigned to the sink connector.
  • host: xenon host.
  • port: xenon port.
  • buffer.capacity.bytes:size of the ByteBuffer(Bytes) pushed to xenon.
  • connector.class: class of implementation of the SinkConnector.
  • topics: comma separated list of topics to be used as source of data.
  • tasks.max: maximum number of tasks to be created.
  • dataset.name: name of the dataset to be opened in xenon.
  • dataset.schema: schema associated with the dataset.
  • schema.version: schema version of the records being sent(mainly for records with schema).

Architecture

Kafka Connect provides APIs for both source and sink connector; currently, we have only implemented sink connector for Xenon and we will only discuss sink connector in the following.

Modules

XenonSinkConnector is composed of the following modules:

  • XenonSinkConnectorConfig (Helper that validates config file)
  • XenonWrapper (Major implementation)
  • XenonSinkConnector(API) (Read config file)
  • XenonSinkTask(API) (Call XenonWrapper APIs)

Client APIs

  • XenonSinkConnector (extends SinkConnector)
    • config() (Build config for the topic)
  • XenonSinkTask (extends SinkTask)
    • start(config) (Start task + connect to Xenon)
    • put(Collection<SinkRecords>) (Save to Xenon)
    • stop() (Disconnect from Xenon + stop task)

Parallelism

Parallelism is naturally handled by SinkTask: Each topic may associate with multiple tasks; each task is a dedicated thread that is attached with a XenonWrapper object, which corresponds to a thread communicating with Xenon server.

Partition Mapping

We use a simple hash for partition mapping: p => p % fanout, where p is the partition number associated with a SinkRecord and fanout is the Xenon fanout, normally equal to the number of cpu cores used by Xenon.

Data Delivery

Xenon connector sends sink records exactly once. Once data is sent to Xenon, Xenon will send back the number of records that have been saved to Xenon. If the returned count does not match the number of records sent, the system should throw an exception.

Flush Control

Using SinkTask.put(), we flush for each collection of SinkRecords received by the connector. Max number of records in each collection can be set to tune the performance of the connector.


Tests

Unit Tests

Integrated into XenonSinkConnector, there are three test classes:

  • XenonSinkConnectorTest (Verify config loading)
  • XenonWrapperTest (Verify Xenon connection and Xenon file operations)
  • XenonSinkTaskTest (Validate the functionality of the connector)

A typical functional test includes the following steps:

  • Start connector and build config using a map
  • Create synthetic records
  • Start a new task using config
  • Call Task.put to pass records to Xenon
  • Stop the task
  • Verify data saved on Xenon

System Tests

For tests, integrating with a running Kafka system, see Deployment example.