## orders

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_orders (
    order_id string,
    customer_id string,
    order_status string,
    order_approved_at bigint,
    order_purchase_timestamp bigint,
    order_delivered_carrier_date bigint,
    order_delivered_customer_date bigint,
    order_estimated_delivery_date bigint
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders.public.orders',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-orders',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql("SELECT * FROM kafka_orders LIMIT 10").print()

+----+--------------------------------+--------------------------------+--------------------------------+----------------------+--------------------------+------------------------------+-------------------------------+-------------------------------+
| op |                       order_id |                    customer_id |                   order_status |    order_approved_at | order_purchase_timestamp | order_delivered_carrier_date | order_delivered_customer_date | order_estimated_delivery_date |
+----+--------------------------------+--------------------------------+--------------------------------+----------------------+--------------------------+------------------------------+-------------------------------+-------------------------------+
| +I | ec7a019261fce44180373d45b44... | c24fc5f9a446b4d8262041b9c64... |                      delivered |     1483618217000000 |         1483617366000000 |             1483706621000000 |              1484140445000000 |              148590720000000

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_orders (
    order_id string,
    customer_id string,
    order_status string,
    order_approved_at bigint,
    order_purchase_timestamp bigint,
    order_delivered_carrier_date bigint,
    order_delivered_customer_date bigint,
    order_estimated_delivery_date bigint
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders.public.orders',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-orders',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS opensearch_orders (
    order_id string,
    customer_id string,
    order_status string,
    order_approved_at timestamp,
    order_purchase_timestamp timestamp,
    order_delivered_carrier_date timestamp,
    order_delivered_customer_date timestamp,
    order_estimated_delivery_date timestamp
) WITH (
    'connector' = 'opensearch-2',
    'hosts' = 'http://opensearch.io:80',
    'allow-insecure' = 'true',
    'index' = 'orders',
    'format' = 'json'
);
""")

flink.execute_sql(r"""
INSERT INTO opensearch_orders
SELECT
    order_id,
    customer_id,
    order_status,
    to_timestamp_ltz(order_approved_at / 1e3, 3) as order_approved_at,
    to_timestamp_ltz(order_purchase_timestamp / 1e3, 3) as order_purchase_timestamp,
    to_timestamp_ltz(order_delivered_carrier_date / 1e3, 3) as order_delivered_carrier_date,
    to_timestamp_ltz(order_delivered_customer_date / 1e3, 3) as order_delivered_customer_date,
    to_timestamp_ltz(order_estimated_delivery_date / 1e3, 3) as order_estimated_delivery_date
FROM kafka_orders;
""").wait()

## order_items

In [1]:
%run utils.ipynb

stream = get_trino(catalog="kafka", schema="default")
df = pl.read_database('select * from "order_items.public.order_items"', connection=stream)
df

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
str,i64,str,str,i64,f64,f64
"""ec7a019261fce44180373d45b442d7…",1,"""f5d8f4fbc70ca2a0038b9a0010ed5c…","""48efc9d94a9834137efd9ea76b065a…",1483962966000000,10.9,8.72
"""b95a0a8bd30aece4e94e81f0591249…",1,"""6c04a068e5ab37749c980c42a036b9…","""48efc9d94a9834137efd9ea76b065a…",1483963280000000,10.9,8.72
"""38bcb524e1c38c2c1b60600a80fc89…",1,"""680cc8535be7cc69544238c1d6a83f…","""48efc9d94a9834137efd9ea76b065a…",1483963596000000,2.9,8.72
"""7a18a504c1a4b32d883e68de2e1a7d…",1,"""c0d4027067afcf9c1697cce981b8fe…","""48efc9d94a9834137efd9ea76b065a…",1483963748000000,7.9,8.72
"""6acecf438369055d9243e121045cca…",1,"""1514ddb0f4a5afc8d24104e89c7144…","""48efc9d94a9834137efd9ea76b065a…",1483963883000000,9.9,8.72
…,…,…,…,…,…,…
"""41f25580214be3f00eb80620ca9710…",2,"""c94d19d021d4a8fbae5a45f904aeea…","""701938c450705b8ae65fc923b70f35…",1486245453000000,103.99,37.24
"""0afbc110b84719faaef4887a5cff5a…",1,"""7340a3839a1de1e99d149b8cf052a2…","""4a3ca9315b744ce9f8e93743614938…",1486247473000000,69.9,11.88
"""adf684f287283640cb375a2e8ada8c…",1,"""8ce0b48cdcc4b753ae4de644979983…","""75d34ebb1bd0bd7dde40dd507b8169…",1486595902000000,131.99,15.22
"""adf684f287283640cb375a2e8ada8c…",2,"""8ce0b48cdcc4b753ae4de644979983…","""75d34ebb1bd0bd7dde40dd507b8169…",1486595902000000,131.99,15.22


In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_items (
    order_id string,
    seller_id string,
    product_id string,
    order_item_id bigint,
    shipping_limit_date bigint,
    freight_value double,
    price double
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_items.public.order_items',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-items',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql("SELECT * FROM kafka_order_items LIMIT 10").print()

+----+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+--------------------------------+
| op |                       order_id |                      seller_id |                     product_id |        order_item_id |  shipping_limit_date |                  freight_value |                          price |
+----+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+--------------------------------+
| +I | ec7a019261fce44180373d45b44... | 48efc9d94a9834137efd9ea76b0... | f5d8f4fbc70ca2a0038b9a0010e... |                    1 |     1483962966000000 |                           8.72 |                           10.9 |
| +I | b95a0a8bd30aece4e94e81f0591... | 48efc9d94a9834137efd9ea76b0... | 6c04a068e5ab37749c980c42a03... |                    1 |

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_items (
    order_id string,
    seller_id string,
    product_id string,
    order_item_id bigint,
    shipping_limit_date bigint,
    freight_value double,
    price double
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_items.public.order_items',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-items',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS opensearch_order_items (
    order_id string,
    seller_id string,
    product_id string,
    order_item_id bigint,
    shipping_limit_date timestamp(3),
    freight_value decimal,
    price decimal
) WITH (
    'connector' = 'opensearch-2',
    'hosts' = 'http://opensearch.io:80',
    'allow-insecure' = 'true',
    'index' = 'order_items',
    'format' = 'json'
);
""")

flink.execute_sql(r"""
INSERT INTO opensearch_order_items
SELECT
    order_id,
    seller_id,
    product_id,
    order_item_id,
    to_timestamp_ltz(shipping_limit_date / 1e3, 3) as shipping_limit_date,
    freight_value,
    price
FROM kafka_order_items;
""").wait()

## order_reviews

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_reviews (
    payload string
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_reviews.ecommerce.order_reviews',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-reviews',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'json'
);
""")

flink.execute_sql(r"""
WITH selector AS (
    SELECT
        json_value(payload, '$.order_id') as order_id,
        json_value(payload, '$.review_id') as review_id,
        json_value(payload, '$.review_score') as review_score,
        json_value(payload, '$.review_comment_title') as review_comment_title,
        json_value(payload, '$.review_comment_message') as review_comment_message,
        replace(replace(json_value(payload, '$.review_creation_date'), 'T', ' '), 'Z', '') as review_creation_date,
        replace(replace(json_value(payload, '$.review_answer_timestamp'), 'T', ' '), 'Z', '') as review_answer_timestamp
    FROM kafka_order_reviews
    )
SELECT
    order_id,
    review_id,
    cast(review_score as integer) as review_score,
    review_comment_title,
    review_comment_message,
    review_creation_date as review_creation_date,
    to_timestamp(review_answer_timestamp) as review_answer_timestamp
FROM selector
LIMIT 10;
""").print()

+----+--------------------------------+--------------------------------+--------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                       order_id |                      review_id | review_score |           review_comment_title |         review_comment_message |           review_creation_date | review_answer_timestamp |
+----+--------------------------------+--------------------------------+--------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I | ec7a019261fce44180373d45b44... | 5f45d6aa32336fa26cbc254721c... |            5 |                         <NULL> | I really loved the product!... |            2017-01-12 00:00:00 | 2017-01-13 20:22:46.000 |
| +I | e1fe072ef14b519af1f0a8ed997... | 49180505e856dbdac671d368b51... |            5 |                         <NULL> | Shop recommended, product w

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_reviews (
    payload string
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_reviews.ecommerce.order_reviews',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-reviews',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'json'
);
""")

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS opensearch_order_reviews (
    order_id string,
    review_id string,
    review_score integer,
    review_comment_title string,
    review_comment_message string,
    review_creation_date timestamp(3),
    review_answer_timestamp timestamp(3)
) WITH (
    'connector' = 'opensearch-2',
    'hosts' = 'http://opensearch.io:80',
    'allow-insecure' = 'true',
    'index' = 'order_reviews',
    'format' = 'json'
);
""")

flink.execute_sql(r"""
INSERT INTO opensearch_order_reviews
SELECT
    order_id,
    review_id,
    cast(review_score as integer) as review_score,
    review_comment_title,
    review_comment_message,
    to_timestamp(review_creation_date) as review_creation_date,
    to_timestamp(review_answer_timestamp) as review_answer_timestamp
FROM (
    SELECT
        json_value(payload, '$.order_id') as order_id,
        json_value(payload, '$.review_id') as review_id,
        json_value(payload, '$.review_score') as review_score,
        json_value(payload, '$.review_comment_title') as review_comment_title,
        json_value(payload, '$.review_comment_message') as review_comment_message,
        replace(replace(json_value(payload, '$.review_creation_date'), 'T', ' '), 'Z', '') as review_creation_date,
        replace(replace(json_value(payload, '$.review_answer_timestamp'), 'T', ' '), 'Z', '') as review_answer_timestamp
    FROM kafka_order_reviews
);
""").wait()

## order_payments

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_payments (
    order_id string,
    payment_type string,
    payment_sequential bigint,
    payment_installments bigint,
    payment_value double
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_payments.public.order_payments',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-payments',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql("SELECT * FROM kafka_order_payments LIMIT 10").print()

+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op |                       order_id |                   payment_type |   payment_sequential | payment_installments |                  payment_value |
+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 43d29c6fc78c31c80080d85e526... |                         boleto |                    1 |                    1 |                          19.62 |
| +I | ec7a019261fce44180373d45b44... |                    credit_card |                    1 |                    1 |                          19.62 |
| +I | 8a784d47854e4cbc5562362393d... |                         boleto |                    1 |                    1 |                          18.62 |
| +I | e1fe072ef14b519af1f0a8ed997... |                    credit_card |                

In [None]:
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
flink = TableEnvironment.create(settings)

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS kafka_order_payments (
    order_id string,
    payment_type string,
    payment_sequential bigint,
    payment_installments bigint,
    payment_value double
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_payments.public.order_payments',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-order-payments',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry.kafka:8081'
);
""")

flink.execute_sql(r"""
CREATE TABLE IF NOT EXISTS opensearch_order_payments (
    order_id string,
    payment_type string,
    payment_sequential bigint,
    payment_installments bigint,
    payment_value double
) WITH (
    'connector' = 'opensearch-2',
    'hosts' = 'http://opensearch.io:80',
    'allow-insecure' = 'true',
    'index' = 'order_payments',
    'format' = 'json'
);
""")

flink.execute_sql(r"""
INSERT INTO opensearch_order_payments
SELECT * FROM kafka_order_payments;
""").wait()