In [None]:
import os 
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

flink_lib_path="/opt/third/flink/lib"
jars = []
for file in os.listdir(flink_lib_path):
    if file.endswith('.jar'):
        jars.append(os.path.basename(file))
str_jars = ';'.join(['file://'+flink_lib_path +'/'+ jar for jar in jars])
t_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)

from pyflink.table.catalog import HiveCatalog

# Create a HiveCatalog
catalog_name = "hive"
default_database = "default"
catalog = HiveCatalog(catalog_name, default_database, "/opt/third/hive/conf")
t_env.register_catalog(catalog_name, catalog)
t_env.use_catalog(catalog_name)

t_env.execute_sql("DROP TABLE IF EXISTS random_source_pyflink")
t_env.execute_sql("""
    CREATE TABLE IF NOT EXISTS random_source_pyflink (
        id BIGINT, 
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='8',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='11'
    )
""")

t_env.execute_sql("DROP TABLE IF EXISTS print_sink_pyflink")
t_env.execute_sql("""
    CREATE TABLE IF NOT EXISTS print_sink_pyflink  (
        id BIGINT, 
        data_sum TINYINT 
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("""
    INSERT INTO print_sink_pyflink 
        SELECT id, sum(data) as data_sum FROM 
            (SELECT id / 2 as id, data FROM random_source_pyflink )
        WHERE id > 1
        GROUP BY id
""").wait()