Skip to content

tspannhw/Flink2Kafka

master
Switch branches/tags
Code
This branch is 7 commits behind BrooksIan:master.
Contribute

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.

There And Back Again, a Story of Apache Kafka & Apache Flink.

Flank

Project Details

This project has heavily inspired by two existing efforts from Data In Motion's FLaNK Stack and Data Artisan's blog on stateful streaming applications. The goal of this project is to provide insight into connecting an Apache Flink applications to Apache Kafka.

FLaNK Stack

Data Artisan's

NYC Taxi Ride Data Set

Project Scope

This project includes the Apache Flink application code and NiFi flow required to get the data into and out Apache Kafka. It doesn't include installation steps NiFi, Kafka, or Flink, but links to installation documentations have been provided below.

Project Prerequisites

  1. Apache NiFi local server
  2. Apache Kafka with an empty topic called "rawinput" and "enriched"
  3. IntelliJ IDE installed with Scala plug-in installed
  4. A cloned copy of this Git repository

Project Set Up and Run

Apache NiFi to Apache Kafka Setup

With Apache NiFi, the records from the source CSV file will be converted into individual JSON records. These records will be written to an Apache Kafka topic called "rawInput".

  • In the NiFi UI, import the NiFi Flow template (XML file in this Git repo). For help, please review the following documentation. Cloudera Documemnetation Link.

  • Once the NiFi template is loaded, the left side of the NiFi flow will look like this.

nifiFlow

  • Right click on the GetFileCSV processor, open Properties tab, and set the path to the source CSV file in the Input Directory option. Please note, the CSV file is located in the data directory of this Git repo.

readCSVFile

  • Right click on the PublishKafkaRecord processor, open Properties tab, and verify the location of your Kafka broker and topic name.

prodKConf

  • Verify the JSON records are being written to rawInput Kafka topic. This can be accomplished with right side the NiFi flow. Once this has been verified please turn off Kafka Consumer processor.

conKRaw

Flink Application Development In IntelliJ

For Development purposes, a running Flink cluster isn't required for application development. This application was built inside of the IntelliJ IDE because it will stand up Flink when your application is running, and the shut it down. This of course isn't required, but it will does make your life easier.

Flink Application - Connect to Kafka Topic

Once JSON files are being written to the Kafka topic, Flink can create a connection to the topic and create a Flink table on top of it, which can later be queried with SQL. This Github repository contains a Flink application that demonstrates this capability.

Java Libraries Required

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

Define Flink Streaming Environment

  • In Flink, the following java code defines the Flink Stream Execution and Stream Table Environments
 //Class Member Static Variables
    static StreamExecutionEnvironment fsEnv;
    static StreamTableEnvironment fsTableEnv;
    static EnvironmentSettings fsSettings;

 // create execution environment
    fsSettings = EnvironmentSettings.newInstance()
       .useBlinkPlanner()
       .inStreamingMode()
       .withBuiltInCatalogName("default_catalog")
       .withBuiltInDatabaseName("default_database")
       .build();

    fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
  // configure event-time and watermarks
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    fsEnv.getConfig().enableForceAvro();
    fsEnv.getConfig().setAutoWatermarkInterval(1000L);

  //Create Streaming Table Environment
     fsTableEnv  = StreamTableEnvironment.create(fsEnv, fsSettings);

Establish Flink Table Connection

  • In Flink, the following java code establishes a Flink Table connection with a Kafka topic. Please note, the schema has been set as JSON and the schema has been provided.
// create table environment
       fsTableEnv.connect(
               new Kafka()
                       .version("universal")
                       .topic("rawInput")
                       .startFromLatest()
                       .property("zookeeper.connect", "localhost:2181")
                       .property("bootstrap.servers", "localhost:9092")
                       .property("group.id", "test")
       )
       // declare a format for this system
       .withFormat(
          new Json()
       )
       // declare the schema of the table
       .withSchema(
          new Schema()
               .field("medallion", DataTypes.STRING())
               .field("licenseId", DataTypes.STRING())
               .field("pickUpTime", DataTypes.STRING())
               .field("dropOffTime", DataTypes.STRING())
               .field("trip_time_in_secs", DataTypes.BIGINT())
               .field("trip_distance", DataTypes.FLOAT())
               .field("pickUpLon", DataTypes.FLOAT())
               .field("pickUpLat", DataTypes.FLOAT())
               .field("dropOffLon", DataTypes.FLOAT())
               .field("dropOffLat", DataTypes.FLOAT())
               .field("payment_type", DataTypes.STRING())
               .field("fare_amount", DataTypes.FLOAT())
               .field("surcharge", DataTypes.FLOAT())
               .field("mta_tax", DataTypes.FLOAT())
               .field("tip_amount", DataTypes.FLOAT())
               .field("tolls_amount", DataTypes.FLOAT())
               .field("total", DataTypes.FLOAT())
       )
       .inAppendMode()
       // create a table with given name
       .createTemporaryTable("TaxiRides");
  • The Flink application will display the following if everything is working as expected.

KafkaRead

Query Flink Table Built On Kafka Topic

  • In Flink, the following Java code will query the newly established Flink Table and print to the screen
// define SQL query to compute average total per area and hour
    Table result = fsTableEnv.sqlQuery(
            "SELECT " +
                    " * " +
                    "FROM TaxiRides"
    );

    // convert result table into a stream and print it
    fsTableEnv.toAppendStream(result, Row.class).print();

Establish a Connection to Destination Kafka Topic

  • In Flink, the following java code will create a connection to a Kafka topic "enriched". Please note, the schema has been set as JSON and the schema has been provided.
 // create table environment
        fsTableEnv.connect(
            new Kafka()
            .version("universal")
            .topic("enriched")
            .startFromLatest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test")
        )
        // declare a format for this system
        .withFormat(
            new Json()
        )
        // declare the schema of the table
        .withSchema(
            new Schema()
                .field("medallion", DataTypes.STRING())
                .field("TimeStamp", DataTypes.TIMESTAMP(3) )
        )
        .inAppendMode()
        // create a table with given name
        .createTemporaryTable("KafkaSink");

Write to Kafka Topic From Flink Query

  • In Flink, the following code will write the query results to a Kafka topic that was established in the previous step.
// define SQL query to compute average total per area and hour
        Table result = fsTableEnv.sqlQuery(
                "SELECT " +
                " medallion, CURRENT_TIMESTAMP, " +
                " FROM  TaxiRides"
        );

        result.insertInto("KafkaSink");
  • The following output is expected in the application. Pleas note the last value in this images was removed from the code example.

KafkaRead

Apache Kafka to NiFi

Read from Kafka Topic "enriched"

  • In the NiFi UI, find the following section of the flow.

conFromKafka1

  • Validate the Kafka settings are correct.

conKConfig

  • Active the Consumer Kafka Processor and validate results.

Helpful Installation Links

Apache NiFi

Apache Kafka

Apache Flink

IntelliJ

Additional Helpful Links

Apache Flink

Apache Kafka + Apache Flink

Apache Flink and Apache Kafka Code Examples

Apache Kafka + Apache Druid

Additional Apache Project Install Links

Additional Apache Projects On Docker

About

A demo applcation of reading and writing to and from Apache Kafka with Apache Flink

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 91.8%
  • Python 8.2%