In [1]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
from pyflink.table.expressions import *
from pyflink.table.window import *

# Line only for Local Development
os.environ["IS_LOCAL"] = "True"

print(os.path.abspath(os.getcwd()))
#https://docs.aws.amazon.com/pt_br/managed-flink/latest/java/gs-python-createapp.html
#https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/python/FirehoseSink/streaming-firehose-sink.py
#https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/python/S3Sink/streaming-file-sink.py



/Users/viniciusdeoliveiramartucci/Documents/GitHub/martucci-glue-streaming/auxiliar-scripts/apache-flink/tutorial-aws-flink/kinesis-stream-to-stream-windowing


In [2]:
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

statement_set = table_env.create_statement_set()

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  # on Kinesis Data Analytics

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    print("Running Flink locally...")
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.abspath(os.getcwd())
    table_env.get_config().set(
        "pipeline.jars",
        f"file:///{CURRENT_DIR}/lib/flink-sql-connector-kinesis-4.1.0-1.17.jar"
    )


Running Flink locally...


In [7]:
def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_source_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE IF NOT EXISTS {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )


def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE IF NOT EXISTS {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def create_output_table(table_name, stream_name, region):
    return """ CREATE TABLE IF NOT EXISTS {0} (
                ticker VARCHAR(6),
                start_timestamp TIMESTAMP(3),
                end_timestamp TIMESTAMP(3),
                rowtime_timestamp TIMESTAMP(3),
                average_price DOUBLE

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.batch.max-size' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region
    )

def create_fake_source_table(table_name):
    return """ CREATE TABLE IF NOT EXISTS {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'datagen',
                'number-of-rows' = '10'
              ) """.format(table_name)

In [8]:
#def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"

input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"

output_stream_key = "output.stream.name"
output_region_key = "aws.region"

# tables
input_table_name = "input_table"
output_table_name = "output_table"

# get application properties
props = get_application_properties()

input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)

input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]

output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]

# 0. Creates fake source table from datagen
input_table_fake_name = "fake_source_table"
table_env.execute_sql(
    create_fake_source_table(input_table_fake_name)
)

# 2. Creates a source table from a Kinesis Data Stream
print(f" Creating source table {input_table_name} from source stream {input_stream} in region {input_region} using {stream_initpos}")
table_env.execute_sql(
    create_source_table(input_table_name, input_stream, input_region, stream_initpos)
)


# 3. Creates a print to check Data
print_output_table = output_table_name + "_print"
print(f" Creating print table {output_table_name} from source stream {input_stream} in region {input_region}.")
table_env.execute_sql(
    create_print_table(print_output_table, output_stream, output_region, stream_initpos)
)

# 4. Creates a sink table writing to a Kinesis Firehose Delivery Strem
print(f" Creating destination table {output_table_name} to destination stream {output_stream} in region {output_region}.")
table_env.execute_sql(
    create_output_table(output_table_name,output_stream, output_region)
)


# specify table program
tickets = table_env.from_path(input_table_name)  # schema (a, b, c, rowtime)

(tickets
    .window(Tumble.over(lit(15).seconds).on(col('event_time')).alias("w"))
    .group_by(col('w'), col('ticker')) 
    .select(col('ticker'), 
            col('w').start.alias('start_timestamp'), 
            col('w').end.alias('end_timestamp'), 
            col('w').rowtime.alias('rowtime_timestamp'), 
            col('price').avg.alias('average_price')
            )
             
).execute_insert(output_table_name).wait()


 Creating source table input_table from source stream martucci-kinesis-stream-firehose in region us-east-1 using LATEST
 Creating print table output_table from source stream martucci-kinesis-stream-firehose in region us-east-1.
 Creating destination table output_table to destination stream martucci-kinesis-destination in region us-east-1.


Py4JJavaError: An error occurred while calling o313.executeInsert.
: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'default_catalog.default_database.output_table' do not match.
Cause: Different number of columns.

Query schema: [ticker: VARCHAR(6), start_timestamp: TIMESTAMP(3) NOT NULL, end_timestamp: TIMESTAMP(3) NOT NULL, rowtime_timestamp: TIMESTAMP(3) *ROWTIME*, average_price: DOUBLE]
Sink schema:  [ticker: VARCHAR(6), price: DOUBLE, event_time: TIMESTAMP(3)]
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1010)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:345)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:312)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:272)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:197)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translateToRel$1(PlannerBase.scala:275)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:231)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:181)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:862)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1097)
	at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57)
	at org.apache.flink.table.api.Table.executeInsert(Table.java:1074)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
