# 📘 Use Case — Day 2  
**HDFS → Spark → Kafka → SQL Server**

Panduan ini berisi langkah-langkah **Hands-on pipeline** dari **HDFS** ke **Kafka** dengan **Spark (batch/cron)**, lalu masuk ke **SQL Server** via **Kafka Connect JDBC Sink**.

Semua perintah dijalankan di **Terminal**; notebook ini hanya **panduan dokumentasi**.

---


## 🔷 Topology
Berikut adalah arsitektur alur data yang akan dibangun:

![](imgage/hdfs_spark_kafka_sqlserver.jpg)


## 1) Prerequisites & Environment

- **Cluster**: CDP production-like, Kerberos & TLS aktif.  
- **Access**: Sudah `kinit` (punya Kerberos ticket).  
- **Tools**: HDFS, Spark 3, Kafka, akses SMM.  
- **Security**: JAAS & truststore tersedia (sama seperti UC1).  
- **SQL Server**: Akses ke DB `sinkdb`.  

📌 Ganti semua `<username>` sesuai user trainee masing-masing.


## 2) Client Security Setup

buka terminal: klik logo + di kiri atas, lalu pilih terminal. Kemudian jalankan script berikut:

```bash
# === JAAS untuk client Kafka ===
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
echo "KAFKA_OPTS=$KAFKA_OPTS"
```

```bash
# === File config Kafka producer (dipakai CLI & Spark) ===
U="<username>"
TMP_DIR="/tmp/$U"

# bikin folder tmp khusus user (hanya bisa diakses user tsb)
install -d -m 700 "$TMP_DIR"

# copy truststore agar tidak rebutan file global
cp -f /tmp/kafka-truststore.jks "$TMP_DIR/kafka-truststore.jks"
chmod 0644 "$TMP_DIR/kafka-truststore.jks"

# tulis file config untuk kafka CLI
cat > "$TMP_DIR/producer.properties" <<EOF
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
ssl.truststore.location=$TMP_DIR/kafka-truststore.jks
ssl.truststore.password=changeit
ssl.endpoint.identification.algorithm=
acks=all
linger.ms=50
batch.size=32768
max.in.flight.requests.per.connection=1
retries=5
delivery.timeout.ms=120000
EOF

# jadikan variabel default buat perintah2 CLI Kafka
export KAFKA_CFG="$TMP_DIR/producer.properties"
```

## 3) Siapkan Data Sumber di HDFS

Contoh data JSON per-user:

```bash
SRC_DIR="/uc2/<username>/uc2_src"
hdfs dfs -mkdir -p "$SRC_DIR"

cat > /tmp/uc2_sample.json <<'JSON'
{"id": 1, "name": "budi"}
{"id": 2, "name": "eko"}
{"id": 3, "name": "dina"}
JSON

hdfs dfs -put -f /tmp/uc2_sample.json "$SRC_DIR/"
hdfs dfs -ls -h "$SRC_DIR" || true
```


## 4) Create Topic (Per-User)

Gunakan topik per-user agar tidak tabrakan: `usecase2.<username>`

```bash
TOPIC="usecase2.<username>"

kafka-topics --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093 \
  --command-config "$KAFKA_CFG" \
  --create --if-not-exists --topic "$TOPIC" --partitions 1 --replication-factor 1

# Cek
kafka-topics --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093 \
  --command-config "$KAFKA_CFG" --list | grep usecase2.<username> || true
```


## 5) Script Spark Batch Producer: HDFS → Kafka

Script yang dipakai: **`spark_batch_to_kafka.py`** pertama edit <username> yang ada pada script dibawah ini dengan username masing masing. setelah itu jalankan di terminal.
    
```bash
install -d -m 755 /home/<username>/labs/02_hdfs_spark_kafka_sqlserver

cat >/home/<username>/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py <<'PY'
#!/usr/bin/env python3
from pyspark.sql import SparkSession, functions as F, types as T

USERNAME = "<username>"                                    
SRC_DIR  = f"/uc2/<username>/uc2_src"
TOPIC    = f"usecase2.<username>"                       
BROKERS  = "djp-training-sbox.dla-dataplatform.internal:9093"

spark = (SparkSession.builder
         .appName(f"uc2-batch-schemaful-<username>")
         .getOrCreate())

schema = T.StructType([
    T.StructField("id",   T.IntegerType(), True),
    T.StructField("name", T.StringType(),  True),
])

# READ & simple TRANSFORM
df = (spark.read.schema(schema).json(SRC_DIR)
        .withColumn("name", F.upper(F.trim("name")))
        .dropna(subset=["id"])
        .coalesce(1))

# ==== Connect envelope: {"schema": {...}, "payload": {...}} ====
connect_schema = F.struct(
    F.lit("struct").alias("type"),
    F.array(
        F.struct(F.lit("id").alias("field"),   F.lit("int32").alias("type"),  F.lit(False).alias("optional")),
        F.struct(F.lit("name").alias("field"), F.lit("string").alias("type"), F.lit(True).alias("optional"))
    ).alias("fields"),
    F.lit(False).alias("optional")
)

payload = F.struct(
    F.col("id").cast("int").alias("id"),
    F.col("name").alias("name")
)

value_connect_json = F.to_json(F.struct(
    connect_schema.alias("schema"),
    payload.alias("payload")
))

out = df.select(
    F.col("id").cast("string").alias("key"),  # key bebas; PK akan diambil dari VALUE
    value_connect_json.alias("value")
)
# ===============================================================

# WRITE ke Kafka
(out.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", BROKERS)
   .option("kafka.security.protocol", "SASL_SSL")
   .option("kafka.sasl.mechanism", "GSSAPI")
   .option("kafka.sasl.kerberos.service.name", "kafka")
   .option("kafka.ssl.truststore.location", "/tmp/<username>/kafka-truststore.jks")
   .option("kafka.ssl.truststore.password", "changeit")
   .option("kafka.ssl.endpoint.identification.algorithm", "")
   .option("topic", TOPIC)
   .save())

spark.stop()
PY

chmod +x /home/<username>/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py
```
    
    
Uji pada terminal (Terminal):
```bash
mkdir -p /tmp/<username>/spark
nice -n 10 ionice -c2 -n7 \
spark3-submit \
  --master local[1] \
  --conf spark.driver.memory=1g \
  --conf spark.sql.shuffle.partitions=2 \
  --conf spark.default.parallelism=1 \
  --conf spark.local.dir=/tmp/<username>/spark \
  --conf spark.driver.extraJavaOptions="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" \
  --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" \
  /home/<username>/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py
  
#cek isi consumer
kafka-console-consumer \
  --bootstrap-server djp-training-sbox.dla-dataplatform.internal:9093 \
  --consumer.config "$KAFKA_CFG" \
  --from-beginning --topic "usecase2.<username>" | head -3
```

## 6) Cron untuk Spark batch
    
```bash
# GANTI <username> lalu jalankan blok ini sekaligus:
# Hapus semua entry lama user <username> jika ada
crontab -r 2>/dev/null || true

# Buat folder log + ownership (agar bisa nulis)
install -d -m 755 /home/<username>/uc2_logs
chown -R <username>:<username> /home/<username>/uc2_logs

# Pasang crontab untuk <username>
( 
  echo 'SHELL=/bin/bash'
  echo 'PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
  # heartbeat sederhana: harus bikin /home/<username>/uc2_logs/cron_heartbeat.log dalam 1-2 menit
  echo '* * * * * date -Is >> /home/<username>/uc2_logs/cron_heartbeat.log'
  # spark job tiap menit, anti-overlap via flock, log ke cron.out
  echo '* * * * * KRB5CCNAME=FILE:/tmp/krb5cc_hdfs /usr/bin/flock -n /tmp/uc2.<username>.lock sh -c '"'"'mkdir -p /tmp/uc2.<username>.spark /home/<username>/uc2_logs && /usr/bin/spark3-submit --master local[1] --conf spark.driver.memory=1g --conf spark.sql.shuffle.partitions=2 --conf spark.default.parallelism=1 --conf spark.local.dir=/tmp/uc2.<username>.spark --conf spark.ui.enabled=false --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf /home/<username>/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py >>/home/<username>/uc2_logs/cron.out 2>&1'"'" 
) | crontab -
```
    
    
Verifikasi entri:
```bash
crontab -l

# optional untuk cek log:
tail -n 5 /home/<username>/uc2_logs/cron_heartbeat.log || true
tail -n 50 /home/<username>/uc2_logs/cron.out || true
```

## 7) MSSQL Sink Connector (Per-User)

Buka **SMM → Connectors → Add Connector → JDBC Sink Connector**, isi seperti berikut.

```properties
{
  "name": "MSSQL_SINK_<username>",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",

  "topics": "usecase2.<username>",

  "connection.url": "jdbc:sqlserver://djp-training-sbox.dla-dataplatform.internal:1433;databaseName=sinkdb;encrypt=true;trustServerCertificate=true;",
  "connection.user": "sink_user",
  "connection.password": "S1nk!User#2025",
  "dialect.name": "SqlServerDatabaseDialect",

  "table.name.format": "usecase2_<username>",

  "insert.mode": "upsert",
  "pk.mode": "record_value",
  "pk.fields": "id",

  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",

  "consumer.override.group.id": "connect-MSSQL_SINK_<username>.v2",
  "consumer.override.auto.offset.reset": "earliest",
  "consumer.override.security.protocol": "SASL_SSL",
  "consumer.override.sasl.mechanism": "GSSAPI",
  "consumer.override.sasl.kerberos.service.name": "kafka",
  "consumer.override.ssl.truststore.location": "/tmp/kafka-truststore.jks",
  "consumer.override.ssl.truststore.password": "changeit",
  "consumer.override.ssl.endpoint.identification.algorithm": "",

  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.name": "usecase2.<username>.DLQ",
  "errors.deadletterqueue.topic.partitions": "1",
  "errors.deadletterqueue.topic.replication.factor": "1",

  "auto.create": "true",
  "auto.evolve": "true",
  "secret.properties": "connection.password"
}
```


## 8) Validasi di SQL Server

Buka **`Terminal`**
Gunakan **mssql-tools** (dalam container) untuk mengecek tabel per-user di SQL Server.  
📌 Ganti `<username>` sesuai user masing-masing.

### SQL Server Validation Commands

```bash
docker run --rm --network host mcr.microsoft.com/mssql-tools \
  /opt/mssql-tools/bin/sqlcmd \
  -S djp-training-sbox.dla-dataplatform.internal,1433 -d sinkdb \
  -U sink_user -P 'S1nk!User#2025' -C -W -s '|' -h -1 \
  -Q "SELECT TOP(10) id, name FROM dbo.usecase2_<username> ORDER BY TRY_CAST(id AS INT) DESC;"
```

## 9) Test tambahkan file baru
```bash
# 1) Tambah data baru (pakai nama file unik)
cat >/tmp/uc2_more.json <<'JSON'
{"id": 4, "name": "tono"}
{"id": 5, "name": "dori"}
JSON

ts=$(date +%s)
hdfs dfs -put -f /tmp/uc2_more.json "/uc2/<username>/uc2_src/uc2_more_${ts}.json"

# 2) SQL watcher (cek baris baru muncul)
watch -n 2 "
docker run --rm --network host mcr.microsoft.com/mssql-tools \
  /opt/mssql-tools/bin/sqlcmd \
  -S djp-training-sbox.dla-dataplatform.internal,1433 -d sinkdb \
  -U sink_user -P 'S1nk!User#2025' -C -W -s '|' -h -1 \
  -Q \"SELECT TOP(10) id, name FROM dbo.usecase2_<username> ORDER BY TRY_CAST(id AS INT) DESC;\""
```

## 10) Clean up

```bash
set -euo pipefail

U="<username>"                        # <<< pastikan benar
USER_NAME="<username>"          # <<< kalau perlu

# 1) Hapus baris cron (user saat ini)
crontab -l 2>/dev/null | sed '/run_uc2_spark_cron\.sh/d' | crontab - || true

# 2) Matikan Spark yang tepat (batasi ke path & user)
pgrep -fu "$USER_NAME" "/home/$USER_NAME/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py" >/dev/null 2>&1 && \
  pkill -fu "$USER_NAME" "/home/$USER_NAME/labs/02_hdfs_spark_kafka_sqlserver/spark_batch_to_kafka.py" || true

# 3) Bersihkan artefak lokal (pastikan $U tidak kosong & path benar)
[[ -n "${U}" ]] || { echo "Env U kosong, abort"; exit 1; }
rm -rf "/tmp/${U}/spark" "/tmp/${U}/locks" || true

# Kalau log kamu di home user <username>:
rm -rf "/home/${USER_NAME}/uc2_logs" || true
# (atau kalau memang log per-user: /home/$U/uc2_logs)
# rm -rf "/home/${U}/uc2_logs" || true

# 4) Bersihkan source HDFS (opsional)
hdfs dfs -test -d "/uc2/${U}/uc2_src" 2>/dev/null && \
  hdfs dfs -rm -r -f "/uc2/${U}/uc2_src/*" || true

# 5) Reset tabel SQL Server (pastikan nama tabel benar)
docker run --rm --network host mcr.microsoft.com/mssql-tools \
  /opt/mssql-tools/bin/sqlcmd \
  -S djp-training-sbox.dla-dataplatform.internal,1433 -d sinkdb \
  -U sink_user -P 'S1nk!User#2025' -C -Q "IF OBJECT_ID('dbo.usecase2_${U}','U') IS NOT NULL TRUNCATE TABLE dbo.usecase2_${U};"
  
#Penting untuk Kafka & Connector

Agar connector konsumsi lagi dari awal, lakukan salah satu:

Paling gampang: ganti consumer.override.group.id di connector (mis. tambahkan .._v2), lalu Save/Restart connector.

Atau pakai topik baru (mis. usecase2.<username>.v2) dan update:

TOPIC di spark_batch_to_kafka.py

topics di connector

```
