In [1]:
from pyflink.table import DataTypes, CsvTableSource, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment

import datetime
from pyflink.table.expressions import col
from pyflink.table.window import Over, GroupWindow
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
from pyflink.table.udf import udf

env_settings = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_execution_environment=env_settings)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.15.3/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.7/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


In [2]:
# InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
# 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/01/2010 8:26,2.55,17850,United Kingdom
column_names = ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 
                'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country']

column_types = [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(), 
                DataTypes.STRING(), DataTypes.DOUBLE(),DataTypes.STRING(), DataTypes.STRING()]

source = CsvTableSource(
   '/home/training/flink-dev/data/1k-dataset.csv',  
    column_names,
    column_types,
    ignore_first_line=True,
    quote_character='"',
    lenient=True
)

# source is data source -origin data is from
# table is flink api
# sink is target where data goes to after transformation
table_env.register_table_source('invoices', source)

# invoices is a table
# get table from source
invoices = table_env.from_path('invoices')


##############################
print('\nRegistered Tables List')
print(table_env.list_tables())

print('\nFinancial Trxs Schema')
invoices.print_schema()


Registered Tables List
['invoices']

Financial Trxs Schema
(
  `InvoiceNo` STRING,
  `StockCode` STRING,
  `Description` STRING,
  `Quantity` DOUBLE,
  `InvoiceDate` STRING,
  `UnitPrice` DOUBLE,
  `CustomerID` STRING,
  `Country` STRING
)


In [3]:
invoices.fetch(3).execute().print()



+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                      InvoiceNo |                      StockCode |                    Description |                       Quantity |                    InvoiceDate |                      UnitPrice |                     CustomerID |                        Country |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                         536446 |                          22619 |      SET OF 6 SOLDIER SKITTLES |                            8.0 |               12/01/2010 12:15 |                

In [4]:
#         .column_by_expression("rowtime", "CAST(f0 AS TIMESTAMP(3))") \

# Flink understand ANSI SQL date format, UTC format

# datetime in string format for may 25 1999
input = '12/01/2010 8:26'
  
# format
format = '%d/%m/%Y %H:%M'
  
# convert from string format to datetime format
dt = datetime.datetime.strptime(input, format)
  
# get the date from the datetime using date() 
# function
print(dt)

new_format = '%Y-%m-%d %H:%M:%S'   # SQL Format, which flink can understand
dt.strftime(new_format)

2010-01-12 08:26:00


'2010-01-12 08:26:00'

In [5]:
# UDF - User Defined Function, python/scala/java
# UDF code is not optimized by flink, python udf shall run on Python VM ie python runtime
def convertDateFormat2(input):
    # format
    format = '%d/%m/%Y %H:%M'
  
    # convert from string format to datetime format
    dt = datetime.datetime.strptime(input, format)
    new_format = '%Y-%m-%d %H:%M:%S' 
    return dt.strftime(new_format)

# creating a python function as UDF, first parameter is date string, return type is date string
# Since function has input(s)/multiple args with data types, and the function return output with datatype
# the middle one, [DataTypes.STRING()] is for input arg(s), input arg is basically a string type
# last one  DataTypes.STRING() is RETURN type from the function , only one return value
convertDateFormat = udf(convertDateFormat2, [DataTypes.STRING()], DataTypes.STRING())

In [6]:
# convert data time string to SQL date time string format as new column CInvoiceDate
# drop Orignal InvoiceDate which is string 
# Cast CInvoiceDate which is SQL date time stirng to TIMESTAMP(3) 3 means precision and 
# create new Column InvoiceDate which TIMESTAMP type
# drop temp col CInvoiceDate
invoices2 = invoices.add_columns(convertDateFormat(col('InvoiceDate')).alias('CInvoiceDate'))\
                    .drop_columns(col('InvoiceDate'))\
                    .add_columns(col("CInvoiceDate").cast(DataTypes.TIMESTAMP(3)).alias("InvoiceDate"))\
                    .drop_columns(col('CInvoiceDate'))\
                    .add_columns( (col("Quantity") * col("UnitPrice")).alias("Amount"))

invoices2.print_schema()

(
  `InvoiceNo` STRING,
  `StockCode` STRING,
  `Description` STRING,
  `Quantity` DOUBLE,
  `UnitPrice` DOUBLE,
  `CustomerID` STRING,
  `Country` STRING,
  `InvoiceDate` TIMESTAMP(3),
  `Amount` DOUBLE
)


In [7]:
table_env.register_table('invoices2', invoices2)

In [8]:
result = table_env.sql_query("SELECT InvoiceNo, Quantity, UnitPrice, Amount from invoices2")
result.fetch(20).execute().print()

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                      InvoiceNo |                       Quantity |                      UnitPrice |                         Amount |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                         536477 |                           96.0 |                           0.72 |                          69.12 |
| +I |                         536477 |                          144.0 |                           0.72 |             103.67999999999999 |
| +I |                         536477 |                          144.0 |                           0.72 |             103.67999999999999 |
| +I |                         536477 |                           12.0 |                            2.1 |             25.200000000000003 |
| +I |                     

In [9]:
results = invoices2.group_by ( col("InvoiceNo") )\
                   .select( col("InvoiceNo"), 
                            col("Quantity").sum.alias("Qty"),
                            col("Amount").sum.alias("TotalAmount"),
                            col("InvoiceNo").count.alias("UniqueItems") )

results.execute().print()

+----+--------------------------------+--------------------------------+--------------------------------+----------------------+
| op |                      InvoiceNo |                            Qty |                    TotalAmount |          UniqueItems |
+----+--------------------------------+--------------------------------+--------------------------------+----------------------+
| +I |                         536370 |                           24.0 |                           90.0 |                    1 |
| -U |                         536370 |                           24.0 |                           90.0 |                    1 |
| +U |                         536370 |                           48.0 |                          180.0 |                    2 |
| -U |                         536370 |                           48.0 |                          180.0 |                    2 |
| +U |                         536370 |                           60.0 |                         

In [10]:
print(results.explain())

== Abstract Syntax Tree ==
LogicalProject(InvoiceNo=[$0], Qty=[$1], TotalAmount=[$2], UniqueItems=[$3])
+- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[SUM($2)], EXPR$2=[COUNT($0)])
   +- LogicalProject(InvoiceNo=[$0], Quantity=[$3], Amount=[*($3, $5)])
      +- LogicalTableScan(table=[[default_catalog, default_database, invoices, source: [CsvTableSource(read fields: InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)]]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[InvoiceNo], select=[InvoiceNo, SUM(Quantity) AS EXPR$0, SUM(Amount) AS EXPR$1, COUNT(InvoiceNo) AS EXPR$2])
+- Exchange(distribution=[hash[InvoiceNo]])
   +- Calc(select=[InvoiceNo, Quantity, *(Quantity, UnitPrice) AS Amount])
      +- LegacyTableSourceScan(table=[[default_catalog, default_database, invoices, source: [CsvTableSource(read fields: InvoiceNo, Quantity, UnitPrice)]]], fields=[InvoiceNo, Quantity, UnitPrice])

== Optimized Execution Plan ==
GroupAggregat

In [11]:
result = table_env.sql_query("""
SELECT InvoiceNo, sum(Quantity)  as Qty, sum(Amount) as TotalAmount, 
        count(InvoiceNo) as UniqueItems
        
FROM invoices2
GROUP BY InvoiceNo
""")

result.fetch(5).execute().print()
result.print_schema()

# source table
table_env.register_table(  "analytics_results_source", result)

+----+--------------------------------+--------------------------------+--------------------------------+----------------------+
| op |                      InvoiceNo |                            Qty |                    TotalAmount |          UniqueItems |
+----+--------------------------------+--------------------------------+--------------------------------+----------------------+
| +I |                         536369 |                            3.0 |                          17.85 |                    1 |
| +I |                         536371 |                           80.0 |                          204.0 |                    1 |
| +I |                         536378 |                           10.0 |                           19.5 |                    1 |
| -D |                         536378 |                           10.0 |                           19.5 |                    1 |
| +I |                         536378 |                           20.0 |                         

In [None]:
print(result.explain())

In [None]:
# create a sink table that prints the output, later will cover mysql/postgres/kafka
# WE only created table for sinking result, that print on console as we use print connector
table_env.execute_sql("""
CREATE TABLE invoice_temp_sink (
  InvoiceNo STRING,
  Qty DOUBLE,
  TotalAmount DOUBLE,
  UniqueItems  BIGINT NOT NULL
) WITH (
    'connector' = 'print'
)
""")

In [None]:
print('\nRegistered Tables List')
print(table_env.list_tables())

In [None]:
# Create Data Flow Graph merging source [csv], operators [invoices2, groupby/sum] into sink table [invoice_temp_sink]
# now take results from analytics_results_source table and write to invoice_temp_sink
# wait should not be used in remote job, just for debugging purpose
table_env.execute_sql("INSERT INTO invoice_temp_sink SELECT * FROM analytics_results_source").wait()
# +I - Insert
# the result is printed below, because we use connector: print, which prices on console/termianl output

In [None]:
# create a sink table that prints the output, later will cover mysql/postgres/kafka


# blackhole connector doesn't print anything, does nothing with data, but ignore it
table_env.execute_sql("""
CREATE TABLE invoice_temp_sink_blackhole (
  InvoiceNo STRING,
  Qty DOUBLE,
  TotalAmount DOUBLE,
  UniqueItems  BIGINT NOT NULL
) WITH (
    'connector' = 'blackhole'
)
""")

# we have 3 sinks invoice_temp_sink, invoice_temp_sink_mysql, invoice_temp_sink_kafka

In [None]:
# below statement process data, does analytics, result is pushed into blackhole connector, that sucks the result
# , does not print

table_env.execute_sql("INSERT INTO invoice_temp_sink_blackhole SELECT * FROM analytics_results_source").wait()

In [None]:
# use statementsets to load and process analytisc once and write results into mulitple destination
# create             statementsets
statement_set = table_env.create_statement_set()

# statement set uisng python using python api

# Emit the result into sink table

statement_set.add_insert("invoice_temp_sink", result) # result is table from analytics query, print console
#statement_set.add_insert("invoice_temp_sink_mysql", result) # result is table from analytics query, print console
statement_set.add_insert("invoice_temp_sink_blackhole", result) # result is table from analytics query, blackhole, doesn't print

# now execute statement set, this load csv, perform analytics, insert data into all 3 sink tables
statement_set.execute().wait()

In [None]:
print(statement_set.explain())

In [None]:
# Now statement set using Flink SQL
# we can mix flink sql and table api
# create statemetn set

# source table: analytics_results_source
# sink target tables   invoice_temp_sink, invoice_temp_sink_mysql, invoice_temp_sink_kafka
statement_set = table_env.create_statement_set()

statement_set.add_insert_sql("INSERT INTO invoice_temp_sink SELECT * FROM analytics_results_source")
#statement_set.add_insert_sql("INSERT INTO invoice_temp_sink_mysql SELECT * FROM analytics_results_source")
statement_set.add_insert_sql("INSERT INTO invoice_temp_sink_blackhole SELECT * FROM analytics_results_source")

print(statement_set.explain())
# execute the statement set 
statement_set.execute().wait()

In [None]:
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
from pyflink.common import Types
from pyflink.table import (DataTypes, TableDescriptor, Schema)
from pyflink.common import Time


In [None]:
table = table_env.from_path("invoices2")

# Quantity, UnitPrice, Amount
    
table = table.select(col('Quantity'), col('UnitPrice'), col("Amount"), col("InvoiceNo"))

# convert table to datastream and perform datastream api operations
ds = table_env.to_data_stream(table)
ds4 = ds.map(lambda r: r[2] /  r[1], output_type=Types.LONG())

ds4.print()


In [None]:
 table2 = table_env.from_data_stream(
        ds4,
        Schema.new_builder().column("f0", DataTypes.BIGINT()).build())
table2.execute().print()

In [None]:
class Sum(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
        state_ttl_config = StateTtlConfig \
            .new_builder(Time.seconds(1)) \
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
            .disable_cleanup_in_background() \
            .build()
        state_descriptor.enable_time_to_live(state_ttl_config)
        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = 0

        # update the state's count
        current += value[0] * value[1]
        self.state.update(current)

        yield value[0], current

In [None]:
ds2 = ds.key_by(lambda value: value[3]) \
      .process(Sum())


with ds2.execute_and_collect() as results:
    for result in results:
        print(result)
        
# table3 = table_env.from_data_stream(
#         ds2)

# table3.execute().print()
# submit for execution
#table_env.execute()