# APACHE FLINK PYTHON TABLE API

This experience is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API.

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 
You can follow this <a href="https://flink.apache.org/"> link </a> for more details.


Apache Flink supports different runtime execution modes from which you can choose depending on the requirements of your use case and the characteristics of your job.

One of these modes is the _streaming mode_, should be used for unbounded jobs that require continuous incremental processing and are expected to stay online indefinitely.

The other one is _batch mode_, executes jobs in a way that is more reminiscent of batch processing frameworks such as MapReduce. This should be used for bounded jobs for which you have a known fixed input and which do not run continuously.

<img src="img/flink_batch_streaming.png" >

**Pipeline Components:**

* Apache Flink (Open Source)

# Basic Pyflink

TableEnvironment is a central concept of the Table API and SQL integration.

Create Table from element:

In [None]:
from pyflink.table import TableEnvironment, EnvironmentSettings

# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1,'Hi'),(2, 'Hello')])
table.to_pandas()

Define column name

In [None]:
table = table_env.from_elements([(1,'Hi'),(2, 'Hello')], ['id','data'])
table.to_pandas()

Auto Schema

In [None]:
table_without_schema = table_env.from_elements([(1,'Hi'),(2, 'Hello')], ['id','data'])

#by default the type of the "id" column is 64 bit int

default_type = table_without_schema.to_pandas()["id"].dtype
print('By default the type of the "id" column is %s.' % default_type)

Manuel Schema

In [None]:
from pyflink.table import DataTypes

table=table_env.from_elements([(1,'Hi'),(2, 'Hello')],
                             DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
                                           DataTypes.FIELD("data", DataTypes.STRING())]))

# now the type of the "id" column is 8 bit int

type = table.to_pandas()["id"].dtype
print('Now the type of the "id" column is %s.' % type)

# Operations

In [None]:
#using batch table environment to execute the queries

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30),('Jack', 'FRANCE', 20)], 
                                 ['name', 'country','revenue'])

In [None]:
orders.to_pandas()

In [None]:
# compute revenue for all customers from France

revenue = orders \
        .select(orders.name, orders.country, orders.revenue) \
        .where(orders.country == 'FRANCE') \
        .group_by(orders.name) \
        .select(orders.name, orders.revenue.sum.alias('rev_sum'))

revenue.to_pandas()

UDF, User Define Function Row-Based Operations

In [None]:
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd

# using batch table environment to execute the queries

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30),('Jack', 'FRANCE', 20)], 
                                 ['name', 'country','revenue'])

#User define function
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
                  result_type= DataTypes.ROW(
                              [DataTypes.FIELD("name", DataTypes.STRING()),
                               DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
                  func_type="pandas")

orders.map(map_function).alias('name','revenue').to_pandas()

# SQL

In [None]:
source = table_env.from_elements([(1,"Hi", "Hello"),(2, "Hello", "Hello")], ["a","b","c"])

# Get TableResult
res = table_env.execute_sql("select a + 1, b, c from %s" % source)

#Travelsal result
with res.collect() as results:
    for result in results:
        print(result)

In [None]:
env_settings= EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

#create a sql source table
table_env.execute_sql("""
    CREATE TABLE sql_source (
        id BIGINT,
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '4',
        'fields.data.kind' = 'sequence',
        'fields.data.start' = '4',
        'fields.data.end' = '7'
    )
""")

#convert the sql table to Table API table
table = table_env.from_path("sql_source")

#or create the table from a sql query
table = table_env.sql_query(" SELECT * FROM sql_source")

#emit the table
table.to_pandas()

# Tables and Explain and Lazy Operations

In [None]:
from pyflink.table import TableEnvironment, EnvironmentSettings

env_settings= EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table1 = table_env.from_elements([(1,'Hi'),(2, 'Hello')], ['id','data'])
table2 = table_env.from_elements([(1,'Hi'),(2, 'Hello')], ['id','data'])

table = table1 \
    .where(table1.data.like('H%')) \
    .union_all(table2)

print(table.explain())

In [None]:
# Get result
table.to_pandas() 

Convert Pandas DataFrame to PyFlink Table

In [None]:
import pandas as pd
import numpy as np

env_settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(env_settings)

#Create a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000,2))

#Create a PyFlink Table from a Pandas DataFrame
table = t_env.from_pandas(pdf)
table.to_pandas()

In [None]:
#Create a PyFlink Table from a Pandas DataFrame with the specified column names
table = t_env.from_pandas(pdf, ['f0','f1'])
table.to_pandas()

In [None]:
#Create a PyFlink Table from a Pandas DataFrame with the specified column types
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
table.to_pandas()

In [None]:
#Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
                         DataTypes.ROW([DataTypes.FIELD("f0",DataTypes.DOUBLE()),
                                       DataTypes.FIELD("f1", DataTypes.DOUBLE())]))

table.to_pandas()

# Source and Sink

In [None]:
from pyflink.table import TableEnvironment, EnvironmentSettings

#use a stream TableEnvironment to execute the queries

env_settings= EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT,
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '8',
        'fields.data.kind' = 'sequence',
        'fields.data.start' = '4',
        'fields.data.end' = '11'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT,
        data_sum TINYINT
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT id,sum(data) as data_sum FROM
            (SELECT id / 2 as id, data FROM random_source)
        WHERE id > 1
        GROUP BY id
""").wait()