# ETL Demo - Kafka -> Dataflow

## Initialize.

In [1]:
%set_env PROJECT_ID=peace-demo
%set_env LOCATION=us-central1
%set_env SUBNET=default
%set_env KAFKA_ID=demo-etl-kafka-instance
%set_env TOPIC_ID=topic_1
%set_env CLIENT_PROPS=/home/client.properties
%set_env GCS_PATH=gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/
%set_env JOB_NAME=kafka-topic1-to-gcs

env: PROJECT_ID=peace-demo
env: LOCATION=us-central1
env: SUBNET=default
env: KAFKA_ID=demo-etl-kafka-instance
env: TOPIC_ID=topic_1
env: CLIENT_PROPS=/home/client.properties
env: GCS_PATH=gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/
env: JOB_NAME=kafka-topic1-to-gcs


In [8]:
!echo 'y' | pip uninstall kafka-python
!pip install kafka-python

Found existing installation: kafka-python 2.0.2
Uninstalling kafka-python-2.0.2:
  Would remove:
    /opt/conda/lib/python3.10/site-packages/kafka/*
    /opt/conda/lib/python3.10/site-packages/kafka_python-2.0.2.dist-info/*
Proceed (Y/n)?   Successfully uninstalled kafka-python-2.0.2
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


## 1. Create a Managed Kafka instance on GCP

In [2]:
!gcloud beta managed-kafka clusters create $KAFKA_ID \
    --location=$LOCATION \
    --cpu=3 \
    --memory=3221225472 \
    --subnets=projects/$PROJECT_ID/regions/$LOCATION/subnetworks/$SUBNET \
    --auto-rebalance

Create request issued for: [demo-etl-kafka-instance]
Check operation [projects/peace-demo/locations/us-central1/operations/operation-1726045803209-621d45af76302-c908fb2b-a5a54a7d] for status.


## 2. Create a Topic

In [3]:
!gcloud beta managed-kafka topics create $TOPIC_ID \
    --cluster=$KAFKA_ID --location=$LOCATION \
    --partitions=2 \
    --replication-factor=1

Created topic [topic_1].


## 3. Create a Dataflow job from Kafka to GCS

In [7]:
!gcloud dataflow flex-template run $JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-$LOCATION/latest/flex/Kafka_to_Gcs_Flex \
    --region $LOCATION \
    --num-workers 1 \
    --parameters readBootstrapServerAndTopic=projects/$PROJECT_ID/locations/$LOCATION/clusters/$KAFKA_ID/topics/$TOPIC_ID,\
    --parameters windowDuration=5m,\
    --parameters outputDirectory=$GCS_PATH,\
    --parameters outputFilenamePrefix=output-,\
    --parameters numShards=0,\
    --parameters enableCommitOffsets=false,\
    --parameters kafkaReadOffset=latest,\
    --parameters kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
    --parameters messageFormat=JSON,\
    --parameters useBigQueryDLQ=false,\
    --parameters autoscalingAlgorithm=NONE

job:
  createTime: '2024-09-11T09:33:04.976600Z'
  currentStateTime: '1970-01-01T00:00:00Z'
  id: 2024-09-11_02_33_04-15064206313907812191
  location: us-central1
  name: kafka-topic1-to-gcs
  projectId: peace-demo
  startTime: '2024-09-11T09:33:04.976600Z'


# 4. Publish data from GCS to Kafka

In [5]:
from google.cloud import bigquery
from kafka import KafkaProducer
import json as json
import os
                  
project_id = os.environ['PROJECT_ID']
location = os.environ['LOCATION']
kafka_id = os.environ['KAFKA_ID']
topic_id = os.environ['TOPIC_ID']
client_file = os.environ['CLIENT_PROPS']
username = ""
password = ""

# read client properties file: client.properties
# username=""
# password=""
separator = "="
with open(client_file) as f:
    for line in f:
        if separator in line:
            name, value = line.split(separator, 1)
            if name.strip() == "username":
                username=value.strip()
            elif name.strip() == "password":
                password=value.strip()

# Set up Kafka producer
def serializer(message):
    return json.dumps(message).encode('utf-8')

def on_send_success(record_metadata):
    print('partition:', record_metadata.partition, ', noffset:', record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)

# Kafka Producer
producer = KafkaProducer(
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=username,
    sasl_plain_password=password,
    bootstrap_servers=['bootstrap.'+kafka_id+'.'+location+'.managedkafka.'+project_id+'.cloud.goog:9092'],
    value_serializer=serializer
)
                       
client = bigquery.Client(project=project_id)
query_job = client.query("""
   SELECT to_json_string(t) as json FROM `peace-demo.tpc_ds.store_sales` t LIMIT 5
   """)
results = query_job.result()
for row in query_job:
    print('sending message:'+row.json)
    producer.send(topic_id, row.json).add_callback(on_send_success).add_errback(on_send_error)
print('sended finished.')

sending message:{"ss_sold_date_sk":2452589,"ss_sold_time_sk":70807,"ss_item_sk":32695,"ss_customer_sk":688176,"ss_cdemo_sk":152464,"ss_hdemo_sk":1905,"ss_addr_sk":93546,"ss_store_sk":67,"ss_promo_sk":49,"ss_ticket_number":9835950,"ss_quantity":47,"ss_wholesale_cost":"71.23","ss_list_price":"92.59","ss_sales_price":"86.1","ss_ext_discount_amt":0,"ss_ext_sales_price":"4046.7","ss_ext_wholesale_cost":"3347.81","ss_ext_list_price":"4351.73","ss_ext_tax":"283.26","ss_coupon_amt":0,"ss_net_paid":"4046.7","ss_net_paid_inc_tax":"4329.96","ss_net_profit":"698.89"}
sending message:{"ss_sold_date_sk":2452589,"ss_sold_time_sk":null,"ss_item_sk":23125,"ss_customer_sk":null,"ss_cdemo_sk":null,"ss_hdemo_sk":1905,"ss_addr_sk":93546,"ss_store_sk":null,"ss_promo_sk":235,"ss_ticket_number":9835950,"ss_quantity":null,"ss_wholesale_cost":"13.59","ss_list_price":"25.68","ss_sales_price":null,"ss_ext_discount_amt":0,"ss_ext_sales_price":874,"ss_ext_wholesale_cost":"625.14","ss_ext_list_price":null,"ss_ext_ta

## Ended to release all the resources.

In [43]:
!gcloud dataflow jobs list --region=$LOCATION --status=active --format="value(JOB_ID)" --filter="name=$JOB_NAME" | tail -n 1 | cut -f 1 -d " "

2024-09-10_10_15_01-5122018209602625939


In [None]:
Please modify the job id with above result and run it.

In [45]:
!echo y|gcloud dataflow jobs cancel 2024-09-10_10_15_01-5122018209602625939 --region=$LOCATION

Cancelled job [2024-09-10_10_15_01-5122018209602625939]


In [46]:
!gcloud storage rm --recursive $GCS_PATH

Removing objects:
Skipping DeleteObjectTask for gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/#1725985195219701. This can occur if a cp command results in multiple writes to the same resource.
Removing gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/#1725985195219701...
Removing gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/output-2024-09-10T17:00:00.000Z-2024-09-10T18:00:00.000Z-pane-0-last-00-of-02.json#1725991204324708...
Removing gs://peace-demo-temp-us-central1/kafka_topic1_to_gcs/output-2024-09-10T17:00:00.000Z-2024-09-10T18:00:00.000Z-pane-0-last-01-of-02.json#1725991204324605...
  Completed 3/4                                                                


In [49]:
!echo y|gcloud beta managed-kafka clusters delete $KAFKA_ID \
    --location=$LOCATION \
    --async

You are about to delete cluster [demo-etl-kafka-instance]

Do you want to continue (Y/n)?  
Delete request issued for: [demo-etl-kafka-instance]
Check operation [projects/peace-demo/locations/us-central1/operations/operation-1725991720903-621c7c368e53a-de10e1a7-bb34bb44] for status.


In [None]:
## THE END.