In [None]:
%flink.ssql(type=update)
-- CREATE TABLE IN GLUE CATALOG TO STORE STREAMING DATA

CREATE TABLE stock_table(
    ticker VARCHAR(6),
    price DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND 
    )
    PARTITIONED BY (ticker)
    WITH(
        'connector'='kinesis',
        'stream'='my-input-stream',
        'aws.region'='eu-west-1',
        'scan.stream.initpos'='LATEST',
        'format'='json',
        'json.timestamp-format.standard'='ISO-8601'
        
        )

In [1]:
%flink
// CREATE TWO UDFs (User Defined Functions) AND REGISTER THEM WITH OUR TABLE ENVIRONMENT

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter._
import java.time.ZoneOffset

// CONVERT DATETIME TO SECONDS
class DateTimeToEpoch extends ScalarFunction{
    def eval(datetime:LocalDateTime) = datetime.toEpochSecond(ZoneOffset.UTC)
}
btenv.registerFunction("dt_to_epoch", new DateTimeToEpoch())

// CONVERT ALL STRINGS TO LOWER CASE
class ScalarLowerCase extends ScalarFunction{
    def eval(str: String) = str.toLowerCase
}
btenv.registerFunction("to_lower", new ScalarLowerCase())

In [2]:
%flink.pyflink

# grab stock table
# create a sliding window and emit results every 5 seconds over 1 minute for every window
# group by ticker and use the UDF defined earlier

input_table = st_env.from_path("stock_table")

new_table = input_table.window(Slide.over("1.minute").every("5.seconds").on("event_time").alias("one_minute_window")) \
            .group_by("one_minute_window, ticker") \
            .select("to_lower(ticker) as ticker, price.max as max_price, dt_to_epoch(one_minute_window.end) as epoch_time")


In [3]:
%flink.pyflink

z.show(new_table, stream_type="update")

In [4]:
%flink.ssql(type=update)

SELECT * FROM stock_table
