In [1]:
import pytest
from pyspark.context import SparkContext
from pyspark.sql.functions import col, unix_timestamp, from_unixtime, date_format, concat, concat_ws, lit, first, last, min
from pyspark.sql.window import Window
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
import sys
sys.path.insert(1, 'src')
from pyspark.conf import SparkConf
from pathlib import Path

# Manually Pass Args
sys.argv.append('--JOB_NAME')
sys.argv.append('test_iceberg')
sys.argv.append('--iceberg_job_catalog_warehouse')
sys.argv.append('s3://aws-poc-glue/boto/iceberg/')

# Spark Configuration
# args = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])
conf = SparkConf()

## Please make sure to pass runtime argument --iceberg_job_catalog_warehouse with value as the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

# Init Job
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

sc.setCheckpointDir('./')

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/01 23:45:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/01 23:45:35 WARN 

In [2]:
"""
Load DataFrames from checkpoints if they are available
"""
p = Path('checkpoints')
checklist = ['df_message', 'df_dag', 'df_dagmem', 'df_inbounds', 'df_compkey', 'df_responses', 'df_joined', 'df_inbounds_matched']
checkpoints = [x.name for x in p.iterdir() if (x.is_dir() & (x.name in checklist))]    

In [3]:
"""
Fetching all required data into DataFrames
"""
if not set(checklist[:3]) <= set(checklist):
    # Fetch from Data Catalog
    df_message = glueContext.create_data_frame.from_catalog(
        database="boto3",
        table_name="message_iceberg"
    )

    df_dag = glueContext.create_data_frame.from_catalog(
        database="boto3",
        table_name="dealer_associate_group_iceberg"
    )

    df_dagmem = glueContext.create_data_frame.from_catalog(
        database="boto3",
        table_name="dealer_associate_group_member_iceberg"
    )

    # Filter the Dealer DataFrame for Relevant columns
    df_message = df_message.filter(col("protocol").isin("E","X") & (col("ismanual")=="1") & col("messagetype").isin("I","S"))\
        .select("id",
                "dealerid",
                "messagetype",
                "protocol",
                "customerid",
                "dealerassociateid",
                "ismanual",
                col("senton").alias("timestamp"))\
        .dropna(subset=["timestamp"])

    # Select only the BDC groups and produce a concise list of dealerassociateids, group name and virtual id
    df_dag = df_dag.filter(col("name").isin("Service BDC", "Service Central"))\
                   .select("id", "name", "virtualdealerassociateid")\
                   .dropDuplicates()

    # Join on BDC Group Members
    df_dagmem = df_dagmem.select("dealerassociategroupid", "dealerassociateid")\
                         .join(df_dag, df_dagmem['dealerassociategroupid']==df_dag['id'], 'inner')\
                         .drop("id", "dealerassociategroupid")

    # Produce a concise list of groups and virtual id names
    df_dag = df_dag.select("name", "virtualdealerassociateid").dropDuplicates()

    # Tag Virtual IDs in the Message Table
    df_message = df_message.join(
                    df_dag.select("virtualdealerassociateid").withColumn('isvirtual', lit(True)),
                    df_message['dealerassociateid']==df_dag['virtualdealerassociateid'],
                    'left')\
                    .fillna(False)

    # Tag which outbound (reps) belong to BDC (virtual) groups
    df_message = df_message.join(
                    df_dagmem.select("dealerassociateid").withColumn('isvirtualrep', lit(True)),
                    'dealerassociateid',
                    'left'
                    )\
                    .fillna(False)

    # Creating Time Bins
    df_message = df_message.withColumn('ut', unix_timestamp(col("timestamp"), 'yyyy-MM-dd HH:mm:ss'))\
                           .withColumn('dty', from_unixtime('ut'))\
                           .withColumn('date', date_format('dty', 'yyyy-MM-dd'))\
                           .withColumn('hour', date_format('dty', 'HH').alias('hour'))

    # Produce composite key column
    df_message = df_message.withColumn("compositekey", concat_ws('-',df_message.dealerid, df_message.date, df_message.hour))
    
    # Create Checkpoints
    df_message.checkpoint()
    df_dag.checkpoint()
    df_dagmem.checkpoint()
    
    # Save the DataFrames
    df_message.write.format("parquet").mode("overwrite").save("checkpoints/df_message")
    df_dag.write.format("parquet").mode("overwrite").save("checkpoints/df_dag")
    df_dagmem.write.format("parquet").mode("overwrite").save("checkpoints/df_dagmem")
else:
    # Load the DataFrames from the checkpoints
    df_message = spark.read.format("parquet").load("checkpoints/df_message")
    df_dag = spark.read.format("parquet").load("checkpoints/df_dag")
    df_dagmem = spark.read.format("parquet").load("checkpoints/df_dagmem")


                                                                                

In [4]:
"""
Creating Composite Key Table:
>>> This will be used as the common table for joins
>>> The current logic defines the composite key as the unique values between dealerid, date and hour
>>> Note: dealerassociateid is included in this table, but is not a part of the actual composite key. Recall that a customer sending an inbound to the BDC inbox, sends it to the virtualdealerassociateid. Therefore, it would be impossible to directly link a response to an inbound, since the responder's id will not correspond with this virtual id. Therefore, we must omit the dealerassociateid in the composite key definition, and retain it as an entry in this table to calculate who responded to what message from which customer.
"""
if not 'df_compkey' in checkpoints:
    df_compkey = df_message.select("compositekey", "dealerid", "date", "hour").dropDuplicates()
    
    # Create checkpoint
    df_compkey.checkpoint()
    
    # Save the DataFrame
    df_compkey.write.format("parquet").mode("overwrite").save("checkpoints/df_compkey")
else:
    df_compkey = spark.read.format("parquet").load("checkpoints/df_compkey")


In [5]:
"""
Separating Inbound and Response Logic
>>> This groups each valid inbound based on the criteria defined under the compositekey
"""
if not set(['df_inbounds', 'df_responses', 'df_joined']) <= set(checkpoints):
    # Inbound Aggregations
    df_inbounds = df_message.filter((col("isvirtual")==lit(True)) & (col("messagetype")=='I'))
    # Response Logic
    df_responses = df_message.filter((col("isvirtualrep")==lit(True)) & (col("messagetype")=='S'))

    # Join the two dataframes
    df_joined = df_inbounds.join(df_responses.select(
                                        "customerid",
                                        "date",
                                        col("dealerassociateid").alias("response_dealerassociateid"),
                                        col("ut").alias("response_ut"),
                                        col("timestamp").alias("response_timestamp")),
                                     ['customerid', 'date'],
                                      "left")

    # Filter for responses which come after an inbound message
    df_joined = df_joined.filter(col("response_ut")>col("ut"))
    
    # Create checkpoints
    df_inbounds.checkpoint()
    df_responses.checkpoint()
    df_joined.checkpoint()
    
    # Save the DataFrames
    df_inbounds.write.format("parquet").mode("overwrite").save("checkpoints/df_inbounds")
    df_responses.write.format("parquet").mode("overwrite").save("checkpoints/df_responses")
    df_joined.write.format("parquet").mode("overwrite").save("checkpoints/df_joined")
else:
    df_inbounds = spark.read.format("parquet").load("checkpoints/df_inbounds")
    df_responses = spark.read.format("parquet").load("checkpoints/df_responses")
    df_joined = spark.read.format("parquet").load("checkpoints/df_joined")

In [6]:
df_inbounds_denorm = df_inbounds.groupBy("compositekey", "dealerassociateid").count()

In [10]:
# from pyspark.sql.functions import min
# # Create a window specification
# # w = Window.partitionBy([col("id")]).orderBy(col("response_ut"))
# w = Window.partitionBy([col("id")])

# # Add a new column with the first value greater than the current row
# result_df = joined_df.withColumn("first_response", min(col("response_ut")).over(w)).dropDuplicates()

# result_df.show()

In [8]:
"""
Tag each inbound which has been responded to with the response timestamp
"""
if not 'df_response_tag' in checkpoints:
    df_response_tag = df_joined.groupBy("id").agg(min("response_ut").alias("response_ut"))
    
    # Create Checkpoint
    df_response_tag.checkpoint()
    
    # Save the DataFrame
    df_response_tag.write.format("parquet").mode("overwrite").save("checkpoints/df_response_tag")
else:
    # Load the DataFrame
    df_response_tag = spark.read.format("parquet").load("checkpoints/df_response_tag")

In [9]:
df_response_tag.show()

+---------+-----------+
|       id|response_ut|
+---------+-----------+
|669446488| 1681985562|
|669212359| 1681974429|
|668550344| 1681894565|
|667979444| 1681819915|
|667906144| 1681816802|
|667762728| 1681809340|
|669996442| 1682060894|
|667879895| 1681814868|
|667730921| 1681808887|
|667885472| 1681815473|
|665697753| 1681460253|
|670045590| 1682062589|
|668737774| 1681903814|
|667898591| 1681815781|
|669996974| 1682060894|
|668174649| 1681833062|
|667585655| 1681800749|
|669974506| 1682058803|
|667706137| 1681808095|
|668771070| 1681905355|
+---------+-----------+
only showing top 20 rows



In [10]:
"""
Join the response tags to the inbound dataset
"""
if not 'df_inbounds_matched' in checkpoints:
    df_inbounds_matched = df_inbounds.join(response_df, 'id', 'left')
    df_inbounds_matched = df_inbounds_matched.withColumn("response_timestamp", from_unixtime(col("response_ut")))
    
    # Create Checkpoint
    df_inbounds_matched.checkpoint()
    
    # Save the DataFrame
    df_inbounds_matched.write.format("parquet").mode("overwrite").save("checkpoints/df_inbounds_matched")
else:
    # Load the DataFrame
    df_inbounds_matched = spark.read.format("parquet").load("checkpoints/df_inbounds_matched")

In [11]:
df_inbounds_matched.show()

+---------+-----------------+--------+-----------+--------+----------+--------+-------------------+------------------------+---------+------------+----------+-------------------+----------+----+------------------+-----------+-------------------+
|       id|dealerassociateid|dealerid|messagetype|protocol|customerid|ismanual|          timestamp|virtualdealerassociateid|isvirtual|isvirtualrep|        ut|                dty|      date|hour|      compositekey|response_ut| response_timestamp|
+---------+-----------------+--------+-----------+--------+----------+--------+-------------------+------------------------+---------+------------+----------+-------------------+----------+----+------------------+-----------+-------------------+
|670727153|           104895|     455|          I|       E| 146267814|       1|2023-04-22 06:27:58|                  104895|     true|       false|1682144878|2023-04-22 06:27:58|2023-04-22|  06| 455-2023-04-22-06|       null|               null|
|670688802|     