In [4]:
#flink-job | confluent_kafka.py
#First Approach
#Consume Kafka Events From Topic
#using Confluent Kafka

from confluent_kafka import Consumer, KafkaError
import json

conf = {
    'bootstrap.servers': 'localhost:9092',  
    'group.id': 'flink-consumer-group-confluent',        
    'auto.offset.reset': 'earliest'         
}

consumer = Consumer(conf)

consumer.subscribe(['dataeng-poc-flink-watermarking'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        else:
            try:
                event = json.loads(msg.value().decode('utf-8'))
                print("Received event:")
                print(json.dumps(event, indent=4)) 
            except Exception as e:
                print("Error processing message:", e)

except KeyboardInterrupt:
    pass

finally:
    consumer.close()


Received event:
{
    "event_name": "authorizationCreated",
    "event_order": 1
}
Received event:
{
    "event_name": "authorizationCreated",
    "event_order": 1
}


In [5]:
#flink-job | consume-events-pyflink.py
#Second Approach
#Consume Kafka Events From Topic 
#using pyflink

from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.table.table
import json

def main(topic, group_id):
    try:
        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        kafka_properties = {
            'bootstrap.servers': 'localhost:9092,localhost:9093,local:9094',
            'group.id': group_id
        }

        kafka_consumer = FlinkKafkaConsumer(
            topic,
            SimpleStringSchema(),  
            kafka_properties)  
        
        kafka_consumer.set_start_from_earliest()
        stream = env.add_source(kafka_consumer)
        
        def process_event(event):
            try:
                event_json = json.loads(event)
                print('Event Received:\n' + json.dumps(event_json, indent=4))
            except json.JSONDecodeError as e:
                print('Error decoding JSON:', e)

        stream.map(process_event)
        #stream.map(lambda x: print('Event Received:\n'+json.dumps(json.loads(x), indent=4)))
        
        env.execute("Consume Kafka Events")
    except Exception as e:
        print('An error occurred:', e)
    except KeyboardInterrupt:
        pass
    finally:
        print('Stopping Consumer')
        

if __name__ == '__main__':
    topic_name = 'dataeng-poc-flink-watermarking'
    group_id = 'flink-consumer-group-pyflink'   
    main(topic_name, group_id)


Event Received:
{
    "system_id": "IA123456789",
    "source_system": "AuthorisationSwitch",
    "acceptance_source_system": "GatewayAcceptance",
    "type": "PreAuth",
    "entry_type": "Debit",
    "payment_reference_id": "PR123456789",
    "original_payment_reference_id": "OPR123456789",
    "merchant_payment_reference": "MPR123456789",
    "acquirer_name": "AcquirerX",
    "processor_name": "ProcessorY",
    "stan_number": "STAN123",
    "transaction_creation_date": {
        "local": "2024-02-19T08:00:00",
        "local_timezone": "UTC+0",
        "utc": "2024-02-19T08:00:00"
    },
    "requester": {
        "origin_ip": "192.168.1.1",
        "origin_platform": "Portal",
        "origin_country_code": "US",
        "username": "user123",
        "fingerprint": "ABCDEF123456"
    },
    "merchant": {
        "id": "M123",
        "system_id": "G123",
        "external_id": "EM123",
        "scheme_merchant_id": "SMID123",
        "acquirer_id": "ACQ123",
        "terminal_id": 

In [7]:
#flink-job | basic-watermarking-stream.py
#Third Approach
#Consume Kafka Events From Topic 
#using pyflink

import sys
import json
from datetime import datetime
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream import TimeCharacteristic


class MyTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value, record_timestamp: int) -> int:
        event_json = json.loads(value)
        timestamp_str = event_json['transaction_creation_date']['utc']
        timestamp_obj = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
        timestamp_int = int(timestamp_obj.timestamp())
        return timestamp_int


def main(topic, group_id):
    try:
        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        kafka_properties = {
            'bootstrap.servers': 'localhost:9092,localhost:9093,local:9094',
            'group.id': group_id
        }

        kafka_consumer = FlinkKafkaConsumer(
            topic,
            SimpleStringSchema(),  
            kafka_properties)  
        
        kafka_consumer.set_start_from_earliest()
        stream = env.add_source(kafka_consumer)

        # Assign timestamps to events and emit watermarks based on event timestamps
        stream_with_timestamps = stream.assign_timestamps_and_watermarks(
            WatermarkStrategy.for_monotonous_timestamps()
                .with_timestamp_assigner(MyTimestampAssigner())
        )

        def process_event(event):
            try:
                event_json = json.loads(event)
                print('Event Received:\n' + json.dumps(event_json, indent=4))
            except json.JSONDecodeError as e:
                print('Error decoding JSON:', e)

        stream.map(process_event)
        #stream.map(lambda x: print('Event Received:\n'+json.dumps(json.loads(x), indent=4)))
        
        env.execute("Consume Kafka Events")
    except Exception as e:
        print('An error occurred:', e)
    except KeyboardInterrupt:
        pass
    finally:
        print('Stopping Consumer')
        

if __name__ == '__main__':
    topic_name = 'dataeng-poc-flink-watermarking'
    group_id = 'flink-consumer-group'   
    main(topic_name, group_id)

Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 667, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1061, in process_bundle
    input_op_by_transform_id[element.transform_id].process_enco

An error occurred: An error occurred while calling o209.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
	at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.Completa

Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 505, in input_elements
    element = received.get(timeout=1)
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/queue.py", line 179, in get
    raise Empty
_queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/Users/admin/.pyenv/versions/3.10.13/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
  File "

In [8]:
#flink-job | auth-watermarking.py

from pyflink.common import Time, WatermarkStrategy, Duration
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
from datetime import datetime

class Sum(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
        state_ttl_config = StateTtlConfig \
            .new_builder(Time.seconds(1)) \
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
            .disable_cleanup_in_background() \
            .build()
        state_descriptor.enable_time_to_live(state_ttl_config)
        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = 0

        # update the state's count
        current += value[2]
        self.state.update(current)

        # register an event time timer 2 seconds later
        ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield ctx.get_current_key(), self.state.value()


class MyTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value, record_timestamp: int) -> int:
        timestamp_str = value[0]
        timestamp_obj = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
        timestamp_int = int(timestamp_obj.timestamp())
        return timestamp_int


def event_timer_timer_demo():
    env = StreamExecutionEnvironment.get_execution_environment()

    json_collection = [
        {"transaction_creation_date": "2024-02-19T08:00:00", "acquirer_name": "PayPal",     "value": 110.1},
        {"transaction_creation_date": "2024-02-19T08:00:02", "acquirer_name": "MasterCard", "value": 30.2},  # Out-of-order
        {"transaction_creation_date": "2024-02-19T08:00:03", "acquirer_name": "PayPal",     "value": 20.0},  # Out-of-order
        {"transaction_creation_date": "2024-02-19T08:00:01", "acquirer_name": "MasterCard", "value": 53.1},  # Out-of-order
        {"transaction_creation_date": "2024-02-19T08:00:04", "acquirer_name": "PayPal",     "value": 13.1},  # In-order
        {"transaction_creation_date": "2024-02-19T08:00:03", "acquirer_name": "MasterCard", "value": 3.1},  # In-order
        {"transaction_creation_date": "2024-02-19T08:00:07", "acquirer_name": "MasterCard", "value": 16.1},  # In-order
        {"transaction_creation_date": "2024-02-19T08:00:05", "acquirer_name": "PayPal",     "value": 20.1}  # In-order
    ]


    def json_to_tuple(json_data):
        return (json_data["transaction_creation_date"], json_data["acquirer_name"], json_data["value"])

    tuple_collection = [json_to_tuple(json_obj) for json_obj in json_collection]

    ds = env.from_collection(tuple_collection, type_info=Types.TUPLE([
                            Types.STRING(),
                            Types.STRING(),
                            Types.FLOAT()
                        ]))

    ds = ds.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
                         .with_timestamp_assigner(MyTimestampAssigner()))

    # apply the process function onto a keyed stream
    ds.key_by(lambda value: value[1]) \
      .process(Sum()) \
      .print()

    # submit for execution
    env.execute()


if __name__ == '__main__':
    event_timer_timer_demo()


6> ('MasterCard', 102.49999952316284)
7> ('PayPal', 163.29999923706055)
7> ('PayPal', 163.29999923706055)
7> ('PayPal', 163.29999923706055)
7> ('PayPal', 163.29999923706055)
6> ('MasterCard', 102.49999952316284)
6> ('MasterCard', 102.49999952316284)
6> ('MasterCard', 102.49999952316284)


In [9]:
#flink-job | auth-wm-tumbling-window.py
import sys

import argparse
from typing import Iterable

from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy

from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow


class MyTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp) -> int:
        return int(value[1])


class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
    def process(self,
                key: str,
                context: ProcessWindowFunction.Context[TimeWindow],
                elements: Iterable[tuple]) -> Iterable[tuple]:
        return [(key, context.window().start, context.window().end, len([e for e in elements]))]


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)
    output_path = known_args.output

    env = StreamExecutionEnvironment.get_execution_environment()
    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    json_collection = [
        { "event_name": "authorizationCreated",     "event_order": 1},
        { "event_name": "schemesResponded", "event_order": 2},  # Out-of-order
        { "event_name": "authorizationCreated",     "event_order": 3},  # Out-of-order
        { "event_name": "schemesResponded", "event_order": 4},  # Out-of-order
        { "event_name": "authorizationCreated",     "event_order": 5},  # In-order
        { "event_name": "schemesResponded", "event_order": 8},  # In-order
        { "event_name": "schemesResponded", "event_order": 9},  # In-order
        { "event_name": "acquirerRequested",     "event_order": 19}, # In-order
        { "event_name": "acquirerResponded",     "event_order": 18},  # In-order
        { "event_name": "acquirerRequested",     "event_order": 16},  # In-order
        { "event_name": "acquirerResponded",     "event_order": 12},  # In-order
        { "event_name": "acquirerResponded",     "event_order": 11}
    ]


    def json_to_tuple(json_data):
        return (json_data["event_name"], json_data["event_order"])

    tuple_collection = [json_to_tuple(json_obj) for json_obj in json_collection]

    # define the watermark strategy
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(MyTimestampAssigner())
    
    ds = env.from_collection(tuple_collection, type_info=Types.TUPLE([
                            Types.STRING(),
                            Types.INT()
                        ]))

    ds = ds.assign_timestamps_and_watermarks(watermark_strategy) \
        .key_by(lambda x: x[0], key_type=Types.STRING()) \
        .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
        .process(CountWindowProcessFunction(),
                 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))

    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        ds.print()

    # submit for execution
    env.execute()

Printing result to stdout. Use --output to specify output path.
(authorizationCreated,0,5,2)
(schemesResponded,0,5,2)
(schemesResponded,5,10,2)
(authorizationCreated,5,10,1)
(acquirerResponded,10,15,2)
(acquirerRequested,15,20,2)
(acquirerResponded,15,20,1)


In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from json import loads

# Kafka consumer configuration
kafka_props = {
    'bootstrap.servers': 'localhost:9092,localhost:9093,local:9094',
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest'
}

# Define the watermark generation function
@udf(result_type=DataTypes.TIMESTAMP(3))
def assign_watermark(ts):
    # Add 5 seconds of allowed lateness as watermark
    return ts - 5000  # in milliseconds

# Set up Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # Set parallelism to 1 for demonstration purpose
t_env = StreamTableEnvironment.create(env)

# Register watermark UDF
t_env.register_function("assign_watermark", assign_watermark)

# Kafka source table
kafka_source_ddl = f"""
    CREATE TABLE kafka_source (
        message STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dataeng-poc-flink-watermarking',
        'properties.bootstrap.servers' = '{kafka_props['bootstrap.servers']}',
        'properties.group.id' = '{kafka_props['group.id']}',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset'
    )
"""

# Register Kafka source table
t_env.execute_sql(kafka_source_ddl)

# Define the query to process the data with watermarking
query = f"""
    SELECT
        TUMBLE_START(transaction_creation_date_utc, INTERVAL '1' MINUTE) AS window_start,
        TUMBLE_END(transaction_creation_date_utc, INTERVAL '1' MINUTE) AS window_end,
        COUNT(*) AS event_count
    FROM (
        SELECT
            CAST(JSON_VALUE(message, '$.transaction_creation_date.utc') AS TIMESTAMP) AS transaction_creation_date_utc
        FROM kafka_source
    )
    GROUP BY TUMBLE(transaction_creation_date_utc, INTERVAL '1' MINUTE)
"""

# Execute the query
result_table = t_env.sql_query(query)

# Convert the result table to a DataStream and print the results
result_table.execute_and_print()

# Start the execution
env.execute("Flink Watermarking Example")


Py4JJavaError: An error occurred while calling o72.sqlQuery.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.kafka_source'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='my_consumer_group'
'scan.startup.mode'='earliest-offset'
'topic'='dataeng-poc-flink-watermarking'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:219)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:244)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2458)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
	at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
	at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
	at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:798)
	at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:772)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:215)
	... 38 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
python-input-format
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:608)
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:794)
	... 40 more


In [2]:
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

# Set up the environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # You can adjust the parallelism as needed
t_env = StreamTableEnvironment.create(env)

# Define the Kafka source properties
kafka_props = {
    "bootstrap.servers": "localhost:9092,localhost:9093,local:9094",
    "group.id": "flink-consumer-group",
    "auto.offset.reset": "latest"
}

# Define the schema for your Kafka topic
# Make sure the schema matches the structure of the events in your Kafka topic
# Here, I'm assuming a simple schema with a single 'value' field
kafka_source_ddl = f"""
    CREATE TABLE kafka_source (
        value STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dataeng-poc-flink-watermarking',
        'properties.bootstrap.servers' = '{kafka_props["bootstrap.servers"]}',
        'properties.group.id' = '{kafka_props["group.id"]}',
        'format' = 'json'
    )
"""

# Register the Kafka source table
t_env.execute_sql(kafka_source_ddl)

# Define your query
# Here, I'm simply selecting and printing the values from the Kafka source table
query = """
    SELECT *
    FROM kafka_source
"""

# Convert the query to a DataStream and print the results
t_env.to_append_stream(t_env.sql_query(query)).print()

# Execute the job
env.execute("Consume Kafka events in Flink")


Py4JJavaError: An error occurred while calling o24.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "value" at line 3, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <BIG_QUERY_BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
	at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "value" at line 3, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <BIG_QUERY_BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:512)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:265)
	at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:156)
	at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:211)
	at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:76)
	... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "value" at line 3, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <BIG_QUERY_BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:48939)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:48747)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:7550)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:8759)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:9996)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:27604)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3781)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:3154)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:321)
	at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:209)
	... 14 more


In [1]:
import json
import logging
import sys

from pyflink.common import Row
from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings, ExplainDetail)
from pyflink.table.expressions import *
from pyflink.table.udf import udtf, udf, udaf, AggregateFunction, TableAggregateFunction, udtaf


def basic_operations():
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

    # define the source
    table = t_env.from_elements(
        elements=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        schema=['id', 'data'])

    right_table = t_env.from_elements(elements=[(1, 18), (2, 30), (3, 25), (4, 10)],
                                      schema=['id', 'age'])

    table = table.add_columns(
                    col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
                    col('data').json_value('$.tel', DataTypes.STRING()).alias('tel'),
                    col('data').json_value('$.addr.country', DataTypes.STRING()).alias('country')) \
                 .drop_columns(col('data'))
    table.execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                            tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |
    # | +I |                    2 |                          hello |                            135 |                          China |
    # | +I |                    3 |                          world |                            124 |                            USA |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+

    # limit the number of outputs
    table.limit(3).execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                            tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |
    # | +I |                    2 |                          hello |                            135 |                          China |
    # | +I |                    3 |                          world |                            124 |                            USA |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+

    # filter
    table.filter(col('id') != 3).execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                            tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |
    # | +I |                    2 |                          hello |                            135 |                          China |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+

    # aggregation
    table.group_by(col('country')) \
         .select(col('country'), col('id').count, col('tel').cast(DataTypes.BIGINT()).max) \
         .execute().print()
    # +----+--------------------------------+----------------------+----------------------+
    # | op |                        country |               EXPR$0 |               EXPR$1 |
    # +----+--------------------------------+----------------------+----------------------+
    # | +I |                        Germany |                    1 |                  123 |
    # | +I |                            USA |                    1 |                  124 |
    # | +I |                          China |                    1 |                  135 |
    # | -U |                          China |                    1 |                  135 |
    # | +U |                          China |                    2 |                  135 |
    # +----+--------------------------------+----------------------+----------------------+

    # distinct
    table.select(col('country')).distinct() \
         .execute().print()
    # +----+--------------------------------+
    # | op |                        country |
    # +----+--------------------------------+
    # | +I |                        Germany |
    # | +I |                          China |
    # | +I |                            USA |
    # +----+--------------------------------+

    # join
    # Note that it still doesn't support duplicate column names between the joined tables
    table.join(right_table.rename_columns(col('id').alias('r_id')), col('id') == col('r_id')) \
         .execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
    # | op |                   id |                           name |                            tel |                        country |                 r_id |                  age |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
    # | +I |                    4 |                        PyFlink |                             32 |                          China |                    4 |                   10 |
    # | +I |                    1 |                          Flink |                            123 |                        Germany |                    1 |                   18 |
    # | +I |                    2 |                          hello |                            135 |                          China |                    2 |                   30 |
    # | +I |                    3 |                          world |                            124 |                            USA |                    3 |                   25 |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+

    # join lateral
    @udtf(result_types=[DataTypes.STRING()])
    def split(r: Row):
        for s in r.name.split("i"):
            yield s

    table.join_lateral(split.alias('a')) \
         .execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                            tel |                        country |                              a |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |                             Fl |
    # | +I |                    1 |                          Flink |                            123 |                        Germany |                             nk |
    # | +I |                    2 |                          hello |                            135 |                          China |                          hello |
    # | +I |                    3 |                          world |                            124 |                            USA |                          world |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |                           PyFl |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |                             nk |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+

    # show schema
    table.print_schema()
    # (
    #   `id` BIGINT,
    #   `name` STRING,
    #   `tel` STRING,
    #   `country` STRING
    # )

    # show execute plan
    print(table.join_lateral(split.alias('a')).explain())
    # == Abstract Syntax Tree ==
    # LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{}])
    # :- LogicalProject(id=[$0], name=[JSON_VALUE($1, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], tel=[JSON_VALUE($1, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], country=[JSON_VALUE($1, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))])
    # :  +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]])
    # +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], rowType=[RecordType(VARCHAR(2147483647) a)], elementType=[class [Ljava.lang.Object;])
    #
    # == Optimized Physical Plan ==
    # PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], correlate=[table(split(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])
    # +- Calc(select=[id, JSON_VALUE(data, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS name, JSON_VALUE(data, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS tel, JSON_VALUE(data, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS country])
    #    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
    #
    # == Optimized Execution Plan ==
    # PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], correlate=[table(split(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])
    # +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS country])
    #    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])

    # show execute plan with advice
    print(table.join_lateral(split.alias('a')).explain(ExplainDetail.PLAN_ADVICE))
    # == Abstract Syntax Tree ==
    # LogicalCorrelate(correlation=[$cor2], joinType=[inner], requiredColumns=[{}])
    # :- LogicalProject(id=[$0], name=[JSON_VALUE($1, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], tel=[JSON_VALUE($1, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], country=[JSON_VALUE($1, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))])
    # :  +- LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
    # +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0, $1, $2, $3)], rowType=[RecordType(VARCHAR(2147483647) a)])
    #
    # == Optimized Physical Plan With Advice ==
    # PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0, $1, $2, $3)], correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])
    # +- Calc(select=[id, JSON_VALUE(data, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS name, JSON_VALUE(data, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS tel, JSON_VALUE(data, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS country])
    #    +- TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, data])
    #
    # No available advice...
    #
    # == Optimized Execution Plan ==
    # PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0, $1, $2, $3)], correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])
    # +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS country])
    #    +- TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, data])

def sql_operations():
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

    # define the source
    table = t_env.from_elements(
        elements=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        schema=['id', 'data'])

    t_env.sql_query("SELECT * FROM %s" % table) \
         .execute().print()
    # +----+----------------------+--------------------------------+
    # | op |                   id |                           data |
    # +----+----------------------+--------------------------------+
    # | +I |                    1 | {"name": "Flink", "tel": 12... |
    # | +I |                    2 | {"name": "hello", "tel": 13... |
    # | +I |                    3 | {"name": "world", "tel": 12... |
    # | +I |                    4 | {"name": "PyFlink", "tel": ... |
    # +----+----------------------+--------------------------------+

    # execute sql statement
    @udtf(result_types=[DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING()])
    def parse_data(data: str):
        json_data = json.loads(data)
        yield json_data['name'], json_data['tel'], json_data['addr']['country']

    t_env.create_temporary_function('parse_data', parse_data)
    t_env.execute_sql(
        """
        SELECT *
        FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
        """ % table
    ).print()
    # +----+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
    # | op |                   id |                           data |                           name |         tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
    # | +I |                    1 | {"name": "Flink", "tel": 12... |                          Flink |         123 |                        Germany |
    # | +I |                    2 | {"name": "hello", "tel": 13... |                          hello |         135 |                          China |
    # | +I |                    3 | {"name": "world", "tel": 12... |                          world |         124 |                            USA |
    # | +I |                    4 | {"name": "PyFlink", "tel": ... |                        PyFlink |          32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+

    # explain sql plan
    print(t_env.explain_sql(
        """
        SELECT *
        FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
        """ % table
    ))
    # == Abstract Syntax Tree ==
    # LogicalProject(id=[$0], data=[$1], name=[$2], tel=[$3], country=[$4])
    # +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}])
    #    :- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_734856049, source: [PythonInputFormatTableSource(id, data)]]])
    #    +- LogicalTableFunctionScan(invocation=[parse_data($cor1.data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)])
    #
    # == Optimized Physical Plan ==
    # PythonCorrelate(invocation=[parse_data($1)], correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
    # +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_734856049, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
    #
    # == Optimized Execution Plan ==
    # PythonCorrelate(invocation=[parse_data($1)], correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
    # +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_734856049, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])

    # explain sql plan with advice
    print(t_env.explain_sql(
        """
        SELECT *
        FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
        """ % table, ExplainDetail.PLAN_ADVICE
    ))
    # == Abstract Syntax Tree ==
    # LogicalProject(id=[$0], data=[$1], name=[$2], tel=[$3], country=[$4])
    # +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}])
    #    :- LogicalTableScan(table=[[*anonymous_python-input-format$10*]])
    #    +- LogicalTableFunctionScan(invocation=[parse_data($cor2.data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)])
    #
    # == Optimized Physical Plan With Advice ==
    # PythonCorrelate(invocation=[parse_data($1)], correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
    # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]], fields=[id, data])
    #
    # No available advice...
    #
    # == Optimized Execution Plan ==
    # PythonCorrelate(invocation=[parse_data($1)], correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
    # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]], fields=[id, data])

def column_operations():
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

    # define the source
    table = t_env.from_elements(
        elements=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        schema=['id', 'data'])

    # add columns
    table = table.add_columns(
        col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
        col('data').json_value('$.tel', DataTypes.STRING()).alias('tel'),
        col('data').json_value('$.addr.country', DataTypes.STRING()).alias('country'))

    table.execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           data |                           name |                            tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 | {"name": "Flink", "tel": 12... |                          Flink |                            123 |                        Germany |
    # | +I |                    2 | {"name": "hello", "tel": 13... |                          hello |                            135 |                          China |
    # | +I |                    3 | {"name": "world", "tel": 12... |                          world |                            124 |                            USA |
    # | +I |                    4 | {"name": "PyFlink", "tel": ... |                        PyFlink |                             32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+

    # drop columns
    table = table.drop_columns(col('data'))
    table.execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                            tel |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |
    # | +I |                    2 |                          hello |                            135 |                          China |
    # | +I |                    3 |                          world |                            124 |                            USA |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+

    # rename columns
    table = table.rename_columns(col('tel').alias('telephone'))
    table.execute().print()
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                   id |                           name |                      telephone |                        country |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                    1 |                          Flink |                            123 |                        Germany |
    # | +I |                    2 |                          hello |                            135 |                          China |
    # | +I |                    3 |                          world |                            124 |                            USA |
    # | +I |                    4 |                        PyFlink |                             32 |                          China |
    # +----+----------------------+--------------------------------+--------------------------------+--------------------------------+

    # replace columns
    table = table.add_or_replace_columns(
        concat(col('id').cast(DataTypes.STRING()), '_', col('name')).alias('id'))
    table.execute().print()
    # +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | op |                             id |                           name |                      telephone |                        country |
    # +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
    # | +I |                        1_Flink |                          Flink |                            123 |                        Germany |
    # | +I |                        2_hello |                          hello |                            135 |                          China |
    # | +I |                        3_world |                          world |                            124 |                            USA |
    # | +I |                      4_PyFlink |                        PyFlink |                             32 |                          China |
    # +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+


def row_operations():
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

    # define the source
    table = t_env.from_elements(
        elements=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "China", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        schema=['id', 'data'])

    # map operation
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
                                    DataTypes.FIELD("country", DataTypes.STRING())]))
    def extract_country(input_row: Row):
        data = json.loads(input_row.data)
        return Row(input_row.id, data['addr']['country'])

    table.map(extract_country) \
         .execute().print()
    # +----+----------------------+--------------------------------+
    # | op |                   id |                        country |
    # +----+----------------------+--------------------------------+
    # | +I |                    1 |                        Germany |
    # | +I |                    2 |                          China |
    # | +I |                    3 |                          China |
    # | +I |                    4 |                          China |
    # +----+----------------------+--------------------------------+

    # flat_map operation
    @udtf(result_types=[DataTypes.BIGINT(), DataTypes.STRING()])
    def extract_city(input_row: Row):
        data = json.loads(input_row.data)
        yield input_row.id, data['addr']['city']

    table.flat_map(extract_city) \
         .execute().print()
    # +----+----------------------+--------------------------------+
    # | op |                   f0 |                             f1 |
    # +----+----------------------+--------------------------------+
    # | +I |                    1 |                         Berlin |
    # | +I |                    2 |                       Shanghai |
    # | +I |                    3 |                        NewYork |
    # | +I |                    4 |                       Hangzhou |
    # +----+----------------------+--------------------------------+

    # aggregate operation
    class CountAndSumAggregateFunction(AggregateFunction):

        def get_value(self, accumulator):
            return Row(accumulator[0], accumulator[1])

        def create_accumulator(self):
            return Row(0, 0)

        def accumulate(self, accumulator, input_row):
            accumulator[0] += 1
            accumulator[1] += int(input_row.tel)

        def retract(self, accumulator, input_row):
            accumulator[0] -= 1
            accumulator[1] -= int(input_row.tel)

        def merge(self, accumulator, accumulators):
            for other_acc in accumulators:
                accumulator[0] += other_acc[0]
                accumulator[1] += other_acc[1]

        def get_accumulator_type(self):
            return DataTypes.ROW(
                [DataTypes.FIELD("cnt", DataTypes.BIGINT()),
                 DataTypes.FIELD("sum", DataTypes.BIGINT())])

        def get_result_type(self):
            return DataTypes.ROW(
                [DataTypes.FIELD("cnt", DataTypes.BIGINT()),
                 DataTypes.FIELD("sum", DataTypes.BIGINT())])

    count_sum = udaf(CountAndSumAggregateFunction())
    table.add_columns(
            col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
            col('data').json_value('$.tel', DataTypes.STRING()).alias('tel'),
            col('data').json_value('$.addr.country', DataTypes.STRING()).alias('country')) \
         .group_by(col('country')) \
         .aggregate(count_sum.alias("cnt", "sum")) \
         .select(col('country'), col('cnt'), col('sum')) \
         .execute().print()
    # +----+--------------------------------+----------------------+----------------------+
    # | op |                        country |                  cnt |                  sum |
    # +----+--------------------------------+----------------------+----------------------+
    # | +I |                          China |                    3 |                  291 |
    # | +I |                        Germany |                    1 |                  123 |
    # +----+--------------------------------+----------------------+----------------------+

    # flat_aggregate operation
    class Top2(TableAggregateFunction):

        def emit_value(self, accumulator):
            for v in accumulator:
                if v:
                    yield Row(v)

        def create_accumulator(self):
            return [None, None]

        def accumulate(self, accumulator, input_row):
            tel = int(input_row.tel)
            if accumulator[0] is None or tel > accumulator[0]:
                accumulator[1] = accumulator[0]
                accumulator[0] = tel
            elif accumulator[1] is None or tel > accumulator[1]:
                accumulator[1] = tel

        def get_accumulator_type(self):
            return DataTypes.ARRAY(DataTypes.BIGINT())

        def get_result_type(self):
            return DataTypes.ROW(
                [DataTypes.FIELD("tel", DataTypes.BIGINT())])

    top2 = udtaf(Top2())
    table.add_columns(
            col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
            col('data').json_value('$.tel', DataTypes.STRING()).alias('tel'),
            col('data').json_value('$.addr.country', DataTypes.STRING()).alias('country')) \
        .group_by(col('country')) \
        .flat_aggregate(top2) \
        .select(col('country'), col('tel')) \
        .execute().print()
    # +----+--------------------------------+----------------------+
    # | op |                        country |                  tel |
    # +----+--------------------------------+----------------------+
    # | +I |                          China |                  135 |
    # | +I |                          China |                  124 |
    # | +I |                        Germany |                  123 |
    # +----+--------------------------------+----------------------+


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    basic_operations()
    sql_operations()
    column_operations()
    row_operations()

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                   id |                           name |                            tel |                        country |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          Flink |                            123 |                        Germany |
| +I |                    2 |                          hello |                            135 |                          China |
| +I |                    3 |                          world |                            124 |                            USA |
| +I |                    4 |       