In [0]:
%run "/Workspace/Users/ankur/00_setup_configuration"

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, datediff, to_date, when
from delta.tables import DeltaTable

# Read staged data
customer_df = spark.read \
    .option("fs.s3a.access.key", dbutils.secrets.get(scope="aws-keys", key="aws-access-key")) \
    .option("fs.s3a.secret.key", dbutils.secrets.get(scope="aws-keys", key="aws-secret-key")) \
    .parquet("s3://jobbertech/assignment/stage/customer/")
sales_df = spark.read \
    .option("fs.s3a.access.key", dbutils.secrets.get(scope="aws-keys", key="aws-access-key")) \
    .option("fs.s3a.secret.key", dbutils.secrets.get(scope="aws-keys", key="aws-secret-key")) \
    .parquet("s3://jobbertech/assignment/stage/sales/")

# Data transformations and enrichments
processed_df = sales_df.join(customer_df, "customer_id")
processed_df = processed_df.withColumn("age_group", (col("age") / 10).cast("integer") * 10)
processed_df = processed_df.withColumn("days_since_purchase", datediff(current_date(), to_date(col("invoice_date"), "yyyy-MM-dd")))
processed_df = processed_df.withColumn("total_sales", col("quantity") * col("price"))
processed_df = processed_df.withColumn("price_range", 
                               when(col("price") <= 500, "Budget")
                               .when((col("price") > 500) & (col("price") <= 1500), "Mid-range")
                               .otherwise("Premium"))

# Write processed data to Delta Lake
processed_df.write \
    .option("fs.s3a.access.key", dbutils.secrets.get(scope="aws-keys", key="aws-access-key")) \
    .option("fs.s3a.secret.key", dbutils.secrets.get(scope="aws-keys", key="aws-secret-key")) \
    .format("delta").mode("overwrite").option("mergeSchema", "true").save("s3://jobbertech/assignment/processed_data/customer_sales")

# Assuming we are storing updates in another bucket for some customers and we will make dataFrame ready for merge operations
updates_df = spark.read.format("delta").load("s3://jobbertech/assignment/updates/")
delta_table = DeltaTable.forPath(spark, "s3://jobbertech/assignment/processed_data/customer_sales")

# Perform upserts using MERGE
(delta_table.alias("original")
 .merge(
     updates_df.alias("updates"),
     "original.customer_id = updates.customer_id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute()
)


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkConnectGrpcException[0m                 Traceback (most recent call last)
File [0;32m<command-4683163562982533>, line 23[0m
[1;32m     17[0m processed_df [38;5;241m=[39m processed_df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mprice_range[39m[38;5;124m"[39m, 
[1;32m     18[0m                                when(col([38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m) [38;5;241m<[39m[38;5;241m=[39m [38;5;241m500[39m, [38;5;124m"[39m[38;5;124mBudget[39m[38;5;124m"[39m)
[1;32m     19[0m                                [38;5;241m.[39mwhen((col([38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m) [38;5;241m>[39m [38;5;241m500[39m) [38;5;241m&[39m (col([38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m) [38;5;241m<[39m[38;5;241m=[39m [38;5;241m1500[39m), [38;5;124m"[39m[38;5;124mMid-range[39m[38;5;124m"[39m)
[1;32m     20[0m

In [0]:
df=spark.read \
    .option("fs.s3a.access.key", dbutils.secrets.get(scope="aws-keys", key="aws-access-key")) \
    .option("fs.s3a.secret.key", dbutils.secrets.get(scope="aws-keys", key="aws-secret-key")) \
    .format("delta").load("s3://jobbertech/assignment/processed_data/customer_sales")

df.show(10,False)

+-----------+----------+---------+--------+-------+------------+----------------+------+---+--------------+---------+-------------------+------------------+-----------+
|customer_id|invoice_no|category |quantity|price  |invoice_date|shopping_mall   |gender|age|payment_method|age_group|days_since_purchase|total_sales       |price_range|
+-----------+----------+---------+--------+-------+------------+----------------+------+---+--------------+---------+-------------------+------------------+-----------+
|C241288    |I138884   |Clothing |5       |1500.4 |2022-08-05  |Kanyon          |Female|28 |Credit Card   |20       |916                |7502.0            |Premium    |
|C111565    |I317333   |Shoes    |3       |1800.51|2021-12-12  |Forum Istanbul  |Male  |21 |Debit Card    |20       |1152               |5401.53           |Premium    |
|C266599    |I127801   |Clothing |1       |300.08 |2021-11-09  |Metrocity       |Male  |20 |Cash          |20       |1185               |300.08            