Skip to content

Commit

Permalink
feat(udf): minimal Python UDF SDK (#7943)
Browse files Browse the repository at this point in the history
This PR designs a minimal SDK for Python UDFs.

Now you can define a function in Python like this:

```python
from risingwave.udf import udf, UdfServer

@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x: int, y: int) -> int:
while y != 0:
(x, y) = (y, x % y)
return x

if __name__ == '__main__':
server = UdfServer()
server.add_function(gcd)
server.serve()
```

This PR also fixes the problem when functions have no input arguments.

Approved-By: xxchan
Approved-By: BugenZhao
  • Loading branch information
wangrunji0408 committed Feb 21, 2023
1 parent f11bb62 commit 7a0316b
Show file tree
Hide file tree
Showing 21 changed files with 386 additions and 148 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -23,6 +23,9 @@ cmake-build-debug/
*.app
build/

# Python
*.pyc

# Golang
go/bin/

Expand Down
50 changes: 21 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Expand Up @@ -123,3 +123,10 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710"
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }

# TODO: remove these patches when arrow releases v34
# we need this commit for handling batch with no columns
# https://github.com/apache/arrow-rs/commit/ea48b9571f88bfbced60f9790ae2a7102502870e
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "9a6c516" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "9a6c516" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "9a6c516" }
5 changes: 4 additions & 1 deletion ci/Dockerfile
Expand Up @@ -5,7 +5,7 @@ ENV LANG en_US.utf8
ARG RUST_TOOLCHAIN

RUN apt-get update -yy && \
DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl parallel \
DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl parallel python3 python3-pip \
openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config bash openjdk-11-jdk wget unzip git tmux lld postgresql-client kafkacat netcat mysql-client \
maven -yy \
&& rm -rf /var/lib/{apt,dpkg,cache,log}/
Expand All @@ -23,6 +23,9 @@ WORKDIR /risingwave

ENV PATH /root/.cargo/bin/:$PATH

# install python dependencies
RUN pip3 install pyarrow

# add required rustup components
RUN rustup component add rustfmt llvm-tools-preview clippy

Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Expand Up @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain)
# !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! #
# AND ALSO docker-compose.yml #
######################################################
export BUILD_ENV_VERSION=v20230220
export BUILD_ENV_VERSION=v20230221_01

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
14 changes: 9 additions & 5 deletions ci/docker-compose.yml
Expand Up @@ -24,34 +24,38 @@ services:
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
healthcheck:
test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ]
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -u root -p123456"
]
interval: 5s
timeout: 5s
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230220
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230221_01
depends_on:
- mysql
- db
volumes:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230220
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230221_01
depends_on:
- mysql
- db
volumes:
- ..:/risingwave

rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230220
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230221_01
volumes:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230220
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230221_01
depends_on:
db:
condition: service_healthy
Expand Down
6 changes: 6 additions & 0 deletions ci/scripts/run-e2e-test.sh
Expand Up @@ -55,6 +55,12 @@ sqllogictest -p 4566 -d dev './e2e_test/batch/**/*.slt' --junit "batch-${profile
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

echo "--- e2e, ci-3cn-1fe, udf"
python3 e2e_test/udf/test.py &
sleep 2
sqllogictest -p 4566 -d dev './e2e_test/udf/python.slt'
pkill python3

echo "--- Kill cluster"
cargo make ci-kill

Expand Down
27 changes: 0 additions & 27 deletions e2e_test/ddl/function.slt

This file was deleted.

49 changes: 49 additions & 0 deletions e2e_test/udf/python.slt
@@ -0,0 +1,49 @@
# Before running this test:
# python3 e2e_test/udf/test.py

# TODO: check the service on creation
# Currently whether the function exists in backend and whether the signature matches is checked on execution. Create function will always succeed.

# Create a function.
statement ok
create function int_42() returns int as 'http://localhost:8815' language arrow_flight;

statement ok
create function gcd(int, int) returns int as 'http://localhost:8815' language arrow_flight;

# Create a function with the same name but different arguments.
statement ok
create function gcd(int, int, int) returns int as 'http://localhost:8815' language arrow_flight;

# Create a function with the same name and arguments.
statement error exists
create function gcd(int, int) returns int as 'http://localhost:8815' language arrow_flight;

query I
select int_42();
----
42

query I
select gcd(25, 15);
----
5

query I
select gcd(25, 15, 3);
----
1

# TODO: drop function without arguments

# # Drop a function but ambiguous.
# statement error is not unique
# drop function gcd;

# Drop a function
statement ok
drop function gcd(int, int);

# Drop a function
statement ok
drop function gcd(int, int, int);
29 changes: 29 additions & 0 deletions e2e_test/udf/test.py
@@ -0,0 +1,29 @@
import sys
sys.path.append('src/udf/python') # noqa

from risingwave.udf import udf, UdfServer


@udf(input_types=[], result_type='INT')
def int_42() -> int:
return 42


@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x: int, y: int) -> int:
while y != 0:
(x, y) = (y, x % y)
return x


@udf(name='gcd', input_types=['INT', 'INT', 'INT'], result_type='INT')
def gcd3(x: int, y: int, z: int) -> int:
return gcd(gcd(x, y), z)


if __name__ == '__main__':
server = UdfServer()
server.add_function(int_42)
server.add_function(gcd)
server.add_function(gcd3)
server.serve()
4 changes: 2 additions & 2 deletions src/common/Cargo.toml
Expand Up @@ -15,8 +15,8 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = "31"
arrow-schema = "31"
arrow-array = "33"
arrow-schema = "33"
async-trait = "0.1"
auto_enums = "0.7"
bitflags = "1.3.2"
Expand Down
4 changes: 2 additions & 2 deletions src/expr/Cargo.toml
Expand Up @@ -17,8 +17,8 @@ normal = ["workspace-hack"]
[dependencies]
aho-corasick = "0.7"
anyhow = "1"
arrow-array = "31"
arrow-schema = "31"
arrow-array = "33"
arrow-schema = "33"
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
chrono-tz = { version = "0.7", features = ["case-insensitive"] }
dyn-clone = "1"
Expand Down

1 comment on commit 7a0316b

@udf
Copy link

@udf udf commented on 7a0316b Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is super cool!

Please sign in to comment.