In [None]:
from pyspark.sql.functions import col,lead,broadcast,collect_list,size,row_number,count,datediff,sum
from pyspark.sql import Window

In [None]:
from pyspark.sql import SparkSession
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<--Your keys-->")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<--Your keys-->")
spark=SparkSession.builder.appName('anlysis_app').getOrCreate()




In [None]:
# checking which customer bought airpods after iphone

transactions_df1 = spark.read.csv("<--S3 path-->",header=True,inferSchema=True)
customers_df = spark.read.table('customers_data')

# Perform the join and select relevant columns
df1_a = transactions_df1 \
    .join(broadcast(customers_df), 'customer_id', 'inner') \
    .select(
        transactions_df1.transaction_id,
        transactions_df1.customer_id,
        customers_df.customer_name,
        col('product_name').alias('first_purchase'),
        transactions_df1.transaction_date,
        customers_df.location
    )

# Defining the windows specifications
w = Window.partitionBy("customer_id").orderBy("transaction_date")

# dtermining which customer bought aipods after iphone
result_df1 = df1_a \
    .withColumn('next_purchase', lead('first_purchase').over(w)) \
    .filter((col('first_purchase') == 'iPhone') & (col('next_purchase') == 'AirPods')) \
    .select('customer_id','customer_name','location')

result_df1.show()

#writing the result in parquet format
dbfs_path='dbfs:/FileStore/output/first_transform'
result_df1.write \
    .mode('overwrite') \
    .partitionBy('location') \
    .format('parquet') \
    .save(dbfs_path)
print(f'The Partitioned data has been written in DBFS')


+-----------+-------------+--------+
|customer_id|customer_name|location|
+-----------+-------------+--------+
|        105|          Eva|    Ohio|
|        108|        Henry|    Utah|
+-----------+-------------+--------+

The Partitioned data has been written in DBFS


In [None]:
# customers who have bought only 1 item from apple 

df2a = transactions_df1.groupBy('customer_id') \
    .agg(collect_list('product_name').alias('products_purchased')) \
    .filter(size('products_purchased') == 1)

# Join with customers_df to get customer details
result_df2 = customers_df.join(broadcast(df2a), 'customer_id', 'inner') \
    .select('customer_id', 'customer_name', 'products_purchased', 'location')


result_df2.show()


dbfs_path = 'dbfs:/FileStore/output/second_transform'

#writing the result in parquet format

result_df2.write \
    .mode('overwrite') \
    .partitionBy('location') \
    .format('parquet') \
    .save(dbfs_path)

print('The Partitioned data has been written in DBFS')


+-----------+-------------+------------------+-------------+
|customer_id|customer_name|products_purchased|     location|
+-----------+-------------+------------------+-------------+
|        139|        Amber|          [iPhone]|     Nebraska|
|        140|      Michael|         [AirPods]|     Virginia|
|        141|      Barbara|         [MacBook]|      Alabama|
|        142|  Christopher|          [iPhone]|   Washington|
|        143|      Barbara|         [AirPods]|     Kentucky|
|        144|      Theresa|         [MacBook]|  Mississippi|
|        145|      Kenneth|          [iPhone]|   Washington|
|        146|       Martin|         [AirPods]|         Utah|
|        147|        Molly|         [MacBook]|Massachusetts|
|        148|         Ryan|          [iPhone]|     Colorado|
|        149|       George|         [AirPods]| North Dakota|
|        150|  Christopher|         [MacBook]| Rhode Island|
|        151|      Stephen|          [iPhone]|         Ohio|
|        152|       Robe

In [None]:
# customers who have bought multiple iteams together on their first purchase
w1 = Window.partitionBy("customer_id", "transaction_date").orderBy("transaction_date")

# Determining items bought together
intermediate_df = transactions_df1 \
    .withColumn('items_bought_together', collect_list('product_name').over(w1)) \
    .select('customer_id', 'transaction_date', 'items_bought_together')

# Defining the window specification
w2 = Window.partitionBy("customer_id").orderBy("transaction_date")

# Add row number and filter
result_df3 = intermediate_df.withColumn("desc_order", row_number().over(w2)) \
    .filter((col("desc_order") == 1) & (size(col('items_bought_together')) > 1))

# Show the result
result_df3.show()
print(f'Number of people who have purchased multiple items on their first purchase: {result_df3.count()}')

+-----------+----------------+---------------------+----------+
|customer_id|transaction_date|items_bought_together|desc_order|
+-----------+----------------+---------------------+----------+
+-----------+----------------+---------------------+----------+

Number of people who have purchased multiple items on their first purchase: 0


In [None]:
#
df4_a = transactions_df1 \
    .join(customers_df, 'customer_id') \
    .select('customer_id', 'customer_name', 'product_name', 'transaction_date')

# Define the window specifications
w1 = Window.partitionBy("customer_id").orderBy("transaction_date")
w2 = Window.partitionBy("customer_id")

# Calculate the next transaction and count transactions per customer
df4_a = df4_a \
    .withColumn('next_transaction', lead('transaction_date').over(w1)) \
    .withColumn('transaction_count', count('customer_id').over(w2)) \
    .filter((col('next_transaction').isNotNull()) & (col('transaction_count') >= 2)) \
    .withColumn('delay_days', datediff(col('next_transaction'), col('transaction_date')))

# Calculate the average delay time
result_df4 = df4_a.groupBy('customer_id', 'customer_name').agg({'delay_days': 'avg'})

# Show the result
result_df4.show()

dbfs_path='dbfs:/FileStore/output/fourth_transform'
result_df4.write \
    .mode('overwrite') \
    .format('parquet') \
    .save(dbfs_path)
print(f'The data has been written in DBFS')

+-----------+-------------+------------------+
|customer_id|customer_name|   avg(delay_days)|
+-----------+-------------+------------------+
|        105|          Eva|18.666666666666668|
|        106|        Frank|18.666666666666668|
|        107|        Grace|              28.0|
|        108|        Henry|              27.5|
|        109|       Summer|              50.0|
|        110|      Michael|              50.0|
|        111|        James|              50.0|
|        112|     Kimberly|              50.0|
|        113|      Lindsay|              50.0|
|        114|          Amy|              50.0|
|        115|        Brian|              50.0|
|        116|      Vanessa|              50.0|
|        117|        Steve|              50.0|
|        118|         Tina|              50.0|
|        119|       Parker|              50.0|
|        120|       Thomas|              50.0|
|        121|         Jane|              50.0|
|        122|      Melanie|              50.0|
|        123|

In [None]:
#total revenue generated by each product
products_df = spark.read.table('products_csv')
products_df.join(transactions_df1,'product_name').groupBy('category','product_name').agg(sum('price').alias('revenue')).sort(col('revenue').desc()).show()
result_df5=products_df.join(transactions_df1,'product_name').groupBy('category','product_name').agg(sum('price').alias('revenue')).sort(col('revenue').desc())
dbfs_path='dbfs:/FileStore/output/fifth_transform'
result_df5.write \
    .mode('overwrite') \
    .format('parquet') \
    .save(dbfs_path)
print(f'The data has been written in DBFS')

+----------+------------+-------+
|  category|product_name|revenue|
+----------+------------+-------+
|    Laptop|     MacBook|42000.0|
|Smartphone|      iPhone|21700.0|
| Accessory|     AirPods| 7750.0|
+----------+------------+-------+

The data has been written in DBFS


In [None]:
w = Window.partitionBy("customer_id").orderBy("transaction_date")

# Add a row number to identify the first purchase
transactions_with_row_number = transactions_df1 \
    .withColumn('row_num', row_number().over(w))

# Filter to get all rows where row_num is greater than 1 (i.e., after the first purchase)
products_after_initial_purchase = transactions_with_row_number \
    .filter(col('row_num') > 1) \
    .select('customer_id', 'product_name')

# Group by customer_id and collect all products bought after the initial purchase into a list
products_aggregated = products_after_initial_purchase \
    .groupBy('customer_id') \
    .agg(collect_list('product_name').alias('products_after_initial_purchase'))

# Join the aggregated DataFrame with the customers DataFrame
result_df6 = products_aggregated \
    .join(customers_df, 'customer_id', 'inner') \
    .select('customer_id', 'customer_name','location', 'products_after_initial_purchase')

# Show the result
result_df6.show(truncate=False)

dbfs_path='dbfs:/FileStore/output/sixth_transform'
result_df6.write \
    .mode('overwrite') \
    .format('parquet') \
    .save(dbfs_path)
print(f'The data has been written in DBFS')

+-----------+-------------+--------------+-------------------------------+
|customer_id|customer_name|location      |products_after_initial_purchase|
+-----------+-------------+--------------+-------------------------------+
|105        |Eva          |Ohio          |[AirPods, MacBook, AirPods]    |
|106        |Frank        |Nevada        |[MacBook, AirPods, MacBook]    |
|107        |Grace        |Colorado      |[iPhone, iPhone]               |
|108        |Henry        |Utah          |[AirPods, AirPods]             |
|109        |Summer       |Minnesota     |[MacBook]                      |
|110        |Michael      |Tennessee     |[iPhone]                       |
|111        |James        |Michigan      |[AirPods]                      |
|112        |Kimberly     |Vermont       |[MacBook]                      |
|113        |Lindsay      |New Jersey    |[iPhone]                       |
|114        |Amy          |Connecticut   |[AirPods]                      |
|115        |Brian       