Skip to content

Latest commit

 

History

History

wordcount

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

WordCount

WordCount pipeline demo.

Run

Local Run

gradle :pipelines:wordcount:run --args="--runner=DirectRunner \
--inputFile=./src/test/resources/data/input.txt \
--output=./build/output"

Cloud Run

export PROJECT_ID=<my-project-id>
export PIPELINE_NAME=wordcount
export GOOGLE_APPLICATION_CREDENTIALS=<full-path-to-your-json>

gradle :pipelines:wordcount:run --args="--runner=DataflowRunner \
--project=${PROJECT_ID} \
--workerLogLevelOverrides='{\"micro.apps\":\"TRACE\"}' \
--gcpTempLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/temp/ \
--stagingLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/staging/ \
--inputFile=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/input/shakespeare.txt \
--output=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/output/output.txt"

# Or with fatJar
java -jar ./pipelines/wordcount/build/libs/wordcount-0.1.0-SNAPSHOT-all.jar  \
--runner=DataflowRunner \
--windowDuration=2m \
--numShards=1 \
--project=${PROJECT_ID} \
--workerLogLevelOverrides='{\"micro.apps\":\"TRACE\"}' \
--inputTopic=projects/${PROJECT_ID}/topics/windowed-files \
--gcpTempLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/temp/ \
--stagingLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/staging/ \
--inputFile=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/input/shakespeare.txt \
--output=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/output/output.txt

Creating Template

gradle :pipelines:wordcount:run --args="--runner=DataflowRunner \
--project=$PROJECT_ID \
--gcpTempLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/temp/ \
--stagingLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/staging/ \
--templateLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/template/${PIPELINE_NAME}"

Running template

Create Job

gcloud dataflow jobs run wordcount \
    --gcs-location gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/template/${PIPELINE_NAME} \
    --parameters inputFile=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/input/shakespeare.txt,gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/output/output.txt

Test

gradle :pipelines:wordcount:test

Build

# clean
gradle :pipelines:wordcount:clean
# make fatJar
gradle :pipelines:wordcount:build

Run the jar generated by above build command.

java -jar ./pipelines/wordcount/build/libs/wordcount-0.1.0-SNAPSHOT-all.jar  \
--runner=DirectRunner \
--inputFile=./pipelines/wordcount/src/test/resources/data/input.txt \
--output=./pipelines/wordcount/build/output

Kubernetes

Scheduled Apache Beam jobs using Kubernetes Cronjobs

kubectl apply -f config/base/beam/cronjob.yml