# MongoDB Sinkコネクタのセットアップ

対象ノードでKafkaコネクタコンテナを起動して、そこにMongoDB Sinkコネクタの登録を行います。

## 前提条件

セットアップ対象となるノードは以下の条件を満たしていることを前提とします。

* Ansible で操作可能なこと
* Dockerコンテナが起動できること
* `docker-compose`コマンドが実行可能なこと

前提条件をみたしていることを確認します。

操作対象となるAnsibleのグループ名を指定してください。

In [None]:
group_name=kafka_connect

Ansibleで接続可能なことを確認します。

In [None]:
ansible ${group_name} -m ping

Dockerが利用可能なことを確認します。

In [None]:
ansible ${group_name} -a 'docker info'

`docker-compose`コマンドが利用可能なことを確認します。

In [None]:
ansible ${group_name} -a 'docker-compose version'

## Kafkaコネクタコンテナの起動

ここでは[Confluent Base Image for Kafka Connect](https://hub.docker.com/r/confluentinc/cp-kafka-connect-base)を利用して、MongoDB コネクタを実行する環境を構築します。起動するコンテナではKafkaコネクタに関する多くのパラメータを設定する必要があるため、コンテナの起動には`docker run`ではなく`docker-compose`コマンドを利用します。



### 資材の配置

コンテナを起動するために必要となる資材を対象ノードに配置します。

配置するファイルを以下に示します。

* docker-compose.yml
* image/Dockerfile

対象となるファイルを、まずローカル環境に作成します。そのための作業ディレクトリを作成します。

In [None]:
work_dir=$(mktemp -d)

`docker-compose.yml`を作成します。主な設定項目を以下に示します。

* Kafkaブローカーのアドレス
* Kafkaコネクタのホスト名、ポート番号
* Kafkaコネクタのグループ名
* Kafkaコネクタの設定内容を記録するトピックに関するパラメータ
    * パラメータの種類
        * コンフィグレーション
        * オフセット
        * ステータス
    * レプリケーションに関するパラメータ
* コンバータ

`docker-compose.yml`の例を以下に示します。

```yaml
version: '3.7'
services:
  kafka-connect:
    build: ./image
    hostname: kafka-connect                                             # Kafkaコネクタのホスト名
    ports:
    - "8083:8083"                                                       # Kafkaコネクタのポート番号
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092    # Kafkaブローカーのアドレス
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect                  # Kafkaコネクタのホスト名
      CONNECT_REST_PORT: 8083                                           # Kafkaコネクタのポート番号
      CONNECT_GROUP_ID: kafka-group-id                                  # Kafkaコネクタのグループ名
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-config                # 保存先となるトピック名(コンフィグレーション)
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(コンフィグレーション)
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offset                # 保存先となるトピック名(オフセット)
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(オフセット)
      CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status                # 保存先となるトピック名(ステータス)
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(ステータス)
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter     # コンバータ
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter   # コンバータ
```

In [None]:
cat > ${work_dir}/docker-compose.yml <<EOF
version: '3.7'
services:
  kafka-connect:
    build: ./image
    hostname: kafka-connect                                             # Kafkaコネクタのホスト名
    ports:
    - "8083:8083"                                                       # Kafkaコネクタのポート番号
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092    # Kafkaブローカーのアドレス
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect                  # Kafkaコネクタのホスト名
      CONNECT_REST_PORT: 8083                                           # Kafkaコネクタのポート番号
      CONNECT_GROUP_ID: kafka-group-id                                  # Kafkaコネクタのグループ名
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-config                # 保存先となるトピック名(コンフィグレーション)
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(コンフィグレーション)
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offset                # 保存先となるトピック名(オフセット)
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(オフセット)
      CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status                # 保存先となるトピック名(ステータス)
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1                      # レプリケーション数(ステータス)
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter     # コンバータ
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter   # コンバータ
EOF

Kafkaコネクタコンテナの `Dockerfile`を作成します。

* `confluentinc/cp-kafka-connect-base:5.4.0` をベースとする
* `mongodb/kafka-connect-mongodb:1.0.0` をインストールする

In [None]:
cat > ${work_dir}/Dockerfile <<EOF
FROM confluentinc/cp-kafka-connect-base:5.4.0

RUN confluent-hub install mongodb/kafka-connect-mongodb:1.0.0 --no-prompt
EOF

対象ノードに配置します。まず配置先のディレクトリを作成します。

In [None]:
ansible ${group_name} -m file -a 'path=connect-mongo state=directory'
ansible ${group_name} -m file -a 'path=connect-mongo/image state=directory'

コンテナを起動するための資材となるファイルを配置します。

In [None]:
ansible ${group_name} -m copy -a "src=${work_dir}/docker-compose.yml dest=connect-mongo/"
ansible ${group_name} -m copy -a "src=${work_dir}/Dockerfile dest=connect-mongo/image/"

ファイルが配置されたことを確認します。

In [None]:
ansible ${group_name} -a 'ls -lR connect-mongo/'

### コンテナの起動

Kafkaコネクタコンテナを起動します。

まず、コンテナイメージをビルドします。

In [None]:
ansible ${group_name} -a 'chdir=connect-mongo/ docker-compose build'

コンテナを起動します。

In [None]:
ansible ${group_name} -a 'chdir=connect-mongo/ docker-compose up -d'

コンテナが正常に起動できたことを確認します。出力結果の`State`が`Up`となっていれば起動が成功したことが確認できます。

```
node001 | CHANGED | rc=0 >>
         Name                        Command                       State                        Ports              
-------------------------------------------------------------------------------------------------------------------
connect_kafka-connect_1   /etc/confluent/docker/run        Up (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp
```

In [None]:
ansible ${group_name} -a 'chdir=connect-mongo/ docker-compose ps'

## MongoDB Sinkコネクタの登録

KafkaコネクタのREST APIを用いてMongoDB Sinkコネクタを登録します。

コネクタを登録するには、以下のパラメータを指定する必要があります。

* Kafkaコネクタ名
* コネクタの対象となるKafkaのトピック名
* MongoDBのアドレス
* MongoDBのデータベース名
* MongoDBのコレクション名

パラメータはJSON形式で指定します。パラメータの例を以下に示します。

```json
{
  "name": "mongodb-sink",
  "config": {
    "name": "mongodb-sink",
    "topics": "topic-name",
    "connection.uri": "mongodb://mongodb:27017",
    "database": "database_name",
    "collection": "collection_name",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector"
  }
}
```

KafkaコネクタのREST APIのアドレスを指定してください。

In [None]:
rest_url=http://node001:8083

REST APIを用いてMongoDB Sinkコネクタを登録します。

In [None]:
curl -X POST ${rest_url}/connectors -H 'Content-Type:application/json' -d @- <<EOF
{
  "name": "mongodb-sink",
  "config": {
    "name": "mongodb-sink",
    "topics": "topic-name",
    "connection.uri": "mongodb://mongodb:27017",
    "database": "database_name",
    "collection": "collection_name",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector"
  }
}
EOF

登録されたことを確認します。

In [None]:
curl -s ${rest_url}/connectors/mongodb-sink | jq .

## 後始末

作業ディレクトリの削除します。

In [None]:
rm -rf ${work_dir}