This repo is a bare minimum sample of running a python beam pipeline on flink deployed in a kubernetes cluster.
This assumes you're starting in an environment with python, a recent version of beam installed, and minikube.
Start up a simple minikube k8s cluster
minikube start --cpus 4 --memory=6144 --disk-size='20gb'
Build a slightly modified flink image that has docker installed. Building this directly in the minikube registry simplifies things.
eval $(minikube docker-env)
docker build ./flink -t docker-flink:1.10
You can deploy flink with or without a job server. If you deploy without, you must run your pipeline using the FlinkRunner
along with the option --flink_submit_uber_jar
.
The FlinkRunner
will start a local job service via java and expects the client to have java install. Using flink_submit_uber_jar
will pack all artifacts into the jar file it uploads to flink. This is required due to the local job server and the flink taskmanager not sharing the staging volume.
Alternatively you can deploy a job service into the k8s cluster and run your pipeline using the PortableRunner
. This requires a more complicated k8s deployment because we must share volume between the job service and the flink taskmanager.
Deploy the flink components to kubernetes.
kubectl apply -f ./k8s/without_job_server/flink.yaml
Wait a minute for the pods to spin up.
kubectl get pods
Submit a beam pipeline.
FLINK_MASTER_URL=$(minikube service flink-jobmanager-rest --url)
python -m pipeline --runner=FlinkRunner --flink_version 1.10 --flink_master=${FLINK_MASTER_URL} --environment_type=DOCKER --save_main_session --flink_submit_uber_jar
It's also possible to use the PortableRunner
by submitting the pipeline to a job server running in k8s.
The job server and the flink taskmanagers need to share the same artifact staging volume. This is necessary so the sdk harness can find the artifacts.
First deploy the persistent volumes and claims.
kubectl apply -f ./k8s/with_job_server/shared.yaml
Then deploy flink.
kubectl apply -f ./k8s/with_job_server/flink.yaml
Last deploy the job server.
kubectl apply -f ./k8s/with_job_server/beam_flink_job_server.yaml
Submit a beam pipeline.
JOB_ENDPOINT=$(minikube service flink-beam-jobserver --url | sed '1q;d')
ARTIFACT_ENDPOINT=$(minikube service flink-beam-jobserver --url | sed '2q;d')
PYTHON_VERSION=$(python --version | cut -d" " -f2 | cut -d"." -f1-2)
BEAM_VERSION=$(python -c "import apache_beam;print(apache_beam.__version__)")
python -m pipeline --runner=PortableRunner --job_endpoint=${JOB_ENDPOINT} --artifact_endpoint=${ARTIFACT_ENDPOINT} --save_main_session --environment_type=DOCKER --environment_config=apache/beam_python${PYTHON_VERSION}_sdk:${BEAM_VERSION}
[TIP] Pre-download the sdk container in the docker-in-docker service on the taskworker. If you don't do this then the taskworker may timeout waiting for the sdk container to spin up.
PYTHON_VERSION=$(python --version | cut -d" " -f2 | cut -d"." -f1-2)
BEAM_VERSION=$(python -c "import apache_beam;print(apache_beam.__version__)")
TASKMANAGER_POD=$(kubectl get pods --selector=component=taskmanager --no-headers -o name)
kubectl exec ${TASKMANAGER_POD} -c taskmanager -- docker pull apache/beam_python${PYTHON_VERSION}_sdk:${BEAM_VERSION}
Open the flink web UI (this should open a tab in your browser).
minikube service flink-jobmanager-rest
Tail the taskmanager logs.
TASKMANAGER_POD=$(kubectl get pods --selector=component=taskmanager --no-headers -o name)
kubectl logs -f ${TASKMANAGER_POD} -c taskmanager
Tail SDK Harness logs.
TASKMANAGER_POD=$(kubectl get pods --selector=component=taskmanager --no-headers -o name)
kubectl exec -it ${TASKMANAGER_POD} -c taskmanager -- bash
docker logs -f $(docker ps -lq)