In [6]:
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

In [None]:
env = StreamExecutionEnvironment.get_execution_environment()

#settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

# create table environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env)

# add kafka connector dependency
kafka_jar = "<absolute_path>/flink-sql-connector-kafka-3.3.0-1.20.jar"

tbl_env.get_config()\
        .get_configuration()\
        .set_string("pipeline.jars", "file://{}".format(kafka_jar))

<pyflink.common.configuration.Configuration at 0x15665e410>

In [None]:
#######################################################################
# Create Kafka Source Table with DDL
#######################################################################
src_ddl = """
    CREATE TABLE sales_usd (
        `index` STRING,
        `day` STRING,
        `open` DOUBLE,
        `high` DOUBLE,
        `low` DOUBLE,
        `close` DOUBLE,
        `time` AS PROCTIME(),
        `date` AS TO_TIMESTAMP(`day`), 
        WATERMARK FOR `date` AS `date` - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'trade_events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'trade_events_group',
        'format' = 'json'
    );
"""

tbl_env.execute_sql(src_ddl)

# create and initiate loading of source Table
tbl = tbl_env.from_path('sales_usd')

print('\nSource Schema')
tbl.print_schema()


Source Schema
(
  `index` STRING,
  `day` STRING,
  `open` DOUBLE,
  `high` DOUBLE,
  `low` DOUBLE,
  `close` DOUBLE,
  `time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),
  `date` TIMESTAMP(3) *ROWTIME* AS TO_TIMESTAMP(`day`),
  WATERMARK FOR `date`: TIMESTAMP(3) AS `date` - INTERVAL '5' SECOND
)


In [None]:
#####################################################################
# Define Tumbling Window Aggregate Calculation (Seller Sales Per Minute)
#####################################################################
sql = """
        SELECT
          TUMBLE_START(`time`, INTERVAL '10' SECONDS) as window_start,
          TUMBLE_END(`time`, INTERVAL '10' SECONDS) AS window_end,
          SUM(`close`) AS window_sales
        FROM sales_usd
        GROUP BY
          TUMBLE(`time`, INTERVAL '10' SECONDS)
    """
revenue_tbl = tbl_env.sql_query(sql)

print('\nProcess Sink Schema')
revenue_tbl.print_schema()


Process Sink Schema
(
  `window_start` TIMESTAMP(3) NOT NULL,
  `window_end` TIMESTAMP(3) NOT NULL,
  `window_sales` DOUBLE
)


In [None]:
###############################################################
# Create Kafka Sink Table
###############################################################
sink_ddl1 = """
    CREATE TABLE sales_euros (
        window_start TIMESTAMP(0),
        window_end TIMESTAMP(0),
        window_sales DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'top_k_trades',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json'
    )
"""
r = tbl_env.execute_sql(sink_ddl1)

In [13]:
# write time windowed aggregations to sink table
revenue_tbl.execute_insert('sales_euros').wait()
tbl_env.execute('windowed-sales')

Py4JJavaError: An error occurred while calling o148.executeInsert.
: java.lang.NullPointerException: metadataHandlerProvider
	at java.base/java.util.Objects.requireNonNull(Objects.java:259)
	at org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122)
	at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
	at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
	at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420)
	at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243)
	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
	at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
	at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
	at org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118)
	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205)
	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
	at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59)
	at org.apache.flink.table.api.Table.executeInsert(Table.java:1074)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:1623)
