In [1]:
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

import get_env
env = get_env.get_remote_env()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")

conf = t_env.get_config().get_configuration()
conf.set_string("fs.allowed-fallback-filesystems", "hadoop")
conf.set_string("fs.gs.project.id", "flink-demo-470113")
conf.set_string("fs.gs.auth.service.account.json.keyfile", "/etc/gcp/key.json")

# ICEBERD NEED CHECKPOINTING for write
env.enable_checkpointing(10000)   # time in ms
# or 
#conf = t_env.get_config().get_configuration()
#conf.set_string("execution.checkpointing.interval", "10 s")

2025-12-04T20:59:06.036316Z main ERROR Reconfiguration failed: No configuration found for '567d299b' at 'null' in 'null'
2025-12-04T20:59:07.160898Z Thread-3 ERROR Reconfiguration failed: No configuration found for '7b8574d6' at 'null' in 'null'


<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment at 0x7fbf38dd1ae0>

In [2]:
# 1|{"customer_id":1,"full_name":"Vel","email":"vel@example.com","signup_date":"2025-01-10","country":"India","city":"Chennai","segment":"Prime"}
#  docker logs -f flink-taskmanager

In [3]:
DATALAKE_WAREHOUSE = 'gs://gks-datalake/iceberg-warehouse/'
BOOTSTRAP = "broker:9092"    # change if needed
GROUP_ID_PREFIX  = "ecomm-flink-tr-12"
TOPIC_PREFIX = "gks"

In [4]:
# Drop old catalog if you were using it

# t_env.execute_sql("DROP CATALOG IF EXISTS ecomm")

# Create new Iceberg catalog 'ecomm'
t_env.execute_sql(f"""
CREATE CATALOG IF NOT EXISTS ecomm WITH (
  'type' = 'iceberg',
  'catalog-type' = 'hadoop',
  -- GCS path for metadata/warehouse; bucket must exist and be writable
  'warehouse' = '{DATALAKE_WAREHOUSE}',
  'property-version' = '1'
)
""")


<pyflink.table.table_result.TableResult at 0x7fbf38dd2e60>

In [5]:
# 0) Clean up
t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS dim_customers_kafka")
t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS dim_customers_print")

# 1) Kafka source table
BOOTSTRAP = "broker:9092"  # adjust if needed

DIM_CUSTOMERS_KAFKA_DDL = f"""
CREATE TEMPORARY TABLE dim_customers_kafka (
  customer_id INT,
  full_name   STRING,
  email       STRING,
  signup_date STRING,
  country     STRING,
  city        STRING,
  segment     STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dim_customers',
  'properties.bootstrap.servers' = '{BOOTSTRAP}',
  'properties.group.id' = 'test-dim-customers',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
)
"""

print("Creating dim_customers_kafka…")
t_env.execute_sql(DIM_CUSTOMERS_KAFKA_DDL)

# (optional) verify it is actually registered
print("Tables now:", t_env.list_tables())

# 2) PRINT sink
DIM_CUSTOMERS_PRINT_DDL = """
CREATE TEMPORARY TABLE dim_customers_print (
  customer_id INT,
  full_name   STRING,
  email       STRING,
  signup_date STRING,
  country     STRING,
  city        STRING,
  segment     STRING
) WITH (
  'connector' = 'print'
)
"""

print("Creating dim_customers_print…")
t_env.execute_sql(DIM_CUSTOMERS_PRINT_DDL)

# 3) Insert from Kafka -> print sink  (this starts a job)
# print("Starting INSERT job…")
# insert_result = t_env.execute_sql("""
# INSERT INTO dim_customers_print
# SELECT
#   customer_id,
#   full_name,
#   email,
#   signup_date,
#   country,
#   city,
#   segment
# FROM dim_customers_kafka
# """)

# print("Job submitted:", insert_result)

Creating dim_customers_kafka…
Tables now: ['dim_customers_kafka']
Creating dim_customers_print…


<pyflink.table.table_result.TableResult at 0x7fbf38dd3640>

In [6]:
# t_env.sql_query("SELECT * FROM dim_customers_kafka LIMIT 1 ").execute().print()

In [7]:
DIM_PRODUCTS_KAFKA_DDL = """
CREATE TEMPORARY TABLE dim_products_kafka (
  product_id   INT,
  product_name STRING,
  category     STRING,
  subcategory  STRING,
  brand        STRING,
  unit_price   DECIMAL(10,2),
  active       BOOLEAN
) WITH (
  'connector' = 'kafka',
  'topic' = 'dim_products',
  'properties.bootstrap.servers' = 'broker:9092',
  'properties.group.id' = 'ecomm-flink-dim-products',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
)
"""

t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS dim_products_kafka;").wait()

t_env.execute_sql(DIM_PRODUCTS_KAFKA_DDL).wait()

In [8]:
#t_env.sql_query("SELECT * FROM dim_products_kafka LIMIT 1 ").execute().print()

In [9]:
# we will discuss water mark on Tuesday
# WATERMARK FOR order_datetime AS order_datetime - INTERVAL '5' SECOND
FACT_ORDERS_KAFKA_DDL = """
CREATE TEMPORARY TABLE fact_orders_kafka (
  order_id       INT,
  customer_id    INT,
  order_datetime TIMESTAMP(3),
  order_status   STRING,
  payment_method STRING,
  shipping_fee   DECIMAL(10,2),
  discount_amt   DECIMAL(10,2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'fact_orders',
  'properties.bootstrap.servers' = 'broker:9092',
  'properties.group.id' = 'ecomm-flink-fact-orders',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
)
"""

t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS fact_orders_kafka;").wait()

t_env.execute_sql(FACT_ORDERS_KAFKA_DDL).wait()

In [10]:
# t_env.sql_query("SELECT * FROM fact_orders_kafka LIMIT 1 ").execute().print()

In [11]:
FACT_ORDERS_ITEMS_KAFKA_DDL = """
CREATE TEMPORARY TABLE fact_order_items_kafka (
  order_item_id INT,
  order_id      INT,
  product_id    INT,
  quantity      INT,
  item_price    DECIMAL(10,2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'fact_order_items',
  'properties.bootstrap.servers' = 'broker:9092',
  'properties.group.id' = 'ecomm-flink-fact-order-items',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
)
"""

t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS fact_order_items_kafka;").wait()
t_env.execute_sql(FACT_ORDERS_ITEMS_KAFKA_DDL).wait()

In [12]:
# t_env.sql_query("SELECT * FROM fact_order_items_kafka LIMIT 1 ").execute().print()

In [13]:
# t_env.execute_sql("USE CATALOG ecomm").wait()

t_env.execute_sql("CREATE DATABASE IF NOT EXISTS ecomm.ecommdb").wait()

t_env.execute_sql("DROP TABLE IF EXISTS ecomm.ecommdb.customer_revenue_kafka").wait()

t_env.execute_sql("""
CREATE TABLE ecomm.ecommdb.customer_revenue_kafka (
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3),
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'format-version' = '2',
   'write.upsert.enabled' = 'true',
  'snapshot.retention.days' = '7',
  'write.metadata.delete-after-commit' = 'false'
)
""").wait()

In [14]:
t_env.execute_sql("""
CREATE TEMPORARY VIEW v_customer_revenue_kafka AS
SELECT
  c.customer_id,
  c.full_name,
  SUM(i.quantity * i.item_price) AS total_revenue,
  COUNT(DISTINCT o.order_id)     AS total_orders,
  MAX(o.order_datetime)          AS last_order_ts
FROM dim_customers_kafka AS c
JOIN fact_orders_kafka AS o
  ON c.customer_id = o.customer_id
JOIN fact_order_items_kafka AS i
  ON o.order_id = i.order_id
WHERE o.order_status = 'PAID'
GROUP BY c.customer_id, c.full_name
""")

<pyflink.table.table_result.TableResult at 0x7fbf38dd22c0>

In [15]:
# t_env.sql_query("SELECT * FROM v_customer_revenue_kafka LIMIT 1").execute().print()

In [16]:
"""
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3)
"""
t_env.execute_sql("""
INSERT INTO ecomm.ecommdb.customer_revenue_kafka
SELECT
  customer_id,
  full_name,
  total_revenue,
  total_orders,
  last_order_ts
FROM v_customer_revenue_kafka
""")

<pyflink.table.table_result.TableResult at 0x7fbf38dd2620>

In [17]:
# t_env.sql_query("SELECT * FROM ecomm.ecommdb.customer_revenue_kafka").execute().print()

In [18]:
t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS customer_revenue_kafka_out;").wait()
"""
  -- key: use customer_id as key
  'key.fields' = 'customer_id',
   'key.format' = 'raw',
  -- 'key.format' = 'json',
  """
t_env.execute_sql("""
CREATE TEMPORARY TABLE customer_revenue_kafka_out (
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3) 
) WITH (
  'connector' = 'kafka',
  'topic' = 'customer_revenue',
  'properties.bootstrap.servers' = 'broker:9092',
 
  -- value: the whole row as JSON
  -- 'value.format' = 'json',
  'format' = 'debezium-json',
  'debezium-json.schema-include' = 'false',
  'debezium-json.ignore-parse-errors' = 'true'
  
   -- 'value.json.fail-on-missing-field' = 'false'
  
)
""")

<pyflink.table.table_result.TableResult at 0x7fbf38dd2920>

In [19]:
t_env.execute_sql("""
INSERT INTO customer_revenue_kafka_out
SELECT
  c.customer_id,
  c.full_name,
  CAST(total_revenue AS DECIMAL(18, 2)) AS total_revenue,
  total_orders,
  last_order_ts
FROM v_customer_revenue_kafka AS c
""")

<pyflink.table.table_result.TableResult at 0x7fbf3d3d79d0>

In [20]:
t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS customer_revenue_kafka_upsert").wait()

t_env.execute_sql("""
CREATE TEMPORARY TABLE customer_revenue_kafka_upsert (
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3),
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'customer_revenue_upsert',
  'properties.bootstrap.servers' = 'broker:9092',

  -- key: derived from PRIMARY KEY (customer_id)
  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',

  -- value: flat JSON row
  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.json.ignore-parse-errors' = 'true'
)
""").wait()

In [21]:
t_env.execute_sql("""
INSERT INTO customer_revenue_kafka_upsert
SELECT
  customer_id,
  full_name,
  CAST(total_revenue AS DECIMAL(18, 2)) AS total_revenue,
  total_orders,
  last_order_ts
FROM v_customer_revenue_kafka
""")


<pyflink.table.table_result.TableResult at 0x7fbf38dd2e00>

In [22]:
t_env.execute_sql("DROP TEMPORARY TABLE IF EXISTS customer_revenue_upsert_kafka").wait()

t_env.execute_sql("""
CREATE TEMPORARY TABLE customer_revenue_upsert_kafka (
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3),
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'customer_revenue_upsert',
  'properties.bootstrap.servers' = 'broker:9092',

  -- key = {"customer_id":1}
  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',

  -- value = the flat JSON you showed
  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.json.ignore-parse-errors' = 'true'
)
""").wait()

In [23]:
# optional but nice for clarity
# t_env.execute_sql("USE CATALOG ecomm").wait()

t_env.execute_sql("""
CREATE TABLE IF NOT EXISTS ecomm.ecommdb.customer_revenue_upsert (
  customer_id   INT,
  full_name     STRING,
  total_revenue DECIMAL(18, 2),
  total_orders  BIGINT,
  last_order_ts TIMESTAMP(3),
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'format-version' = '2',
  'write.upsert.enabled' = 'true'
)
""").wait()


In [24]:
t_env.execute_sql("""
INSERT INTO ecomm.ecommdb.customer_revenue_upsert
SELECT
  customer_id,
  full_name,
  total_revenue,
  total_orders,
  last_order_ts
FROM customer_revenue_upsert_kafka
""")


<pyflink.table.table_result.TableResult at 0x7fbf38dd3d60>

In [30]:
t_env.sql_query("SELECT * FROM ecomm.ecommdb.customer_revenue_upsert").execute().print()

+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
| op | customer_id |                      full_name |        total_revenue |         total_orders |              last_order_ts |
+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
| +I |           1 |                            Vel |              2026.95 |                    3 | 2025-11-01 18:45:00.000000 |
| +I |           2 |                           Shiv |               529.49 |                    1 | 2025-10-20 11:22:00.000000 |
+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
2 rows in set


In [31]:
t_env.sql_query("SELECT * FROM ecomm.ecommdb.customer_revenue_kafka").execute().print()

+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
| op | customer_id |                      full_name |        total_revenue |         total_orders |              last_order_ts |
+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
| +I |           1 |                            Vel |              2026.95 |                    3 | 2025-11-01 18:45:00.000000 |
| +I |           2 |                           Shiv |               529.49 |                    1 | 2025-10-20 11:22:00.000000 |
+----+-------------+--------------------------------+----------------------+----------------------+----------------------------+
2 rows in set
