# S3コネクタの登録

[Kafka Connect S3](https://www.confluent.io/connector/kafka-connect-s3/)を Kafka Connectに登録します。

# 事前チェック

「00-111-Kafka Connectのセットアップ.ipynb」でKafka Connectの構築を行ったことを前提としています。

1. 対象となるノードを Ansible で操作できること
1. Kafka Connect を ポート番号 8083 で実行していること

In [None]:
target = 'connector'

In [None]:
!ansible {target} -m ping

In [None]:
!ansible {target} -b -a 'whoami'

Kafka ConnectのURLを指定してください。

In [None]:
url = 'http://172.30.2.20:8083/connectors'

Kafka ConnectのREST APIにアクセスしてみます。

正しくアクセスできた場合は、`<Response [200]>` と表示されます。

In [None]:
import requests
r = requests.get(url)
print(r)
print(r.json())

# 認証情報の設定

AWS S3にアクセスするための認証情報を設定します。

S3コネクタが AWS の認証情報を取得する方法は[AWS 認証情報の使用](https://docs.aws.amazon.com/ja_jp/sdk-for-java/v1/developer-guide/credentials.html)に記述されています。以下の方法があります。

* 環境変数
  – AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
* Java のシステムプロパティ
  - aws.accessKeyId, aws.secretKey
* デフォルトの認証情報プロファイルファイル
  - 通常は `~/.aws/credentials`
* Amazon ECS コンテナの認証情報
* インスタンスプロファイル認証情報

このNotebookではKafka Connectサービスの環境変数にアクセスキー、シークレットキーの値を設定します。

## 認証情報の入力

AWSのアクセスキーを入力してください。

In [None]:
from getpass import getpass
aws_access_key = getpass()

シークレットキーを入力してください。


In [None]:
aws_secret_key = getpass()

## 設定ファイルの更新

Kafka Connectサービスの設定ファイルを更新して、アクセスキー、シークレットキーをサービスの環境変数に設定するようにします。

環境変数を書き込む設定ファイルを作成します。

In [None]:
from tempfile import TemporaryDirectory
from pathlib import Path

In [None]:
with TemporaryDirectory() as work_dir:
    dot_env = Path(work_dir) / '.env'
    with dot_env.open(mode='w') as f:
        f.write(f'''
AWS_ACCESS_KEY_ID={aws_access_key}
AWS_SECRET_ACCESS_KEY={aws_secret_key}
''')
    !ansible {target} -b -m copy -a 'src={dot_env} dest=kafka-connect/'

In [None]:
!ansible {target} -a 'chdir=kafka-connect docker-compose up -d'

# S3 コネクタを登録する

Kafka ConnectにS3コネクタを登録します。登録は Kafka Connectの[REST API](https://docs.confluent.io/current/connect/references/restapi.html)で行います。

## パラメータの設定

Kafkaのトピックを指定してください。

In [None]:
# topics = 'distributed-video1,distributed-video2,distributed-video3'
topics = 'distributed-video1'

S3のリージョンを指定してください。

In [None]:
s3_region = 'ap-northeast-1'

S3のバケット名を指定してください。

In [None]:
s3_bucket = 'nii-dp-test-20190220'

ここまで、入力されたパラメータから、コネクタの登録を行う REST APIの JSON オブジェクトに対応する Python オブジェクトを作成します。

In [None]:
s3_sink = {
    "name": "s3-sink",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "tasks.max": 3,
        "topics": topics,
        "flush.size": "1",
        "schema.compatibility": "NONE",
        "s3.region": s3_region,
        "s3.bucket.name": s3_bucket,
        "partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
        "timestamp.extractor": "Record",
        "partition.duration.ms": 3600000,
        "locale": "C",
        "timezone": "Asia/Tokyo",
    },
}

ここでは Kafka で送られてくる JPEGデータを S3 に格納することを想定して[Write raw message values into S3](https://docs.confluent.io/current/connect/kafka-connect-s3/index.html#write-raw-message-values-into-s3)の設定手順を参考にしました。

> AWS以外の S3互換ストレージを指定する場合は `store.url` に URLを指定すればよいはずです。
> 詳細は[S3 Connector Configuration Options](https://docs.confluent.io/current/connect/kafka-connect-s3/configuration_options.html#storage)を参照してください。

## コネクタの登録

REST APIを実行してS3 コネクタを登録します。

Kafka Connect のURLを確認します。

In [None]:
print(url)

コネクタの登録を行います。

> 登録が成功すると、次のセルの出力に `<Response [201]>`と表示されます。

In [None]:
r = requests.post(url, json=s3_sink)
r

S3コネクタが登録されたことを確認するためにコネクタの一覧を表示します。

In [None]:
r = requests.get(url)
print(r.json())

登録したコネクタを削除する場合は、以下のコードを実行してください。

```
r = requests.delete(f'{url}/{s3_sink["name"]}')
r
```