In [1]:
!pip show apache-flink

Name: apache-flink
Version: 1.19.0
Summary: Apache Flink Python API
Home-page: https://flink.apache.org
Author: Apache Software Foundation
Author-email: dev@flink.apache.org
License: https://www.apache.org/licenses/LICENSE-2.0
Location: /opt/homebrew/lib/python3.11/site-packages
Requires: apache-beam, apache-flink-libraries, avro-python3, cloudpickle, fastavro, httplib2, numpy, pandas, pemja, protobuf, py4j, pyarrow, python-dateutil, pytz, requests, ruamel.yaml
Required-by: 


In [1]:
import os
os.environ['JAVA_HOME']='/opt/homebrew/opt/openjdk@11'


from pyflink.table import EnvironmentSettings, TableEnvironment

# Create a batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = TableEnvironment.create(env_settings)

# Get the current working directory
CURRENT_DIR = os.getcwd()

# Define a list of JAR file names you want to add
jar_files = [
    "flink-jar/flink-sql-avro-confluent-registry-1.19.0.jar",
    "flink-jar/flink-sql-connector-kafka-3.1.0-1.18.jar"
    
    
]


# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]

table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    ";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval",
    "5000"
)

table_env.get_config().get_configuration().set_string(
    "parallelism.default",
    "4"
)

# Configure checkpointing
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode",
    "EXACTLY_ONCE"
)

# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.checkpoints-directory",
    CURRENT_DIR
)


<pyflink.common.configuration.Configuration at 0x134bf3790>

# Create Source  

#### Customer Source 

In [2]:
customer_source = """
CREATE TABLE IF NOT EXISTS customer_source (
  id STRING,
    first_name STRING,
    last_name STRING,
    email STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'inventory.inventory.customers',
    'properties.bootstrap.servers' = 'localhost:7092',
    'properties.group.id' = 'sales',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json'
)
"""
# Execute the SQL to create the sources
table_env.execute_sql(customer_source)
print("Created customer_source table.")

Created customer_source table.


#### Order Source 

In [3]:
orders_source = """
CREATE TABLE IF NOT EXISTS product_source (
  order_number STRING,
  order_date STRING,
  purchaser STRING,
  quantity STRING,
  product_id STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'inventory.inventory.orders',
    'properties.bootstrap.servers' = 'localhost:7092',
    'properties.group.id' = 'order_group',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json'
)
"""
# Execute the SQL to create the sources
table_env.execute_sql(orders_source)
print("Created orders_source table.")


Created orders_source table.


# Join Two Streams 

In [4]:
# join_query = """
# SELECT 
#     p.order_number,
#     p.order_date,
#     c.id AS customer_id,
#     c.first_name,
#     c.last_name,
#     c.email,
#     p.quantity,
#     p.product_id
# FROM product_source AS p
# JOIN customer_source AS c ON p.purchaser = c.id
# """


# # Execute the join query
# result = table_env.execute_sql(join_query)
# result.print()

# Sink

In [5]:
orders_sink = """
CREATE TABLE IF NOT EXISTS kafka_upsert_sink (
    order_number STRING,
    order_date STRING,
    customer_id STRING,
    first_name STRING,
    last_name STRING,
    email STRING,
    quantity STRING,
    product_id STRING,
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'processed_orders',
    'properties.bootstrap.servers' = 'localhost:7092',
  'key.format' = 'avro-confluent',
  'value.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8081',
  'value.avro-confluent.url' = 'http://localhost:8081'
)
"""
table_env.execute_sql(orders_sink)


<pyflink.table.table_result.TableResult at 0x134c17b10>

In [None]:
# Insert joined data into the Kafka sink
join_and_insert_query = """
INSERT INTO kafka_upsert_sink
SELECT 
    p.order_number,
    p.order_date,
    c.id AS customer_id,
    c.first_name,
    c.last_name,
    c.email,
    p.quantity,
    p.product_id
FROM product_source AS p
JOIN customer_source AS c ON p.purchaser = c.id
"""
table_env.execute_sql(join_and_insert_query).wait()

print("Streaming data to Kafka topic 'processed_orders'.")