<div id="singlestore-header" style="display: flex; background-color: rgba(124, 195, 235, 0.25); padding: 5px;">
    <div id="icon-image" style="width: 90px; height: 90px;">
        <img width="100%" height="100%" src="https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/common/images/header-icons/confluent-logo.png" />
    </div>
    <div id="text" style="padding: 5px; margin-left: 10px;">
        <div id="badge" style="display: inline-block; background-color: rgba(0, 0, 0, 0.15); border-radius: 4px; padding: 4px 8px; align-items: center; margin-top: 6px; margin-bottom: -2px; font-size: 80%">SingleStore Notebooks</div>
        <h1 style="font-weight: 500; margin: 8px 0 0 4px;">Ingest data from Confluent Cloud (Kafka)</h1>
    </div>
</div>

<img src=https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/notebooks/confluent-cloud-integration/images/confluent-kafka-integration.png width="100%" />

### Set Up a Kafka Cluster on Confluent Cloud

Before initiating the integration process, it is essential to configure a Kafka cluster on Confluent Cloud. Refer to the <a href="https://docs.confluent.io/cloud/current/get-started/index.html">Quick Start for Confluent Cloud</a> guide for related information.

Once the cluster is created, perform the following tasks:

- Create a topic, for example <b>'s2-topic'</b>. On the topic overview page, select <b>Schema &gt; Set a schema &gt; Avro</b>, and add a new Avro schema. In this guide, the default schema is used.

<img src=https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/notebooks/confluent-cloud-integration/images/kafka-value-schema.png width="100%" />

- Create API keys. The API key is displayed <b>only once</b>. Be sure to copy and securely store the API key.

<img src=https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/notebooks/confluent-cloud-integration/images/confluent-api-key.png width="100%" />

- On the left navigation pane, select <b>Connectors</b> and create a sample producer named <b>'datagen'</b> using the <b>Datagen Source</b> connector. In the <b>Topic selection</b> pane, select the <b>'s2-topic'</b> created earlier. In the <b>Kafka credentials</b> pane, select the <b>Use an existing API key</b> option. Configure the producer to use the same schema as the one in the created topic. Refer to <a href="https://docs.confluent.io/cloud/current/get-started/index.html#step-3-create-a-sample-producer">Step 3: Create a sample producer</a> for more information.
- Launch the <b>'datagen'</b> producer and verify that the <b>'s2-topic'</b> has new messages.

### Set Up Variables

Use the <b>S2_DATABASE_NAME</b>, <b>S2_TABLE_NAME</b>, and <b>S2_PIPELINE_NAME</b> variables for integration.

### Copy Data from Confluent Cloud
- Assign the topic name <b>'s2-topic'</b> to the <b>CONFLUENT_KAFKA_TOPIC_NAME</b> variable.
- Specify the API key and secret using the <b>CONFLUENT_API_KEY</b> and <b>CONFLUENT_API_SECRET</b> variables, respectively.

On the left navigation pane, select <b>Clients</b>. Select a language (for example Java), and configure the following variables:
- <b>CONFLUENT_CLUSTER_BOOTSTRAP_SERVER</b> from <b>bootstrap.servers</b>
- <b>CONFLUENT_SCHEMA_REGISTRY_URL</b> from <b>schema.registry.url</b>

Select <b>Create Schema Registry API key</b> to create a schema API key and configure the following variables:
- <b>CONFLUENT_SCHEMA_REGISTRY_KEY</b>
- <b>CONFLUENT_SCHEMA_REGISTRY_SECRET</b>

In [1]:
S2_DATABASE_NAME = 'confluent_cloud_integration'
S2_TABLE_NAME = 'kafka_events'
S2_PIPELINE_NAME = 'kafka_consumer_pipeline'
CONFLUENT_KAFKA_TOPIC_NAME = 's2-topic'
CONFLUENT_KAFKA_CLIENT_ID = 'cwc|001j000000j7k7bAAA|SingleStore'
CONFLUENT_CLUSTER_BOOTSTRAP_SERVER = 'pkc-xmzwx.europe-central2.gcp.confluent.cloud:9092'
CONFLUENT_API_KEY = 'EAPEIJZDU5KY26X5'
CONFLUENT_API_SECRET = '***************************************'

CONFLUENT_SCHEMA_REGISTRY_URL='https://psrc-9zg5y.europe-west3.gcp.confluent.cloud'
CONFLUENT_SCHEMA_REGISTRY_KEY = '7ALNJUEMWMBIMAQL'
CONFLUENT_SCHEMA_REGISTRY_SECRET = '***************************************'

### Create a Database

In [2]:
%%sql

DROP DATABASE IF EXISTS {{S2_DATABASE_NAME}};
CREATE DATABASE {{S2_DATABASE_NAME}};

<div class="alert alert-block alert-warning">    <b class="fa fa-solid fa-exclamation-circle"></b>    <div>        <p><b>Action Required</b></p>        <p>Be sure to select the <tt>{{S2_DATABASE_NAME}}</tt> database from the drop-down list at the top of this notebook.        It updates the <tt>connection_url</tt> which is used by the <tt>%%sql</tt> magic command and SQLAlchemy to connect to the selected database.</p>    </div></div>

### Create a Table Based on the Kafka Avro Schema

In [3]:
%%sql

DROP PIPELINE IF EXISTS {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}};
DROP TABLE IF EXISTS {{S2_DATABASE_NAME}}.{{S2_TABLE_NAME}};
CREATE TABLE IF NOT EXISTS {{S2_DATABASE_NAME}}.{{S2_TABLE_NAME}} (
`field1` int,
`field2` double,
`field3` text
);

### Create a Kafka Pipeline

<div class="alert alert-block alert-info">    <b class="fa fa-solid fa-exclamation-circle"></b>    <div>        <p><b>Notes</b></p>        <ul><li><p> All Kafka configurations in the pipeline, such as <tt>'client.id'</tt>, are supported since version <tt>8.1.35</tt>.</p></li> <li><p> The schema registry mapping section should be updated according to your schema registry in the  <i>'table column name'</i>  <-  <i>'schema registry field name'</i> format. </p></li>  </ul>    </div></div>

In [4]:
%%sql

DROP PIPELINE IF EXISTS {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}};
CREATE PIPELINE {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}}
AS LOAD DATA KAFKA '{{CONFLUENT_CLUSTER_BOOTSTRAP_SERVER}}/{{CONFLUENT_KAFKA_TOPIC_NAME}}'
CONFIG '{ \"client.id\": \"{{CONFLUENT_KAFKA_CLIENT_ID}}\",\n         \"sasl.username\": \"{{CONFLUENT_API_KEY}}\",\n         \"sasl.mechanism\": \"PLAIN\",\n         \"security.protocol\": \"SASL_SSL\",\n         \"ssl.ca.location\": \"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem\",\n \"schema.registry.username\": \"{{CONFLUENT_SCHEMA_REGISTRY_KEY}}\"}'
CREDENTIALS '{\"sasl.password\": \"{{CONFLUENT_API_SECRET}}\",\n \"schema.registry.password\": \"{{CONFLUENT_SCHEMA_REGISTRY_SECRET}}\"}'
BATCH_INTERVAL 20
DISABLE OFFSETS METADATA GC
INTO TABLE {{S2_TABLE_NAME}}
FORMAT AVRO
SCHEMA REGISTRY '{{CONFLUENT_SCHEMA_REGISTRY_URL}}'
(
field1  <-  my_field1,
field2  <-  my_field2,
field3  <-  my_field3
);

### Test the Created Pipeline

In [5]:
%%sql
TEST PIPELINE  {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}} LIMIT 1;

### Start the Pipeline

In [6]:
%%sql

START PIPELINE {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}};

### Stop the Pipeline

In [7]:
%%sql

STOP PIPELINE {{S2_DATABASE_NAME}}.{{S2_PIPELINE_NAME}};

### View Consumed Events

In [8]:
%%sql

SELECT * FROM {{S2_DATABASE_NAME}}.{{S2_TABLE_NAME}};

<div id="singlestore-footer" style="background-color: rgba(194, 193, 199, 0.25); height:2px; margin-bottom:10px"></div>
<div><img src="https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/common/images/singlestore-logo-grey.png" style="padding: 0px; margin: 0px; height: 24px"/></div>