From 772298b4e1213c1e4736b877212f595a1cf1ead4 Mon Sep 17 00:00:00 2001 From: zhisheng <1041218129@qq.com> Date: Sat, 20 Jun 2020 16:23:31 +0800 Subject: [PATCH] add StreamWindowSQLExample --- .../example/StreamWindowSQLExample.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 flink-learning-sql/flink-learning-sql-blink/src/main/java/com/zhisheng/sql/blink/stream/example/StreamWindowSQLExample.java diff --git a/flink-learning-sql/flink-learning-sql-blink/src/main/java/com/zhisheng/sql/blink/stream/example/StreamWindowSQLExample.java b/flink-learning-sql/flink-learning-sql-blink/src/main/java/com/zhisheng/sql/blink/stream/example/StreamWindowSQLExample.java new file mode 100644 index 00000000..3a4d0e3f --- /dev/null +++ b/flink-learning-sql/flink-learning-sql-blink/src/main/java/com/zhisheng/sql/blink/stream/example/StreamWindowSQLExample.java @@ -0,0 +1,87 @@ +package com.zhisheng.sql.blink.stream.example; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import java.io.File; +import java.io.IOException; + +/** + * Simple example for demonstrating the use of SQL in Java. + * + *

Usage: {@code ./bin/flink run ./examples/table/StreamWindowSQLExample.jar} + * + *

This example shows how to: + * - Register a table via DDL + * - Declare an event time attribute in the DDL + * - Run a streaming window aggregate on the registered table + */ +public class StreamWindowSQLExample { + + public static void main(String[] args) throws Exception { + // set up execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, blinkStreamSettings); + + // write source data into temporary file and get the absolute path + String contents = "1,beer,3,2019-12-12 00:00:01\n" + + "1,diaper,4,2019-12-12 00:00:02\n" + + "2,pen,3,2019-12-12 00:00:04\n" + + "2,rubber,3,2019-12-12 00:00:06\n" + + "3,rubber,2,2019-12-12 00:00:05\n" + + "4,beer,1,2019-12-12 00:00:08"; + String path = createTempFile(contents); + + // register table via DDL with watermark, + // the events are out of order, hence, we use 3 seconds to wait the late events + String ddl = "CREATE TABLE orders (\n" + + " user_id INT,\n" + + " product STRING,\n" + + " amount INT,\n" + + " ts TIMESTAMP(3),\n" + + " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n" + + ") WITH (\n" + + " 'connector.type' = 'filesystem',\n" + + " 'connector.path' = '" + path + "',\n" + + " 'format.type' = 'csv'\n" + + ")"; + tEnv.sqlUpdate(ddl); + + // run a SQL query on the table and retrieve the result as a new Table + String query = "SELECT\n" + + " CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS STRING) window_start,\n" + + " COUNT(*) order_num,\n" + + " SUM(amount) total_amount,\n" + + " COUNT(DISTINCT product) unique_products\n" + + "FROM orders\n" + + "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"; + Table result = tEnv.sqlQuery(query); + tEnv.toAppendStream(result, Row.class).print(); + + // after the table program is converted to DataStream program, + // we must use `env.execute()` to submit the job. + env.execute("Streaming Window SQL Job"); + + // should output: + // 2019-12-12 00:00:00.000,3,10,3 + // 2019-12-12 00:00:05.000,3,6,2 + } + + /** + * Creates a temporary file with the contents and returns the absolute path. + */ + private static String createTempFile(String contents) throws IOException { + File tempFile = File.createTempFile("orders", ".csv"); + tempFile.deleteOnExit(); + FileUtils.writeFileUtf8(tempFile, contents); + return tempFile.toURI().toString(); + } +}