In [0]:
%flink.ssql(type=update)

CREATE TABLE stock_table (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'input-stream',
'aws.region' = 'ap-south-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601')

In [1]:
%flink.ssql(type=update)
SELECT * FROM stock_table;



In [2]:

%flink.ssql(type=update)
SELECT * FROM stock_table WHERE price > 50;

In [3]:
%flink.ssql(type=update)

SELECT * FROM stock_table WHERE ticker LIKE 'AM%'

In [4]:
%flink.ssql(type=update)
SELECT
    ticker,
    MAX(price) - MIN(price) AS price_growth
FROM stock_table
GROUP BY ticker
ORDER BY price_growth DESC
LIMIT 5;


In [5]:
%flink.ssql(type=update)
SELECT
    ticker,
    STDDEV_POP(price) AS price_volatility
FROM stock_input
GROUP BY ticker
ORDER BY price_volatility DESC
LIMIT 5;



In [6]:
%flink.ssql(type=update)

SELECT ticker, COUNT(ticker) AS ticker_count
FROM stock_table
GROUP BY TUMBLE(event_time, INTERVAL '10' second), ticker;

In [7]:
%flink.ssql(type=update)

SELECT ticker, COUNT(ticker) AS ticker_count
FROM stock_table
GROUP BY HOP(event_time, INTERVAL '5' second, INTERVAL '10' second), ticker;


In [8]:
%flink.ssql(type=update)
SELECT
    ticker,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    AVG(price) AS moving_avg_price
FROM stock_table
GROUP BY ticker, TUMBLE(event_time, INTERVAL '1' MINUTE);


In [9]:
%flink.ssql(type=update)

SELECT *
FROM stock_table
    MATCH_RECOGNIZE(
        PARTITION BY ticker
        ORDER BY event_time
        MEASURES
            A.event_time AS initialPriceTime,
            C.event_time AS dropTime,
            A.price - C.price AS dropDiff,
            A.price as initialPrice,
            C.price as lastPrice
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C) WITHIN INTERVAL '10' MINUTES
        DEFINE
            B AS B.price > A.price - 500
    )

In [10]:
%flink.ssql(type=update)
CREATE TABLE output_table (
ticker VARCHAR(6),
max_price DOUBLE,
min_price DOUBLE,
avg_price DOUBLE

)

WITH (
'connector' = 'kinesis',
'stream' = 'output-stream',
'aws.region' = 'ap-south-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json')






In [11]:
%flink.ssql(type=update)

INSERT INTO output_table
SELECT
    ticker,
    MAX(price) AS max_price,
    MIN(price) AS min_price,
    AVG(price) AS avg_price
FROM stock_table
GROUP BY ticker, TUMBLE(PROCTIME(), INTERVAL '1' MINUTE);



In [12]:
%flink.ssql(type=update)

CREATE TABLE stock_input (
    ticker STRING,
    price DOUBLE,
    event_time STRING
) WITH (
    'connector' = 'kinesis',
    'stream' = 'input-stream',
    'aws.region' = 'ap-south-1',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'scan.stream.initpos' = 'LATEST'
);


In [13]:
%flink.ssql(type=update)

CREATE TABLE stock_output (
    ticker STRING,
    max_price DOUBLE,
    min_price DOUBLE,
    avg_price DOUBLE
) WITH (
    'connector' = 'kinesis',
    'stream' = 'output-stream',
    'aws.region' = 'ap-south-1',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'sink.partitioner' = 'random'
);



In [14]:
%flink.ssql(type=update)

INSERT INTO stock_output
SELECT
    ticker,
    MAX(price) AS max_price,
    MIN(price) AS min_price,
    AVG(price) AS avg_price
FROM stock_table
GROUP BY ticker, TUMBLE(PROCTIME(), INTERVAL '1' MINUTE);



In [15]:
%flink.ssql(type=update)

SELECT * FROM stock_output;


In [16]:
%flink.ssql
