Skip to content
This repository has been archived by the owner on Dec 23, 2021. It is now read-only.

morsapaes/flink-sql-pulsar

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 

Repository files navigation

Flink SQL for Pulsar Folks

⚠️ Update: This repository will no longer be actively maintained. Please check the Ververica fork.

Apache Flink and Apache Pulsar share a common vision around unifying batch and streaming: on one side, batch is seen as just a special case of streaming, while on the other streams serve as a unified view on data. The main differentiating use case for Flink + Pulsar is to simplify building a one-stop-shop for storing and processing both real-time and historic data.

Screen Shot 2021-05-11 at 15 44 58

This demo walks you through building an analytics application using Pulsar and Flink SQL.

Docker

To keep things simple, the demo uses a Docker Compose setup that makes it easier to bundle up all the services you need.

Getting the setup up and running

docker-compose build

docker-compose up -d

Is everything really up and running?

docker-compose ps

You should be able to access the Flink Web UI (http://localhost:8081).

Pulsar

You’ll use the Twitter Firehose built-in connector to consume tweets about gardening into a Pulsar topic. To create the Twitter source, run:

docker-compose exec pulsar ./bin/pulsar-admin source create \
  --name twitter \
  --source-type twitter \
  --destinationTopicName tweets \
  --source-config '{"consumerKey":<consumerKey>,"consumerSecret":<consumerSecret>,"token":<token>,"tokenSecret":<tokenSecret>, "terms":"gardening"}'

ℹ️ This source requires a valid Twitter authentication token, which you can generate on the Twitter Developer Portal.

After creating the source, you can check that data is flowing into the tweets topic:

docker-compose exec pulsar ./bin/pulsar-client consume -n 0 -r 0 -s test tweets

At any point, you can also stop the connector:

docker-compose exec pulsar ./bin/pulsar-admin sources stop --name twitter

Flink SQL

Next, you can start the Flink SQL Client:

docker-compose exec sql-client ./sql-client.sh

and use a Pulsar catalog to access the topic directly as a table in Flink. This will make some things a lot easier afterwards, too!

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar:6650',
   'admin-url' = 'http://pulsar:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;

You can query the tweets table off-the-bat using a simple SELECT statement — so, you now have tweets making their way from Twitter Firehose to Pulsar to Flink!

Using Pulsar Metadata

If you look closely, most events have a null createdAt value. What now?

4_flink_sql_select

One way to get a relevant timestamp is to tap into Pulsar metadata to get the publishTime (i.e. ingestion time). The cool thing about using catalogs is being able to create a table with the exact same schema as the original topic by just using a CREATE TABLE LIKE statement:

CREATE TABLE default_catalog.default_database.pulsar_tweets 
(
  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/tweets',
  'value.format' = 'json',
  'service-url' = 'pulsar://pulsar:6650',
  'admin-url' = 'http://pulsar:8080',
  'scan.startup.mode' = 'earliest'
)
LIKE tweets;

In the DDL above, you're using the Pulsar Flink connector, tapping into the tweets topic, and using the JSON format to deserialize the events. And because you're fetching the publishtime and defining it as a watermark, you now also have some notion of time in your application!

Producing Aggregated Results to Pulsar

To close the loop, you can create a sink table backed by a new tweets_agg topic in Pulsar, and insert into it using a simple windowed aggregation:

CREATE TABLE pulsar_tweets_agg (
tmstmp TIMESTAMP(3),
  tweet_cnt BIGINT
) WITH (
  'connector'='pulsar',
  'topic'='persistent://public/default/tweets_agg',
  'value.format'='json',
  'service-url'='pulsar://pulsar:6650',
  'admin-url'='http://pulsar:8080'
 );

INSERT INTO pulsar_tweets_agg
SELECT TUMBLE_START(publishTime, INTERVAL '10' SECOND) AS wStart,
       COUNT(id) AS tweet_cnt
FROM pulsar_tweets
GROUP BY TUMBLE(publishTime, INTERVAL '10' SECOND);

You can see that the query is pretty much something you could run on a regular database — just standard SQL doing some aggregation of the total number of tweets over windows of 10 seconds. Once you submit this query, it will run continuously and continuously sink the results into Pulsar. To monitor the execution of the query or cancel it, use the Flink Web UI!


And that's it!

For an overview of the evolution of the Flink Pulsar integration over time, check out these slides, and follow Apache Flink and Apache Pulsar on Twitter for the latest updates.

About

Self-contained demo using Flink SQL and Apache Pulsar to build a simple analytics pipeline. All you need is Docker! 🐳

Topics

Resources

Stars

Watchers

Forks