In [0]:
/*
Create a sensor_data table describing the format of the data in the stream.
The first line tells to Apache Zeppelin to provide a stream SQL environment (%flink.ssql) for the Apache Flink interpreter.

The first part of the CREATE TABLE statement is familiar to anyone who has used SQL with a database. 
A table is created to store the sensor data in the stream. 
The WATERMARK option is used to measure progress in the event time, 
as described in the Event Time and Watermarks section of the Apache Flink documentation.

The second part of the CREATE TABLE statement describes the connector used to receive data in the table 
(for example, kinesis or kafka), the name of the stream, the AWS Region, 
the overall data format of the stream (such as json or csv), and the syntax used for timestamps (in this case, ISO 8601)
*/

In [1]:
%flink.ssql
CREATE TABLE sensor_data (
    sensor_id INTEGER,
    current_temperature DOUBLE,
    status VARCHAR(6),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (sensor_id)
WITH (
    'connector' = 'kinesis',
    'stream' = 'my-input-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

In [2]:
/*
Simple SELECT to get all the content of the sensor_data table
The first line of the command has a parameter (type=update) so that the output of the SELECT, 
which is more than one row, is continuously updated when new data arrives.
*/

In [3]:
%flink.ssql(type=update)
SELECT * FROM sensor_data;

In [4]:
/*
Run the aggregated query explicitly using a SQL syntax using the HOP function in the GROUP BY section of the SELECT statement. 
To add the time to the output of the select, use the HOP_ROWTIME function.
*/

In [5]:
%flink.ssql(type=update)

SELECT sensor_data.status,
       COUNT(*) AS num,
       AVG(sensor_data.current_temperature) AS avg_current_temperature,
       HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
  FROM sensor_data
 GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;


In [6]:
/*
Create a sensor_state table connected to my-output-stream.
*/


In [7]:
%flink.ssql

CREATE TABLE sensor_state (
    status VARCHAR(6),
    num INTEGER,
    avg_current_temperature DOUBLE,
    hop_time TIMESTAMP(3)
)
WITH (
'connector' = 'kinesis',
'stream' = 'my-output-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

In [8]:
/*
Use this INSERT INTO statement to continuously insert the result of the select into the sensor_state table.
*/


In [9]:
%flink.ssql(type=update)

INSERT INTO sensor_state
SELECT sensor_data.status,
    COUNT(*) AS num,
    AVG(sensor_data.current_temperature) AS avg_current_temperature,
    HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM sensor_data
GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;


In [10]:
/*
The data is also sent to the destination Kinesis data stream (my-output-stream) 
so that it can be used by other applications. 
For example, the data in the destination stream can be used to update a real-time dashboard
*/