Skip to content

resink-ai/flink-sql-gateway-api

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flink SQL Gateway Python Client

A client library for accessing Flink SQL Gateway REST API (mostly generated by OpenAPI Generator)

Compartibility Overview

API Versions Compatible Flink Versions Comment
API V1 flink-1.16.2 -> flink-2.1 Allow users to submit statements to the gateway and execute.
API V2 flink-1.17.2 -> flink-2.1 Supports SQL Client to connect to the gateway. Default starting from flink-1.17
API V3 flink-1.20 -> flink-2.1 Supports Materialized Table refresh operation. Default starting from flink-1.20
API V4 flink-2.0 -> flink-2.1 Supports to deploy script in application mode.

Which version should I use?

  • pip install flink-sql-gateway-api==1.19.0 if 1.17 <= flink version < 1.20
  • pip install flink-sql-gateway-api==2.1.0 if 1.20 <= flink version

Usage

First, create a client:

from flink_gateway_api import Client

client = Client(base_url="http://localhost:8083")

If the endpoints you're going to hit require authentication, use AuthenticatedClient instead:

from flink_gateway_api import AuthenticatedClient

client = AuthenticatedClient(base_url="http://localhost:80083", token="SuperSecretToken")

Now call your endpoint and use your models:

import json
import time
from flink_gateway_api import Client
from flink_gateway_api.api.default import (
   open_session,
   close_session,
   execute_statement,
   fetch_results,
)
from flink_gateway_api.models import (
   OpenSessionRequestBody,
   ExecuteStatementResponseBody,
   RowFormat,
)

with Client('http://localhost:8083') as client:
   responses = open_session.sync(client=client, body=OpenSessionRequestBody.from_dict({
      "properties": {
         "idle-timeout": "10s"
      },
      "sessionName": "test_session"
   }))
   print(f"Open session response: {responses}")

   select_result = execute_statement.sync(responses.session_handle, client=client,
                                          body=ExecuteStatementResponseBody.from_dict({
                                             "statement": "SELECT 23 as age, 'Alice Liddel' as name;",
                                          }))

   print(f"Select result: {select_result}")
   time.sleep(1)
   fetch_return = fetch_results.sync(
      responses.session_handle,
      select_result.operation_handle,
      0,
      client=client,
      row_format=RowFormat.JSON,
   )
   print(f"Fetch return: {json.dumps(fetch_return.to_dict())}")

   close_session.sync(responses.session_handle, client=client)
   print(f"Session closed")

Or do the same thing with an async version:

import json
import asyncio
from flink_gateway_api import Client
from flink_gateway_api.api.default import (
   open_session,
   close_session,
   execute_statement,
   fetch_results,
)
from flink_gateway_api.models import (
   OpenSessionRequestBody,
   ExecuteStatementResponseBody,
   RowFormat,
)

async with Client('http://localhost:8083') as client:
   responses = await open_session.asyncio(client=client, body=OpenSessionRequestBody.from_dict({
      "properties": {
         "idle-timeout": "10s"
      },
      "sessionName": "test_session"
   }))
   print(f"Open session response: {responses}")

   select_result = await execute_statement.asyncio(responses.session_handle, client=client,
                                                   body=ExecuteStatementResponseBody.from_dict({
                                                      "statement": "SELECT 23 as age, 'Alice Liddel' as name;",
                                                   }))

   print(f"Select result: {select_result}")
   await asyncio.sleep(1)  # Changed time.sleep to asyncio.sleep
   fetch_return = await fetch_results.asyncio(
      responses.session_handle,
      select_result.operation_handle,
      0,
      client=client,
      row_format=RowFormat.JSON,
   )
   print(f"Fetch return: {json.dumps(fetch_return.to_dict())}")

   await close_session.asyncio(responses.session_handle, client=client)
   print(f"Session closed")

The returned results will be like:

{
  "isQueryResult": true,
  "jobID": "a0ad286b7259d4755327ce4969a8ec97",
  "nextResultUri": "/v2/sessions/7625ad82-b23b-4118-9683-4a46b7c5022a/operations/353994b9-532d-4c0e-9258-688ec777f948/result/1?rowFormat=JSON",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "resultType": "PAYLOAD",
  "results": {
    "columns": [
      {
        "name": "age",
        "logicalType": {
          "type": "INTEGER",
          "nullable": false
        },
        "comment": null
      },
      {
        "name": "name",
        "logicalType": {
          "type": "CHAR",
          "nullable": false,
          "length": 12
        },
        "comment": null
      }
    ],
    "columnInfos": [],
    "data": [
      {
        "kind": "INSERT",
        "fields": [23, "Alice Liddel"]
      }
    ],
    "fieldGetters": [],
    "rowFormat": "JSON"
  }
}

By default, when you're calling an HTTPS API it will attempt to verify that SSL is working correctly. Using certificate verification is highly recommended most of the time, but sometimes you may need to authenticate to a server (especially an internal server) using a custom certificate bundle.

client = AuthenticatedClient(
    base_url="https://internal_api.example.com",
    token="SuperSecretToken",
    verify_ssl="/path/to/certificate_bundle.pem",
)

You can also disable certificate validation altogether, but beware that this is a security risk.

client = AuthenticatedClient(
    base_url="https://internal_api.example.com",
    token="SuperSecretToken",
    verify_ssl=False
)

Things to know:

  1. Every path/method combo becomes a Python module with four functions:

    1. sync: Blocking request that returns parsed data (if successful) or None
    2. sync_detailed: Blocking request that always returns a Request, optionally with parsed set if the request was successful.
    3. asyncio: Like sync but async instead of blocking
    4. asyncio_detailed: Like sync_detailed but async instead of blocking
  2. All path/query params, and bodies become method arguments.

  3. If your endpoint had any tags on it, the first tag will be used as a module name for the function (my_tag above)

  4. Any endpoint which did not have a tag will be in flink_gateway_api.api.default

Advanced customizations

There are more settings on the generated Client class which let you control more runtime behavior, check out the docstring on that class for more info. You can also customize the underlying httpx.Client or httpx.AsyncClient (depending on your use-case):

from flink_gateway_api import Client

def log_request(request):
    print(f"Request event hook: {request.method} {request.url} - Waiting for response")

def log_response(response):
    request = response.request
    print(f"Response event hook: {request.method} {request.url} - Status {response.status_code}")

client = Client(
    base_url="http://localhost:80083",
    httpx_args={"event_hooks": {"request": [log_request], "response": [log_response]}},
)

# Or get the underlying httpx client to modify directly with client.get_httpx_client() or client.get_async_httpx_client()

You can even set the httpx client directly, but beware that this will override any existing settings (e.g., base_url):

import httpx
from flink_gateway_api import Client

client = Client(
    base_url="http://localhost:80083",
)
# Note that base_url needs to be re-set, as would any shared cookies, headers, etc.
client.set_httpx_client(httpx.Client(base_url="http://localhost:80083"))

Developer

  1. Quick start
# code gen
make py_118 # or
make py_119 # or
make py_120 # or
make py_21_v3 # or
make py_21_v4

# release
make release
  1. Manual release
# 1. test, exit if fail
cd $(rev-parse --show-toplevel)/flink-sql-gateway-client && pytest tests

# 2. check current version
cd $(rev-parse --show-toplevel)
cat flink-sql-gateway-client/pyproject.toml | grep version
version=$(cat flink-sql-gateway-client/pyproject.toml| grep version | cut -d '"' -f2)

# 3. tag & release
cd $(rev-parse --show-toplevel)
git add -u
git commit -m
git tag -d $release_tag || true
git tag "release-$version"
git push origin "release-$version"
  1. Release non-alpha versions
make py_119

# Manually edit flink-sql-gateway-client/pyproject.toml with desired version
version=$(cat flink-sql-gateway-client/pyproject.toml| grep version | cut -d '"' -f2)
git tag "release-$version"
git push origin "release-$version"

About

Python Client of the Flink SQL Gateway REST API

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •