In [None]:
from pyflink.table import EnvironmentSettings, TableEnvironment

In [13]:
# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

In [None]:

table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table1.execute().print()


+----------------------+--------------------------------+
|                   _1 |                             _2 |
+----------------------+--------------------------------+
|                    1 |                             Hi |
|                    2 |                          Hello |
+----------------------+--------------------------------+
2 rows in set


In [None]:
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table2.execute().print()

+----------------------+--------------------------------+
|                   id |                           data |
+----------------------+--------------------------------+
|                    1 |                             Hi |
|                    2 |                          Hello |
+----------------------+--------------------------------+
2 rows in set


In [None]:
table3 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# by default, the type of the "id" column is BIGINT
print('By default the type of the "id" column is %s.' % table3.get_schema().get_field_data_type("id"))

from pyflink.table import DataTypes
table4 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                                DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
                                               DataTypes.FIELD("data", DataTypes.STRING())]))
# now the type of the "id" column is set as TINYINT
print('Now the type of the "id" column is %s.' % table4.get_schema().get_field_data_type("id"))


By default the type of the "id" column is BIGINT.
Now the type of the "id" column is TINYINT.


In [None]:
table4.get_schema()

root
 |-- id: TINYINT
 |-- data: STRING

# Create using DDL statements # 

In [None]:
# create a stream TableEnvironment
stream_env_settings = EnvironmentSettings.in_streaming_mode()
stream_table_env = TableEnvironment.create(stream_env_settings)

In [None]:
stream_table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT 
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='3',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='6'
    )
""")
stream_table = stream_table_env.from_path("random_source")
stream_table.execute().print()


+----+----------------------+--------+
| op |                   id |   data |
+----+----------------------+--------+
| +I |                    1 |      4 |
| +I |                    2 |      5 |
| +I |                    3 |      6 |
+----+----------------------+--------+
3 rows in set


In [9]:
from pyflink.table import TableDescriptor, Schema, DataTypes

In [10]:
stream_table_env.create_temporary_table(
    'random_source',
    TableDescriptor.for_connector('datagen')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.TINYINT())
                .build())
        .option('fields.id.kind', 'sequence')
        .option('fields.id.start', '1')
        .option('fields.id.end', '3')
        .option('fields.data.kind', 'sequence')
        .option('fields.data.start', '4')
        .option('fields.data.end', '6')
        .build())

stream_table2 = stream_table_env.from_path("random_source")
stream_table2.execute().print()


+----+----------------------+--------+
| op |                   id |   data |
+----+----------------------+--------+
| +I |                    1 |      4 |
| +I |                    2 |      5 |
| +I |                    3 |      6 |
+----+----------------------+--------+
3 rows in set


In [14]:
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', table)

# create Table API table from catalog
new_table = table_env.from_path('source_table')
new_table.execute().print()


+----------------------+--------------------------------+
|                   id |                           data |
+----------------------+--------------------------------+
|                    1 |                             Hi |
|                    2 |                          Hello |
+----------------------+--------------------------------+
2 rows in set


In [18]:
# stream_table = stream_table_env.from_elements([(2, 'Hi'), (3, 'Hello')], ['id', 'data'])
# stream_table_env.create_temporary_view('source_table_stream', stream_table)

# create Table API table from catalog
new_stream_table = stream_table_env.from_path('source_table_stream')
new_stream_table.execute().print()


+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                    2 |                             Hi |
| +I |                    3 |                          Hello |
+----+----------------------+--------------------------------+
2 rows in set


In [None]:
from pyflink.table.expressions import call, col

# using batch table environment to execute the queries
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
                                 ['name', 'country', 'revenue'])

# compute revenue for all customers from France
revenue = orders \
    .select(col("name"), col("country"), col("revenue")) \
    .where(col("country") == 'FRANCE') \
    .group_by(col("name")) \
    .select(col("name"), call("sum", col("revenue")).alias('rev_sum'))

revenue.execute().print()


+--------------------------------+----------------------+
|                           name |              rev_sum |
+--------------------------------+----------------------+
|                           Jack |                   30 |
+--------------------------------+----------------------+
1 row in set


In [28]:
revenue2 = orders \
    .group_by(col("name")) \
    .select(col("name"), call("count", col("revenue")).alias('count'))

revenue2.execute().print()

+--------------------------------+----------------------+
|                           name |                count |
+--------------------------------+----------------------+
|                           Jack |                    2 |
|                           Rose |                    1 |
+--------------------------------+----------------------+
2 rows in set


In [31]:
stream_table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='8',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='11'
    )
""")

stream_table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT, 
        data_sum TINYINT 
    ) WITH (
        'connector' = 'print'
    )
""")

stream_table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT id, sum(data) as data_sum FROM 
            (SELECT id / 2 as id, data FROM random_source)
        WHERE id > 1
        GROUP BY id
""").wait()


18> +I[3, 9]
15> +I[2, 7]
15> -U[2, 7]
18> -U[3, 9]
15> +U[2, 15]
18> +U[3, 19]
4> +I[4, 11]


In [64]:
table_env.drop_temporary_table('orders2')

False

In [72]:
table_env.drop_temporary_view('orders2') 

True

In [71]:
# Liệt kê tất cả table/view khả dụng
print(table_env.list_tables())

# Liệt kê tất cả temporary table/view
print(table_env.list_temporary_tables())

['UnnamedTable$0', 'orders', 'orders2', 'print_sink', 'source_table', 'table_api_table', 'table_sink']
['UnnamedTable$0', 'orders', 'orders2', 'source_table', 'table_api_table']


In [43]:
table_env.execute_sql("DROP TABLE IF EXISTS print_sink")

table_env.execute_sql("""
    CREATE TABLE print_sink (
        name STRING, 
        count_abc BIGINT 
    ) WITH (
        'connector' = 'print'
    )
""")


<pyflink.table.table_result.TableResult at 0x736820167550>

In [49]:
# SELECT query, không INSERT
table = table_env.sql_query("""
    SELECT name, count(1) as count_abc 
    FROM orders2
    GROUP BY name
""")

# Chuyển kết quả sang Pandas
df = table.to_pandas()
print(df)


   name  count_abc
0  Jack          2
1  Rose          1


In [52]:
table_env.execute_sql("""
    CREATE TABLE table_sink (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")

# convert the Table API table to a SQL view
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)

# emit the Table API table
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()


1> +I[1, Hi]
1> +I[2, Hello]
