# Week 3 Lab 2: Streaming Queries with Apache Flink

Welcome to Apache Zeppelin. Zeppeling is a web-based multi-purpose notebook that enables interactive data analysis and visualization. It supports multiple languages including Python, Scala, SQL, and more. In Zeppelin there is a concept named interpreters. An interpreter allows any language or data-processing-backend to be plugged into Zeppelin. Interpreters in Zeppelin are essentially components that allow the execution of code written in different languages or for different data processing engines within the same notebook environment. Each interpreter is associated with a specific language or processing engine and is prefixed with a special syntax (e.g., %python, %spark, %sql) to indicate the type of code to be executed. For example, this cell uses the markdown interpreter `%md`. During this lab you will use Apache Flink to process streaming data; in particular, you will use [PyFlink](https://nightlies.apache.org/flink/flink-docs-master/api/python/). PyFlink is the Python API for Apache Flink, allowing to write Flink applications in Python. On the other hand, Apache Flink is an open-source stream processing framework that enables scalable, high-throughput, low-latency data stream and batch processing.


In this notebook you will interact with PyFlink through the interpreter `%flink.pyflink`; you will see this command at the beginning of each of the following cells. First, will import some necessary packages from PyFlink.

In [1]:
%flink.pyflink
import os
import json

from datetime import datetime
from pyflink.common import Row
from pyflink.table.expressions import col, lit
from pyflink.table import (EnvironmentSettings, StreamTableEnvironment, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.window import Slide
from pyflink.table.udf import udf

During this lab you will use the PyFilnk's [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/python/table/intro_to_table_api/). The Table API is a high-level API for batch and stream processing of data that is part of Apache Flink. It provides a SQL-like interface to define and execute data processing pipelines. The Table API allows you to work with structured data in a declarative manner, making it easier to write complex data transformations and aggregations. There is another API in PyFlink named the DataStream API, but it is beyond the scope of this lab. The DataStream API is a low-level API designed for complex event-driven applications and stream processing. If you want to know more about this API, you can always 
check the [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors)


In the Table API there are some important concepts:
* `TableEnvironment` is the central concept of the Table API in PyFlink, it is responsible of manage tables and metadata, configuration, and execution of table operations.
* A `Table` represents a logical set of rows with a schema. Tables are the primary abstraction for performing data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a table source, and how to eventually write data to a table sink.
* The `Expressions` are used to define operations on columns in a Table. 

First, set up the environment for executing table programs in streaming mode with the following lines:

In [3]:
%flink.pyflink
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

 
The first line of the previous cell creates an `EnvironmentSettings` object configured for streaming mode store at the variable `env_settings`. `EnvironmentSetting`s is used to specify the characteristics of the execution environment for Flink’s Table API.

The second line creates a `TableEnvironment` based on the settings provided in the `env_settings` and stores it in `table_env`. Now, you will use the table environment object just created to develop the tables and queries for your streaming dataset. First, you will create a table named `source_stream`. Remember that tables do not contain the data itself, but help you to describe how to read the data from the source streaming. For that, you will create a table with the schema of the dataset that arrives from your input streaming. As an example, the data that you will ingest looks like this:

```json
{
    'order_id': '1f4da8b2-73d0-49d5-9762-3e2e0a3cf004', 
    'order_timestamp': '2024-04-04T15:32:03', 
    'order_date': '2024-04-04', 
    'customer_number': 198, 
    'customer_visit_number': 1,
    'customer_city': 'Makati City', 
    'customer_country': 'Philippines',
    'customer_credit_limit': 60237,
    'device_type': 'desktop', 
    'browser': 'Opera/9.32.(Windows 98; tig-ER) Presto/2.9.179 Version/10.00', 
    'operating_system': 'Android', 
    'product_code': 'S32_1268', 
    'product_line': 'Trucks and Buses',
    'product_unitary_price': 96.31, 
    'quantity': 10, 
    'total_price': 963.1,
    'traffic_source': 'www.hardin-green.com'
}
```

Identify the [data type](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/python/table/python_types/) of each field and see how the table will be created below. Note that you can set the `order_timestamp` field as `TIMESTAMP(0)`; this data type means the timestamp will have zero fractional digits in the seconds field, effectively truncating any millisecond, microsecond, or nanosecond parts. This means it represents a precise date and time down to the second.

In [5]:
%flink.pyflink
table_env.execute_sql("DROP TABLE IF EXISTS source_stream;")

In [6]:
%flink.pyflink

input_table_name = "source_stream"
table_env.execute_sql(f"DROP TABLE IF EXISTS {input_table_name};")

input_stream_name = "de-c3w3lab2-kinesis-input-stream"

region="us-east-1"
source_table_ddl = """
  CREATE TABLE {0} (
    order_id STRING,
    order_timestamp TIMESTAMP(0),
    order_date STRING,
    customer_number INT,
    customer_visit_number INT,
    customer_city STRING,
    customer_country STRING,
    customer_credit_limit INT,
    device_type STRING,
    browser STRING,
    operating_system STRING,
    product_code STRING, 
    product_line STRING,
    product_unitary_price NUMERIC,
    quantity INT, 
    total_price NUMERIC,
    traffic_source STRING,
    WATERMARK FOR order_timestamp AS order_timestamp - INTERVAL '5' MINUTES)
    PARTITIONED BY (order_id)
    WITH (
    'connector' = 'kinesis',
    'stream' = '{1}',
    'aws.region' = '{2}',
    'format' = 'json',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'json.timestamp-format.standard' = 'ISO-8601'
    ) """.format(input_table_name, input_stream_name, region)
table_env.execute_sql(source_table_ddl)

In the previous script you were already given with part of the creation query. You can see the following parts:
* `WATERMARK` command, which is used in stream processing to handle late data and out-of-order events. Watermarks indicate the progress of event time in a stream. 
    * In the Watermark, the `FOR` clause specifies that the watermark is associated with the `order_timestamp` column.
    * Furthermore, the `AS order_timestamp - INTERVAL '5' MINUTES` clause defines that the system expects events to arrive within 5 minutes of their event time. Events arriving later than 5 minutes are considered late.
* `WITH` clause specifies the table's connector and other properties essential to define how Flink connects to the datasource. Between the most relevant ones you can find:
    * `'connector' = 'kinesis'` specifies that the table will read from an AWS Kinesis stream.
    * `'format' = 'json'` specifies that the data format of the stream is JSON.
    * `'scan.stream.initpos' = 'TRIM_HORIZON'`. This specifies the initial position in the stream from which to start reading. `'TRIM_HORIZON`' means it will start reading from the earliest available record in the stream.
    * `'json.timestamp-format.standard' = 'ISO-8601'` specifies that the timestamp format in the JSON data is ISO-8601.

If you want to know more about the connectors, you can check the [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/).




Now, let's create an User Defined Function (UDF) to convert the timestamps into string. This is helpful as a workaround to save timestamps in AWS Kinesis.


In [9]:
%flink.pyflink
@udf(input_types=[DataTypes.TIMESTAMP(3)], result_type=DataTypes.STRING())
def to_string(i):
    return str(i)
table_env.create_temporary_system_function("to_string", to_string)


 
Now, you will create a sliding window table from your `source_stream`. Here you can use the sliding window query that you created when working with the local data sample to get the total number of sales. 
In this case, define the window size to be of 6 minutes while the window slide to be of 3 minutes.

In [11]:
%flink.pyflink
# Sliding window query
input_table = table_env.from_path("source_stream")
sliding_window_table = (
        input_table.window(
            Slide.over(lit(6).minute)
            .every(lit(3).minutes)
            .on(col("order_timestamp"))
            .alias("six_minute_window")
        )
        .group_by(col("six_minute_window"))
        .select(to_string(col("six_minute_window").end).alias("event_time"), col("total_price").sum.alias("total_sales"))
    )

Create a temporary view in your table environment based on the sliding window table that you created.


In [13]:
%flink.pyflink
table_env.create_temporary_view("sliding_window_table", sliding_window_table)

 
In the next cell, you have to define the schema of your sink table, which will be actually one of the output AWS Kinesis data streams that have been created for you. You can set the `event_time` field as a string by simplicity. Use the `de-c3w3lab2-kinesis-total-sales-slide-output-stream` output stream and name your output table as `"output_sliding_sales_stream"`

In [15]:
%flink.pyflink

table_name = "output_sliding_sales_stream"
table_env.execute_sql(f"DROP TABLE IF EXISTS {table_name};")

stream_name = "de-c3w3lab2-kinesis-total-sales-slide-output-stream"

region="us-east-1"
source_table_ddl = """
 CREATE TABLE {0} (
    event_time STRING,
    total_sales NUMERIC)

    WITH (
    'connector' = 'kinesis',
    'stream' = '{1}',
    'aws.region' = '{2}',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    ) """.format(table_name, stream_name, region)
table_env.execute_sql(source_table_ddl)

 
After you have defined the schema in your output table, insert the data from the sliding window table into your sink:

In [17]:
%flink.pyflink
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                     .format("output_sliding_sales_stream", "sliding_window_table"))


 
After you execute the previous cell, you will see in the upper right side of that cell that appeared an option to open a `Flink Job`. Click on it and another tab will be open where you can interact with the Flink GUI. 
This interface shows the state of the Flink jobs and will even allow you to take a look to the data. 

In this lab, you will use the consumer provided to you. Go to VSCode and run the following commands to run the consumer script:

```bash
python3 scripts/consumer/src/consumer.py de-c3w3lab2-kinesis-total-sales-slide-output-stream
```

You should start seeing some processed records with the schema that you just generated and should look similar to this output:

```json
{'event_time': '2024-06-01 01:00:00', 'total_sales': 25385}
```

Great! You have created your first sliding window query over a datastream! The next step for you is to create another sliding window query that allows you to get the number of orders per device type. For that, follow these instructions:
* Create an `input_table` from your `"source_stream"` table.
* From `input_table` create a window with a size of 6 minutes and a slide of 3 minutes on the `order_timestamp` column. Name this column as ``six_minute_window`.
* Then, group by the columns: `six_minute_window` and `device_type`. Remember to use the `col` function to reference columns in the table.
* Then, select the `six_minute_window` column, cast it as string and save it as `event_time`. Select also your `device_type` column and count `order_id`; save it as `orders_count`

Save your query table to `orders_by_device_window_table`.

In [20]:
%flink.pyflink
input_table = table_env.from_path("source_stream")

orders_by_device_window_table = (
    input_table.window(
        Slide.over(lit(6).minute)
        .every(lit(3).minutes)
        .on(col("order_timestamp"))
        .alias("six_minute_window")
    )
    .group_by(col("six_minute_window"), col("device_type"))
    .select(
        to_string(col("six_minute_window").end).alias("event_time"),
        col("device_type"),
        col("order_id").count.alias("orders_count")
    )
)


Create the temporary view from your output table. Name it as `orders_by_device_window_table`

In [22]:
%flink.pyflink
table_env.create_temporary_view("orders_by_device_window_table", orders_by_device_window_table)


Now, create the schema of your output: 
* The table name should be `output_device_sliding_stream`
* Your schema should include the following columns: `event_time` as string, `device_type` also as string, and `orders_count` as a numeric value.
* Remember to set your connector to `kinesis`.
* Your output stream name is `de-c3w3lab2-kinesis-devices-output-stream`.

In [24]:
%flink.pyflink

table_name = "output_device_sliding_stream"
table_env.execute_sql(f"DROP TABLE IF EXISTS {table_name};")

stream_name = "de-c3w3lab2-kinesis-devices-output-stream"

region="us-east-1"
source_table_ddl = """
 CREATE TABLE {0} (
    event_time STRING,
    device_type STRING,
    orders_count NUMERIC)

    WITH (
    'connector' = 'kinesis',
    'stream' = '{1}',
    'aws.region' = '{2}',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    ) """.format(table_name, stream_name, region)
table_env.execute_sql(source_table_ddl)

With your output schema created, start inserting data from your sliding window table into your output table. Remember to open the `Flink job` once it is available and to run your consumer from VSCode terminal with

```bash
python3 scripts/consumer/src/consumer.py  de-c3w3lab2-kinesis-devices-output-stream
```


In [26]:
%flink.pyflink
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                     .format("output_device_sliding_stream", "orders_by_device_window_table"))

In this lab, you explored the capabilities of PyFlink within a Zeppelin notebook to perform advanced stream processing tasks. By creating sliding window and tumbled window queries, you gained hands-on experience with two fundamental types of window operations used in stream processing:
* Sliding Windows: You learned how to create sliding window queries that allow for overlapping time intervals, providing a continuous and detailed view of your data over time. This is useful for real-time monitoring and detecting trends.
* Tumbled Windows: You implemented tumbled window queries, which divide the stream into non-overlapping, fixed-size windows. This is ideal for periodic reporting and aggregating data over uniform intervals.
 
You applied windowing functions to simulate real-time analytics scenarios, such as summarizing data over time windows. By working with event time attributes and defining watermarks, you ensured accurate and timely processing of out-of-order events, which is critical for reliable stream processing.

The ability to process and analyze streaming data in real time is a valuable asset in today's data-driven world, opening up opportunities for innovation and insights in various domains.
