In [1]:
import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

In [2]:
#This serves as the main entry point for interacting with the Flink runtime. It can be used for setting execution parameters such as 
#restart strategy, default parallelism, etc. The table config allows setting Table API specific configurations.

In [14]:
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

<pyflink.table.table_config.TableConfig at 0x167ce4990>

In [15]:
t_env.create_temporary_table(
    'source',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column("word",DataTypes.STRING())
            .build())
    .option('path','input.csv')
    .format('csv')
        .build())
tab = t_env.from_path('source')

In [16]:
t_env.create_temporary_table(
    'sink',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .column('count', DataTypes.BIGINT())
                .build())
        .option('path', 'output')
        .format(FormatDescriptor.for_format('canal-json')
                .build())
        .build())

In [17]:
my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("input.csv")

In [18]:
my_sink_ddl = """
    create table sink (
        word STRING,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'canal-json',
        'path' = '{}'
    )
""".format("output")

In [19]:
t_env.execute_sql(my_source_ddl)

<pyflink.table.table_result.TableResult at 0x1685bf490>

In [20]:
t_env.execute_sql(my_sink_ddl)

<pyflink.table.table_result.TableResult at 0x167ce3f50>

In [21]:
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
    for s in line[0].split():
        yield Row(s)

In [22]:
tab.flat_map(split).alias('word') \
   .group_by(col('word')) \
   .select(col('word'), lit(1).count) \
   .execute_insert('sink') \
   .wait()