Skip to content

Latest commit

 

History

History

bidirectional-link

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

Bi-directional cluster link

Start the clusters

    docker-compose up -d

Two CP clusters are running:

Create the topic product and the schema product-value in the both clusters

Create the schema product-value

    curl -v -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @data/product.avsc http://localhost:8085/subjects/product-value/versions
    curl -v -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @data/product.avsc http://localhost:8086/subjects/product-value/versions     

Create the topic product

    docker-compose exec leftKafka kafka-topics --bootstrap-server leftKafka:19092 --topic product --create --partitions 1 --replication-factor 1
    docker-compose exec rightKafka kafka-topics --bootstrap-server rightKafka:29092 --topic product --create --partitions 1 --replication-factor 1

Create the schema linking

Create schema linking from left to right

  1. Create config file
    docker-compose exec leftSchemaregistry bash -c '\
    echo "schema.registry.url=http://rightSchemaregistry:8086" > /home/appuser/config.txt'
  1. Create the schema exporter
    docker-compose exec leftSchemaregistry bash -c '\
    schema-exporter --create --name left-to-right-sl --subjects "product-value" \
    --config-file ~/config.txt \
    --schema.registry.url http://leftSchemaregistry:8085 \
    --subject-format "left.product-value" \
    --context-type NONE'
  1. Validate exporter is working
    docker-compose exec leftSchemaregistry bash -c '\
    schema-exporter --list \
    --schema.registry.url http://leftSchemaregistry:8085'
  1. Check the exporter is running
    docker-compose exec leftSchemaregistry bash -c '\
    schema-exporter --get-status --name left-to-right-sl --schema.registry.url http://leftSchemaregistry:8085' | jq
  1. Check the schema is the same in the left and right cluster (right cluster has the schema with a different name)
    curl http://localhost:8085/subjects/product-value/versions/1 | jq
    curl http://localhost:8086/subjects/left.product-value/versions/1 | jq

Create schema linking from right to left

  1. Create config file
    docker-compose exec rightSchemaregistry bash -c '\
    echo "schema.registry.url=http://leftSchemaregistry:8085" > /home/appuser/config.txt'
  1. Create the schema exporter
    docker-compose exec rightSchemaregistry bash -c '\
    schema-exporter --create --name right-to-left-sl --subjects "product-value" \
    --config-file ~/config.txt \
    --schema.registry.url http://rightSchemaregistry:8086 \
    --subject-format "right.product-value" \
    --context-type NONE'
  1. Validate exporter is working
    docker-compose exec rightSchemaregistry bash -c '\
    schema-exporter --list \
    --schema.registry.url http://rightSchemaregistry:8086'
  1. Check the exporter is running
    docker-compose exec rightSchemaregistry bash -c '\
    schema-exporter --get-status --name right-to-left-sl --schema.registry.url http://rightSchemaregistry:8086' | jq
  1. Check the schema is the same in the left and right cluster (right cluster has the schema with a different name)
    curl http://localhost:8086/subjects/product-value/versions/1 | jq
    curl http://localhost:8085/subjects/right.product-value/versions/1 | jq

Create the cluster linking

Create cluster linking from left to right

  1. Create config file to configure the cluster linking
docker-compose exec rightKafka bash -c '\
echo "\
bootstrap.servers=leftKafka:19092
link.mode=BIDIRECTIONAL
cluster.link.prefix=left.
consumer.offset.sync.enable=true
" > /home/appuser/cl.properties'

docker-compose exec rightKafka bash -c '\
echo "{\"groupFilters\": [{\"name\": \"*\",\"patternType\": \"LITERAL\",\"filterType\": \"INCLUDE\"}]}" > /home/appuser/cl-offset-groups.json'
  1. Create the cluster link on the destination cluster. We are using some extra configuration options.
    docker-compose exec rightKafka \
    kafka-cluster-links --bootstrap-server rightKafka:29092 \
    --create --link bidirectional-link \
    --config-file /home/appuser/cl.properties \
    --consumer-group-filters-json-file /home/appuser/cl-offset-groups.json
  1. Create the mirroring
    docker-compose exec rightKafka \
    kafka-mirrors --create \
    --source-topic product \
    --mirror-topic left.product \
    --link bidirectional-link \
    --bootstrap-server rightKafka:29092        
  1. Verifying cluster linking is up
    docker-compose exec rightKafka \
    kafka-cluster-links --bootstrap-server rightKafka:29092 --link  bidirectional-link --list

Output is similar to Link name: 'bidirectional-link', link ID: 'AMw2VoNJRya3CgMHGRnwIQ', remote cluster ID: 'JZOncuJyRQqaQ_qt8Mi_UA', local cluster ID: '_-6HdK0dS_S6Z9xZO8usOg', remote cluster available: 'true', link state: 'ACTIVE'

Create cluster linking from right to left

  1. Create config file to configure the cluster linking
docker-compose exec leftKafka bash -c '\
echo "\
bootstrap.servers=rightKafka:29092
link.mode=BIDIRECTIONAL
cluster.link.prefix=right.
consumer.offset.sync.enable=true
" > /home/appuser/cl2.properties'

docker-compose exec leftKafka bash -c '\
echo "{\"groupFilters\": [{\"name\": \"*\",\"patternType\": \"LITERAL\",\"filterType\": \"INCLUDE\"}]}" > /home/appuser/cl2-offset-groups.json'
  1. Create the cluster link on the destination cluster. We are using some extra configuration options.
    docker-compose exec leftKafka \
    kafka-cluster-links --bootstrap-server leftKafka:19092 \
    --create --link bidirectional-link \
    --config-file /home/appuser/cl2.properties \
    --consumer-group-filters-json-file /home/appuser/cl2-offset-groups.json
  1. Create the mirroring
    docker-compose exec leftKafka \
    kafka-mirrors --create \
    --source-topic product \
    --mirror-topic right.product \
    --link bidirectional-link \
    --bootstrap-server leftKafka:19092
  1. Verifying cluster linking is up
    docker-compose exec leftKafka \
    kafka-cluster-links --bootstrap-server leftKafka:19092 --link bidirectional-link --list

Output is similar to Link name: 'bidirectional-link', link ID: 'AMw2VoNJRya3CgMHGRnwIQ', remote cluster ID: '_-6HdK0dS_S6Z9xZO8usOg', local cluster ID: 'JZOncuJyRQqaQ_qt8Mi_UA', remote cluster available: 'true', link state: 'ACTIVE'

Checking the link is the same

Check again the created links

 docker-compose exec leftKafka \
    kafka-cluster-links --bootstrap-server leftKafka:19092  --link bidirectional-link --list
docker-compose exec rightKafka \
    kafka-cluster-links --bootstrap-server rightKafka:29092 --link  bidirectional-link --list

Verifying the results:

  • Link name: 'bidirectional-link' -> same name for both results, when the links were created, the same name was given to them
  • link ID: 'AMw2VoNJRya3CgMHGRnwIQ' -> same id in both links
  • remote cluster ID: '_-6HdK0dS_S6Z9xZO8usOg' and 'JZOncuJyRQqaQ_qt8Mi_UA' are shown in a crossed way
  • local cluster ID: 'JZOncuJyRQqaQ_qt8Mi_UA' and '_-6HdK0dS_S6Z9xZO8usOg' are shown in a crossed way

Test time!

Active-active

  1. Producer produces to left cluster
   docker-compose exec leftSchemaregistry kafka-avro-console-producer \
    --bootstrap-server leftKafka:19092 \
    --topic product \
    --property value.schema.id=1 \
    --property schema.registry.url=http://leftSchemaregistry:8085 \
    --property auto.register=false \
    --property use.latest.version=true

    { "product_id": 1, "product_name" : "riceLeft"} 
    { "product_id": 2, "product_name" : "beansLeft"} 
  1. Consumer consumes from left cluster
    docker-compose exec leftSchemaregistry \
        kafka-avro-console-consumer --bootstrap-server leftKafka:19092 \
        --property schema.registry.url=http://leftSchemaregistry:8085 \
        --group left-group \
        --from-beginning \
        --include ".*product" \
        --property print.timestamp=true \
        --property print.offset=true \
        --property print.partition=true \
        --property print.headers=true \
        --property print.key=true \
        --property print.value=true
  1. Producer produces to right cluster
   docker-compose exec rightSchemaregistry kafka-avro-console-producer \
    --bootstrap-server rightKafka:29092 \
    --topic product \
    --property value.schema.id=1 \
    --property schema.registry.url=http://rightSchemaregistry:8086 \
    --property auto.register=false \
    --property use.latest.version=true

    { "product_id": 1, "product_name" : "riceRight"} 
    { "product_id": 2, "product_name" : "beansRight"} 
  1. Consumer consumes from right cluster
    docker-compose exec rightSchemaregistry \
        kafka-avro-console-consumer --bootstrap-server rightKafka:29092 \
        --property schema.registry.url=http://rightSchemaregistry:8086 \
        --group right-group \
        --from-beginning \
        --include ".*product" \
        --property print.timestamp=true \
        --property print.offset=true \
        --property print.partition=true \
        --property print.headers=true \
        --property print.key=true \
        --property print.value=true

disaster mode

    docker-compose exec leftSchemaregistry \
        kafka-avro-console-consumer --bootstrap-server leftKafka:19092 \
        --property schema.registry.url=http://leftSchemaregistry:8085 \
        --group disaster-group \
        --from-beginning \
        --include ".*product" \
        --property print.timestamp=true \
        --property print.offset=true \
        --property print.partition=true \
        --property print.headers=true \
        --property print.key=true \
        --property print.value=true
docker-compose exec rightSchemaregistry \
        kafka-avro-console-consumer --bootstrap-server rightKafka:29092 \
        --property schema.registry.url=http://rightSchemaregistry:8086 \
        --group disaster-group \
        --from-beginning \
        --include ".*product" \
        --property print.timestamp=true \
        --property print.offset=true \
        --property print.partition=true \
        --property print.headers=true \
        --property print.key=true \
        --property print.value=true

Dummy mode (same consumer group on both sides)

  • data is sync'ed
  • offsets are not sync'ed fast enough