In [5]:
import time
from typing import TYPE_CHECKING
from resinkit.flink_operation import ResultsFetchOpts
from flink_gateway_api import Client
import logging

from resinkit.flink_session import FlinkSession

# Configure the root logger
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logging.getLogger("httpcore.http11").setLevel(logging.INFO)
logging.getLogger('httpcore.connection').setLevel(logging.INFO)

import nest_asyncio
nest_asyncio.apply()  # Allow nested event loops
fg_client = Client(base_url="http://localhost:8083", raise_on_unexpected_status=True)

if TYPE_CHECKING:
    from resinkit.flink_operation import FlinkOperation

_ts = int(time.time() * 1000)

sql_text = f'''
-- Create Paimon catalog

DROP CATALOG IF EXISTS my_catalog;

CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/tmp/paimon_{_ts}'
);

USE CATALOG my_catalog;

DROP TABLE IF EXISTS MyTable;

-- Create table with Paimon-specific options
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
    'bucket' = '4',
    'changelog-producer' = 'input'
);

INSERT INTO MyTable (user_id, item_id, behavior, dt) VALUES
    (1001, 5001, 'view', '2024-01-01'),
    (1001, 5002, 'like', '2024-01-01'),
    (1002, 5001, 'cart', '2024-01-01'),
    (1003, 5003, 'purchase', '2024-01-01'),
    (1001, 5004, 'view', '2024-01-02'),
    (1002, 5002, 'view', '2024-01-02'),
    (1002, 5002, 'like', '2024-01-02'),
    (1003, 5001, 'cart', '2024-01-02'),
    (1004, 5005, 'view', '2024-01-02'),
    (1001, 5003, 'purchase', '2024-01-03'),
    (1002, 5004, 'view', '2024-01-03'),
    (1003, 5002, 'like', '2024-01-03'),
    (1004, 5001, 'cart', '2024-01-03'),
    (1005, 5005, 'purchase', '2024-01-03'),
    (1001, 5001, 'view', '2024-01-04');

-- For batch query
SET 'execution.runtime-mode' = 'batch';
-- Remove the invalid orders table query
-- Instead query the created table
SELECT * FROM MyTable;
'''
dfs = []
with FlinkSession(fg_client) as session:
    for s in sql_text.split(';'):
        s = s.strip()
        if not s:
            continue
        print(f"Executing: {s}")
        with session.execute(s).sync() as operation:  # type: FlinkOperation
            dfs.append(operation.fetch(ResultsFetchOpts()).sync())
dfs

2025-04-20 08:23:39,123 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions "HTTP/1.1 200 OK"
2025-04-20 08:23:39,127 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:39,132 - httpx - INFO - HTTP Request: GET http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/8396b2e5-5389-47a1-98aa-2db6aa798454/result/0?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:39,133 - resinkit.session_utils - DEBUG - Fetch result: {'nextResultUri': '/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/8396b2e5-5389-47a1-98aa-2db6aa798454/result/0?rowFormat=JSON', 'resultType': 'NOT_READY'}
2025-04-20 08:23:39,242 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/8396b2e5-5389-47a1-98aa-2db6aa798454/result/0?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:39,243 - resinkit.session_utils - DEB

Executing: -- Create Paimon catalog

DROP CATALOG IF EXISTS my_catalog


2025-04-20 08:23:39,348 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/8396b2e5-5389-47a1-98aa-2db6aa798454/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:39,349 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:39,356 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/8396b2e5-5389-47a1-98aa-2db6aa798454/close "HTTP/1.1 200 OK"
2025-04-20 08:23:39,358 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:39,362 - httpx - INFO - HTTP Request: GET http://

Executing: CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/tmp/paimon_1745162619014'
)


2025-04-20 08:23:39,575 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/23074588-17a2-411d-870b-965e3174aa62/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:39,576 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:39,580 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/23074588-17a2-411d-870b-965e3174aa62/close "HTTP/1.1 200 OK"
2025-04-20 08:23:39,583 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:39,585 - httpx - INFO - HTTP Request: GET http://

Executing: USE CATALOG my_catalog


2025-04-20 08:23:39,809 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/e8cd9315-b1ba-4e04-8238-3f5123424b88/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:39,810 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:39,813 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/e8cd9315-b1ba-4e04-8238-3f5123424b88/close "HTTP/1.1 200 OK"
2025-04-20 08:23:39,848 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:39,853 - httpx - INFO - HTTP Request: GET http://

Executing: DROP TABLE IF EXISTS MyTable


2025-04-20 08:23:40,068 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/4fa7cd76-e2a9-4a75-ae2b-be5640cd0cb2/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:40,068 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:40,071 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/4fa7cd76-e2a9-4a75-ae2b-be5640cd0cb2/close "HTTP/1.1 200 OK"
2025-04-20 08:23:40,074 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:40,077 - httpx - INFO - HTTP Request: GET http://

Executing: -- Create table with Paimon-specific options
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
    'bucket' = '4',
    'changelog-producer' = 'input'
)


2025-04-20 08:23:40,294 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/1e52ac38-600e-4128-b20c-dca1898b87bc/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:40,295 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:40,298 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/1e52ac38-600e-4128-b20c-dca1898b87bc/close "HTTP/1.1 200 OK"
2025-04-20 08:23:40,302 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:40,304 - httpx - INFO - HTTP Request: GET http://

Executing: INSERT INTO MyTable (user_id, item_id, behavior, dt) VALUES
    (1001, 5001, 'view', '2024-01-01'),
    (1001, 5002, 'like', '2024-01-01'),
    (1002, 5001, 'cart', '2024-01-01'),
    (1003, 5003, 'purchase', '2024-01-01'),
    (1001, 5004, 'view', '2024-01-02'),
    (1002, 5002, 'view', '2024-01-02'),
    (1002, 5002, 'like', '2024-01-02'),
    (1003, 5001, 'cart', '2024-01-02'),
    (1004, 5005, 'view', '2024-01-02'),
    (1001, 5003, 'purchase', '2024-01-03'),
    (1002, 5004, 'view', '2024-01-03'),
    (1003, 5002, 'like', '2024-01-03'),
    (1004, 5001, 'cart', '2024-01-03'),
    (1005, 5005, 'purchase', '2024-01-03'),
    (1001, 5001, 'view', '2024-01-04')


2025-04-20 08:23:40,522 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/2771dc5e-bdac-4cf3-8e42-a1e702720580/result/0?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:40,522 - resinkit.session_utils - DEBUG - Fetch result: {'nextResultUri': '/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/2771dc5e-bdac-4cf3-8e42-a1e702720580/result/0?rowFormat=JSON', 'resultType': 'NOT_READY'}
2025-04-20 08:23:40,629 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/2771dc5e-bdac-4cf3-8e42-a1e702720580/result/0?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:40,630 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'jobID': '2763f11f45f0f2ad1bd088868093b4a1', 'nextResultUri': '/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/2771dc5e-bdac-4cf3-8e42-a1e702720580/result/1?rowFormat=JSON', 'resultKind': 'SUCCESS_WITH_CONT

Executing: -- For batch query
SET 'execution.runtime-mode' = 'batch'


2025-04-20 08:23:41,002 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/fc0f4e9b-4486-4901-8b5d-252f2e372dfe/result/1?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:41,003 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': False, 'resultKind': 'SUCCESS', 'resultType': 'EOS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'columnInfos': [], 'data': [], 'fieldGetters': [], 'rowFormat': 'JSON'}}
2025-04-20 08:23:41,006 - httpx - INFO - HTTP Request: DELETE http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/fc0f4e9b-4486-4901-8b5d-252f2e372dfe/close "HTTP/1.1 200 OK"
2025-04-20 08:23:41,013 - httpx - INFO - HTTP Request: POST http://localhost:8083/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/statements "HTTP/1.1 200 OK"
2025-04-20 08:23:41,016 - httpx - INFO - HTTP Request: GET http://

Executing: -- Remove the invalid orders table query
-- Instead query the created table
SELECT * FROM MyTable


2025-04-20 08:23:41,230 - httpx - INFO - HTTP Request: GET http://localhost:8083/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/d90eafe2-fe7d-4799-baf1-236ba1532cac/result/0?rowFormat=JSON "HTTP/1.1 200 OK"
2025-04-20 08:23:41,230 - resinkit.session_utils - DEBUG - Fetch result: {'isQueryResult': True, 'jobID': 'bc31e66ca904ad96972a59b532f52ad7', 'nextResultUri': '/v2/sessions/7a242b06-276f-4044-8595-d9470ecbca2f/operations/d90eafe2-fe7d-4799-baf1-236ba1532cac/result/1?rowFormat=JSON', 'resultKind': 'SUCCESS_WITH_CONTENT', 'resultType': 'PAYLOAD', 'results': {'columns': [{'name': 'user_id', 'logicalType': {'type': 'BIGINT', 'nullable': False}, 'comment': None}, {'name': 'item_id', 'logicalType': {'type': 'BIGINT', 'nullable': True}, 'comment': None}, {'name': 'behavior', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}, {'name': 'dt', 'logicalType': {'type': 'VARCHAR', 'nullable': False, 'length': 2147483647}, 'comment': None}], 

[  result
 0     OK,
   result
 0     OK,
   result
 0     OK,
   result
 0     OK,
   result
 0     OK,
                              job id
 0  2763f11f45f0f2ad1bd088868093b4a1,
   result
 0     OK,
     user_id  item_id  behavior          dt
 0      1005     5005  purchase  2024-01-03
 1      1004     5001      cart  2024-01-03
 2      1002     5004      view  2024-01-03
 3      1001     5001      view  2024-01-04
 4      1002     5001      cart  2024-01-01
 5      1004     5005      view  2024-01-02
 6      1002     5002      like  2024-01-02
 7      1001     5003  purchase  2024-01-03
 8      1003     5002      like  2024-01-03
 9      1001     5002      like  2024-01-01
 10     1003     5003  purchase  2024-01-01
 11     1001     5004      view  2024-01-02
 12     1003     5001      cart  2024-01-02]

In [6]:
[x.shape for x in dfs]

[(1, 1), (1, 1), (1, 1), (1, 1), (1, 1), (1, 1), (1, 1), (13, 4)]

In [7]:
dfs[-1].head()

Unnamed: 0,user_id,item_id,behavior,dt
0,1005,5005,purchase,2024-01-03
1,1004,5001,cart,2024-01-03
2,1002,5004,view,2024-01-03
3,1001,5001,view,2024-01-04
4,1002,5001,cart,2024-01-01
