# Elastic sink connector

sources:
- https://docs.confluent.io/platform/current/connect/references/restapi.html

This notebook explain how to connect an elasticsearch **index** to a Kafka **topic**.

Pre-requisites:
- elasticsearch, kafka(-connect) up and running
- elastic-sink-connector.jar downloaded inside the **project/connectors** directory.


## Setup
We want to communicate and send json to the kafka-connect container via its REST API. We nee **requests** and **json** modules.

In [1]:
import requests
import json

connect_url = "http://kafka-connect:8083"  # URL to your Kafka Connect REST API
# headers for the http requests
headers = {"Content-Type": "application/json; charset=utf-8",
           "Accept": "application/json"
          }

## List connectors

In [2]:
import requests
import json

connect_url = "http://kafka-connect:8083"  # URL to your Kafka Connect REST API
# headers for the http requests
headers = {"Content-Type": "application/json; charset=utf-8",
           "Accept": "application/json"
          }
# request
response = requests.get(f'{connect_url}/connectors', headers=headers)

# Check the response
if response.status_code == 200:
    print(f"List of the connectors: {response.text}.")
else:
    print(f"Failed to list connectors: {response.status_code} - {response.text}")

List of the connectors: [].


## Connector configuration

We need to write the specifications of the connection between the **topic** and the **index**.

You can link multiple topics.

In [3]:
connector_name = input('Enter a name for the connector: ')
topic_name = input('Enter the topic names (separated by a comma): ')
index_name = input('Enter the index name: ')

connector_config ={
  "name": connector_name, # name of the connector
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", # choose your connector
    "tasks.max": "1",  # max number of tasks at the same time
    "topics": topic_name, # names of the topics "topic-1, topic-2, ..."
    "key.ignore": "true", # should it take the key in consideration or not
    "schema.ignore": "true", # should it ignore the schema ?
    "connection.url": "http://elasticsearch:9200", # url of the elasticsearch service
    "type.name": "_doc", # the identifier to search for a document ex /my-index/_doc/id
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "index.name": index_name
  }
}

Enter a name for the connector:  meteo
Enter the topic names (separated by a comma):  meteo
Enter the index name:  meteo


## Create an index
It is important to set a mapping inside elasticsearch order to have the timestamp and geopoint available inside our index.

In [7]:
import requests

# Replace with your Elasticsearch host and port
es_host = 'http://elasticsearch:9200'

# Replace with your desired index name
index_name = input('index name: ')

# Define the index mapping
index_mapping = {
    "mappings": {
        "properties": {
            "timestamp": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
            },
            "temperature": {
                "type": "float"
            },
            "humidity": {
                "type": "float"
            },
            "pressure": {
                "type": "float"
            },
            "sensor_id": {
                "type": "integer"
            },
            "location": {
                "type": "geo_point"
            }
        }
    }
}

# Create the index with mapping
index_url = f"{es_host}/{index_name}"
response = requests.put(index_url, json=index_mapping)

# Check the response status
if response.status_code == 200:
    print(f"Index '{index_name}' created successfully.")
else:
    print(f"Failed to create index. Status code: {response.status_code}, Response: {response.text}")


index name:  meteo


Failed to create index. Status code: 400, Response: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters:  [mappings : {properties={sensor_id={type=integer}, temperature={type=float}, humidity={type=float}, location={type=geo_point}, pressure={type=float}, timestamp={format=yyyy-MM-dd HH:mm:ss.SSSSSS, type=date}}}]"}],"type":"mapper_parsing_exception","reason":"Failed to parse mapping: Root mapping definition has unsupported parameters:  [mappings : {properties={sensor_id={type=integer}, temperature={type=float}, humidity={type=float}, location={type=geo_point}, pressure={type=float}, timestamp={format=yyyy-MM-dd HH:mm:ss.SSSSSS, type=date}}}]","caused_by":{"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters:  [mappings : {properties={sensor_id={type=integer}, temperature={type=float}, humidity={type=float}, location={type=geo_point}, pressure={type=float}, timestamp={format=yy

## Create the connector
You should first create an  index with a correct mapping.
Let's inject the configuration dict to kafka-connect via a http **POST** requests at the **/connectors/** level to connect the **topic** and the **index**.

In [4]:
# Create the connector
response = requests.post(f"{connect_url}/connectors/",
                         headers=headers,
                         data=json.dumps(connector_config))

# Check the response
if response.status_code == 201:
    print(f"Connector '{connector_name}' created successfully.")
else:
    print(f"Failed to create connector: {response.status_code} - {response.text}")


Connector 'meteo' created successfully.


## Describe a connector

In [5]:
# asking for a connector name
connector_name = input('Enter the name of the connector: ')

# request
response = requests.get(f'{connect_url}/connectors/{connector_name}', headers=headers)

# Check the response
if response.status_code == 200:
    print(f"Description {connector_name}: {response.text}.")
else:
    print(f"Failed to list connectors: {response.status_code} - {response.text}")

Enter the name of the connector:  meteo


Description meteo: {"name":"meteo","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name":"_doc","index.name":"meteo","tasks.max":"1","topics":"meteo","value.converter.schemas.enable":"false","name":"meteo","key.ignore":"true","connection.url":"http://elasticsearch:9200","value.converter":"org.apache.kafka.connect.json.JsonConverter","schema.ignore":"true"},"tasks":[{"connector":"meteo","task":0}],"type":"sink"}.


## Delete a connector

In [18]:
# asking for a connector name
connector_name = input('Enter the name of the connector: ')

# request
response = requests.delete(f'{connect_url}/connectors/{connector_name}', headers=headers)

# Check the response
if response.status_code == 204:
    print(f"Connector {connector_name} deleted.")
else:
    print(f"Failed to delete connector: {response.status_code} - {response.text}")

Enter the name of the connector:  meteo


Connector meteo deleted.
