In [1]:
import os 
base_path = os.getcwd()
print(base_path)


/home/tim/Documents/work/apache_flink


In [2]:
from pyflink.common import Configuration
import matplotlib.pyplot as plt
from pyflink.table.expressions import lit, col
from pyflink.table.window import Slide
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, TableEnvironment, TableDescriptor, \
    FormatDescriptor, Schema
from pyflink.table.udf import udf

configuration = Configuration()
configuration.set_integer("table.exec.resource.default-parallelism", 1)

t_env = TableEnvironment.create(
    EnvironmentSettings.new_instance().in_streaming_mode().with_configuration(configuration).build())
table_config = t_env.get_config()

# my bid in large 
# UINT64 timestamp INT32 auction INT32 bidder INT64 datetime FLOAT64 pric
bid_table = t_env.create_temporary_table(
    'bid',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('ts', DataTypes.BIGINT())
            .column('auctionId', DataTypes.INT())
            .column('bidder', DataTypes.INT())
            .column('datetime', DataTypes.INT())
            .column('price', DataTypes.DOUBLE())
            .column_by_expression('dateTime', "TO_TIMESTAMP_LTZ(ts, 3)")
            .watermark('dateTime', "dateTime")
            .build())
    .option('path', f'{base_path}/data/nexmark/small/bid_small.csv')
    .format('csv')
    .option('csv.ignore-parse-errors', 'true')
    .build())

# my auction  instance in large 
# UINT64 timestamp INT32 id INT32 initialbid INT32 reserve INT64 expires INT32 seller INT32 category
auction_table = t_env.create_temporary_table(
    'auction',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('ts', DataTypes.BIGINT())
            .column('id', DataTypes.INT())
            .column('initialbid', DataTypes.INT())
            .column('reserve', DataTypes.INT())
            .column('expires', DataTypes.INT())
            .column('seller', DataTypes.INT())
            .column('category', DataTypes.INT())
            .column_by_expression('dateTime', "TO_TIMESTAMP_LTZ(ts, 3)")
            .watermark('dateTime', "dateTime")
            .build())
    .option('path', f'{base_path}/data/nexmark/small/auction_modified_small.csv')
    .format('csv')
    .build())

test_table = t_env.create_temporary_table(
    'sink_q0',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('ts', DataTypes.BIGINT())
            .column('auctionId', DataTypes.INT())
            .column('bidder', DataTypes.INT())
            .column('datetime', DataTypes.INT())
            .column('price', DataTypes.DOUBLE())
            .column('dateTime', DataTypes.TIMESTAMP(3))
            #.column_by_expression('dateTime', "to_timestamp(from_unixtime(datetime_raw))")
            #.column_by_expression('dateTime', "TO_TIMESTAMP_LTZ(ts, 3)")
            .watermark('dateTime', "dateTime")
            .build())
    .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q0.csv')
    .format('csv')
    .build()
)

t_env.create_temporary_table(
    'sink_q1',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('price', DataTypes.DOUBLE())
            .build())
    .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q1.csv')
    .format('csv')
    .build()
)

t_env.create_temporary_table(
    'sink_q2',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('ts', DataTypes.BIGINT())
            .column('auctionId', DataTypes.INT())
            .column('bidder', DataTypes.INT())
            .column('datetime', DataTypes.INT())
            .column('price', DataTypes.DOUBLE())
            .column('dateTime', DataTypes.TIMESTAMP(3))
            #.column_by_expression('dateTime', "to_timestamp(from_unixtime(datetime_raw))")
            #.column_by_expression('dateTime', "TO_TIMESTAMP_LTZ(ts, 3)")
            .watermark('dateTime', "dateTime")
            .build())
    .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q2.csv')
    .format('csv')
    .build()
)

t_env.create_temporary_table(
    'sink_q5',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('auction', DataTypes.INT())
            .column('num', DataTypes.BIGINT())
            .build())
    .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q5.csv')
    .format(FormatDescriptor.for_format('csv')
            .build())
    .build()
)

t_env.create_temporary_table(
    'sink_q7',
    TableDescriptor.for_connector('filesystem')
    .schema(
        Schema.new_builder()
            .column('start', DataTypes.BIGINT())
            .column('end', DataTypes.BIGINT())
            .column('timestamp', DataTypes.BIGINT())
            .column('auctionId', DataTypes.BIGINT())
            .column('bidder', DataTypes.BIGINT())
            .column('dateTime', DataTypes.TIMESTAMP(3))
            .column('price', DataTypes.DECIMAL(10,2))
            .column('timestamp1', DataTypes.BIGINT())
            .column('auction1', DataTypes.BIGINT())
            .column('seller', DataTypes.BIGINT())
            .column('initialbid', DataTypes.DECIMAL(10,2))
            .column('reserve', DataTypes.DECIMAL(10,2))
            .column('expires', DataTypes.BIGINT())
            .column('category', DataTypes.BIGINT())
            .build()
    )
    .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q7.csv')
    .format(FormatDescriptor.for_format('csv').build())
    .build()
)

t_env.create_temporary_table(
    'sink_q8',
    TableDescriptor.for_connector('filesystem')
    .schema(Schema.new_builder()
            .column('start', DataTypes.BIGINT())
            .column('end', DataTypes.BIGINT())
            .column('timestamp', DataTypes.BIGINT())
            .column('auction', DataTypes.INT())
            .column('bidder', DataTypes.INT())
            .column('dateTime', DataTypes.INT())
            .column('price', DataTypes.DOUBLE())
            .column('timestamp1', DataTypes.BIGINT())
            .column('auction1', DataTypes.INT())
            .column('initialbid', DataTypes.INT())
            .column('reserve', DataTypes.INT())
            .column('expires', DataTypes.INT())
            .column('seller', DataTypes.INT())
            .column('category', DataTypes.INT())
            .build())
        .option('path', f'{base_path}/data/nexmark/small/outputs/sink_q8.csv')
    .format(FormatDescriptor.for_format('csv')
            .build())
    .build())


## Query 0

In [3]:
t_env.execute_sql("""
INSERT INTO sink_q0
SELECT * FROM bid;
""").wait()

## Query 1

In [4]:
t_env.execute_sql("""
INSERT INTO sink_q1
SELECT price * 89 / 100 AS price FROM bid;
""").wait()

## Query 2

In [5]:
t_env.execute_sql("""
INSERT INTO sink_q2
SELECT * FROM bid WHERE auctionId % 123 = 0;
""").wait()

## Query 5

In [6]:
t_env.execute_sql("""
    INSERT INTO sink_q5
    SELECT AuctionBids.auctionId, AuctionBids.num
     FROM (
       SELECT
         auctionId,
         count(*) AS num,
         window_start AS starttime,
         window_end AS endtime
         FROM TABLE(
                 HOP(TABLE bid, DESCRIPTOR(dateTime), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
         GROUP BY auctionId, window_start, window_end
     ) AS AuctionBids
     JOIN (
       SELECT
         max(CountBids.num) AS maxn,
         CountBids.starttime,
         CountBids.endtime
       FROM (
         SELECT
           count(*) AS num,
           window_start AS starttime,
           window_end AS endtime
         FROM TABLE(
                    HOP(TABLE bid, DESCRIPTOR(dateTime), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
         GROUP BY auctionId, window_start, window_end
         ) AS CountBids
       GROUP BY CountBids.starttime, CountBids.endtime
     ) AS MaxBids
     ON AuctionBids.starttime = MaxBids.starttime AND
        AuctionBids.endtime = MaxBids.endtime AND
        AuctionBids.num >= MaxBids.maxn;
""").wait()

## Query 7

In [7]:
t_env.execute_sql("""
INSERT INTO sink_q7                  
SELECT 
    1000 * UNIX_TIMESTAMP(CAST(B.starttime AS STRING)) + EXTRACT(MILLISECOND FROM B.starttime) AS `start`,
    1000 * UNIX_TIMESTAMP(CAST(B.endtime AS STRING)) + EXTRACT(MILLISECOND FROM B.endtime) AS `end`,
    1000 * UNIX_TIMESTAMP(CAST(B.dateTime AS STRING)) + EXTRACT(MILLISECOND FROM B.dateTime) AS `timestamp`,
    B.auctionId, B.bidder, B.dateTime, B.price, 
    1000 * UNIX_TIMESTAMP(CAST(A.dateTime AS STRING)) + EXTRACT(MILLISECOND FROM A.dateTime) AS `timestamp1`,
    A.id AS auction1, A.seller, A.initialbid, A.reserve, A.expires, A.category
FROM (
    SELECT auctionId, bidder, dateTime, price, 
           window_start AS starttime,
           window_end AS endtime
    FROM TABLE(
        TUMBLE(TABLE bid, DESCRIPTOR(dateTime), INTERVAL '12' HOUR)
    )
    GROUP BY auctionId, bidder, dateTime, price, window_start, window_end
) B
JOIN (
    SELECT id, seller, initialbid, reserve, expires, category, dateTime,
           window_start AS starttime,
           window_end AS endtime
    FROM TABLE(
        TUMBLE(TABLE auction, DESCRIPTOR(dateTime), INTERVAL '12' HOUR)
    )
    GROUP BY id, initialbid, reserve, expires, dateTime, seller, category, window_start, window_end
) A
ON B.auctionId = A.id AND B.starttime = A.starttime AND B.endtime = A.endtime;
""").wait()

## Query 8

In [8]:
t_env.execute_sql("""
INSERT INTO sink_q8
SELECT
         1000 * UNIX_TIMESTAMP(CAST(B.starttime AS STRING)) + EXTRACT(MILLISECOND FROM B.starttime) as `start`,
         1000 * UNIX_TIMESTAMP(CAST(B.endtime AS STRING)) + EXTRACT(MILLISECOND FROM B.endtime) as `end`,
         1000 * UNIX_TIMESTAMP(CAST(B.dateTime AS STRING)) + EXTRACT(MILLISECOND FROM B.dateTime) as `timestamp`,
         B.auctionId, B.bidder, B.`datetime` as dateTime, B.price / 0, 1000 * UNIX_TIMESTAMP(CAST(A.dateTime AS STRING)) + EXTRACT(MILLISECOND FROM A.dateTime) as `timestamp1`,
         A.id as auction1, A.initialbid / 1 - B.bidder, A.reserve, A.expires, A.seller, A.category    
FROM (
  SELECT auctionId, bidder, dateTime, price, `datetime`,
        window_start AS starttime,
        window_end AS endtime
  FROM TABLE(
            TUMBLE(TABLE bid, DESCRIPTOR(dateTime), INTERVAL '100' SECOND(3)))
  GROUP BY auctionId, bidder, dateTime, price, window_start, window_end, datetime
) B
JOIN (
  SELECT id, seller, initialbid, reserve, expires, category, dateTime,
        window_start AS starttime,
        window_end AS endtime
  FROM TABLE(
        TUMBLE(TABLE auction, DESCRIPTOR(dateTime), INTERVAL '100' SECOND(3)))
  GROUP BY id, initialbid, reserve, expires, dateTime, seller, category, window_start, window_end
) A
ON B.auctionId = A.id AND B.starttime = A.starttime AND B.endtime = A.endtime;
""").wait()