Skip to content

Commit

Permalink
Add Beam example (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Jan 9, 2020
1 parent 7d7379b commit bb8834d
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 0 deletions.
39 changes: 39 additions & 0 deletions examples/beam-python/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM flink:1.8.3-scala_2.12 AS flink
FROM apachebeam/python3.6_sdk:2.17.0

# Install dependencies
RUN set -ex \
&& apt-get update \
&& apt-get -y install \
gettext-base \
openjdk-8-jre-headless \
openjdk-8-jdk-headless \
&& rm -rf /var/lib/apt/lists/*

# add Flink from the official Flink image
ENV FLINK_HOME=/opt/flink
ENV PATH=$PATH:$FLINK_HOME/bin
COPY --from=flink $FLINK_HOME $FLINK_HOME

# Install the job server, this will be the Flink entry point
RUN \
mkdir -p /opt/flink/flink-web-upload \
&& ( \
cd /opt/flink/flink-web-upload \
&& curl -f -O https://repository.apache.org/content/groups/public/org/apache/beam/beam-runners-flink-1.8-job-server/2.17.0/beam-runners-flink-1.8-job-server-2.17.0.jar \
&& ln -s beam-runners-flink-1.8-job-server*.jar beam-runner.jar \
) \
&& echo 'jobmanager.web.upload.dir: /opt/flink' >> $FLINK_HOME/conf/flink-conf.yaml

# Application code - this can be moved to an s2i assemble script
COPY . /code
WORKDIR /code/src
RUN \
pip install -r /code/src/requirements.txt

# entry point for FlinkK8sOperator Flink config
COPY docker-entrypoint.sh /

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["local"]
9 changes: 9 additions & 0 deletions examples/beam-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Beam Python Application example

This example shows how to build a Docker image for a Beam Python application that is compatible with the Flink operator, from Flink and Beam base containers.

The Python SDK workers run within the task manager container and the pipeline is submitted through the native Flink entry point (no Beam job server required). For more information about the Beam deployment see this [document](https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d).

To deploy the example locally: `kubectl create -f flink-operator-custom-resource.yaml`

Flink UI (after running `kubectl proxy`): `http://localhost:8001/api/v1/namespaces/flink-operator/services/beam-python-flinkk8soperator-example:8081/proxy/#/overview`
48 changes: 48 additions & 0 deletions examples/beam-python/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/sh

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec
else
# Others
#echo gosu flink
echo ""
fi
}

# Add in extra configs set by the operator
if [ -n "$FLINK_PROPERTIES" ]; then
echo "$FLINK_PROPERTIES" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

envsubst < $FLINK_HOME/conf/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml.tmp
mv $FLINK_HOME/conf/flink-conf.yaml.tmp $FLINK_HOME/conf/flink-conf.yaml

COMMAND=$@

if [ $# -lt 1 ]; then
COMMAND="local"
fi
echo "COMMAND: $COMMAND"

if [ "$COMMAND" = "help" ]; then
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
exit 0
elif [ "$COMMAND" = "jobmanager" ]; then
echo "Starting Job Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
elif [ "$COMMAND" = "taskmanager" ]; then
echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ "$COMMAND" = "local" ]; then
echo "Starting local cluster"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi

exec "$@"
35 changes: 35 additions & 0 deletions examples/beam-python/flink-operator-custom-resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
metadata:
name: beam-python-flinkk8soperator-example
namespace: flink-operator
annotations:
labels:
environment: development
spec:
#image: docker.io/lyft/flinkk8soperator-example-beam:{sha}
image: flinkk8soperator-example-beam
flinkConfig:
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 10m
state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
state.savepoints.dir: file:///checkpoints/flink/savepoints
jobManagerConfig:
resources:
requests:
memory: "200Mi"
cpu: "0.1"
replicas: 1
taskManagerConfig:
taskSlots: 2
resources:
requests:
memory: "200Mi"
cpu: "0.1"
flinkVersion: "1.8"
jarName: "beam-runner.jar"
parallelism: 1
entryClass: "org.apache.beam.runners.flink.FlinkPortableClientEntryPoint"
programArgs: "--driver-cmd \"cd /code/src; exec python -m beam_example.pipeline --job_name=beam-flinkk8soperator-example\""
deleteMode: None
1 change: 1 addition & 0 deletions examples/beam-python/src/beam_example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from __future__ import absolute_import
28 changes: 28 additions & 0 deletions examples/beam-python/src/beam_example/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import print_function

from __future__ import absolute_import

import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

if __name__ == "__main__":
# --job_endpoint argument supplied by the Flink entry point
args = [
"--runner=PortableRunner",
"--streaming",
"--sdk_worker_parallelism=2",
"--job_name=beam-on-flinkk8soperator",
"--environment_type=PROCESS",
"--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
]
# command line options override defaults
args.extend(sys.argv[1:])
print("args: " + str(args))
pipeline = beam.Pipeline(options=PipelineOptions(args))
pcoll = (pipeline
| beam.Create([0, 1, 2])
| beam.Map(lambda x: x))
result = pipeline.run()
# streaming job does not finish
#result.wait_until_finish()
4 changes: 4 additions & 0 deletions examples/beam-python/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Add your dependencies here
#
numpy==1.16.4 # via pyarrow
apache-beam==2.17.0

0 comments on commit bb8834d

Please sign in to comment.