Pulsar Flink SQL Examples
Pulsar is a highly magnetized rotating neutron star
Flink SQL's SELECT * is a natural for Selecting Pulsar π
Environment Setup and Instructions
Spin up Pulsar and Flink Clusters
docker-compose upCreate a Pulsar topic and add retention policies to keep data around
./setup.shAt this point run the EventsProducer class to generate some event data. You can find the Producer here
Launch Flink SQL Cli
docker exec -it jobmanager ./bin/sql-client.shCreate a Pulsar Catalog
CREATE CATALOG pulsar WITH (
'type' = 'pulsar-catalog',
'catalog-admin-url' = 'http://pulsar:8080',
'catalog-service-url' = 'pulsar://pulsar:6650'
);Check the available databases and tables
USE CATALOGS pulsar;
SHOW DATABASES;
USE `public/default`;
SHOW TABLES;
DESCRIBE events;Read all the events we have ingested
SELECT * FROM events;Create a new database
CREATE DATABASE IF NOT EXISTS processing;
USE processing;Create a new table
CREATE TABLE click_events (
eventType STRING,
productId STRING,
categoryId STRING,
categoryCode STRING,
brand STRING,
price DOUBLE,
userid STRING,
userSession STRING,
`event_time` TIMESTAMP_LTZ(3) METADATA,
`key` STRING,
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' SECOND
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/events',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'source.start.message-id' = 'earliest' ,
'format' = 'json'
);DESCRIBE click_events;SELECT * FROM click_events;