
# 📘 Enterprise Use Case — Day 1  
**Oracle → Debezium → Kafka → HDFS**

Panduan ini berisi langkah-langkah praktikum **Enterprise CDC pipeline**.  
Semua perintah dijalankan di **Terminal**, bukan langsung di notebook.  
Notebook ini hanya sebagai **panduan dokumentasi**.

---

## 🔷 Topology
Berikut adalah arsitektur alur data yang akan dibangun:

![](img/ora_debezium_hdfs.svg)



## 1) Prerequisites & Environment

- **Cluster**: CDP/HDP production-like cluster dengan Kerberos & TLS.  
- **Access**: Pastikan sudah mendapat Kerberos ticket (`kinit`).  
- **Tools**: Kafka CLI, HDFS CLI, Spark 3, dan akses SMM.  
- **Security**: JAAS & truststore sudah tersedia di node.

📌 Semua variabel seperti `<username>` ganti sesuai user trainee masing-masing.



## 2) Client Security Setup

Buka Terminal, jalankan perintah ini untuk set JAAS config:

```bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
```



## 3) Create Topic (Enterprise Standard)

Gunakan **naming convention**: `<env>.<domain>.<usecase>.demo.<username>`

Contoh perintah:

```bash
kafka-topics --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093   --command-config /tmp/producer_non_jaas.properties   --create --if-not-exists   --topic dev.djp.test-topic.<username>   --partitions 3   --replication-factor 1
```

Cek daftar topic:

```bash
kafka-topics --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093   --command-config /tmp/producer_non_jaas.properties   --list
```



## 4) Test Producer & Consumer

Buka **2 terminal**.  
Terminal pertama jalankan producer:

```bash
kafka-console-producer --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093   --producer.config /tmp/producer_non_jaas.properties   --topic dev.djp.test-topic.<username>


kafka-console-producer --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093   --producer.config /tmp/producer_non_jaas.properties   --topic dev.djp.test-topic.irfan
```
input text dengan isi bebas.

Terminal kedua jalankan consumer:

```bash
kafka-console-consumer --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093   --consumer.config /tmp/consumer_non_jaas.properties   --topic dev.djp.test-topic.irfan   --from-beginning
```



## 5) Create Debezium Oracle CDC Connector

Masuk ke **SMM → Connectors → Add Connector → Oracle (Debezium)**.  
Isi parameter berikut (sesuaikan `<username>`):


### Debezium Oracle CDC Connector Config

```properties
name=oracle-cdc-<username>
topic.prefix=trainee01

# Connector class
connector.class=io.debezium.connector.oracle.OracleConnector

# Consumer overrides
consumer.override.sasl.kerberos.service.name=kafka
consumer.override.sasl.mechanism=GSSAPI
consumer.override.security.protocol=SASL_SSL
consumer.override.ssl.truststore.location=/tmp/kafka-truststore.jks
consumer.override.ssl.truststore.password=changeit

# Database connection
database.connection.adapter=logminer
database.dbname=XE
database.hostname=djp-training-sbox.dla-dataplatform.internal
database.port=1521
database.pdb.name=XEPDB1
database.server.name=ora1
database.user=C##CDC
database.password=Cdc123!
database.include.list=
include.schema.changes=false
schema.include.list=CDC
table.include.list=CDC.DEMO

# Database history (consumer)
database.history.consumer.sasl.kerberos.service.name=kafka
database.history.consumer.sasl.mechanism=GSSAPI
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.ssl.truststore.location=/tmp/kafka-truststore.jks
database.history.consumer.ssl.truststore.password=changeit

# Database history (producer)
database.history.producer.sasl.kerberos.service.name=kafka
database.history.producer.sasl.mechanism=GSSAPI
database.history.producer.security.protocol=SASL_SSL
database.history.producer.ssl.truststore.location=/tmp/kafka-truststore.jks
database.history.producer.ssl.truststore.password=changeit

# Database history Kafka
database.history.kafka.bootstrap.servers=djp-training-sbox.dla-dataplatform.internal:9093
database.history.kafka.topic=ora1.schema.history

# Producer overrides
producer.override.sasl.kerberos.service.name=kafka
producer.override.sasl.mechanism=GSSAPI
producer.override.security.protocol=SASL_SSL
producer.override.ssl.truststore.location=/tmp/kafka-truststore.jks
producer.override.ssl.truststore.password=changeit

# General
log.mining.strategy=online_catalog
snapshot.mode=initial
security.protocol=SASL_SSL
tasks.max=1
zookeeper.connect=djp-training-sbox.dla-dataplatform.internal:2181/kafka
secret.properties=database.password

✅ Setelah connector aktif, cek apakah data sudah mengalir ke topic:  
`<username>.ora1.CDC.DEMO`



## 6) Create HDFS Sink Connector

Di SMM → Connectors → Add Connector → HDFSSinkConnector. 
```properties
{
  "connector.class": "com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector",
  "consumer.override.sasl.kerberos.service.name": "kafka",
  "consumer.override.sasl.mechanism": "GSSAPI",
  "consumer.override.security.protocol": "SASL_SSL",
  "consumer.override.ssl.truststore.location": "/tmp/kafka-truststore.jks",
  "consumer.override.ssl.truststore.password": "changeit",
  "hadoop.conf.path": "file:///etc/hadoop/conf",
  "hdfs.kerberos.authentication": "true",
  "hdfs.kerberos.keytab.path": "${cm-agent:keytab}",
  "hdfs.kerberos.namenode.principal": "hdfs/djp-training-sbox.dla-dataplatform.internal@DLA-TRAINING.LOCAL",
  "hdfs.kerberos.user.principal": "${cm-agent:ENV:kafka_connect_service_principal}",
  "hdfs.output": "/cdc/<USERNAME>",
  "hdfs.uri": "hdfs://djp-training-sbox.dla-dataplatform.internal:8020",
  "hive.metastore.uris": "thrift://djp-training-sbox.dla-dataplatform.internal:9083",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "output.avro.passthrough.enabled": "true",
  "output.storage": "com.cloudera.dim.kafka.connect.hdfs.HdfsPartitionStorage",
  "output.writer": "com.cloudera.dim.kafka.connect.partition.writers.json.JsonPartitionWriter",
  "tasks.max": "1",
  "topics": "ora1.CDC.DEMO",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.passthrough.enabled": "true",
  "value.converter.schema.registry.url": "https://djp-training-sbox.dla-dataplatform.internal:7790",
  "value.converter.schemas.enable": "true",
  "secret.properties": "",
  "name": "HDFS_SINK-<USERNAME>"
}

```
Isi parameter berikut (sesuaikan `<username>`):

### HDFS Sink Connector Config

```properties
name=hdfs-sink-<username>
connector.class=com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=<username>.ora1.CDC.DEMO

# HDFS target
hdfs.uri=hdfs://djp-training-sbox.dla-dataplatform.internal:8020
hdfs.output=/cdc/<username>/
hadoop.conf.path=/etc/hadoop/conf
hdfs.kerberos.authentication=true

# Converter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=https://djp-training-sbox.dla-dataplatform.internal:7790

# Hive Metastore
hive.metastore.uris=thrift://djp-training-sbox.dla-dataplatform.internal:9083

Cek hasil di HDFS:

```bash
hdfs dfs -ls /cdc/<username>/
```



## 7) (Optional) Spark Structured Streaming

Ubah File kafka_spark_consumer.py pada bagian `<username>`:
```properties
#CONFIG SPARK CONSUMER
KAFKA_BS  = "djp-training-sbox.dla-dataplatform.internal:9093"
TRUST_JKS = "/tmp/kafka-truststore.jks"
TOPIC     = "<username>.ora1.CDC.DEMO"
OUT_DIR   = "hdfs://djp-training-sbox.dla-dataplatform.internal:8020/cdc/spark-<username>/"
CHK_DIR   = "hdfs://djp-training-sbox.dla-dataplatform.internal:8020/checkpoints/cdc_kafka_to_hdfs-<username>"
CONSUMER_GROUP = "spark-consumer-<username>"  
```
SAVE FILE (ctrl + S) !

Jika ingin membaca dari Kafka dan tulis ke HDFS dengan Spark:
Ubah  pada bagian `<username>`
```bash
spark3-submit   --master local[*]   --conf spark.driver.extraJavaOptions="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"   --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"   /home/trainee01/labs/01_oracle_debezium_kafka_hdfs/kafka_spark_consumer.py
```



## 8) Validation Checklist

- ✅ CDC Connector di SMM status **RUNNING**.  
- ✅ Topic `dev.djp.ora1.CDC.DEMO` berisi data perubahan dari Oracle.  
- ✅ HDFS Sink menghasilkan file di `/cdc/<username>/`.  
- ✅ (Optional) Spark job menulis file JSON di HDFS dengan checkpoint.  
