In [1]:
from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

In [2]:
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

<pyflink.common.configuration.Configuration at 0x7fa9def1c290>

In [3]:
# define the source
table = t_env.from_elements(
    elements=[
        (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
        (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
        (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
        (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
    ],
    schema=['id', 'data'])

In [4]:
# define the sink
t_env.execute_sql("""
        CREATE TABLE sink (
            id BIGINT,
            data STRING
        ) with (
            'connector' = 'print'
        )
    """)

<pyflink.table.table_result.TableResult at 0x7fa9e1795e10>

In [5]:
from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes)
from pyflink.table.udf import udf

# update json columns
@udf(result_type=DataTypes.STRING())
def update_tel(data):
    json_data = json.loads(data)
    json_data['tel'] += 1
    return json.dumps(json_data)

In [6]:
table = table.select(table.id, update_tel(table.data))

In [7]:
table.execute_insert('sink').wait()
# remove .wait if submitting to a remote cluster, refer to
# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
# for more details

# 在控制台打印

In [8]:
# define the source with watermark definition
t_env.execute_sql("""
        CREATE TABLE source (
            ts TIMESTAMP_LTZ(3),
            name STRING,
            price FLOAT,
            watermark FOR ts as ts - INTERVAL '3' SECOND
        ) with (
            'connector' = 'datagen',
            'number-of-rows' = '10'
        )
    """)

<pyflink.table.table_result.TableResult at 0x7fa9e2329fd0>

In [9]:
# define the sink
t_env.execute_sql("""
        CREATE TABLE sink_price (
            name STRING,
            total_price FLOAT,
            w_start TIMESTAMP(3),
            w_end TIMESTAMP(3)
        ) with (
            'connector' = 'print'
        )
    """)

<pyflink.table.table_result.TableResult at 0x7fa9e27463d0>

In [10]:
table = t_env.from_path("source")

In [11]:
from pyflink.table.udf import udaf
from pyflink.table.window import Tumble

In [12]:
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
    return v.mean()

In [13]:
# define the tumble window operation
table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w")) \
             .group_by(table.name, col('w')) \
             .select(table.name, mean_udaf(table.price), col("w").start, col("w").end)

In [14]:
# submit for execution
table.execute_insert('sink_price') \
     .wait()

  return pa.RecordBatch.from_arrays(arrays, schema)
