Skip to content
This repository has been archived by the owner on Nov 26, 2021. It is now read-only.

Introduction to SQL on Flink

Fabian Hueske edited this page Feb 25, 2020 · 16 revisions

In this session you will learn about

  • The design goals of SQL on Flink
  • Flink SQL's approach to unified stream/batch processing
  • Targeted use cases for SQL on Flink
  • Flink's interactive SQL CLI client

Slides

Documentation

The following documentation pages might be useful during the training:

  • Streaming Concepts - Streaming-specific documentation for Flink SQL such as configuration of time attributes and handling of updating results.
  • Flink SQL - Documentation of SQL coverage.
  • Built-In Functions - Documentation of built-in functions.
  • Flink SQL Client - Documentation for the SQL Client and its configuration.

Introduction to the Training Environment

The SQL CLI client is configured with an environment configuration file that specifies existing (source and sink) tables and user-defined functions. You will not need to add or remove tables or functions. However, you will learn how to create and drop views.

In the following, we show how to use the CLI client and present the tables and functions of the training environment.

Provided Tables

You can list all available tables using the SHOW TABLES command. It lists table sources and sinks as well as views.

Flink SQL> SHOW TABLES;
Rides
Fares
DriverChanges
Sink_TenMinPsgCnts
Sink_AreaCnts

You can check the schema of a table using the DESCRIBE command as shown in the following:

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: BIGINT
 |-- taxiId: BIGINT
 |-- isStart: BOOLEAN
 |-- lon: FLOAT
 |-- lat: FLOAT
 |-- rideTime: TIMESTAMP(3) *ROWTIME*
 |-- psgCnt: INT

Source Tables

Source tables can be used in the FROM clause of a SQL query. They cannot be used as the target of an INSERT INTO clause in a SQL query.

All source tables are external tables stored in separate Apache Kafka topics. The Kafka records are encoded in JSON. Each record that is read from a topic is appended to the corresponding source table.

Note: The source tables are not immediately completely available when the training environment is started. Their records are continuously ingested into their Kafka topics. The ingestion starts when the training environment is started (docker-compose up -d) and runs at 10x fast-forward speed, i.e., it takes 1 minute to ingest 10 minutes of data (based on the timestamp of the records).

Rides

We will use the Rides source table for all of the exercises. It contains information about taxi rides that took place in New York City in the beginning of 2013. Each ride is represented by two event records, a start ride event and an end ride event.

The schema of the Rides table is as follows:

rideId: BIGINT       // the unique id of a ride (note, Rides contains two records per ride)
taxiId: BIGINT       // the unique id of the taxi
isStart: BOOLEAN     // flag for pick-up (true) or drop-off (false) event
lon: FLOAT           // the longitude of the pick-up or drop-off location
lat: FLOAT           // the latitude of the pick-up or drop-off location
rideTime: TIMESTAMP  // the time of the pick-up or drop-off event
psgCnt: INT          // the number of passengers on the ride

Fares

The Fares source table contains information about about the fares paid for the taxi rides. The table contains one record for each taxi ride.

The schema of the Fares table is as follows:

rideId: BIGINT      // the unique id of the ride
payTime: TIMESTAMP  // the time when the payment was made (same as timestamp of ride end event in Rides table)
payMethod: VARCHAR  // the method of payment (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT          // the amount of paid tip
toll: FLOAT         // the amount of paid toll
fare: FLOAT         // the amount of paid fare

DriverChanges

The DriverChanges table contains one record for event when a taxi is driven by another driver than before, i.e., when the driver of a taxi changes. This might happen when a driver starts a new shift.

The schema of the DriverChanges table is as follows:

taxiId: BIGINT             // the unique id of the taxi
driverId: BIGINT           // the unique id of the driver who starts using the taxi
usageStartTime: TIMESTAMP  // the time when the driver starts using the taxi

Sink Tables

Sink tables can be used as the target of an INSERT INTO clause in a SQL query. They cannot be used in the FROM clause of a SQL query.

Sink_TenMinPsgCnts

The Sink_TenMinPsgCnts table represents an external append sink that writes to the Apache Kafka topic TenMinPsgCnts.

It consists of a start time, end time, and count:

Flink SQL> DESCRIBE Sink_TenMinPsgCnts;
root
 |-- cntStart: TIMESTAMP(3)
 |-- cntEnd: TIMESTAMP(3)
 |-- cnt: BIGINT

You can monitor the Kafka topic of the TenMinsPsgCnts table by running the following command in the folder that contains the docker-compose.yml file:

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

Sink_AreaCnts

The Sink_AreaCnts table represents an external upsert sink that writes to the Elasticsearch index area-cnts.

It consists of an area identifier and count:

Flink SQL> DESCRIBE Sink_AreaCnts;
root
 |-- areaId: INT
 |-- cnt: BIGINT

You can check the state of the sink index by accessing Elasticsearch's REST API:

Note: The area-cnts index is automatically created when the first query writes to it.

Explore the Training Data

In order to explore the data of the Rides table, execute a simple query:

SELECT * FROM Rides;

The CLI client will enter the result visualization mode and display the results.

rideId        taxiId   isStart           lon           lat                  rideTime   psgCnt
150156    2013003948     false     -73.98211      40.74796     2013-01-01 06:49:26.0        2
150538    2013003570     false    -74.004684      40.72859     2013-01-01 06:49:26.0        1
151066    2013005078      true     -73.97712     40.752007     2013-01-01 06:49:26.0        1
147794    2013010015     false     -73.87098     40.774143     2013-01-01 06:49:27.0        1
148680    2013003578     false     -73.96466     40.680794     2013-01-01 06:49:27.0        1
151067    2013002010      true    -73.992256     40.750004     2013-01-01 06:49:27.0        2

You can leave the result visualization mode by pressing q. The mode provides more functionality, such as skipping through pages or in/decreasing the update rate, as shown at the bottom.

Provided User-Defined Functions

The training environment provides a set of user-defined functions for the training data.

In order to list all user-defined functions use the SHOW FUNCTIONS statement:

Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
Drivers

The functions are defined as follows:

  • timeDiff(t1: TIMESTAMP, t2: TIMESTAMP): BIGINT: Converts two timestamps into UTC, subtracts t1 - t2 and returns the time difference in milliseconds.

  • isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN: Checks if a location is within the NYC area.

  • toAreaId(lon: FLOAT, lat: FLOAT): INT: Maps a location (longitude, latitude) to an area id that represents a cell of approximately 100x100 meters size.

  • toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]: Reverse method of getAreaId to compute the longitude and latitude of the center of an area cell.

  • Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP) is a special table-valued function, a so-called Temporal Table Function that returns for a timestamp ts for every taxi the driver that most recently used it. This function will be discussed in the context of joins.

Views

Views allow to define virtual tables from SQL queries. The view definition is parsed and syntactically validated immediately. However, the view is executed when it is referenced by a SELECT or INSERT INTO statement that is executed.

Views can be created within a CLI session using the CREATE VIEW statement:

CREATE VIEW RideStarts AS SELECT * FROM Rides WHERE isStart;

Views created within a CLI session can be removed again using the DROP VIEW statement:

DROP VIEW RideStarts;

Exercises

Find a Particular Ride

Write a query that outputs the start and end event of ride 123.

The output should look similar to:

isStart                  rideTime
   true     2013-01-01 00:01:00.0
  false     2013-01-01 00:07:00.0
Click to see the solution.
SELECT isStart, rideTime FROM Rides WHERE rideId=123;

The query filters by the ride id.


Cleanse the Rides

The task of this exercise is to cleanse the table of ride events by removing events that do not start or end in New York City.

The output should look similar to:

rideId           taxiId          isStart              lon              lat                  rideTime    psgCnt
     1       2013000001             true        -73.99078         40.76088     2013-01-01 00:00:00.0         1
     2       2013000002             true       -73.978325         40.77809     2013-01-01 00:00:00.0         5
     3       2013000003             true        -73.98962         40.72999     2013-01-01 00:00:00.0         1
     4       2013000004             true       -73.981575         40.76763     2013-01-01 00:00:00.0         2
     5       2013000005             true        -74.00053        40.737343     2013-01-01 00:00:00.0         4
     6       2013000006             true       -73.866135         40.77109     2013-01-01 00:00:00.0         6
     7       2013000007             true        -74.00693        40.740765     2013-01-01 00:00:00.0         6
     8       2013000008             true       -73.955925        40.781887     2013-01-01 00:00:00.0         3
     9       2013000009             true        -73.99988        40.743343     2013-01-01 00:00:00.0         1
    10       2013000010             true       -73.989845         40.75804     2013-01-01 00:00:00.0         3
    11       2013000011             true       -73.870834         40.77377     2013-01-01 00:00:00.0         1
Click to see the solution.
SELECT * FROM Rides WHERE isInNYC(lon, lat);

The query filters by using the UDF isInNYC.


NYC Rides View

Create a view nyc_view for all ride events that happened in New York City.

The output of SELECT * FROM nyc_view should be equal to the output of the previous exercise.

Click to see the solution.
CREATE VIEW nyc_view AS SELECT * FROM Rides WHERE isInNYC(lon, lat);

The statement creates a view from the query of the previous exercise.


NYC Areas Rides View

Create a view nyc_area_view and enrich all ride events with their area id.

The output of SELECT * FROM nyc_area_view should look similar to:

rideId            taxiId           isStart            areaId                  rideTime      psgCnt
     1        2013000001              true             47792     2013-01-01 00:00:00.0           1
     2        2013000002              true             44301     2013-01-01 00:00:00.0           5
     3        2013000003              true             54043     2013-01-01 00:00:00.0           1
     4        2013000004              true             46298     2013-01-01 00:00:00.0           2
     5        2013000005              true             52535     2013-01-01 00:00:00.0           4
     6        2013000006              true             45881     2013-01-01 00:00:00.0           6
     7        2013000007              true             51780     2013-01-01 00:00:00.0           6
     8        2013000008              true             43567     2013-01-01 00:00:00.0           3
     9        2013000009              true             51285     2013-01-01 00:00:00.0           1
    10        2013000010              true             48292     2013-01-01 00:00:00.0           3
Click to see the solution.
CREATE VIEW nyc_area_view AS
SELECT rideId, taxiId, isStart, toAreaId(lon, lat) AS areaId, rideTime, psgCnt
FROM nyc_view;

The statement creates a view using the view from the previous exercise. The UDF converts the coordinates to area ids.