In [0]:
%flink.pyflink

from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.window import Tumble
from pyflink.table.udf import udf
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


def create_input_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                `id` BIGINT,
                `date` TIMESTAMP(3),
                `block` STRING,
                `iucr` STRING,
                `primary_type` STRING,
                `description` STRING,
                `location_description` STRING,
                `arrest` STRING,
                `domestic` STRING,
                `beat` STRING,
                `district` STRING,
                `ward` STRING,
                `community_area` STRING,
                WATERMARK FOR  `date` AS  `date` - INTERVAL '5' SECOND
              )
              PARTITIONED BY (community_area)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json'
              ) """.format(table_name, stream_name, region, stream_initpos)

def create_output_table(table_name, stream_name, region):
    return """ CREATE TABLE {0} (
                community_area STRING,
                arrest STRING,
                cnt BIGINT,
                event_time VARCHAR(64)
              )
              PARTITIONED BY (community_area)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'format' = 'json'
              ) """.format(table_name, stream_name, region)


def perform_tumbling_window_aggregation(input_table_name):
    # use SQL Table in the Table API
    input_table = table_env.from_path(input_table_name)

    tumbling_window_table = (
        input_table.window(
            Tumble.over("10.seconds").on("date").alias("ten_second_window")
        )
        .group_by("community_area, arrest, ten_second_window")
        .select("community_area, arrest, count(1) as cnt, to_string(ten_second_window.end) as event_time")
    )

    return tumbling_window_table


@udf(input_types=[DataTypes.TIMESTAMP(3)], result_type=DataTypes.STRING())
def to_string(i):
    return str(i)


table_env.create_temporary_system_function("to_string", to_string)

def main():
    # tables
    input_table_name = "big_crime_input_table"
    output_table_name = "output_table"

    input_stream = 'big-crime-stream-crimes'
    input_region = 'us-east-1'
    stream_initpos = 'LATEST'

    output_stream = 'big-crime-stream-summary'
    output_region ='us-east-1'

    # 2. Creates a source table from a Kinesis Data Stream
    table_env.execute_sql(create_input_table(input_table_name, input_stream, input_region, stream_initpos))

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(create_output_table(output_table_name, output_stream, output_region))

    # 4. Queries from the Source Table and creates a tumbling window over 10 seconds to calculate the count of events
    # over the window.
    tumbling_window_table = perform_tumbling_window_aggregation(input_table_name)
    table_env.create_temporary_view("tumbling_window_table", tumbling_window_table)

    # 5. These tumbling windows are inserted into the sink table
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                         .format(output_table_name, "tumbling_window_table"))

    print(table_result.get_job_client().get_job_status())


if __name__ == "__main__":
    main()

In [1]:
%flink.ssql(type=update)
select * from output_table

In [2]:
%flink.ssql
