Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/function/forward_message/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
FROM python:3.9.12-slim

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.0 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org | python3 -

WORKDIR $PYSETUP_PATH
ADD . $PYSETUP_PATH
WORKDIR $PYSETUP_PATH
RUN poetry install --no-dev --no-root

ADD . /app

# install dumb-init
RUN wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64
RUN chmod +x /dumb-init

WORKDIR /app
RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["/app/entry.sh"]
16 changes: 16 additions & 0 deletions examples/function/forward_message/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Example Python User Defined Function using Kafka sink

1. Install Kafka in the Kubernetes cluster
```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/config/apps/kafka/kafka-minimal.yaml
```

2. Build the docker image and import into k3d
```shell
docker build -t test-python-udf:v1 . && k3d image import docker.io/library/test-python-udf:v1
```

3. Apply the pipeline
```shell
kubectl apply -f pipeline-numaflow.yaml
```
4 changes: 4 additions & 0 deletions examples/function/forward_message/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
33 changes: 33 additions & 0 deletions examples/function/forward_message/pipeline-numaflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: local-kafka
spec:
vertices:
- name: input
source:
kafka:
brokers:
- kafka-broker:9092
topic: input-topic
consumerGroup: test
- name: forward-message
udf:
container:
args:
- python
- example.py
image: docker.io/library/test-python-udf:v1
- name: log-output
sink:
log: {}
- name: log-kafka-output
sink:
log: {}
edges:
- from: input
to: forward-message
- from: input
to: log-kafka-output
- from: forward-message
to: log-output
15 changes: 15 additions & 0 deletions examples/function/forward_message/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "example-function"
version = "0.2.2"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.9"
pynumaflow = "0.2.2"

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
6 changes: 3 additions & 3 deletions examples/sink/simplesink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ENV PYTHONFAULTHANDLER=1 \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.1.13 \
POETRY_VERSION=1.2.0 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
Expand All @@ -23,12 +23,12 @@ RUN apt-get update \
build-essential

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
RUN curl -sSL https://install.python-poetry.org | python3 -

WORKDIR $PYSETUP_PATH
ADD . $PYSETUP_PATH
WORKDIR $PYSETUP_PATH
RUN poetry install --no-dev
RUN poetry install --no-dev --no-root

ADD . /app

Expand Down
2 changes: 1 addition & 1 deletion examples/sink/simplesink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

2. Apply the pipeline
```shell
kubectl apply -f pipeline-numaflow.yaml
kubectl apply -f pipeline-numaflow.yaml
```
2 changes: 1 addition & 1 deletion examples/sink/simplesink/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer


def udsink_handler(datums: List[Datum], __) -> Responses:
def udsink_handler(datums: List[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg)
Expand Down
4 changes: 2 additions & 2 deletions examples/sink/simplesink/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[tool.poetry]
name = "example-sink"
version = "0.1.0"
version = "0.2.2"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.9"
pynumaflow = "0.1.0"
pynumaflow = "0.2.2"

[tool.poetry.dev-dependencies]

Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/function/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class UserDefinedFunctionServicer(udfunction_pb2_grpc.UserDefinedFunctionService

def __init__(self, map_handler: UDFMapCallable, sock_path=FUNCTION_SOCK_PATH):
self.__map_handler: UDFMapCallable = map_handler
self.sock_path = sock_path
self.sock_path = f"unix://{sock_path}"
self._cleanup_coroutines = []

def MapFn(
Expand Down
8 changes: 3 additions & 5 deletions pynumaflow/sink/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
import logging
from os import environ


from google.protobuf import empty_pb2 as _empty_pb2

import grpc
from typing import Callable, Any, Iterator, List
from typing import Callable, Any, List

from pynumaflow._constants import (
SINK_SOCK_PATH,
DATUM_KEY,
)
from pynumaflow.sink import Response, Responses, Datum
from pynumaflow.sink import Responses, Datum
from pynumaflow.sink.generated import udsink_pb2_grpc, udsink_pb2
from pynumaflow.types import NumaflowServicerContext

Expand Down Expand Up @@ -48,7 +46,7 @@ class UserDefinedSinkServicer(udsink_pb2_grpc.UserDefinedSinkServicer):

def __init__(self, sink_handler: UDSinkCallable, sock_path=SINK_SOCK_PATH):
self.__sink_handler: UDSinkCallable = sink_handler
self.sock_path = sock_path
self.sock_path = f"unix://{sock_path}"
self._cleanup_coroutines = []

def SinkFn(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pynumaflow"
version = "0.2.1"
version = "0.2.2"
description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow."
authors = ["NumaFlow Developers"]
readme = "README.md"
Expand Down