Skip to content

Commit

Permalink
Kafka Backup and Restore Blueprint (#840)
Browse files Browse the repository at this point in the history
* Kafka Backup Blueprint

* added newline

* echo on single line

* fixed typo

* moved to examples folder

* Adding kafka Restore blueprint using confluent source connector

* properties file for s3Source

* changes to restore blueprint

* addition to restore blueprint

* resolved issues

* add error exit command

* files renamed

* added prehook

* restore all topics, added a python script

* merged blueprints

* added phase to restore action

* successcode

* pod success

* backup Process completed with successful exit code

* fixed bugs

* restore with input artifact

* fixed bug

* customized connector

* backup and restore done

* adobe s3 connector

* fixed newline at the end

* changes to README, multiple Run commands in Dockerfile

* required changes in goreleaser

* bug fixes

* removed latest tags

* adding comments

* source connector name as pod name

* removed jar file, added delete action, removed cleanup process, change kafka cluster setup

* added sink docker image to delete action

* removed clusterRole dependency, changed kafdrop broker address

* typo correction

* changes to add delete action

* latest changes

* updated with integrationtest

* update adobe README

* update confluent

* grammer fixed

* grammer fixed

* Update documentation to force TravisCI build

Co-authored-by: Pavan Navarathna <pavan@kasten.io>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 18, 2021
1 parent 033d5e2 commit b9b654c
Show file tree
Hide file tree
Showing 25 changed files with 1,164 additions and 1 deletion.
16 changes: 15 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,21 @@ dockers:
image_templates:
- 'ghcr.io/kanisterio/foundationdb:6.2.20'
dockerfile: 'docker/foundationdb/Dockerfile'

- image_templates:
- 'ghcr.io/kanisterio/kafka-adobe-s3-source-connector:{{ .Tag }}'
dockerfile: 'docker/kafka-adobes3Connector/image/adobeSource.Dockerfile'
extra_files:
- 'docker/kafka-adobes3Connector/image/adobe-monitorsource.sh'
- 'docker/kafka-adobes3Connector/image/cleans3.py'
- binaries:
- kando
image_templates:
- 'ghcr.io/kanisterio/kafka-adobe-s3-sink-connector:{{ .Tag }}'
dockerfile: 'docker/kafka-adobes3Connector/image/adobeSink.Dockerfile'
extra_files:
- 'docker/kafka-adobes3Connector/image/adobe-monitorsink.sh'
archives:
- allow_different_binary_count: true
snapshot:
name_template: '{{ .Tag }}'
checksum:
Expand Down
53 changes: 53 additions & 0 deletions docker/kafka-adobes3Connector/image/adobe-monitorsink.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/sh
echo "===========================Monitoring started==================================="
sleep 60
export MAXLAG=0
export GROUP="connect-${CONNECTORNAME}"
#export pid="$(lsof -t -i:8083)"
export ELAPSEDTIME="$(ps -e -o "pid,etimes,command" |awk -v processid=$PID '{if($1==processid) print $2}')"
export LAG="$(/bin/kafka-consumer-groups --bootstrap-server "$BOOTSTRAPSERVER" --describe --group "$GROUP"| awk 'BEGIN{maxLag= 0}{if ($6>0+maxLag) maxLag=$6} END{print maxLag}')"
echo "==========================GROUP=$GROUP, MaxTime=$TIMEINSECONDS, MAXLAG=$MAXLAG, pid=$PID, ELAPSEDTIME=$ELAPSEDTIME, LAG=$LAG, ==============================="
if [ $LAG = "LAG" ]
then
export LAG=0
fi
while [[ "$LAG" -gt "$MAXLAG" && "$ELAPSEDTIME" -lt "$TIMEINSECONDS" ]]
do
LAG="$(/bin/kafka-consumer-groups --bootstrap-server "$BOOTSTRAPSERVER" --describe --group "$GROUP"| awk 'BEGIN{maxLag= 0}{if ($6>0+maxLag) maxLag=$6} END{print maxLag}')"
ELAPSEDTIME="$(ps -e -o "pid,etimes,command" |awk -v processid=$PID '{if($1==processid) print $2}')"
echo "========================== LAG = $LAG , ELAPSEDTIME = $ELAPSEDTIME ======================================="
sleep 2
done
if [ -z "$ELAPSEDTIME" ]
then
echo "================Connector failed======================"
curl -X DELETE http://localhost:8083/connectors/$CONNECTORNAME
echo "================Connector Deleted======================"
exit 1
fi
echo "========================== Connector Job done successfully Killing the process ==================="

cat /tmp/config/s3config.properties | grep "topics=" | awk -F "=" '{print $2}' | tr , "\n" > /tmp/config/topics.txt

while IFS= read -r TOPIC
do
# getting topic name from configuration file
echo "=====================getting number of message to $TOPIC =============================="
# getting retention period as set for the topic
export MESSAGECOUNT="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}')"

if [ -z "$TOPICDESC" ]
then
export TOPICDESC="$TOPIC:$MESSAGECOUNT"
else
export TOPICDESC="$TOPICDESC,$TOPIC:$MESSAGECOUNT"
fi
# print
echo "=============== topic description $TOPICDESC ===================================="
done < "/tmp/config/topics.txt"
kando output backupDetail ${TOPICDESC}
kando output s3path ${S3_PATH}
curl -X DELETE http://localhost:8083/connectors/$CONNECTORNAME
echo "================Connector Deleted======================"
kill -INT $PID
exit 0
46 changes: 46 additions & 0 deletions docker/kafka-adobes3Connector/image/adobe-monitorsource.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/sh
echo "===========================Monitoring started==================================="
sleep 60
export ELAPSEDTIME="$(ps -e -o "pid,etimes,command" |awk -v processid=$PID '{if($1==processid) print $2}')"
if [ -z "$ELAPSEDTIME" ]
then
echo "================Connector failed======================"
curl -X DELETE http://localhost:8083/connectors/$CONNECTORNAME
echo "================Connector Deleted======================"
exit 1
fi
for i in $(echo $TOPIC_DETAIL | sed "s/,/ /g")
do

export TOPIC="$(echo $i | awk -F ":" '{print $1}')"
export FINAL_MESSAGE_COUNT="$(echo $i | awk -F ":" '{print $2}')"
if echo ",$TOPIC_LIST," | grep -q ",$TOPIC,"
then

echo "======================Start Restore process for topic $TOPIC with messagecount = $FINAL_MESSAGE_COUNT ============================="
export START_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -1 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')"
export END_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')"
export CURRENT_MESSAGE_COUNT=$((START_OFFSET - END_OFFSET))
echo "======================Start offset = $START_OFFSET , endoffset = $END_OFFSET , message count = $CURRENT_MESSAGE_COUNT ============================="

until [ "$CURRENT_MESSAGE_COUNT" = "$FINAL_MESSAGE_COUNT" ]
do
echo "======================Restore in process for $TOPIC ============================="
START_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -1 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')"
END_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')"
CURRENT_MESSAGE_COUNT=$((START_OFFSET - END_OFFSET))
echo "======================Start offset = $START_OFFSET , endoffset = $END_OFFSET , message count = $CURRENT_MESSAGE_COUNT ============================="
sleep 3
done

echo "=======================restore complete for $TOPIC =================================="
else
echo "=================$TOPIC not listed in the $TOPIC_LIST, skipping restore====================="
fi
done

echo "=========================== All topic restored as per backup details ==================================="
curl -X DELETE http://localhost:8083/connectors/$CONNECTORNAME
echo "================Connector Deleted======================"
kill -INT $PID
exit 0
25 changes: 25 additions & 0 deletions docker/kafka-adobes3Connector/image/adobeSink.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM confluentinc/cp-kafka-connect:6.1.0

USER root

RUN yum install -y lsof

# copy the jar files

RUN yum install -y \
java-1.8.0-openjdk \
java-1.8.0-openjdk-devel

ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk/
RUN yum install git -y
RUN java -version
RUN git clone https://github.com/adobe/kafka-connect-s3.git
RUN cd kafka-connect-s3 && ./gradlew shadowJar
# copy the jar files
RUN cp ./kafka-connect-s3/build/libs/kafka-connect-s3-chart/kafka-connect/0.0.4-2a8a4aa-all.jar /opt/

# Install kando
ADD kando /usr/local/bin/

# adding script to monitor sink connector
COPY docker/kafka-adobes3Connector/image/adobe-monitorsink.sh monitorconnect.sh
21 changes: 21 additions & 0 deletions docker/kafka-adobes3Connector/image/adobeSource.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM confluentinc/cp-kafka-connect:6.1.0

USER root
# copy the jar files

RUN yum install -y \
java-1.8.0-openjdk \
java-1.8.0-openjdk-devel

ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk/
RUN yum install git -y
RUN java -version
RUN git clone https://github.com/adobe/kafka-connect-s3.git
RUN cd kafka-connect-s3 && ./gradlew shadowJar

RUN cp ./kafka-connect-s3/build/libs/kafka-connect-s3-chart/kafka-connect/0.0.4-2a8a4aa-all.jar /opt/

# adding script to monitor source connector
COPY docker/kafka-adobes3Connector/image/adobe-monitorsource.sh monitorconnect.sh

COPY docker/kafka-adobes3Connector/image/cleans3.py cleanup.py
14 changes: 14 additions & 0 deletions docker/kafka-adobes3Connector/image/cleans3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
import boto3
# Create an S3 client
s3 = boto3.client('s3', aws_access_key_id = os.environ.get('AWS_ACCESS_KEY'), aws_secret_access_key = os.environ.get('AWS_SECRET_KEY'), region_name = os.environ.get('REGION'))
bucket = os.environ.get('BUCKET')
s3path = os.environ.get('S3PATH')

prefix = s3path.split('/')[3] + '/'

response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

for object in response['Contents']:
print('Deleting', object['Key'])
s3.delete_object(Bucket=bucket, Key=object['Key'])
183 changes: 183 additions & 0 deletions examples/kafka/adobe-s3-connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Kafka topic backup and restore
To backup and restore Kafka topic data, we have used Adobe S3 Kafka connector which periodically polls data from Kafka and in turn uploads it to S3. Each chunk of data is represented as an S3 object. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used.

During restore, topic messages are purged before the restore operation is performed.

## Prerequisites

* Kubernetes 1.9+
* Kanister controller version 0.50.0 installed in the cluster in a namespace <kanister-operator-namespace>. This example uses `kasten-io` namespace
* Kanctl CLI installed (https://docs.kanister.io/tooling.html#kanctl)

## Assumption

* No consumer is consuming the topic at the moment topic is being restored.

## Installing the Chart

Install the Kafka Operator using the helm chart with release name `kafka-release` using the following commands:

```bash
# Add strimzi in your local chart repository
$ helm repo add strimzi https://strimzi.io/charts/

# Update your local chart repository
$ helm repo update

# Install the Kafka Operator (Helm Version 3)
$ kubectl create namespace kafka-test
$ helm install kafka-release strimzi/strimzi-kafka-operator --namespace kafka-test

```
## Setup Kafka
```bash
# Provision the Apache Kafka and zookeeper.
$ kubectl create -f ./kafka-cluster.yaml -n kafka-test

# wait for the pods to be in ready state
$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka-test

# setup kafdrop for monitoring the Kafka cluster, this is not mandatory for the blueprint as a part of restore and backup.
$ kubectl create -f kafdrop.yaml -n kafka-test

# by default kafdrop run on port 9000, we can view it by
kubectl port-forward kafdrop 7000:9000 -n kafka-test
```

## Validate producer and consumer
Create Producer and Consumer using Kafka image provided by strimzi.
```bash
# create a producer and push data to it
$ kubectl -n kafka-test run kafka-producer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-external-bootstrap:9094 --topic blogpost
> event1
> event2
> event3

# creating a consumer on a different terminal
$ kubectl -n kafka-test run kafka-consumer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-external-bootstrap:9094 --topic my-topic --from-beginning
```

**NOTE:**
* Here, we now have Kafka running with the broker running on service `my-cluster-kafka-external-bootstrap:9094`
* `adobe-s3-sink.properties` file contains properties related `s3 sink Connector`
* `adobe-s3-source.properties` file contains properties related `s3 source Connector`
* `kafkaConfiguration.properties` contains properties pointing to Kafka server

## Configuration

The following configuration applies to source and sink connector.

| Config Key | Notes |
| ---------- | ----- |
| name | name of the connector |
| s3.bucket | The name of the bucket to write to |
| s3.prefix | Prefix added to all object keys stored in bucket to "namespace" them. |
| s3.path_style | Force path-style access to bucket |
| topics | Comma separated list of topics that need to be processed |
| task.max | Max number of tasks that should be run inside the connector |
| format | S3 File Format |
| compressed_block_size | Size of _uncompressed_ data to write to the file before rolling to a new block/chunk |

These additional configs apply to the kafka-connect:

| Config Key | Notes |
| ---------- | ----- |
| bootstrap.servers | Kafka broker address in the cluster |
| plugin.path | Connector jar location |

## Setup Blueprint, ConfigMap and S3 Location profile
Before setting up the Blueprint, a Kanister Profile is created with S3 details along with a ConfigMap with the configuration details. `timeinSeconds` denotes the time after which sink connector needs to stop running.
```bash
# Create ConfigMap with the properties file, S3 properties and kafkaConfiguration.properties
$ kubectl create configmap s3config --from-file=adobe-s3-sink.properties=./adobe-s3-sink.properties --from-file=adobe-kafkaConfiguration.properties=./adobe-kafkaConfiguration.properties --from-file=adobe-s3-source.properties=./adobe-s3-source.properties --from-literal=timeinSeconds=1800 -n kafka-test

# Create Profile pointing to S3 bucket
$ kanctl create profile s3compliant --access-key <aws-access-key> \
--secret-key <aws-secret-key> \
--bucket <aws-bucket-name> --region <aws-region-name> \
--namespace kafka-test

# Blueprint Definition
$ kubectl create -f ./kafka-blueprint.yaml -n kasten-io
```

## Perform Backup
To perform backup to S3, an ActionSet is created to run `kafka-connect`.
```bash
# Create an actionset
$ kanctl create actionset --action backup --namespace kasten-io --blueprint kafka-blueprint --profile kafka/s3-profile-fn64h --objects v1/configmaps/kafka/s3config
```
## Verify the backup
We can verify the backup operation by adding some data to the topic configured earlier

* List all topics in Kafka server
```bash
$ kubectl -n kafka-test run kafka-producer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server=my-cluster-kafka-external-bootstrap:9094 --list
```
* Create a topic on Kafka server
```bash
$ kubectl -n kafka-test run kafka-producer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-topics.sh --create --topic blogpost --bootstrap-server my-cluster-kafka-external-bootstrap:9094
```
* Create a producer to push data to blogpost topic
```bash
$ kubectl -n kafka-test run kafka-producer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-external-bootstrap:9094 --topic blogpost

>{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
>{"title":"ABCD3","year":2000,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
>{"title":"Student of the year","year":2001,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
>{"title":"ABCD","year":2002,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
```
* Check S3 bucket for the topic

## Perform Restore
To perform restore, a pre-hook restore operation is performed which will purge all events from the topics in the Kafka cluster whose backups were performed previously.
```bash

$ kanctl create actionset --action restore --namespace kasten-io --blueprint kafka-blueprint --profile kafka/s3-profile-fn64h --objects v1/configmaps/kafka/s3config

```
**NOTE:**
* Here, the topic must be present in the Kafka cluster
* Before running pre-hook operation, confirm that no other consumer is consuming data from that topic

## Verify restore
Create a consumer for topics
```bash
# Creating a consumer on a different terminal
$ kubectl -n kafka-test run kafka-consumer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-external-bootstrap:9094 --topic blogpost --from-beginning
```
All the messages restored can be viewed.

## Delete Blueprint and Profile CR

```bash
# Delete the blueprint
$ kubectl delete blueprints.cr.kanister.io <blueprint-name> -n kasten-io
# Get the profile
$ kubectl get profiles.cr.kanister.io -n kafka-test
NAME AGE
s3-profile-fn64h 2h
# Delete the profile
$ kubectl delete profiles.cr.kanister.io s3-profile-fn64h -n kafka-test
```

### Troubleshooting

The following debug commands can be used to troubleshoot issues during the backup and restore processes:

Check Kanister controller logs:
```bash
$ kubectl --namespace kasten-io logs -l run=kanister-svc -f
```
Check events of the ActionSet:
```bash
$ kubectl describe actionset <actionset-name> -n kasten-io
```
Check the logs of the Kanister job
```bash
# Get the Kanister job pod name
$ kubectl get pod -n kafka-test

# Check the logs
$ kubectl logs <name-of-pod-running the job> -n kafka-test
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# bootstrap server points to kafka broker address which is static
bootstrap.servers=PLAINTEXT://my-cluster-kafka-external-bootstrap:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/
16 changes: 16 additions & 0 deletions examples/kafka/adobe-s3-connector/adobe-s3-sink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
connector.class=com.spredfast.kafka.connect.s3.sink.S3SinkConnector
tasks.max=4
format=binary
format.include.keys=true
topics=blogs,feeds
# too many records can overwhelm the poll loop on large topics and will result in
# Connect continously rebalancing without making progress
consumer.max.poll.records=500
# Flushing to S3 can take some time, so allow for more than the default 5 seconds when shutting down.
task.shutdown.graceful.timeout.ms=30000
# The converters specify the format of data in Kafka and how to translate it into Connect data
key.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
value.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
s3.prefix=topics
s3.path_style=true
local.buffer.dir=/tmp/kafka-connect-s3.buffer

0 comments on commit b9b654c

Please sign in to comment.