In [0]:
from pyspark.sql.functions import *
df = spark.read.csv('/Volumes/my_catalog/source/pyspark_vol/us_customer_data_2.csv', header = True)
df = df.withColumn('email', trim(regexp_replace(col('email'), r'\s+', ' ')))
df = df.withColumn('registration_date', to_date(col('registration_date')))
df = df.withColumn('first_part', split(col('phone'), 'x')[0])\
    .withColumn('ext', 
                when(col('phone').contains('x'), 
                split(col('phone'), 'x')[1])
    .otherwise(None))
df = df.withColumn('first_part', regexp_replace(col('first_part'), r'\D', ''))
df = df.withColumn('country_code',
                   expr("CASE WHEN length(first_part)>10 THEN substring(first_part, 1, length(first_part)-10) ELSE NULL END"))

df = df.withColumn('phone_no', 
                   expr("substring(first_part, length(first_part)-9, 10)"))

df = df.drop('first_part', 'phone')

df = df.select('customer_id', 'name', 'email', 'country_code', 'phone_no', 'ext', 'address', 'registration_date', 'loyalty_status')
df.show(1)

+-----------+-------------+------------------+------------+--------+----+--------------------+-----------------+--------------+
|customer_id|         name|             email|country_code|phone_no| ext|             address|registration_date|loyalty_status|
+-----------+-------------+------------------+------------+--------+----+--------------------+-----------------+--------------+
|          1|Michelle Kidd|vayala@example.net|        NULL|    NULL|NULL|USNS Santiago, FP...|       2025-01-25|          Gold|
+-----------+-------------+------------------+------------+--------+----+--------------------+-----------------+--------------+
only showing top 1 row


In [0]:
df2 = spark.read.csv('/Volumes/my_catalog/source/pyspark_vol/transaction_data_1.csv', header = True)
df2 = df2.withColumn("amount", col('amount').cast('double'))
df2 = df2.withColumn("transaction_date", to_date("transaction_date"))
df2.show(1)

+--------------+-----------+-------+----------------+----------------+--------------+--------------+
|transaction_id|customer_id| amount|transaction_date|product_category|payment_method|store_location|
+--------------+-----------+-------+----------------+----------------+--------------+--------------+
|             1|        565|2992.47|      2025-03-10|          Sports|    Debit Card|      New York|
+--------------+-----------+-------+----------------+----------------+--------------+--------------+
only showing top 1 row


In [0]:
df3 = df.join(df2, on = 'customer_id', how = 'inner')
df3.show(5)

+-----------+----------------+--------------------+------------+----------+-----+--------------------+-----------------+--------------+--------------+-------+----------------+----------------+--------------+--------------+
|customer_id|            name|               email|country_code|  phone_no|  ext|             address|registration_date|loyalty_status|transaction_id| amount|transaction_date|product_category|payment_method|store_location|
+-----------+----------------+--------------------+------------+----------+-----+--------------------+-----------------+--------------+--------------+-------+----------------+----------------+--------------+--------------+
|          1|   Michelle Kidd|  vayala@example.net|        NULL|      NULL| NULL|USNS Santiago, FP...|       2025-01-25|          Gold|             8|2652.57|      2025-04-19|            Home|   Credit Card|      New York|
|          4|  Kimberly Price|jessicaknight@exa...|         001|9476334224|07930|1631 Alexis Meado...|      

In [0]:
df3.createOrReplaceTempView('customer')

In [0]:
%sql
select 
  payment_method, 
  round(sum(amount), 2) as revenue 
from customer
group by payment_method
order by revenue desc;

payment_method,revenue
Debit Card,635175.45
Credit Card,595437.25
Cash,593727.99
PayPal,581581.93
