- For
GKE
only - run:kubectl create clusterrolebinding <user>-cluster-admin-binding --clusterrole=cluster-admin --user=<user>@<domain>
- Install metacontroller or run
make install-metacontroller
Note To install metacontroller
you can usually just run these:
kubectl create namespace `metacontroller`
kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/metacontroller/master/manifests/metacontroller-rbac.yaml
kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/metacontroller/master/manifests/metacontroller.yaml
Optionally you can use the following to reduce log flood coming from Metacontroller
:
kubectl apply -f https://raw.githubusercontent.com/srfrnk/metacontroller/master/manifests/metacontroller.yaml
Just run the following:
kubectl apply -f https://raw.githubusercontent.com/srfrnk/k8s-flink-operator/master/dist/flink-controller.yaml
To use flink-controller
you need to have:
- A
JAR
containing code that creates a validFlink
job - A
docker image
that contains theJAR
- A
K8S
configuration file defining the job
Please see the Example Apache BEAM Pipeline that can run on Flink
.
- The main file creates the job.
gradle build
creates theJAR
- The Dockerfile defines the image
- To build the image inside
minikube
:eval $(minikube docker-env) && docker build . -t flink-test:v1
- You can also build locally and push to any repository accessible to your
K8S
cluster
- The JSON Manifest defines the job.
- Can be
YAML
orJSON
The spec must include:
jobManagerUrl
: Cluster URL toFlink Job Manager
("host:port")jarImage
:Full image identifier ("repo/image:tag")jarPath
:Absolute path toJAR
inside imagemainClass
:Full class-name for the job (e.g. "org.example.MyClass")- Either
streaming
orcron
: For streaming job or batch job
streaming
should include
replicas
: number of jobs to submit simultaneously
cron
should include
schedule
: The schedule in Cron format. See hereconcurrencyPolicy
: Specifies how to treat concurrent executions of a Job. Valid values are: - "Allow" (default): allows CronJobs to run concurrently; - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet; - "Replace": cancels currently running job and replaces it with a new one. See here
The following are optional:
version
: A string with the version label to be added to all k8s resources. If noversion
is specified a label withNoVersion
would be added.props
: An array of{key,value}
props to pass to job. (i.e. viaParameterTool parameters = ParameterTool.fromArgs(args);
)volumeMounts
: An array ofvolume specs
(See below).env
: An array of EnvVars.
volume specs
have the following parameters:
volume
: a Volume spec.mount
: a VolumeMount spec.
Note: both volume
and mount
don't need to have a name
. Any given name
would be overwritten.
The mainClass
must contain public static void main(String[] args)
function that runs a Flink
job.
- Make sure the
JAR
image is accessible to theK8S
cluster - Apply configuration manifest:
kubectl apply -f <CONFIGURATION_MANIFEST>
kubectl delete -f <CONFIGURATION_MANIFEST>
kubectl delete -f https://raw.githubusercontent.com/srfrnk/k8s-flink-operator/master/dist/flink-controller.yaml
kubectl delete -f https://raw.githubusercontent.com/GoogleCloudPlatform/metacontroller/master/manifests/metacontroller-rbac.yaml
kubectl delete -f https://raw.githubusercontent.com/GoogleCloudPlatform/metacontroller/master/manifests/metacontroller.yaml
kubectl delete namespace metacontroller
- I recently published an article describing how and why I built this. You can find it here.