Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new Queries task to run multiple SQL with multiple outputs, parameter binding and transactions #368

Open
anna-geller opened this issue Aug 28, 2024 · 1 comment
Assignees
Labels
area/plugin Plugin-related issue or feature request enhancement New feature or request

Comments

@anna-geller
Copy link
Member

anna-geller commented Aug 28, 2024

Context

The current Query task is limited to executing a single SQL statement and handling its output. Some automation tasks require executing multiple SQL statements, potentially wrapped in a transaction, while handling outputs of multiple SELECT statements.

To avoid breaking changes in the Query tasks, here is a proposal for a new Queries task.

This task will enable:

  • execution of multiple SQL statements
  • managing outputs of multiple SELECT queries
  • handling parameters to prevent SQL injection
  • executing multiple queries as a transaction to ensure atomicity and consistency e.g. when fetching credit card balance right after inserting or updating data.

Use case for testing

Start Postgres in a container:

docker run --name queries -e POSTGRES_PASSWORD=mysecretpassword -e POSTGRES_USER=postgres -e POSTGRES_DB=postgres -p 5432:5432 -d postgres

Create tables (alternatively directly from the new Queries task):

CREATE TABLE myusers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    last_login TIMESTAMP
);

CREATE TABLE mylogs (
    log_id SERIAL PRIMARY KEY,
    user_email VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    FOREIGN KEY (user_email) REFERENCES myusers(email)
);

Implementation

Task Type: io.kestra.plugin.jdbc.postgresql.Queries (equivalent tasks will be needed for all JDBC plugin subgroups, not just Postgres)

Description: The Queries task allows executing multiple SQL statements within a single task, with support for parameter binding and transaction management. The task can handle multiple SELECT statements and their outputs, allowing you to fetch the results directly or store them as internal storage ION files.

Note on parsing SQL statements from a single string

Important note: we want to make the sql property work as a single string allowing to execute multiple SQL statements separated by semicolons (;). We want to support both:

  1. parameters as a map
  2. sql as a single string with multiple SQL statements separated by semicolons (;)

If supporting both is not feasible at the same time e.g. because of the performance/cost of parsing the SQL string, we can consider supporting only the sql as a single string without parameters. Supporting sql as a single string with multiple SQL statements separated by semicolons (;) has higher priority than supporting parameters.

We should investigate how other tools in the Java ecosystem parse queries for execution e.g. Flyway https://www.baeldung.com/liquibase-vs-flyway.

TL;DR sql as a string without parameters > sql as an array with parameters

Properties

  • sql:
    • Type: string
    • Required: ✔️
    • Description: A single string containing multiple SQL statements separated by semicolons (;). The statements will be executed sequentially as part of the transaction.
  • parameters:
    • Type: map
    • Required: ❌
    • Description: A map of parameters to bind to the SQL queries. The keys should match the parameter placeholders in the SQL string, e.g., :parameterName.
  • fetchType:
    • Type: string
    • Required: ✔️
    • Default: STORE
    • Description: Specifies how the data is fetched. The available options are:
      • STORE: Store all rows to a file.
      • FETCH: Output all rows as an output variable.
      • FETCH_ONE: Output only the first row.
      • NONE: Do nothing.

Outputs

The outputs will be an array, with each element corresponding to the output of a single query in the sql string. Each output element may contain the following fields based on the fetchType:

  • row (if fetchType: FETCH_ONE):

    • Type: object
    • Description: A map containing the first row of fetched data. This field is populated only if fetchType is set to FETCH_ONE.
  • rows (if fetchType: FETCH):

    • Type: array
    • SubType: object
    • Description: A list of maps, each containing a row of fetched data. This field is populated only if fetchType is set to FETCH.
  • size (if fetchType: FETCH or STORE):

    • Type: integer
    • Description: The number of rows fetched. This field is populated if fetchType is set to either FETCH or STORE.
  • uri (if fetchType: STORE):

    • Type: string
    • Format: uri
    • Description: The URI of the result file stored on Kestra's internal storage. This field is populated only if fetchType is set to STORE.

Example flow

id: run_queries
namespace: company.team

tasks:
  - id: setup
    type: io.kestra.plugin.jdbc.postgresql.Queries
    url: jdbc:postgresql://host.docker.internal:5432/postgres
    username: postgres
    password: xxx
    sql: |
      DROP TABLE IF EXISTS myusers CASCADE;
      CREATE TABLE myusers (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        email VARCHAR(255) UNIQUE NOT NULL,
        last_login TIMESTAMP
      );
      DROP TABLE IF EXISTS mylogs;
      CREATE TABLE mylogs (
        log_id SERIAL PRIMARY KEY,
        user_email VARCHAR(255) NOT NULL,
        action VARCHAR(255) NOT NULL,
        timestamp TIMESTAMP NOT NULL,
        FOREIGN KEY (user_email) REFERENCES myusers(email)
      );

  - id: queries
    type: io.kestra.plugin.jdbc.postgresql.Queries
    url: jdbc:postgresql://host.docker.internal:5432/postgres
    username: postgres
    password: xxx
    sql: |
      INSERT INTO myusers (name, email) VALUES (:name, :email) ON CONFLICT (email) DO NOTHING;
      UPDATE myusers SET last_login = :login WHERE email = :email;
      INSERT INTO mylogs (user_email, action, timestamp) VALUES (:email, 'login', :login);
      SELECT * FROM myusers WHERE email = :email;
      SELECT * FROM mylogs WHERE user_email = :email;
    parameters:
      name: Rick
      email: "rick@kestra.io"
      login: "{{ execution.startDate }}"
    fetchType: FETCH

Example Output

For a query with fetchType: FETCH:

[
  {
    "rows": [
      {
        "id": 1,
        "name": "Rick",
        "email": "rick@kestra.io",
        "last_login": "2024-08-29T10:00:00Z"
      }
    ],
    "size": 1
  },
  {
    "rows": [
      {
        "log_id": 1,
        "user_email": "rick@kestra.io",
        "action": "login",
        "timestamp": "2024-08-29T10:00:00Z"
      }
    ],
    "size": 1
  }
]

For a query with fetchType: STORE:

[
  {
    "uri": "kestra:///internal/storage/queries-result-3.ion",
    "size": 1
  },
  {
    "uri": "kestra:///internal/storage/queries-result-4.ion",
    "size": 1
  }
]

For a query with fetchType: FETCH_ONE:

[
  {
    "row": {
      "id": 1,
      "name": "Rick",
      "email": "rick@kestra.io",
      "last_login": "2024-08-29T10:00:00Z"
    }
  },
  {
    "row": {
      "log_id": 1,
      "user_email": "rick@kestra.io",
      "action": "login",
      "timestamp": "2024-08-29T10:00:00Z"
    }
  }
]
@anna-geller
Copy link
Member Author

@loicmathieu can you perhaps check the proposed design, evaluate the feasibility and let Matt tackle this with your guidance and review?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/plugin Plugin-related issue or feature request enhancement New feature or request
Projects
Status: Backlog
Development

No branches or pull requests

3 participants