In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta import *
import boto3
import json
import math
#import os
#import pandas as pd
#import numpy as np
#import logging

In [2]:
## Variables
BOOTSTRAP_SERVERS = "boot-XXXXXXXX.c1.kafka-serverless.us-east-1.amazonaws.com:9098"
REGION = "us-east-1"
TOPIC = "amznbookreviews"

In [3]:
builder = SparkSession.builder.appName("amzn-reviews") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.cores", "2") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory","28g") \
    .config("spark.executor.cores","5") \
    .config("spark.executor.instances","2") \
    .config("spark.sql.execution.arrow.pyspark.enabled",True) \
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", True) \
    .config("spark.sql.parquet.mergeSchema", False) \
    .config("spark.hadoop.parquet.enable.summary-metadata", False) \
    .enableHiveSupport()

spark = configure_spark_with_delta_pip(builder).getOrCreate()

sc = spark.sparkContext

:: loading settings :: url = jar:file:/opt/spark-3.5.0-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-de2bc315-6eb2-4232-ad99-3d46d34dbe8a;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in local-m2-cache
	found io.delta#delta-storage;3.1.0 in local-m2-cache
	found org.antlr#antlr4-runtime;4.9.3 in local-m2-cache
:: resolution report :: resolve 141ms :: artifacts dl 7ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from local-m2-cache in [default]
	io.delta#delta-storage;3.1.0 from local-m2-cache in [default]
	org.antlr#antlr4-runtime;4.9.3 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	--------------------------------------------------------------

In [4]:
# Get secrets credential for S3a
client = boto3.client('secretsmanager',region_name=REGION)
response = client.get_secret_value(
    SecretId='s3all'
)
accessJson = json.loads(response['SecretString'])
accessKeyId = accessJson['accessKey']
secretAccessKey = accessJson['secretAccess']

# Configure S3a
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", accessKeyId)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secretAccessKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")

# ForEachBatch definitions

In [5]:
def preBatchRecords(microBatchDf, batchSize):
    batch_count = math.ceil(microBatchDf.count() / batchSize)
    # % sign is modulus -- remainder after division
    microBatchDf = microBatchDf.withColumn("batch_id", col("row_number") % batch_count)
    microBatchDf = microBatchDf.withColumn("bartUpdated", lit("N"))
    microBatchDf = microBatchDf.withColumn("bartSummary", lit(""))

    return microBatchDf

In [6]:
def callUdfBatch(df, batchId):
    path = "s3a://amzn-customer-reviews-XXXXXXXXXXXX/sink/test/test-streaming-foreach-pandas/"
    submitted_df = preBatchRecords(df, 5)

    submitted_df.write.format("delta").mode("append").save(path)

# Reading from Kafka

In [7]:
## Setting the Kafka options
options_read = {
    "kafka.bootstrap.servers": BOOTSTRAP_SERVERS,
    "subscribe": TOPIC,
    "startingOffsets": "latest",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "AWS_MSK_IAM",
    "kafka.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
    "kafka.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}

In [8]:
## Read from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .options(**options_read) \
    .load()

# Writing to S3

In [9]:
## Setting the Checkpoint location
checkpoint_path = "s3a://amzn-customer-reviews-XXXXXXXXXXX/checkpoint/"

In [10]:
## Setting schema of Kafka message topic
json_schema = StructType([
    StructField('asin', StringType(), True),
    StructField('overall', DoubleType(), True),
    StructField('reviewText', StringType(), True),
    StructField('reviewTimeTS', TimestampType(), True),
    StructField('reviewerID', StringType(), True),
    StructField('reviewerName', StringType(), True),
    StructField('summary', StringType(), True),
    StructField('verified', BooleanType(), True),
    StructField('row_number', IntegerType(), True),
    StructField('asin_key', IntegerType(), True)
])

In [11]:
streamHandle = (df
                .selectExpr("CAST(key as STRING)","CAST(value as STRING)")
                .select(from_json("value",json_schema).alias("data")).select("data.*")
                .writeStream
                .foreachBatch(callUdfBatch)
                .trigger(processingTime='20 seconds')
                .start()
               )

streamHandle.awaitTermination()

24/03/12 15:18:52 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c8bfaad4-5597-4919-9d73-60f75d2c86de. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/12 15:18:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/12 15:18:55 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/03/12 15:18:57 WARN ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level
24/03/12 15:19:00 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'

KeyboardInterrupt: 