In [1]:
import os
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

# Source and Sink FileSystem StreamTable Environment 

In [32]:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)

# use blink table planner
st_env = StreamTableEnvironment \
    .create(s_env, environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())


# FileSystem source
source_ddl = """CREATE TABLE MySourceTable (word varchar) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/opt/flink/notebooks/data/word_count_input')
"""

# FileSystem sink
sink_ddl = """CREATE TABLE MySinkTable (
    word varchar,
    cnt bigint) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/opt/flink/notebooks/data/word_count_output1')
"""

# Mysql sink
sinkdb_ddl = """CREATE TABLE MySinkDbTable (
    word varchar,
    cnt bigint) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://mysql:3306/test',
        'connector.table' = 'word_count',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.write.flush.interval' = '10',
        'connector.username' = 'root',
        'connector.password' = 'my-secret-pw')
"""

sinkdbsms_ddl = """CREATE TABLE MySinkDbSmsTable (
    smstext varchar,
    smstype varchar) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://mysql:3306/test',
        'connector.table' = 'sms',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.write.flush.interval' = '10',
        'connector.username' = 'root',
        'connector.password' = 'my-secret-pw')
"""

# Kafka source
source_kafka_ddl = """CREATE TABLE MySourceKafkaTable (word varchar) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.bootstrap.servers' = 'kafka:9092',
    'connector.properties.group.id' = 'test',
    'format.type' = 'csv'
        )
"""

st_env.execute_sql(source_kafka_ddl)
st_env.execute_sql(source_ddl)
st_env.execute_sql(sink_ddl)
st_env.execute_sql(sinkdb_ddl)
st_env.execute_sql(sinkdbsms_ddl)

<pyflink.table.table_result.TableResult at 0x7f9d13b16080>

# Define UDF function using PyFlink

Let's use the mlflow model to have abstraction over multiple frameworks 

In [34]:
from pyflink.table.expressions import call 
from pyflink.table.udf import ScalarFunction

class SpamClassifier(ScalarFunction):
    def __init__(self, model_id):
        import mlflow.sklearn
        self.model = mlflow.sklearn.load_model(f"/mlflow/mlruns/2/{model_id}/artifacts/model")

    def eval(self, s):
        res=self.model.predict([s])
        return res[0]
        #return s

spam_classifier = udf(SpamClassifier("64a89b0a6b7346498316bfae4c298535"), input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

st_env.register_function("SPAM_CLASSIFIER",spam_classifier)

# Define pipeline using SQL

In [7]:
st_env.execute_sql("INSERT INTO MySinkTable SELECT word as smstext, SPAM_CLASSIFIER(word) as smstype FROM MySourceKafkaTable")

<pyflink.table.table_result.TableResult at 0x7f05fb4c36a0>

In [35]:
st_env.execute_sql("INSERT INTO MySinkDbSmsTable SELECT word as smstext, SPAM_CLASSIFIER(word) as smstype FROM MySourceKafkaTable")

<pyflink.table.table_result.TableResult at 0x7f9dae2e9470>

# Define pipeline using PyFlink

In [None]:
st_env.from_path('MySourceKafkaTable') \
    .select('smstext, SPAM_CLASSIFIER(word)') \
    .insert_into('MySinkDbTable')

st_env.execute("5-word_count-mysql")

# Drop Sources and Sink definitions 

In [None]:
st_env.execute_sql("DROP TABLE MySourceTable")
st_env.execute_sql("DROP TABLE MySinkTable")
st_env.execute_sql("DROP TABLE MySinkDbTable")
st_env.execute_sql("DROP TABLE MySourceKafkaTable")