Skip to content

my-study-area/estudo-kafka-connect

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

estudo-kafka-connect

GitHub top language Made by Repository size GitHub last commit

Projetos de estudo de kafka-connect

Iniciando

Twelve Days of SMT

Twelve Days of SMT 🎄 - Day 10: ReplaceField

# cria um conector para criar dados automáticos no tópico source-voluble-datagen-day10-00
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
    -d '{
        "connector.class"                              : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day10-transactions.with"                : "#{Internet.uuid}",
        "genv.day10-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day10-transactions.units.with"           : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day10-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day10-transactions.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day10-transactions.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "genv.day10-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day10-transactions.item.with"            : "#{Beer.name}",
        "topic.day10-transactions.throttle.ms"         : 1000
    }'

# consome as mensagem deserializando usando schema-registry e avro
docker-compose exec kafkacat sh -c "kafkacat -b broker:29092 -t day10-transactions \
-s value=avro -r http://schema-registry:8081 -C -J -e | jq [.payload]"

# cria conector para salvar dados no mysql usando blacklist
curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-01/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-transactions",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "dropCC",
      "transforms.dropCC.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.dropCC.blacklist": "cc_num,cc_exp,card_type"
      }'

# conecta no mysql e mostra a estrutura da tabela sem os campos do blacklist
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
desc `day10-transactions`;

# cria conector para salvar dados no mysql usando whitelist
curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-day10-00/config \
  -H "Content-Type: application/json" -d '{
    "connector.class"                  : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"                   : "jdbc:mysql://mysql:3306/demo",
    "connection.user"                  : "mysqluser",
    "connection.password"              : "mysqlpw",
    "topic.prefix"                     : "day10-",
    "poll.interval.ms"                 : 10000,
    "tasks.max"                        : 1,
    "table.whitelist"                  : "production_data",
    "mode"                             : "bulk",
    "transforms"                       : "selectFields",
    "transforms.selectFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.selectFields.whitelist": "item,cost,units,txn_date"
  }'

# cria conector para salvar dados no mysql renomeando campos
curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-02/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-transactions",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "renameTS",
      "transforms.renameTS.type"   : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameTS.renames": "txn_date:transaction_timestamp"
      }'

Para instalar os conectores manualmente basta realizar o download do conector e criar um Dockerfile copiando a pasta lib e o arquivo manifest.json no diretório /usr/share/confluent-hub-components/<NOME-CONECTOR> dentro do container

Twelve Days of SMT 🎄 - Day 1: InsertField (timestamp)

# inicia os containers
dockercompose up -d

# cria um conector para gerar dados automáticos no tópico customers e transacations
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-00/config \
    -d '{
        "connector.class"                      : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.customers.with"                 : "#{Internet.uuid}",
        "genv.customers.name.with"             : "#{HitchhikersGuideToTheGalaxy.character}",
        "genv.customers.email.with"            : "#{Internet.emailAddress}",
        "genv.customers.location->city.with"   : "#{HitchhikersGuideToTheGalaxy.location}",
        "genv.customers.location->planet.with" : "#{HitchhikersGuideToTheGalaxy.planet}",
        "topic.customers.records.exactly"      : 10,

        "genkp.transactions.with"                : "#{Internet.uuid}",
        "genv.transactions.customer_id.matching" : "customers.key",
        "genv.transactions.cost.with"            : "#{Commerce.price}",
        "genv.transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.transactions.item.with"            : "#{Beer.name}",
        "topic.transactions.throttle.ms"         : 500
    }'

# lista os tópicos
docker-compose exec kafkacat sh -c "kafkacat -b broker:29092 -L | grep topic"

# consume as mensagens do tópico transactions criado no conector source-voluble-datagen-00
# obs: os valores do payload estarão serializados
docker-compose exec kafkacat sh -c "kafkacat -b broker:29092 -t transactions -C -J | jq"

# consome as mensagem deserializando usando schema-registry e avro
docker-compose exec kafkacat sh -c "kafkacat -b broker:29092 -t transactions \
-s value=avro -r http://schema-registry:8081 -C -J | jq"

# cria conector jdbc para alimentar o mysql
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true"
        }'

# acessa o container mysql com o usuário root
# descreve a estrutura da tabela
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
desc transactions;

// resposta
+-------------+------+------+-----+---------+-------+
| Field       | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| customer_id | text | YES  |     | NULL    |       |
| cost        | text | YES  |     | NULL    |       |
| item        | text | YES  |     | NULL    |       |
| card_type   | text | YES  |     | NULL    |       |
+-------------+------+------+-----+---------+-------+

# atualiza conector adicionando coluna messageTS
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true",
          "auto.evolve"         : "true",
          "transforms"                         : "insertTS",
          "transforms.insertTS.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field": "messageTS"
        }'

# acessa o container mysql com o usuário root
# descreve a estrutura da tabela
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
desc transactions;

//resposta
+-------------+-------------+------+-----+---------+-------+
| Field       | Type        | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| customer_id | text        | YES  |     | NULL    |       |
| cost        | text        | YES  |     | NULL    |       |
| item        | text        | YES  |     | NULL    |       |
| card_type   | text        | YES  |     | NULL    |       |
| messageTS   | datetime(3) | YES  |     | NULL    |       |
+-------------+-------------+------+-----+---------+-------+

# cria o bucket 
aws --endpoint-url=http://localhost:4566 s3 mb s3://rmoff-smt-demo-01

# lista os buckets criados
aws s3api list-buckets --query "Buckets[].Name" \
--endpoint-url=http://localhost:4566

# cria conector com s3
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
    -d '{
          "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
          "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
          "store.url" : "http://localstack:4566",
          "s3.region"              : "us-west-2",
          "s3.bucket.name"         : "rmoff-smt-demo-01",
          "topics"                 : "customers,transactions",
          "tasks.max"              : "4",
          "flush.size"             : "16",
          "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility"   : "NONE",
          "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
        }'

# lista todos os arquivo no s3
aws --endpoint-url=http://localhost:4566 s3 ls s3://rmoff-smt-demo-01 \
--recursive --human-readable --summarize

# lista todos os arquivos num bucket s3
aws --endpoint-url=http://localhost:4566 s3api list-objects --bucket rmoff-smt-demo-01 \
--output text --query "Contents[].{Key: Key}"

Para acessar as mensagens criadas pelo conector source-voluble-datagen-00 (io.mdrogalis.voluble.VolubleSourceConnector), acesse http://localhost:9021/ > Menu lateral Topics > click no tópico transactions e click em Messages

Curso: Kafka Connect 101 (Confluent)

# inicia container
docker-compose up -d

# cria connector datagen
curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

# verifica se o conector está executando
curl -s http://localhost:8083/connectors/datagen_local_01/status

# lê as mensagens do tópico pageviews
docker-compose exec connect kafka-avro-console-consumer \
 --bootstrap-server broker:9092 \
 --property schema.registry.url=http://schema-registry:8081 \
 --topic pageviews \
 --property print.key=true \
 --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
 --property key.separator=" : " \
 --max-messages 10

 # verifica o stack trace utilizando curl e jq
 curl -s "http://localhost:8083/connectors/source-debezium-orders-00/status"
| jq '.tasks[0].trace'

# logs do serviço connect via docker-compose
docker-compose logs -f connect

# exemplo de alteração do level de log sem reiniciar conector
curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/io.debezium \
    -d '{"level": "TRACE"}'

# verifica o status de um conector e filtra a saída
# exibindo o nome e o status usando jq
curl -v http://localhost:8083/connectors/transform2/status | \
jq -c '[.name, .tasks[].state]'

# exemplo de conector com dead letter queue
# gravando o motivo do erro, no header da mensagem
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_03",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file": "/data/file_sink_03.txt",
                "errors.tolerance": "all",
                "errors.deadletterqueue.topic.name":"dlq_file_sink_03",
                "errors.deadletterqueue.topic.replication.factor": 1,
                "errors.deadletterqueue.context.headers.enable":true
                }
        }'

# exemplo de conector, exibindo o motivo do erro no log do worker
# "errors.log.enable":true -> habilita o log
# "errors.log.include.messages":true -> habilita metadata no log
#
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_04",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file": "/data/file_sink_04.txt",
                "errors.tolerance": "all",
                "errors.log.enable":true,
                "errors.log.include.messages":true
                }
        }'

# verifica a versão e id do cluster
curl -v http://localhost:8083

# lista os plugins de conectores instalados
curl -v http://localhost:8083/connector-plugins

# lista os conectores
curl -v http://localhost:8083/connectors/

# lista as configurações do conector
curl -v http://localhost:8083/connectors/file_sink_04/config 

Fonte: Kafka Connect 101

Introduction to Kafka Connectors

# entra no diretório
cd baeldung

# inicia os containers
docker-compose up -d

# acessa container do kafka-connect
docker-compose run --rm kafka-connect bash

# adiciona arquivo para o source connector
echo -e "foo\nbar\n" > ./test.txt

# inicia worker
connect-standalone \
  ./connect-standalone.properties \
  ./connect-file-source.properties \
  ./connect-file-sink.properties

# inicia outro terminal do bash do kafka-connect
docker-compose run --rm kafka-connect bash

# consome as mensagens lançadas pelo worker
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic connect-test --from-beginning

# resposta esperada
# {"schema":{"type":"string","optional":false},"payload":"foo"}
# {"schema":{"type":"string","optional":false},"payload":"bar"}

# inicia o kafka-connect no modo distribuído
connect-distributed connect-distributed.properties

# adiciona um connector source
curl -d @connect-file-source.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors -v

# adiciona um connector sink via api
curl -d @connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

# consome as mensagens do tópico connect-distributed
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic connect-distributed --from-beginning

# deleta os connectors via api
curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

# inicia kafka connect com configuração para transformar dados
connect-distributed connect-distributed-transformer.properties

# adiciona um connector source com transformação de dados
curl -v -d @data/connect-file-source-transform.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

# cria consumidor para o tópico connect-transformation
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic connect-transformation --from-beginning

#  acessa container do kafka-connect
# instala conector do confluent-hub 
docker-compose run --rm kafka-connect bash
confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

connect-file-source.json:

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-distributed.txt",
        "topic": "connect-distributed"
    }
}

connect-file-sink.json:

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-distributed.sink.txt",
        "topics": "connect-distributed"
    }
}

fonte: Introduction to Kafka Connectors

Projeto Kafka Connect: Integração entre sistemas (MySQL /Elasticsearch)

# entra no diretório
cd full-cycle

# inicia os containers
docker-compose up -d

# Em caso de erro no elastic search
# [1]: max virtual memory areas vm.max_map_count [65530] is too low, 
# increase to at least [262144]
sudo echo "vm.max_map_count=262144" >>  /etc/sysctl.conf 
sudo sysctl -w vm.max_map_count=262144 # vm.max_map_count=262144

# acessa banco de dados products no mysql
mysql -u root -p products -h 127.0.0.1 -P 33600

# cria a tabela product
create table products(id int, name varchar(255));

# insere registro na tabela products
insert into products values(1, "carro");

# seleciona os registros da tabela products com 
# visualização por linha
select * from products order by id desc limit 10\G;
*************************** 1. row ***************************
  id: 2
name: bicicleta
*************************** 2. row ***************************
  id: 1
name: carro
2 rows in set (0.00 sec)

Para utilizar o control-center no navegador acesse http://localhost:9021/

Para utilizar o Kibana acesse http://localhost:5601/

fonte: Kafka Connect: Integração entre sistemas (MySQL /Elasticsearch)

Api

curl -v http://localhost:8083/connector-plugins

Instalação manual de Kafka e conector

Kafka

# realiza o download
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz

# descompacta o arquivo
tar -xzf kafka_2.13-2.8.1.tgz

# deleta arquivo tgz
rm kafka_2.13-2.8.1.tgz

# entra no repositório
cd kafka_2.13-2.8.1

# adiciona o listener para o kafka
echo 'advertised.listeners=PLAINTEXT://localhost:9092' >> config/server.properties

# inicia o Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# inicia o Kafka
bin/kafka-server-start.sh config/server.properties

# comando nc verificando a comunicação com a porta 9092
nc -vz localhost 9092

# cria uma tópico
bin/kafka-topics.sh --create --topic teste1 --bootstrap-server \
localhost:9092 --partitions 3 --replication-factor 2

# cria um producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic teste1

# cria um consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic teste1 --from-beginning

Conector Kafka

Para instalarmos um conector é necessário que o Kafka já esteja inicializado.

Na instalação manual é necessário ter o arquivo jar e todas as suas dependências. Uma forma de conseguir é os plugins é no Confluent Hub ou na comunidade, através do Github. No exemplo abaixo temos o exemplo do conector faker-events-connector e sua estrutura.

/opt/connectors
└── faker-events-connector
    ├── doc
    │   ├── LICENSE
    │   └── README.md
    ├── lib
    │   ├── automaton-1.11-8.jar
    │   ├── commons-lang3-3.5.jar
    │   ├── faker-events-connector-0.1.0.jar
    │   ├── generex-1.0.2.jar
    │   ├── gson-2.9.0.jar
    │   ├── javafaker-1.0.2.jar
    │   ├── logback-classic-1.2.10.jar
    │   ├── logback-core-1.2.10.jar
    │   ├── slf4j-api-1.7.32.jar
    │   └── snakeyaml-1.23-android.jar
    └── manifest.json

3 directories, 13 files

No exemplo acima o plugin está na pasta /opt/connectors, mas poderia estar em qualquer outra pasta. Para definir o diretório dos plugin devemos utilizar adicionar a linha plugin.path=/opt/connectors no arquivo connect-distributed.properties. Para isso podemos executar o seguinte comando: echo "plugin.path=/opt/connectors" >> connect-distributed.properties.

Agora é a vez de criarmos o arquivo de configuração do plugin (./exemplos/basic-example.json):

{
    "name": "basic-example",
    "config": {
        "connector.class": "br.com.fec.FecConnector",
        "topic.name": "topico2",
        "message.value": "#{Name.full_name}",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "key.converter.schemas.enable": false,
        "value.converter.schemas.enable":false
    }
}

Com o Kafka inicializado, arquivo de configuração do Kafka connect atualizado e configuração do plugin podemos executar os seguintes comandos:

# inicia o conector
bin/connect-distributed.sh config/connect-distributed.properties

# cria plugin faker-events-connector
curl -X POST -H "Content-Type:application/json" -d @exemplos/basic-example.json http://localhost:8083/connectors

# lista os conectores
curl -v http://localhost:8083/connectors

# lista os plugins instalados
curl -v http://localhost:8083/connector-plugins

# consome os eventos criados pelo plugin
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topico2 --from-beginning \
--property print.key=true

# deleta o plugin faker-events-connector
curl -X DELETE http://localhost:8083/connectors/basic-example

Fontes:

Links

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published