In [111]:
import findspark
findspark.init()

In [112]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [113]:
spark = SparkSession.builder.appName('pr').getOrCreate()
spark

In [6]:
# Identify the customers who brought the same product more than once but on different days
# ( if same product is purchased multiple times but on same date shouldn't be counted)

data = [(333, 1122, 9, '2022-02-06T01:00:00.000+00:00'),
        (333,1122,10,'2022-02-06T02:00:00.000+00:00'), 
        (536,1435, 10,'2022-03-02T08:40:00.000+00:00'),
        (536,3223,5,'2022-03-02T09:33:28.000+00:00'),
        (536, 3223, 6,'2022-01-11T12:33:44.000+00:00'),
        (827, 2452, 45,'2022-03-02T00:00:00.000+00:00'), 
        (827, 3585, 35,'2022-02-20T14:05:26.000+00:00')]
df = spark.createDataFrame(data = data, schema=['uid', 'pid', 'qunt', 'pur_dt'])
df.show()

+---+----+----+--------------------+
|uid| pid|qunt|              pur_dt|
+---+----+----+--------------------+
|333|1122|   9|2022-02-06T01:00:...|
|333|1122|  10|2022-02-06T02:00:...|
|536|1435|  10|2022-03-02T08:40:...|
|536|3223|   5|2022-03-02T09:33:...|
|536|3223|   6|2022-01-11T12:33:...|
|827|2452|  45|2022-03-02T00:00:...|
|827|3585|  35|2022-02-20T14:05:...|
+---+----+----+--------------------+



In [7]:
df1 = df.withColumn('dt', to_date('pur_dt')).groupBy('uid', 'pid', 'dt').count()
df1.show()

+---+----+----------+-----+
|uid| pid|        dt|count|
+---+----+----------+-----+
|333|1122|2022-02-06|    2|
|536|1435|2022-03-02|    1|
|536|3223|2022-03-02|    1|
|536|3223|2022-01-11|    1|
|827|2452|2022-03-02|    1|
|827|3585|2022-02-20|    1|
+---+----+----------+-----+



In [8]:
df1.groupBy('uid', 'pid').count().filter(col('count') >= 2).show()

+---+----+-----+
|uid| pid|count|
+---+----+-----+
|536|3223|    2|
+---+----+-----+



In [26]:
# In the given dataset, names contain for some names and space for some names, extract the first name and last name 

data = [(1, 'sagar-prajapati'), (2, 'alex-john'), (3, 'john cena'), (4, 'kim joe')]
schema = ['Id','name']
df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=True)

+---+---------------+
| Id|           name|
+---+---------------+
|  1|sagar-prajapati|
|  2|      alex-john|
|  3|      john cena|
|  4|        kim joe|
+---+---------------+



In [27]:
replace_regexp = "((,)?\s|[-])"
df1 = df.withColumn("A", regexp_replace(col("name"), replace_regexp, " "))
df1.show()

+---+---------------+---------------+
| Id|           name|              A|
+---+---------------+---------------+
|  1|sagar-prajapati|sagar prajapati|
|  2|      alex-john|      alex john|
|  3|      john cena|      john cena|
|  4|        kim joe|        kim joe|
+---+---------------+---------------+



In [28]:
df2 = df1.withColumn('First_name', split(df1['A'], ' ').getItem(0)).withColumn('Last_Name', split(df1['A'], ' ').getItem(1))
df2.drop('A').show()

+---+---------------+----------+---------+
| Id|           name|First_name|Last_Name|
+---+---------------+----------+---------+
|  1|sagar-prajapati|     sagar|prajapati|
|  2|      alex-john|      alex|     john|
|  3|      john cena|      john|     cena|
|  4|        kim joe|       kim|      joe|
+---+---------------+----------+---------+



In [29]:
split_regex = "((,)?\s|[-])"
df1 = df.withColumn("A", split(col("name"), split_regex))
df1.show()

df1.select('*', df1.A.getItem(0).alias('fname'), df1.A.getItem(1).alias('lname')).show()

+---+---------------+------------------+
| Id|           name|                 A|
+---+---------------+------------------+
|  1|sagar-prajapati|[sagar, prajapati]|
|  2|      alex-john|      [alex, john]|
|  3|      john cena|      [john, cena]|
|  4|        kim joe|        [kim, joe]|
+---+---------------+------------------+

+---+---------------+------------------+-----+---------+
| Id|           name|                 A|fname|    lname|
+---+---------------+------------------+-----+---------+
|  1|sagar-prajapati|[sagar, prajapati]|sagar|prajapati|
|  2|      alex-john|      [alex, john]| alex|     john|
|  3|      john cena|      [john, cena]| john|     cena|
|  4|        kim joe|        [kim, joe]|  kim|      joe|
+---+---------------+------------------+-----+---------+



In [30]:
df1.select('A',array_contains('A',"alex").alias("array_contains")).show()
df1.filter(array_contains(df1.A, 'john')).show()
df1.select("Name", array_sort(df1.A).alias("Sorted_Numbers")).show()

+------------------+--------------+
|                 A|array_contains|
+------------------+--------------+
|[sagar, prajapati]|         false|
|      [alex, john]|          true|
|      [john, cena]|         false|
|        [kim, joe]|         false|
+------------------+--------------+

+---+---------+------------+
| Id|     name|           A|
+---+---------+------------+
|  2|alex-john|[alex, john]|
|  3|john cena|[john, cena]|
+---+---------+------------+

+---------------+------------------+
|           Name|    Sorted_Numbers|
+---------------+------------------+
|sagar-prajapati|[prajapati, sagar]|
|      alex-john|      [alex, john]|
|      john cena|      [cena, john]|
|        kim joe|        [joe, kim]|
+---------------+------------------+



In [31]:
df1.withColumn("Size", size(df1.A)).show()

+---+---------------+------------------+----+
| Id|           name|                 A|Size|
+---+---------------+------------------+----+
|  1|sagar-prajapati|[sagar, prajapati]|   2|
|  2|      alex-john|      [alex, john]|   2|
|  3|      john cena|      [john, cena]|   2|
|  4|        kim joe|        [kim, joe]|   2|
+---+---------------+------------------+----+



In [32]:
# call_duration 

data = [(10, 20, 58), (20,10,12), (10,30, 20),(30,40,100),(30, 40, 200), (30, 40, 200), (40, 30, 500)]
df = spark.createDataFrame(data = data, schema=['person1', 'person2', 'call_duration'])
df.show()

+-------+-------+-------------+
|person1|person2|call_duration|
+-------+-------+-------------+
|     10|     20|           58|
|     20|     10|           12|
|     10|     30|           20|
|     30|     40|          100|
|     30|     40|          200|
|     30|     40|          200|
|     40|     30|          500|
+-------+-------+-------------+



In [33]:
df1 = df.alias('t1').unionAll(df.alias('t2')).filter(col('person1') < col('person2'))
df1.show()

+-------+-------+-------------+
|person1|person2|call_duration|
+-------+-------+-------------+
|     10|     20|           58|
|     10|     30|           20|
|     30|     40|          100|
|     30|     40|          200|
|     30|     40|          200|
|     10|     20|           58|
|     10|     30|           20|
|     30|     40|          100|
|     30|     40|          200|
|     30|     40|          200|
+-------+-------+-------------+



In [34]:
df2 = df1.groupBy('person1','person2').agg(count(col('call_duration')).alias('call_count'), sum(col('call_duration')).alias('total_duration'))
df2.show()

+-------+-------+----------+--------------+
|person1|person2|call_count|total_duration|
+-------+-------+----------+--------------+
|     10|     20|         2|           116|
|     10|     30|         2|            40|
|     30|     40|         6|          1000|
+-------+-------+----------+--------------+



In [35]:
# Select the teachers who teaches only math and not any other subject 

data = [(1, "MATH"), (2,'MATH'), (4, 'CHEM'),(5, 'MATH'),(2, 'ENG'), (3, 'PHY')]
df = spark.createDataFrame(data = data, schema=['id', 'sub'])
df.show()

+---+----+
| id| sub|
+---+----+
|  1|MATH|
|  2|MATH|
|  4|CHEM|
|  5|MATH|
|  2| ENG|
|  3| PHY|
+---+----+



In [36]:
df1 = df.groupBy('id').count().filter(col('count') == 1)
df1.show()

+---+-----+
| id|count|
+---+-----+
|  1|    1|
|  4|    1|
|  5|    1|
|  3|    1|
+---+-----+



In [37]:
df.join(df1, df.id == df1.id, how = 'inner').filter(df['sub'] == 'MATH').select(df['*']).show()

+---+----+
| id| sub|
+---+----+
|  1|MATH|
|  5|MATH|
+---+----+



In [38]:
# Find out the companies where revenue has only increased over the years and there was no decrease at all for any point.

data = [('ABC', 2000, 100),
('ABC', 2001, 110),
('ABC', 2002, 120),
('XYZ', 2000, 100),
('XYZ', 2001, 90),
('XYZ', 2002, 120),
('RXC', 2000, 500),
('RXC', 2001, 400),
('RXC', 2002, 600),
('RXC', 2003, 800)]
schema = StructType([StructField('COMPANY', StringType(), True),
                     StructField('YEAR', IntegerType(), True),
                     StructField('REVENUE', IntegerType(), True)]) 
df = spark.createDataFrame(data=data, schema=schema)
df.show()

+-------+----+-------+
|COMPANY|YEAR|REVENUE|
+-------+----+-------+
|    ABC|2000|    100|
|    ABC|2001|    110|
|    ABC|2002|    120|
|    XYZ|2000|    100|
|    XYZ|2001|     90|
|    XYZ|2002|    120|
|    RXC|2000|    500|
|    RXC|2001|    400|
|    RXC|2002|    600|
|    RXC|2003|    800|
+-------+----+-------+



In [39]:
from pyspark.sql.window import Window
window = Window.partitionBy('COMPANY').orderBy('YEAR')
df1 = df.withColumn('lag', col('REVENUE')-lag(col('REVENUE'), 1, 0).over(window))
df1.show()

+-------+----+-------+----+
|COMPANY|YEAR|REVENUE| lag|
+-------+----+-------+----+
|    ABC|2000|    100| 100|
|    ABC|2001|    110|  10|
|    ABC|2002|    120|  10|
|    RXC|2000|    500| 500|
|    RXC|2001|    400|-100|
|    RXC|2002|    600| 200|
|    RXC|2003|    800| 200|
|    XYZ|2000|    100| 100|
|    XYZ|2001|     90| -10|
|    XYZ|2002|    120|  30|
+-------+----+-------+----+



In [41]:
df2 = df1.groupBy('COMPANY').agg(min(col('lag')).alias('diff')).filter(col('diff') > 0)
df2.show()

+-------+----+
|COMPANY|diff|
+-------+----+
|    ABC|  10|
+-------+----+



In [42]:
# lIst down the movies with an odd ID and which is not boring and order by id desc 

data = [(1, 'war', 'great ed',8.9),
    (2,'science','fiction',8.5),
    (3,'irish', 'boring', 6.2),
    (4, 'Ice song', 'fantacy', 8.6),
    (5, "house card", 'interesting', 9.1)]
sch = ['ID', 'Movie', 'Type', 'Rating']
df = spark.createDataFrame(data=data, schema=sch)
df.show()

+---+----------+-----------+------+
| ID|     Movie|       Type|Rating|
+---+----------+-----------+------+
|  1|       war|   great ed|   8.9|
|  2|   science|    fiction|   8.5|
|  3|     irish|     boring|   6.2|
|  4|  Ice song|    fantacy|   8.6|
|  5|house card|interesting|   9.1|
+---+----------+-----------+------+



In [43]:
df1 = df.filter(((df['ID'] %2 ) != 0 ) & (col('Type') != 'boring')).orderBy(col('ID').desc())
df1.show()

+---+----------+-----------+------+
| ID|     Movie|       Type|Rating|
+---+----------+-----------+------+
|  5|house card|interesting|   9.1|
|  1|       war|   great ed|   8.9|
+---+----------+-----------+------+



In [44]:
# Find the employees earning more than managers 

data = [(1, "John", 6000, 4), (2,'Kevin',11000,4), (3, 'Bob',8000, 5),(4, 'Laura',9000,None),(5, 'Sarah',10000, None)]
df = spark.createDataFrame(data = data, schema=['id', 'name', 'salary','mid'])
df.show()

+---+-----+------+----+
| id| name|salary| mid|
+---+-----+------+----+
|  1| John|  6000|   4|
|  2|Kevin| 11000|   4|
|  3|  Bob|  8000|   5|
|  4|Laura|  9000|NULL|
|  5|Sarah| 10000|NULL|
+---+-----+------+----+



In [45]:
df.alias('emp').join(df.alias('mgr'), col('emp.mid')  == col('mgr.id'), 'inner').filter(col('emp.salary') > col('mgr.salary'))\
.select(col('emp.id'), col('emp.name'), col('emp.salary'), col('emp.mid')).show()

+---+-----+------+---+
| id| name|salary|mid|
+---+-----+------+---+
|  2|Kevin| 11000|  4|
+---+-----+------+---+



In [28]:
# Remove special charactors

data = [['Mavs^', 18], 
        ['Ne%ts', 33], 
        ['Hawk**s', 12], 
        ['Mavs@', 15], 
        ['Hawks!', 19],
        ['(Cavs)', 24],
        ['Magic', 28]] 
columns = ['team', 'points'] 
df = spark.createDataFrame(data, columns) 
df.show()

+-------+------+
|   team|points|
+-------+------+
|  Mavs^|    18|
|  Ne%ts|    33|
|Hawk**s|    12|
|  Mavs@|    15|
| Hawks!|    19|
| (Cavs)|    24|
|  Magic|    28|
+-------+------+



In [30]:
df1 = df.withColumn('team2', regexp_replace('team', '[^a-zA-Z0-9]', ''))
df1.show()

+-------+------+-----+
|   team|points|team2|
+-------+------+-----+
|  Mavs^|    18| Mavs|
|  Ne%ts|    33| Nets|
|Hawk**s|    12|Hawks|
|  Mavs@|    15| Mavs|
| Hawks!|    19|Hawks|
| (Cavs)|    24| Cavs|
|  Magic|    28|Magic|
+-------+------+-----+



In [31]:
df1.withColumn('count_of_spe_char',length(col('team')) - length(col('team2'))).show()

+-------+------+-----+-----------------+
|   team|points|team2|count_of_spe_char|
+-------+------+-----+-----------------+
|  Mavs^|    18| Mavs|                1|
|  Ne%ts|    33| Nets|                1|
|Hawk**s|    12|Hawks|                2|
|  Mavs@|    15| Mavs|                1|
| Hawks!|    19|Hawks|                1|
| (Cavs)|    24| Cavs|                2|
|  Magic|    28|Magic|                0|
+-------+------+-----+-----------------+



https://www.linkedin.com/company/seekho-bigdata-institute/posts/?feedView=all

### 9
The dataset has duplicate entries, and some entries are missing values. You are required to:
    
-- Deduplicate the dataset.
- Handle any missing values appropriately.
- Determine the top 3 most frequent activity_type for each user_id.
- Calculate the time spent by each user on each activity_type

In [159]:
data = [ ("U1", "2024-12-30 10:00:00", "LOGIN"), 
        ("U1", "2024-12-30 10:05:00", "BROWSE"),
        ("U1", "2024-12-30 10:20:00", "LOGOUT"),
        ("U2", "2024-12-30 11:00:00", "LOGIN"),
        ("U2", "2024-12-30 11:15:00", "BROWSE"),
        ("U2", "2024-12-30 11:30:00", "LOGOUT"),
        ("U1", "2024-12-30 10:20:00", "LOGOUT"),  
        (None, "2024-12-30 12:00:00", "LOGIN"),   
        ("U3", None, "LOGOUT")           ]

schema = StructType([ StructField("user_id", StringType(), True),
                     StructField("timestamp", StringType(), True),
                     StructField("activity_type", StringType(), True) ])
df = spark.createDataFrame(data, schema)
df.show()

+-------+-------------------+-------------+
|user_id|          timestamp|activity_type|
+-------+-------------------+-------------+
|     U1|2024-12-30 10:00:00|        LOGIN|
|     U1|2024-12-30 10:05:00|       BROWSE|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|     U2|2024-12-30 11:00:00|        LOGIN|
|     U2|2024-12-30 11:15:00|       BROWSE|
|     U2|2024-12-30 11:30:00|       LOGOUT|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|   NULL|2024-12-30 12:00:00|        LOGIN|
|     U3|               NULL|       LOGOUT|
+-------+-------------------+-------------+



In [160]:
#convert timestamp to TimesstampType
df = df.withColumn('timestamp', col('timestamp').cast(TimestampType()))
df.show()

+-------+-------------------+-------------+
|user_id|          timestamp|activity_type|
+-------+-------------------+-------------+
|     U1|2024-12-30 10:00:00|        LOGIN|
|     U1|2024-12-30 10:05:00|       BROWSE|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|     U2|2024-12-30 11:00:00|        LOGIN|
|     U2|2024-12-30 11:15:00|       BROWSE|
|     U2|2024-12-30 11:30:00|       LOGOUT|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|   NULL|2024-12-30 12:00:00|        LOGIN|
|     U3|               NULL|       LOGOUT|
+-------+-------------------+-------------+



In [166]:
# Deduplicate the dataset 
df_deduplicated = df.dropDuplicates()
df_deduplicated.show()

+-------+-------------------+-------------+
|user_id|          timestamp|activity_type|
+-------+-------------------+-------------+
|     U1|2024-12-30 10:00:00|        LOGIN|
|     U1|2024-12-30 10:05:00|       BROWSE|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|     U2|2024-12-30 11:00:00|        LOGIN|
|     U2|2024-12-30 11:15:00|       BROWSE|
|     U2|2024-12-30 11:30:00|       LOGOUT|
|     U3|               NULL|       LOGOUT|
|   NULL|2024-12-30 12:00:00|        LOGIN|
+-------+-------------------+-------------+



In [167]:
#2 Handle missing values 
df_cleaned = df_deduplicated.dropna(subset=['user_id', 'timestamp', 'activity_type'])
df_cleaned.show()

+-------+-------------------+-------------+
|user_id|          timestamp|activity_type|
+-------+-------------------+-------------+
|     U1|2024-12-30 10:00:00|        LOGIN|
|     U1|2024-12-30 10:05:00|       BROWSE|
|     U1|2024-12-30 10:20:00|       LOGOUT|
|     U2|2024-12-30 11:00:00|        LOGIN|
|     U2|2024-12-30 11:15:00|       BROWSE|
|     U2|2024-12-30 11:30:00|       LOGOUT|
+-------+-------------------+-------------+



In [171]:
#Determine the top 3 most frequent activity_type for each user_id.

activity_count = df_cleaned.groupBy('user_id', 'activity_type').count()
activity_count.show()

+-------+-------------+-----+
|user_id|activity_type|count|
+-------+-------------+-----+
|     U1|       BROWSE|    1|
|     U2|       BROWSE|    1|
|     U1|       LOGOUT|    1|
|     U2|        LOGIN|    1|
|     U2|       LOGOUT|    1|
|     U1|        LOGIN|    1|
+-------+-------------+-----+



In [172]:
activity_count= activity_count.withColumn('rank', row_number().over(Window.partitionBy('user_id').orderBy(desc('count'))))
activity_count.show()

+-------+-------------+-----+----+
|user_id|activity_type|count|rank|
+-------+-------------+-----+----+
|     U1|       BROWSE|    1|   1|
|     U1|       LOGOUT|    1|   2|
|     U1|        LOGIN|    1|   3|
|     U2|       BROWSE|    1|   1|
|     U2|        LOGIN|    1|   2|
|     U2|       LOGOUT|    1|   3|
+-------+-------------+-----+----+



In [174]:
top_activities = activity_count.filter(col('rank') <= 3).select('user_id', 'activity_type', 'count')
top_activities.show()

+-------+-------------+-----+
|user_id|activity_type|count|
+-------+-------------+-----+
|     U1|       BROWSE|    1|
|     U1|       LOGOUT|    1|
|     U1|        LOGIN|    1|
|     U2|       BROWSE|    1|
|     U2|        LOGIN|    1|
|     U2|       LOGOUT|    1|
+-------+-------------+-----+



In [175]:
#Calculate the time spent by each user on each activity_type

win_spec = Window.partitionBy('user_id').orderBy('timestamp')
df_with_lag = df_cleaned.withColumn('next_timestamp', lag('timestamp').over(win_spec))
df_with_lag.show()

+-------+-------------------+-------------+-------------------+
|user_id|          timestamp|activity_type|     next_timestamp|
+-------+-------------------+-------------+-------------------+
|     U1|2024-12-30 10:00:00|        LOGIN|               NULL|
|     U1|2024-12-30 10:05:00|       BROWSE|2024-12-30 10:00:00|
|     U1|2024-12-30 10:20:00|       LOGOUT|2024-12-30 10:05:00|
|     U2|2024-12-30 11:00:00|        LOGIN|               NULL|
|     U2|2024-12-30 11:15:00|       BROWSE|2024-12-30 11:00:00|
|     U2|2024-12-30 11:30:00|       LOGOUT|2024-12-30 11:15:00|
+-------+-------------------+-------------+-------------------+



In [184]:
df_with_time_spent = df_with_lag.withColumn('time_spent', (col('next_timestamp').cast('long') - col('timestamp').cast('long')).cast('double') / 60)
df_with_time_spent.show()

+-------+-------------------+-------------+-------------------+----------+
|user_id|          timestamp|activity_type|     next_timestamp|time_spent|
+-------+-------------------+-------------+-------------------+----------+
|     U1|2024-12-30 10:00:00|        LOGIN|               NULL|      NULL|
|     U1|2024-12-30 10:05:00|       BROWSE|2024-12-30 10:00:00|      -5.0|
|     U1|2024-12-30 10:20:00|       LOGOUT|2024-12-30 10:05:00|     -15.0|
|     U2|2024-12-30 11:00:00|        LOGIN|               NULL|      NULL|
|     U2|2024-12-30 11:15:00|       BROWSE|2024-12-30 11:00:00|     -15.0|
|     U2|2024-12-30 11:30:00|       LOGOUT|2024-12-30 11:15:00|     -15.0|
+-------+-------------------+-------------+-------------------+----------+



In [185]:
time_spent_per_activity = df_with_time_spent.groupBy('user_id', 'activity_type').sum('time_spent')
time_spent_per_activity.show()

+-------+-------------+---------------+
|user_id|activity_type|sum(time_spent)|
+-------+-------------+---------------+
|     U1|        LOGIN|           NULL|
|     U1|       BROWSE|           -5.0|
|     U1|       LOGOUT|          -15.0|
|     U2|        LOGIN|           NULL|
|     U2|       BROWSE|          -15.0|
|     U2|       LOGOUT|          -15.0|
+-------+-------------+---------------+



In [189]:
# 13 Identify users who have made transactions on at least 3 different dates.
#For these users, calculate their average transaction amount.

data = [
 (1, 101, 500.0, "2024-01-01"), 
 (2, 102, 200.0, "2024-01-02"), 
 (3, 101, 300.0, "2024-01-03"), 
 (4, 103, 100.0, "2024-01-04"), 
 (5, 102, 400.0, "2024-01-05"), 
 (6, 103, 600.0, "2024-01-06"), 
 (7, 101, 200.0, "2024-01-07"),
]
columns = ["transaction_id", "user_id", "transaction_amount", "transaction_date"]
df = spark.createDataFrame(data, columns)
df.show()

+--------------+-------+------------------+----------------+
|transaction_id|user_id|transaction_amount|transaction_date|
+--------------+-------+------------------+----------------+
|             1|    101|             500.0|      2024-01-01|
|             2|    102|             200.0|      2024-01-02|
|             3|    101|             300.0|      2024-01-03|
|             4|    103|             100.0|      2024-01-04|
|             5|    102|             400.0|      2024-01-05|
|             6|    103|             600.0|      2024-01-06|
|             7|    101|             200.0|      2024-01-07|
+--------------+-------+------------------+----------------+



In [190]:
# count distinct transactons dates per uesr 
user_date_count = df.groupBy('user_id').agg(countDistinct('transaction_date').alias('distinct_date_count'))
user_date_count.show()

+-------+-------------------+
|user_id|distinct_date_count|
+-------+-------------------+
|    103|                  2|
|    101|                  3|
|    102|                  2|
+-------+-------------------+



In [191]:
# filter users with at least 3 transactions dates 
filtered_users = user_date_count.filter(col('distinct_date_count') >= 3)
filtered_users.show()

+-------+-------------------+
|user_id|distinct_date_count|
+-------+-------------------+
|    101|                  3|
+-------+-------------------+



In [192]:
# join with original data to filter transactions of these users 
filtered_transactions = df.join(filtered_users, on ='user_id', how='inner')
filtered_transactions.show()

+-------+--------------+------------------+----------------+-------------------+
|user_id|transaction_id|transaction_amount|transaction_date|distinct_date_count|
+-------+--------------+------------------+----------------+-------------------+
|    101|             1|             500.0|      2024-01-01|                  3|
|    101|             3|             300.0|      2024-01-03|                  3|
|    101|             7|             200.0|      2024-01-07|                  3|
+-------+--------------+------------------+----------------+-------------------+



In [193]:
result = filtered_transactions.groupBy('user_id').agg(avg('transaction_amount').alias('avg_transaction_amount'))
result.show()

+-------+----------------------+
|user_id|avg_transaction_amount|
+-------+----------------------+
|    101|     333.3333333333333|
+-------+----------------------+



In [180]:
#15 calculate the average gap in days between consecutive transactions.
# Identify the user with the largest average gap.

data = [
 (1, 101, 500.0, "2024-01-01"), 
 (2, 102, 200.0, "2024-01-02"), 
 (3, 101, 300.0, "2024-01-03"), 
 (4, 103, 100.0, "2024-01-04"), 
 (5, 102, 400.0, "2024-01-05"), 
 (6, 103, 600.0, "2024-01-06"), 
 (7, 101, 200.0, "2024-01-07"),
]
columns = ["transaction_id", "user_id", "transaction_amount", "transaction_date"]
df = spark.createDataFrame(data, columns)
df.show()

+--------------+-------+------------------+----------------+
|transaction_id|user_id|transaction_amount|transaction_date|
+--------------+-------+------------------+----------------+
|             1|    101|             500.0|      2024-01-01|
|             2|    102|             200.0|      2024-01-02|
|             3|    101|             300.0|      2024-01-03|
|             4|    103|             100.0|      2024-01-04|
|             5|    102|             400.0|      2024-01-05|
|             6|    103|             600.0|      2024-01-06|
|             7|    101|             200.0|      2024-01-07|
+--------------+-------+------------------+----------------+



In [181]:
df = df.withColumn('transaction_date', to_date(col('transaction_date'), 'yyyy-MM-dd'))
df.show()

+--------------+-------+------------------+----------------+
|transaction_id|user_id|transaction_amount|transaction_date|
+--------------+-------+------------------+----------------+
|             1|    101|             500.0|      2024-01-01|
|             2|    102|             200.0|      2024-01-02|
|             3|    101|             300.0|      2024-01-03|
|             4|    103|             100.0|      2024-01-04|
|             5|    102|             400.0|      2024-01-05|
|             6|    103|             600.0|      2024-01-06|
|             7|    101|             200.0|      2024-01-07|
+--------------+-------+------------------+----------------+



In [182]:
# Calculate the difference in days between consecutive transactions

win = Window.partitionBy('user_id').orderBy('transaction_date')

df = df.withColumn('prev_date', lag('transaction_date').over(win))
df.show()

df = df.withColumn('day_gap', datediff(col('transaction_date'), col('prev_date')))
df.show()

+--------------+-------+------------------+----------------+----------+
|transaction_id|user_id|transaction_amount|transaction_date| prev_date|
+--------------+-------+------------------+----------------+----------+
|             1|    101|             500.0|      2024-01-01|      NULL|
|             3|    101|             300.0|      2024-01-03|2024-01-01|
|             7|    101|             200.0|      2024-01-07|2024-01-03|
|             2|    102|             200.0|      2024-01-02|      NULL|
|             5|    102|             400.0|      2024-01-05|2024-01-02|
|             4|    103|             100.0|      2024-01-04|      NULL|
|             6|    103|             600.0|      2024-01-06|2024-01-04|
+--------------+-------+------------------+----------------+----------+

+--------------+-------+------------------+----------------+----------+-------+
|transaction_id|user_id|transaction_amount|transaction_date| prev_date|day_gap|
+--------------+-------+------------------+----

In [184]:
# Calculate the average gap for each user

avg_gap_df = df.groupBy('user_id').agg(avg('day_gap').alias('avg_day_gap'))
avg_gap_df.show()

+-------+-----------+
|user_id|avg_day_gap|
+-------+-----------+
|    101|        3.0|
|    102|        3.0|
|    103|        2.0|
+-------+-----------+



In [186]:
print('Average gap for each user:')
avg_gap_df.show()

Average gap for each user:
+-------+-----------+
|user_id|avg_day_gap|
+-------+-----------+
|    101|        3.0|
|    102|        3.0|
|    103|        2.0|
+-------+-----------+



In [188]:
# Identify the user with the largest average gap
largest_avg_gap = avg_gap_df.orderBy(desc('avg_day_gap')).limit(1)

print('User with the largest average gap:')
largest_avg_gap.show()

User with the largest average gap:
+-------+-----------+
|user_id|avg_day_gap|
+-------+-----------+
|    101|        3.0|
+-------+-----------+



### 16

- Calculate the average grade for each student across all semesters.
- Filter out students with an average grade lower than 75.
- Find the top 3 students with the highest grades in each subject for the latest semester.
- Sort the final results by subject and then by grade in descending order.

In [144]:
data = [
 ("Alice", "Math", "Semester1", 85),
 ("Alice", "Math", "Semester2", 90),
 ("Alice", "English", "Semester1", 78),
 ("Alice", "English", "Semester2", 82),
 ("Bob", "Math", "Semester1", 65),
 ("Bob", "Math", "Semester2", 70),
 ("Bob", "English", "Semester1", 60),
 ("Bob", "English", "Semester2", 65),
 ("Charlie", "Math", "Semester1", 95),
 ("Charlie", "Math", "Semester2", 98),
 ("Charlie", "English", "Semester1", 88),
 ("Charlie", "English", "Semester2", 90),
 ("David", "Math", "Semester1", 78),
 ("David", "Math", "Semester2", 80),
 ("David", "English", "Semester1", 75),
 ("David", "English", "Semester2", 72),
 ("Eve", "Math", "Semester1", 88),
 ("Eve", "Math", "Semester2", 85),
 ("Eve", "English", "Semester1", 80),
 ("Eve", "English", "Semester2", 83)
]

# Create DataFrame
df = spark.createDataFrame(data, ["Student", "Subject", "Semester", "Grade"])
df.show()

+-------+-------+---------+-----+
|Student|Subject| Semester|Grade|
+-------+-------+---------+-----+
|  Alice|   Math|Semester1|   85|
|  Alice|   Math|Semester2|   90|
|  Alice|English|Semester1|   78|
|  Alice|English|Semester2|   82|
|    Bob|   Math|Semester1|   65|
|    Bob|   Math|Semester2|   70|
|    Bob|English|Semester1|   60|
|    Bob|English|Semester2|   65|
|Charlie|   Math|Semester1|   95|
|Charlie|   Math|Semester2|   98|
|Charlie|English|Semester1|   88|
|Charlie|English|Semester2|   90|
|  David|   Math|Semester1|   78|
|  David|   Math|Semester2|   80|
|  David|English|Semester1|   75|
|  David|English|Semester2|   72|
|    Eve|   Math|Semester1|   88|
|    Eve|   Math|Semester2|   85|
|    Eve|English|Semester1|   80|
|    Eve|English|Semester2|   83|
+-------+-------+---------+-----+



In [150]:
# Calculate the average grade for each student across all semesters.
std_avg_grd = df.groupBy('Semester').agg(avg(col('Grade')))
std_avg_grd.show()

+---------+----------+
| Semester|avg(Grade)|
+---------+----------+
|Semester1|      79.2|
|Semester2|      81.5|
+---------+----------+



In [151]:
#Filter out students with an average grade lower than 75. 

df.filter(col('Grade') > 75).show()

+-------+-------+---------+-----+
|Student|Subject| Semester|Grade|
+-------+-------+---------+-----+
|  Alice|   Math|Semester1|   85|
|  Alice|   Math|Semester2|   90|
|  Alice|English|Semester1|   78|
|  Alice|English|Semester2|   82|
|Charlie|   Math|Semester1|   95|
|Charlie|   Math|Semester2|   98|
|Charlie|English|Semester1|   88|
|Charlie|English|Semester2|   90|
|  David|   Math|Semester1|   78|
|  David|   Math|Semester2|   80|
|    Eve|   Math|Semester1|   88|
|    Eve|   Math|Semester2|   85|
|    Eve|English|Semester1|   80|
|    Eve|English|Semester2|   83|
+-------+-------+---------+-----+



In [153]:
#Find the top 3 students with the highest grades in each subject for the latest semester. 
# Step 1: Determine the latest semester for each subject.

latest_semester_df = df.groupBy('Subject').agg({'Semester':'max'}).withColumnRenamed('max(Semester)', 'Latest_Semester')
latest_semester_df.show()


+-------+---------------+
|Subject|Latest_Semester|
+-------+---------------+
|English|      Semester2|
|   Math|      Semester2|
+-------+---------------+



In [156]:
# Sort the final results by subject and then by grade in descending order. 

df1 = df.alias('main')
df2 = latest_semester_df.alias('latest')

latest_grades_df = df1.join(df2, (df1.Subject == df2.Subject) & (df1.Semester == df2.Latest_Semester), how='inner')\
                   .select(df1.Student, df1.Subject, df1.Grade)
latest_grades_df.show()

+-------+-------+-----+
|Student|Subject|Grade|
+-------+-------+-----+
|  Alice|   Math|   90|
|  Alice|English|   82|
|    Bob|   Math|   70|
|    Bob|English|   65|
|Charlie|   Math|   98|
|Charlie|English|   90|
|  David|   Math|   80|
|  David|English|   72|
|    Eve|English|   83|
|    Eve|   Math|   85|
+-------+-------+-----+



19
- Display the schema of the created DataFrame.
- Filter the DataFrame to show only transactions where the amount is greater than 100.
- Add a new column discounted_amount that applies a 10% discount to all transactions.

In [141]:
transaction_data = [
 {"transaction_id": 1, "customer_id": 101, "amount": 150, "date": "2025-01-01"},
 {"transaction_id": 2, "customer_id": 102, "amount": 90, "date": "2025-01-02"},
 {"transaction_id": 3, "customer_id": 103, "amount": 200, "date": "2025-01-03"},
 {"transaction_id": 4, "customer_id": 104, "amount": 50, "date": "2025-01-04"},
 {"transaction_id": 5, "customer_id": 105, "amount": 120, "date": "2025-01-05"}
]
df = spark.createDataFrame(transaction_data)
df.show()
df.printSchema()

+------+-----------+----------+--------------+
|amount|customer_id|      date|transaction_id|
+------+-----------+----------+--------------+
|   150|        101|2025-01-01|             1|
|    90|        102|2025-01-02|             2|
|   200|        103|2025-01-03|             3|
|    50|        104|2025-01-04|             4|
|   120|        105|2025-01-05|             5|
+------+-----------+----------+--------------+

root
 |-- amount: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- transaction_id: long (nullable = true)



In [142]:
df.filter(df.amount > 100).show()

+------+-----------+----------+--------------+
|amount|customer_id|      date|transaction_id|
+------+-----------+----------+--------------+
|   150|        101|2025-01-01|             1|
|   200|        103|2025-01-03|             3|
|   120|        105|2025-01-05|             5|
+------+-----------+----------+--------------+



In [143]:
# Add a new column discounted_amount that applies a 10% discount to all transactions.

df.withColumn('discounted_amount', col('amount') * 0.9).show()

+------+-----------+----------+--------------+-----------------+
|amount|customer_id|      date|transaction_id|discounted_amount|
+------+-----------+----------+--------------+-----------------+
|   150|        101|2025-01-01|             1|            135.0|
|    90|        102|2025-01-02|             2|             81.0|
|   200|        103|2025-01-03|             3|            180.0|
|    50|        104|2025-01-04|             4|             45.0|
|   120|        105|2025-01-05|             5|            108.0|
+------+-----------+----------+--------------+-----------------+



### 20 

The sales data is collected daily, and the company wants to analyze the performance of each product across different regions. The data is stored in a PySpark DataFrame with the following schema:
The management has requested a report where each region becomes a column and the values represent the total sales for each product in that region. Your task is to write PySpark code to generate this pivot table.

Task:

1. Load the sample data into a PySpark DataFrame.
2. Use PySpark's pivot functionality to create a table where:
Each region is a column.
- The rows represent the products.
- The values are the total sales for each product in each region.
3. Provide the output DataFrame in a user-friendly format for the stakeholders.

In [38]:
data =([ ("A", "North", 100), 
        ("B", "East", 200), 
        ("A", "East", 150),
        ("C", "North", 300), 
        ("B", "South", 400), 
        ("C", "East", 250) ] )
columns = ["Product", "Region", "Sales"]
df = spark.createDataFrame(data, columns)
df.show()

+-------+------+-----+
|Product|Region|Sales|
+-------+------+-----+
|      A| North|  100|
|      B|  East|  200|
|      A|  East|  150|
|      C| North|  300|
|      B| South|  400|
|      C|  East|  250|
+-------+------+-----+



In [40]:
df.groupBy('Product').pivot('Region').agg(sum('Sales')).show()

+-------+----+-----+-----+
|Product|East|North|South|
+-------+----+-----+-----+
|      B| 200| NULL|  400|
|      C| 250|  300| NULL|
|      A| 150|  100| NULL|
+-------+----+-----+-----+



### 21 

calculate the ROW_NUMBER() partitioned by department and ordered by salary in descending order. Additionally, the employees should be ranked within each department based on their hiring date if their salaries are the same. Add a new column called rank to the DataFrame that contains the calculated row numbers.

In [33]:
data = [ ("John", "HR", 5000, "2021-05-01"), 
("Jane", "HR", 6000, "2022-03-15"), 
("Sam", "Engineering", 7000, "2021-06-01"), 
("Anna", "Engineering", 8000, "2020-07-01"), 
("Paul", "HR", 4500, "2021-05-01"), 
("Sara", "Engineering", 7000, "2020-08-01"), 
("Tom", "Engineering", 7500, "2021-07-01") ]

columns = ["name", "department", "salary", "hire_date"]
df = spark.createDataFrame(data, columns)
df.show()

+----+-----------+------+----------+
|name| department|salary| hire_date|
+----+-----------+------+----------+
|John|         HR|  5000|2021-05-01|
|Jane|         HR|  6000|2022-03-15|
| Sam|Engineering|  7000|2021-06-01|
|Anna|Engineering|  8000|2020-07-01|
|Paul|         HR|  4500|2021-05-01|
|Sara|Engineering|  7000|2020-08-01|
| Tom|Engineering|  7500|2021-07-01|
+----+-----------+------+----------+



In [35]:
win = Window.partitionBy('department').orderBy(col('salary').desc())
df.withColumn('rn', row_number().over(win)).show()

+----+-----------+------+----------+---+
|name| department|salary| hire_date| rn|
+----+-----------+------+----------+---+
|Anna|Engineering|  8000|2020-07-01|  1|
| Tom|Engineering|  7500|2021-07-01|  2|
| Sam|Engineering|  7000|2021-06-01|  3|
|Sara|Engineering|  7000|2020-08-01|  4|
|Jane|         HR|  6000|2022-03-15|  1|
|John|         HR|  5000|2021-05-01|  2|
|Paul|         HR|  4500|2021-05-01|  3|
+----+-----------+------+----------+---+



## 22 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧:
    
You have the following dataset containing sales information for different products and regions. Reshape the data using PySpark's pivot() method to calculate the total sales for each product across regions, and then optimize it further by applying specific transformations.

Task 1: Use pivot() to create a table showing the total sales for each product by region.

Task 2: Add a column calculating the percentage contribution of each region to the total sales for that product.

Task 3: Sort the data in descending order by total sales for each product.

In [25]:
data = [ ("North", "Laptop", 2000, "Q1"),
        ("South", "Laptop", 3000, "Q1"), 
        ("East", "Laptop", 2500, "Q1"), 
        ("North", "Phone", 1500, "Q1"),
        ("South", "Phone", 1000, "Q1"), 
        ("East", "Phone", 2000, "Q1"),
        ("North", "Laptop", 3000, "Q2"), 
        ("South", "Laptop", 4000, "Q2"),
        ("East", "Laptop", 3500, "Q2"),
        ("North", "Phone", 2500, "Q2"),
        ("South", "Phone", 1500, "Q2"), 
        ("East", "Phone", 3000, "Q2") ]

columns = ["Region", "Product", "Sales", "Quarter"] 
df = spark.createDataFrame(data, columns)
df.show()

+------+-------+-----+-------+
|Region|Product|Sales|Quarter|
+------+-------+-----+-------+
| North| Laptop| 2000|     Q1|
| South| Laptop| 3000|     Q1|
|  East| Laptop| 2500|     Q1|
| North|  Phone| 1500|     Q1|
| South|  Phone| 1000|     Q1|
|  East|  Phone| 2000|     Q1|
| North| Laptop| 3000|     Q2|
| South| Laptop| 4000|     Q2|
|  East| Laptop| 3500|     Q2|
| North|  Phone| 2500|     Q2|
| South|  Phone| 1500|     Q2|
|  East|  Phone| 3000|     Q2|
+------+-------+-----+-------+



In [26]:
# Pivot to calculate total sales by region
pivot_df = df.groupBy('Product').pivot('Region').sum('Sales')
pivot_df.show()

+-------+----+-----+-----+
|Product|East|North|South|
+-------+----+-----+-----+
|  Phone|5000| 4000| 2500|
| Laptop|6000| 5000| 7000|
+-------+----+-----+-----+



In [28]:
# Add a column calculating the percentage contribution of each region to the total sales for that product.

total_seles_df = pivot_df\
    .withColumn('Total_Sales', col('North') + col('South') + col('East'))\
    .withColumn('North_%', round(col('North') / col('Total_Sales') * 100, 2))\
    .withColumn('South_%', round(col('South') / col('Total_Sales') * 100, 2))\
    .withColumn('East_%', round(col('East') / col('Total_Sales') * 100, 2))

total_seles_df.show()  

+-------+----+-----+-----+-----------+-------+-------+------+
|Product|East|North|South|Total_Sales|North_%|South_%|East_%|
+-------+----+-----+-----+-----------+-------+-------+------+
|  Phone|5000| 4000| 2500|      11500|  34.78|  21.74| 43.48|
| Laptop|6000| 5000| 7000|      18000|  27.78|  38.89| 33.33|
+-------+----+-----+-----+-----------+-------+-------+------+



In [32]:
# Sort the data in descending order by total sales for each product.

total_seles_df.sort(col('Total_Sales').desc()).show()
# OR 
total_seles_df.orderBy(col('Total_Sales').desc()).show()

+-------+----+-----+-----+-----------+-------+-------+------+
|Product|East|North|South|Total_Sales|North_%|South_%|East_%|
+-------+----+-----+-----+-----------+-------+-------+------+
| Laptop|6000| 5000| 7000|      18000|  27.78|  38.89| 33.33|
|  Phone|5000| 4000| 2500|      11500|  34.78|  21.74| 43.48|
+-------+----+-----+-----+-----------+-------+-------+------+

+-------+----+-----+-----+-----------+-------+-------+------+
|Product|East|North|South|Total_Sales|North_%|South_%|East_%|
+-------+----+-----+-----+-----------+-------+-------+------+
| Laptop|6000| 5000| 7000|      18000|  27.78|  38.89| 33.33|
|  Phone|5000| 4000| 2500|      11500|  34.78|  21.74| 43.48|
+-------+----+-----+-----+-----------+-------+-------+------+



### 24 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 

You are given a nested JSON file named sample_data.json stored in an S3 bucket at s3://your-bucket/sample_data.json. The JSON file contains details about employees, including their names, departments, and address details (nested fields).

Write a PySpark program to:
- Load the JSON file into a DataFrame.
- Flatten the nested structure to create a tabular format.
- Write the resulting DataFrame as a Parquet file to the output path s3://your-bucket/output/.

In [82]:
"""from google.cloud import storage
storage_client = storage.client(project = project_id)
bucket = storage_client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix = subdirectory_name)
for obj in blobs:
    blob1 = obj.name
    blob = bucket.blob(blob1)
    blob.download_to_filename(blob1)
    
#upload file 
path = ("{0}/output/{1}".format(subdirectory_name, f_name))
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(path)
blob.upload_from_filename(f_name)"""

#df = spark.read.json('sample_data.json')

f_df = spark.read.format('json')\
           .option('inferschema', True)\
           .option('multiline', True)\
           .load('sample_data.json')
f_df.show()     
f_df.printSchema()

+-------------------+----------+---+-------+
|            address|department| id|   name|
+-------------------+----------+---+-------+
|     {New York, NY}|        HR|  1|  Alice|
|{San Francisco, CA}|        IT|  2|    Bob|
|      {Chicago, IL}|   Finance|  3|Charlie|
+-------------------+----------+---+-------+

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- department: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [89]:
df = f_df.select('id','name','department', 'address.city','address.state')\
                                           .withColumnRenamed('city','address_city')\
                                           .withColumnRenamed('state','address_state')
df.show()

+---+-------+----------+-------------+-------------+
| id|   name|department| address_city|address_state|
+---+-------+----------+-------------+-------------+
|  1|  Alice|        HR|     New York|           NY|
|  2|    Bob|        IT|San Francisco|           CA|
|  3|Charlie|   Finance|      Chicago|           IL|
+---+-------+----------+-------------+-------------+



In [92]:
#df.write.mode('overwrite').parquet('path')
"""df.write.format('json')\
        .option('header', True)\
        .option('mode', 'overwrite')\
        .option('path','newJson/')\
        .save()"""

## 26 

split the values in this column into multiple rows to make the dataset suitable for downstream processing.


In [60]:
data = [ (1, "apple,banana,orange"), 
 (2, "mango,grapes"), (3, "pineapple") ] 

columns = ["ID", "Tags"]
df = spark.createDataFrame(data, columns)
df.show()

+---+-------------------+
| ID|               Tags|
+---+-------------------+
|  1|apple,banana,orange|
|  2|       mango,grapes|
|  3|          pineapple|
+---+-------------------+



In [65]:
df.withColumn('Tag', explode(split(df['Tags'], ','))).drop('Tags').show()

+---+---------+
| ID|      Tag|
+---+---------+
|  1|    apple|
|  1|   banana|
|  1|   orange|
|  2|    mango|
|  2|   grapes|
|  3|pineapple|
+---+---------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 27 

Imagine you're analyzing the monthly sales performance of a company across different regions. You want to calculate:

- The cumulative sales for each region over months.
- The rank of each month based on sales within the same region.

In [57]:
data = [ ("East", "Jan", 200), ("East", "Feb", 300), 
("East", "Mar", 250), ("West", "Jan", 400), 
("West", "Feb", 350), ("West", "Mar", 450) ]

columns = ["Region", "Month", "Sales"]
df = spark.createDataFrame(data, columns)
df.show()

+------+-----+-----+
|Region|Month|Sales|
+------+-----+-----+
|  East|  Jan|  200|
|  East|  Feb|  300|
|  East|  Mar|  250|
|  West|  Jan|  400|
|  West|  Feb|  350|
|  West|  Mar|  450|
+------+-----+-----+



In [59]:
# Define a window partitionBy by Region and ordered by Sales
window_spec = Window.partitionBy('Region').orderBy('Sales')

# Add cumulative sum and rank columns:
res_df = df.withColumn('Cumulative_Sales', sum('Sales').over(window_spec))
res_df.show()
res_df = res_df.withColumn('Rank', rank().over(window_spec))
res_df.show()

+------+-----+-----+----------------+
|Region|Month|Sales|Cumulative_Sales|
+------+-----+-----+----------------+
|  East|  Jan|  200|             200|
|  East|  Mar|  250|             450|
|  East|  Feb|  300|             750|
|  West|  Feb|  350|             350|
|  West|  Jan|  400|             750|
|  West|  Mar|  450|            1200|
+------+-----+-----+----------------+

+------+-----+-----+----------------+----+
|Region|Month|Sales|Cumulative_Sales|Rank|
+------+-----+-----+----------------+----+
|  East|  Jan|  200|             200|   1|
|  East|  Mar|  250|             450|   2|
|  East|  Feb|  300|             750|   3|
|  West|  Feb|  350|             350|   1|
|  West|  Jan|  400|             750|   2|
|  West|  Mar|  450|            1200|   3|
+------+-----+-----+----------------+----+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 28

You are working as a Data Engineer and need to clean up a dataset that contains customer order information. The dataset includes details such as the customer ID, order ID, order date, and the total amount. Due to a data processing issue, some rows are duplicated, and you need to remove duplicates based on a composite key of customer_id and order_id, keeping only the latest order (based on the order_date).

You need to remove the duplicate rows based on the composite key (customer_id, order_id) and retain only the row with the latest order_date for each combination of customer_id and order_id.

In [34]:
data = [ (101, 1001, "2025-01-15", 500.00), (102, 1002, "2025-01-14", 300.00), (101, 1001, "2025-01-17", 550.00), (103, 1003, "2025-01-16", 450.00),
(102, 1002, "2025-01-18", 320.00), (103, 1003, "2025-01-19", 460.00) ]

schema = ["customer_id", "order_id", "order_date", "total_amount"] 
df = spark.createDataFrame(data, schema)
df.show()

+-----------+--------+----------+------------+
|customer_id|order_id|order_date|total_amount|
+-----------+--------+----------+------------+
|        101|    1001|2025-01-15|       500.0|
|        102|    1002|2025-01-14|       300.0|
|        101|    1001|2025-01-17|       550.0|
|        103|    1003|2025-01-16|       450.0|
|        102|    1002|2025-01-18|       320.0|
|        103|    1003|2025-01-19|       460.0|
+-----------+--------+----------+------------+



In [53]:
#df.drop_duplicates(['customer_id','order_id']).show()

# Cast the order_date to DateType for proper compasion
df = df.withColumn('order_date', to_date(df['order_date'], 'yyyy-MM-dd'))
df.show()

+-----------+--------+----------+------------+
|customer_id|order_id|order_date|total_amount|
+-----------+--------+----------+------------+
|        101|    1001|2025-01-15|       500.0|
|        102|    1002|2025-01-14|       300.0|
|        101|    1001|2025-01-17|       550.0|
|        103|    1003|2025-01-16|       450.0|
|        102|    1002|2025-01-18|       320.0|
|        103|    1003|2025-01-19|       460.0|
+-----------+--------+----------+------------+



In [54]:
# Window specification to get the latest order per customer_id and order_id

window_spec = Window.partitionBy('customer_id', 'order_id').orderBy(col('order_date').desc())

# Add row_number to identify the latest record
df_with_row_number = df.withColumn('row_num', row_number().over(window_spec))
df_with_row_number.show()

+-----------+--------+----------+------------+-------+
|customer_id|order_id|order_date|total_amount|row_num|
+-----------+--------+----------+------------+-------+
|        101|    1001|2025-01-17|       550.0|      1|
|        101|    1001|2025-01-15|       500.0|      2|
|        102|    1002|2025-01-18|       320.0|      1|
|        102|    1002|2025-01-14|       300.0|      2|
|        103|    1003|2025-01-19|       460.0|      1|
|        103|    1003|2025-01-16|       450.0|      2|
+-----------+--------+----------+------------+-------+



In [55]:
# Filter to get onnly the latest record 

res_df = df_with_row_number.filter(col('row_num') == 1).drop('row_num')
res_df.show()

+-----------+--------+----------+------------+
|customer_id|order_id|order_date|total_amount|
+-----------+--------+----------+------------+
|        101|    1001|2025-01-17|       550.0|
|        102|    1002|2025-01-18|       320.0|
|        103|    1003|2025-01-19|       460.0|
+-----------+--------+----------+------------+



29 The columns contain different types of data, including numeric, categorical, and string values. Your objective is to:

1. Fill numeric columns with the median value.
2. Fill categorical columns with the most frequent value.
3. Fill string columns with "Unknown".

In [48]:
data = [ (1, 25, 'North', 'M', '2025-01-01', 150),
(2, None, 'East', None, '2025-01-02', None),
(3, 30, 'South', 'F', None, 200),
(4, 22, None, 'M', '2025-01-03', 180),
(5, 28, 'West', 'F', None, None), ]
schema = StructType([StructField('Customer_ID', IntegerType
                                 (), True),
                    StructField('Age', IntegerType(), True),
                    StructField('Region', StringType(), True),
                    StructField('Gender', StringType(), True),
                     StructField('Last_Visit', StringType(), True),
                     StructField('Purchase_Amount', IntegerType(), True),
                    ])

#columns = ['Customer_ID', 'Age', 'Region', 'Gender', 'Last_Visit', 'Purchase_Amount'] 
df = spark.createDataFrame(data, schema)
print(df.printSchema())
df.show()

root
 |-- Customer_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Last_Visit: string (nullable = true)
 |-- Purchase_Amount: integer (nullable = true)

None
+-----------+----+------+------+----------+---------------+
|Customer_ID| Age|Region|Gender|Last_Visit|Purchase_Amount|
+-----------+----+------+------+----------+---------------+
|          1|  25| North|     M|2025-01-01|            150|
|          2|NULL|  East|  NULL|2025-01-02|           NULL|
|          3|  30| South|     F|      NULL|            200|
|          4|  22|  NULL|     M|2025-01-03|            180|
|          5|  28|  West|     F|      NULL|           NULL|
+-----------+----+------+------+----------+---------------+



In [50]:
# Define a function to fill missing values dynamically
def fill_missing_values(df):
    
    column_types = df.dtypes
    print('column_types: ', column_types)
    
    # loop through each column based on type
    for column, dtype in column_types:
        
        if dtype == 'int' or dtype == 'double' or dtype == 'long':
            median_value = df.approxQuantile(column, [0.5], 0)[0]
            df = df.fillna({column: median_value})
            
        elif dtype == 'string':
            df = df.fillna({column: 'Unknown'})
            
        else:
            df = df.fillna({column: 'Unknown'})
        
    return df
filled_df = fill_missing_values(df)
filled_df.show()

column_types:  [('Customer_ID', 'int'), ('Age', 'int'), ('Region', 'string'), ('Gender', 'string'), ('Last_Visit', 'string'), ('Purchase_Amount', 'int')]
+-----------+---+-------+-------+----------+---------------+
|Customer_ID|Age| Region| Gender|Last_Visit|Purchase_Amount|
+-----------+---+-------+-------+----------+---------------+
|          1| 25|  North|      M|2025-01-01|            150|
|          2| 25|   East|Unknown|2025-01-02|            180|
|          3| 30|  South|      F|   Unknown|            200|
|          4| 22|Unknown|      M|2025-01-03|            180|
|          5| 28|   West|      F|   Unknown|            180|
+-----------+---+-------+-------+----------+---------------+



30 Validate the date format and filter rows where input_date matches the format "yyyy-MM-dd".

- Transform valid dates into the format "MM/dd/yyyy".
- For invalid dates, replace them with the string "Invalid Date".
- Output the transformed DataFrame with a new column named validated_date."""

In [32]:
data = [ ("2023-12-31",), ("31-12-2023",),
("2023/12/31",), ("2024-01-01",),
("13-01-2023",), ("invalid",), ("2022-02-28",) ]
columns = ["input_date"]
df = spark.createDataFrame(data, columns)
df.show() 

+----------+
|input_date|
+----------+
|2023-12-31|
|31-12-2023|
|2023/12/31|
|2024-01-01|
|13-01-2023|
|   invalid|
|2022-02-28|
+----------+



In [33]:
result_df = df.withColumn('validated_date', when(to_date(col('input_date'), 'yyyy-MM-dd').isNotNull(), date_format(to_date(col('input_date'), 'yyyy-MM-dd'), 'MM/dd/yyyy')).otherwise('Invalid_date'))
result_df.show() 

+----------+--------------+
|input_date|validated_date|
+----------+--------------+
|2023-12-31|    12/31/2023|
|31-12-2023|  Invalid_date|
|2023/12/31|  Invalid_date|
|2024-01-01|    01/01/2024|
|13-01-2023|  Invalid_date|
|   invalid|  Invalid_date|
|2022-02-28|    02/28/2022|
+----------+--------------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 34

You are given a dataset of sales transactions for multiple stores and products.
- Calculate the percentage contribution of each product's sales to the total sales of its store.

In [4]:
data = [ ("S1", "P1", 100), ("S1", "P2", 200),
("S1", "P3", 300), ("S2", "P1", 400),
("S2", "P2", 100), ("S2", "P3", 500) ]
columns = ["StoreID", "Product", "Sales"] 
df = spark.createDataFrame(data, columns)
df.show()

+-------+-------+-----+
|StoreID|Product|Sales|
+-------+-------+-----+
|     S1|     P1|  100|
|     S1|     P2|  200|
|     S1|     P3|  300|
|     S2|     P1|  400|
|     S2|     P2|  100|
|     S2|     P3|  500|
+-------+-------+-----+



In [5]:
df1 = df.groupBy('StoreID').agg(sum('Sales').alias('Total_Sales'))
df1.show()

+-------+-----------+
|StoreID|Total_Sales|
+-------+-----------+
|     S1|        600|
|     S2|       1000|
+-------+-----------+



In [7]:
join_df = df.join(df1, on='StoreID', how= 'inner')
join_df.show()

+-------+-------+-----+-----------+
|StoreID|Product|Sales|Total_Sales|
+-------+-------+-----+-----------+
|     S1|     P1|  100|        600|
|     S1|     P2|  200|        600|
|     S1|     P3|  300|        600|
|     S2|     P1|  400|       1000|
|     S2|     P2|  100|       1000|
|     S2|     P3|  500|       1000|
+-------+-------+-----+-----------+



In [8]:
join_df.withColumn('percnt', round((col('Sales') / col('Total_sales'))*100,2)).show()

+-------+-------+-----+-----------+------+
|StoreID|Product|Sales|Total_Sales|percnt|
+-------+-------+-----+-----------+------+
|     S1|     P1|  100|        600| 16.67|
|     S1|     P2|  200|        600| 33.33|
|     S1|     P3|  300|        600|  50.0|
|     S2|     P1|  400|       1000|  40.0|
|     S2|     P2|  100|       1000|  10.0|
|     S2|     P3|  500|       1000|  50.0|
+-------+-------+-----+-----------+------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 35

You are working as a Data Engineer at a retail company. The marketing team has provided a dataset of customer purchases to analyze the relationship between the amount spent on advertisements and the revenue generated. 
- Using PySpark, compute the correlation between the "Ad_Spend" and "Revenue" columns to determine if there's a linear relationship.

In [9]:
schema = StructType([ StructField("Customer_ID", StringType(), True), StructField("Ad_Spend", IntegerType(), True), StructField("Revenue", IntegerType(), True) ])
data = [ ("C001", 2000, 25000), ("C002", 1500, 23000),
("C003", 3000, 40000), ("C004", 1200, 18000),
("C005", 2500, 30000) ] 
df = spark.createDataFrame(data, schema)
df.show() 

+-----------+--------+-------+
|Customer_ID|Ad_Spend|Revenue|
+-----------+--------+-------+
|       C001|    2000|  25000|
|       C002|    1500|  23000|
|       C003|    3000|  40000|
|       C004|    1200|  18000|
|       C005|    2500|  30000|
+-----------+--------+-------+



In [11]:
correlation = df.stat.corr('Ad_Spend', 'Revenue')
print('The correlation between ad_spend and revenue is:' ,correlation)

The correlation between ad_spend and revenue is: 0.9704535552410213


𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 36

You are given a large e-commerce transaction dataset stored in a partitioned format based on country. 
- Count the distinct number of products purchased (product_id) for each customer_id in every country. The result should include the country, customer ID, and the distinct product count.

In [12]:
data = [ ("USA", 101, "P001"), 
("USA", 101, "P002"), ("USA", 101, "P001"), 
("USA", 102, "P003"), ("USA", 102, "P003"), 
("UK", 201, "P004"), ("UK", 201, "P005"), 
("UK", 202, "P004"), ("UK", 202, "P005"), ("UK", 202, "P004") ]

columns = ["country", "customer_id", "product_id"]
df = spark.createDataFrame(data, columns)
df.show() 

+-------+-----------+----------+
|country|customer_id|product_id|
+-------+-----------+----------+
|    USA|        101|      P001|
|    USA|        101|      P002|
|    USA|        101|      P001|
|    USA|        102|      P003|
|    USA|        102|      P003|
|     UK|        201|      P004|
|     UK|        201|      P005|
|     UK|        202|      P004|
|     UK|        202|      P005|
|     UK|        202|      P004|
+-------+-----------+----------+



In [13]:
df.groupBy('country', 'customer_id').agg(countDistinct(col('product_id'))).show()

+-------+-----------+--------------------------+
|country|customer_id|count(DISTINCT product_id)|
+-------+-----------+--------------------------+
|    USA|        101|                         2|
|     UK|        202|                         2|
|     UK|        201|                         2|
|    USA|        102|                         1|
+-------+-----------+--------------------------+



$Broadcast$ the smaller DataFrame (product_data). 37

In [37]:
sales_data = [ (1, 101, 5, '2025-01-01'), (2, 102, 3, '2025-01-02'),
(3, 103, 2, '2025-01-03'), (4, 101, 1, '2025-01-04'),
(5, 104, 4, '2025-01-05'), (6, 105, 6, '2025-01-06'), ]

# product_data (Small DataFrame)

product_data = [ (101, 'Laptop', 'Electronics', 1000),
(102, 'Phone', 'Electronics', 500),
(103, 'Headphones', 'Accessories', 150),
(104, 'Tablet', 'Electronics', 600),
(105, 'Smartwatch', 'Accessories', 200), ] 

sales_df = spark.createDataFrame(sales_data, ['sale_id', 'product_id','quantity', 'sale_date'])
product_df= spark.createDataFrame(product_data, ['product_id', 'product_name', 'category', 'price'])
sales_df.show()
product_df.show()

+-------+----------+--------+----------+
|sale_id|product_id|quantity| sale_date|
+-------+----------+--------+----------+
|      1|       101|       5|2025-01-01|
|      2|       102|       3|2025-01-02|
|      3|       103|       2|2025-01-03|
|      4|       101|       1|2025-01-04|
|      5|       104|       4|2025-01-05|
|      6|       105|       6|2025-01-06|
+-------+----------+--------+----------+

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|       101|      Laptop|Electronics| 1000|
|       102|       Phone|Electronics|  500|
|       103|  Headphones|Accessories|  150|
|       104|      Tablet|Electronics|  600|
|       105|  Smartwatch|Accessories|  200|
+----------+------------+-----------+-----+



In [39]:
from pyspark.sql.functions import broadcast

prod_df_broadcast = broadcast(product_df)

joined_df = sales_df.join(prod_df_broadcast, on='product_id', how='inner')
joined_df.show(truncate=False)


+----------+-------+--------+----------+------------+-----------+-----+
|product_id|sale_id|quantity|sale_date |product_name|category   |price|
+----------+-------+--------+----------+------------+-----------+-----+
|101       |1      |5       |2025-01-01|Laptop      |Electronics|1000 |
|102       |2      |3       |2025-01-02|Phone       |Electronics|500  |
|103       |3      |2       |2025-01-03|Headphones  |Accessories|150  |
|101       |4      |1       |2025-01-04|Laptop      |Electronics|1000 |
|104       |5      |4       |2025-01-05|Tablet      |Electronics|600  |
|105       |6      |6       |2025-01-06|Smartwatch  |Accessories|200  |
+----------+-------+--------+----------+------------+-----------+-----+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 39

You are working with large datasets in PySpark and need to join two DataFrames. However, one of the tables has highly skewed data, causing performance issues due to data shuffling. How would you optimize this join using salting techniques?
You are given the following sample datasets:

sales_df (Fact Table - Large Dataset, Highly Skewed on store_id)
Your task is to perform an optimized join between sales_df and store_df on store_id, ensuring that the skewness does not degrade performance.

In [32]:
sales_data = [ (101, "P001", 100), (101, "P002", 200), (101, "P003", 150), (102, "P004", 300), 
              (103, "P005", 400), (101, "P006", 500), (104, "P007", 250) ] 

sales_df = spark.createDataFrame(sales_data, ["store_id", "product_id", "amount"]) 
sales_df.show()

store_data = [(101, "Walmart"), (102, "Target"), (103, "Costco"), (104, "BestBuy")] 
store_df = spark.createDataFrame(store_data, ["store_id", "store_name"]) 
store_df.show()

+--------+----------+------+
|store_id|product_id|amount|
+--------+----------+------+
|     101|      P001|   100|
|     101|      P002|   200|
|     101|      P003|   150|
|     102|      P004|   300|
|     103|      P005|   400|
|     101|      P006|   500|
|     104|      P007|   250|
+--------+----------+------+

+--------+----------+
|store_id|store_name|
+--------+----------+
|     101|   Walmart|
|     102|    Target|
|     103|    Costco|
|     104|   BestBuy|
+--------+----------+



In [33]:
# Step 1: Adding Salt to skewed 'sales_df'

num_salt_keys = 3  # Define the range of salt keys 

sales_df_salted = sales_df.withColumn('salt', floor(rand() * num_salt_keys))\
                          .withColumn('salted_store_id', concat_ws("_", col('store_id'), col('salt')))
sales_df_salted.show()

+--------+----------+------+----+---------------+
|store_id|product_id|amount|salt|salted_store_id|
+--------+----------+------+----+---------------+
|     101|      P001|   100|   1|          101_1|
|     101|      P002|   200|   0|          101_0|
|     101|      P003|   150|   0|          101_0|
|     102|      P004|   300|   0|          102_0|
|     103|      P005|   400|   0|          103_0|
|     101|      P006|   500|   2|          101_2|
|     104|      P007|   250|   0|          104_0|
+--------+----------+------+----+---------------+



In [34]:
# Step 2: Expanding 'store_df' for Join compatibility 

expanded_store_df = store_df.crossJoin(spark.range(0, num_salt_keys).toDF('salt'))\
                    .withColumn('salted_store_id', concat_ws('_', col('store_id'), col('salt')))
expanded_store_df.show()

+--------+----------+----+---------------+
|store_id|store_name|salt|salted_store_id|
+--------+----------+----+---------------+
|     101|   Walmart|   0|          101_0|
|     101|   Walmart|   1|          101_1|
|     101|   Walmart|   2|          101_2|
|     102|    Target|   0|          102_0|
|     102|    Target|   1|          102_1|
|     102|    Target|   2|          102_2|
|     103|    Costco|   0|          103_0|
|     103|    Costco|   1|          103_1|
|     103|    Costco|   2|          103_2|
|     104|   BestBuy|   0|          104_0|
|     104|   BestBuy|   1|          104_1|
|     104|   BestBuy|   2|          104_2|
+--------+----------+----+---------------+



In [36]:
# Step 3: Performing the Optimized Join on Salted Keys

joined_df = sales_df_salted.join(expanded_store_df,'salted_store_id', 'inner').drop('salted_store_id', 'salt')
joined_df.show()

+--------+----------+------+--------+----------+
|store_id|product_id|amount|store_id|store_name|
+--------+----------+------+--------+----------+
|     101|      P002|   200|     101|   Walmart|
|     101|      P003|   150|     101|   Walmart|
|     101|      P001|   100|     101|   Walmart|
|     101|      P006|   500|     101|   Walmart|
|     102|      P004|   300|     102|    Target|
|     103|      P005|   400|     103|    Costco|
|     104|      P007|   250|     104|   BestBuy|
+--------+----------+------+--------+----------+



"""𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 40

You are working as a Data Engineer at a fintech company. Your team is working on integrating two datasets:

1. Customer Transactions Data (transactions_df) - Contains customer transactions with columns: customer_id, txn_id, amount, and txn_date. 

2. Customer Profile Data (profile_df) - Contains customer information with columns: customer_id, name, age, and txn_id (latest transaction ID for reference).

 The requirement is to merge these two DataFrames on customer_id while keeping track of:

Conflicting column names (txn_id) should be renamed properly.

If a customer exists in profile_df but not in transactions_df, the row should still be present with NULL values for transaction-related columns.

Your task is to write an optimized PySpark code to achieve this."""

In [4]:
transactions_data = [ (101, "T001", 500, "2024-08-10"), (102, "T002", 1200, "2024-08-09"), 
                     (103, "T003", 300, "2024-08-08"), (104, "T004", 450, "2024-08-07"), ] 

profile_data = [ (101, "John", 30, "T001"), (102, "Emma", 27, "T005"), 
                (103, "Alex", 35, "T003"), (105, "Sam", 40, "T006"), ]

transactions_df = spark.createDataFrame(transactions_data, ["customer_id", "txn_id", "amount", "txn_date"])
transactions_df.show()

profile_df = spark.createDataFrame(profile_data, ["customer_id", "name", "age", "txn_id"])
profile_df.show()

+-----------+------+------+----------+
|customer_id|txn_id|amount|  txn_date|
+-----------+------+------+----------+
|        101|  T001|   500|2024-08-10|
|        102|  T002|  1200|2024-08-09|
|        103|  T003|   300|2024-08-08|
|        104|  T004|   450|2024-08-07|
+-----------+------+------+----------+

+-----------+----+---+------+
|customer_id|name|age|txn_id|
+-----------+----+---+------+
|        101|John| 30|  T001|
|        102|Emma| 27|  T005|
|        103|Alex| 35|  T003|
|        105| Sam| 40|  T006|
+-----------+----+---+------+



In [7]:
profile_df = profile_df.withColumnRenamed('txn_id', 'last_txn_id')
profile_df.show()

+-----------+----+---+-----------+
|customer_id|name|age|last_txn_id|
+-----------+----+---+-----------+
|        101|John| 30|       T001|
|        102|Emma| 27|       T005|
|        103|Alex| 35|       T003|
|        105| Sam| 40|       T006|
+-----------+----+---+-----------+



In [8]:
profile_df.join(transactions_df, on = "customer_id", how = "full_outer").show()

+-----------+----+----+-----------+------+------+----------+
|customer_id|name| age|last_txn_id|txn_id|amount|  txn_date|
+-----------+----+----+-----------+------+------+----------+
|        101|John|  30|       T001|  T001|   500|2024-08-10|
|        102|Emma|  27|       T005|  T002|  1200|2024-08-09|
|        103|Alex|  35|       T003|  T003|   300|2024-08-08|
|        104|NULL|NULL|       NULL|  T004|   450|2024-08-07|
|        105| Sam|  40|       T006|  NULL|  NULL|      NULL|
+-----------+----+----+-----------+------+------+----------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 42

You are given an employee dataset containing information about employees and their managers. Each employee has a manager_id that refers to another employee in the same table. Your task is to use self-join to find hierarchical relationships between employees, such as finding all employees under a specific manager or the reporting hierarchy of an employee.

Interview Task
- Write a PySpark self-join query to find the direct reports of each manager. Additionally, extend the logic to find all hierarchical relationships up to any level.


In [9]:
data = [ (1, "Alice", None), (2, "Bob", 1),
(3, "Charlie", 1), (4, "David", 2),
(5, "Eva", 2), (6, "Frank", 3), (7, "Grace", 3) ]

columns = ["employee_id", "employee_name", "manager_id"]
df = spark.createDataFrame(data, columns)
df.show()

+-----------+-------------+----------+
|employee_id|employee_name|manager_id|
+-----------+-------------+----------+
|          1|        Alice|      NULL|
|          2|          Bob|         1|
|          3|      Charlie|         1|
|          4|        David|         2|
|          5|          Eva|         2|
|          6|        Frank|         3|
|          7|        Grace|         3|
+-----------+-------------+----------+



In [17]:
df.alias('emp').join(df.alias('mgr'), col('mgr.employee_id') == col('emp.manager_id'), 'left')\
           .select(col('mgr.employee_name'), col('emp.employee_name')).show()

+-------------+-------------+
|employee_name|employee_name|
+-------------+-------------+
|         NULL|        Alice|
|        Alice|          Bob|
|        Alice|      Charlie|
|          Bob|        David|
|          Bob|          Eva|
|      Charlie|        Frank|
|      Charlie|        Grace|
+-------------+-------------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 43

You are working as a Data Engineer, and the company has a log system where timestamps are recorded for every user action (e.g., when the user logs in and logs out). Your manager wants to know how much time each user spends between log in and log out.

calculate the difference between the logout_timestamp and login_timestamp in hours, minutes, and seconds. The result should be formatted like "HH:mm:ss".


In [14]:
data = [ (1, "2025-01-31 08:00:00", "2025-01-31 10:30:45"),
(2, "2025-01-31 09:00:30", "2025-01-31 12:15:10"),
(3, "2025-01-31 07:45:00", "2025-01-31 09:00:15") ]

columns = ["user_id", "login_timestamp", "logout_timestamp"] 
df = spark.createDataFrame(data, columns)
df.show()

+-------+-------------------+-------------------+
|user_id|    login_timestamp|   logout_timestamp|
+-------+-------------------+-------------------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|
+-------+-------------------+-------------------+



In [15]:
df = df.withColumn('login_time', unix_timestamp('login_timestamp'))
df = df.withColumn('logout_time', unix_timestamp('logout_timestamp'))
df.show()

+-------+-------------------+-------------------+----------+-----------+
|user_id|    login_timestamp|   logout_timestamp|login_time|logout_time|
+-------+-------------------+-------------------+----------+-----------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|1738290600| 1738299645|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|1738294230| 1738305910|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|1738289700| 1738294215|
+-------+-------------------+-------------------+----------+-----------+



In [16]:
# Calculate difference 

df = df.withColumn('duration_seconds', col('logout_time')-col('login_time'))
df.show()

+-------+-------------------+-------------------+----------+-----------+----------------+
|user_id|    login_timestamp|   logout_timestamp|login_time|logout_time|duration_seconds|
+-------+-------------------+-------------------+----------+-----------+----------------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|1738290600| 1738299645|            9045|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|1738294230| 1738305910|           11680|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|1738289700| 1738294215|            4515|
+-------+-------------------+-------------------+----------+-----------+----------------+



In [17]:
# Calculate hours, minutes, and seconds

df = df.withColumn('hours', (col('duration_seconds') / 3600).cast('int'))
df = df.withColumn('minutes', ((col('duration_seconds') % 3600) / 60).cast('int'))
df = df.withColumn('seconds', (col('duration_seconds') % 60).cast('int'))
df.show()

+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+
|user_id|    login_timestamp|   logout_timestamp|login_time|logout_time|duration_seconds|hours|minutes|seconds|
+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|1738290600| 1738299645|            9045|    2|     30|     45|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|1738294230| 1738305910|           11680|    3|     14|     40|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|1738289700| 1738294215|            4515|    1|     15|     15|
+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+



In [18]:
df = df.withColumn('formatted_duration', expr("lpad(hours, 2,'0') ||':' || lpad(minutes, 2,'0') ||':' || lpad(seconds, 2,'0')"))
df.show()

+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+------------------+
|user_id|    login_timestamp|   logout_timestamp|login_time|logout_time|duration_seconds|hours|minutes|seconds|formatted_duration|
+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+------------------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|1738290600| 1738299645|            9045|    2|     30|     45|          02:30:45|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|1738294230| 1738305910|           11680|    3|     14|     40|          03:14:40|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|1738289700| 1738294215|            4515|    1|     15|     15|          01:15:15|
+-------+-------------------+-------------------+----------+-----------+----------------+-----+-------+-------+------------------+



In [19]:
df.select('user_id', 'formatted_duration').show(truncate=False)

+-------+------------------+
|user_id|formatted_duration|
+-------+------------------+
|1      |02:30:45          |
|2      |03:14:40          |
|3      |01:15:15          |
+-------+------------------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 44 

You have a dataset of user activities in an e-commerce application, where each row represents an activity performed by a user. The dataset contains duplicate activity entries (based on user and activity type) and you need to remove the duplicates. Furthermore, you want to keep only the most recent record for each user, based on a timestamp column.

Problem
- Remove duplicates based on user_id and activity_type.
- Keep only the most recent activity_timestamp for each user and activity type combination.

In [9]:
data = [ (1, 'login', '2025-02-01 10:00:00'), (1, 'view_product', '2025-02-01 10:05:00'), 
        (1, 'login', '2025-02-01 10:30:00'), (2, 'purchase', '2025-02-01 11:00:00'), (2, 'login', '2025-02-01 11:15:00'), 
(2, 'view_product', '2025-02-01 11:30:00'), (3, 'login', '2025-02-01 12:00:00'), (3, 'login', '2025-02-01 12:05:00') ]
 
df = spark.createDataFrame(data, ["user_id", "activity_type", "activity_timestamp"])
df.show()
df.printSchema()

+-------+-------------+-------------------+
|user_id|activity_type| activity_timestamp|
+-------+-------------+-------------------+
|      1|        login|2025-02-01 10:00:00|
|      1| view_product|2025-02-01 10:05:00|
|      1|        login|2025-02-01 10:30:00|
|      2|     purchase|2025-02-01 11:00:00|
|      2|        login|2025-02-01 11:15:00|
|      2| view_product|2025-02-01 11:30:00|
|      3|        login|2025-02-01 12:00:00|
|      3|        login|2025-02-01 12:05:00|
+-------+-------------+-------------------+

root
 |-- user_id: long (nullable = true)
 |-- activity_type: string (nullable = true)
 |-- activity_timestamp: string (nullable = true)



In [11]:
df = df.withColumn('activity_timestamp', col('activity_timestamp').cast('timestamp'))
df.show()

+-------+-------------+-------------------+
|user_id|activity_type| activity_timestamp|
+-------+-------------+-------------------+
|      1|        login|2025-02-01 10:00:00|
|      1| view_product|2025-02-01 10:05:00|
|      1|        login|2025-02-01 10:30:00|
|      2|     purchase|2025-02-01 11:00:00|
|      2|        login|2025-02-01 11:15:00|
|      2| view_product|2025-02-01 11:30:00|
|      3|        login|2025-02-01 12:00:00|
|      3|        login|2025-02-01 12:05:00|
+-------+-------------+-------------------+



In [21]:
window_spec = Window.partitionBy('user_id', 'activity_type').orderBy(col('activity_timestamp').desc())
# Add a row number to each partition
df_with_row_num = df.withColumn('row_num', row_number().over(window_spec))
df_with_row_num.show()

+-------+-------------+-------------------+-------+
|user_id|activity_type| activity_timestamp|row_num|
+-------+-------------+-------------------+-------+
|      1|        login|2025-02-01 10:30:00|      1|
|      1|        login|2025-02-01 10:00:00|      2|
|      1| view_product|2025-02-01 10:05:00|      1|
|      2|        login|2025-02-01 11:15:00|      1|
|      2|     purchase|2025-02-01 11:00:00|      1|
|      2| view_product|2025-02-01 11:30:00|      1|
|      3|        login|2025-02-01 12:05:00|      1|
|      3|        login|2025-02-01 12:00:00|      2|
+-------+-------------+-------------------+-------+



In [22]:
df_filtered = df_with_row_num.filter(col('row_num') == 1).drop('row_num')
df_filtered.show()

+-------+-------------+-------------------+
|user_id|activity_type| activity_timestamp|
+-------+-------------+-------------------+
|      1|        login|2025-02-01 10:30:00|
|      1| view_product|2025-02-01 10:05:00|
|      2|        login|2025-02-01 11:15:00|
|      2|     purchase|2025-02-01 11:00:00|
|      2| view_product|2025-02-01 11:30:00|
|      3|        login|2025-02-01 12:05:00|
+-------+-------------+-------------------+



𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧 46

You have been given a large dataset containing employee salary details. Your goal is to optimize a PySpark job that performs a groupBy operation while minimizing the shuffle.

Task:
Write a PySpark job to calculate the total salary per department.
Optimize the job to reduce shuffle while performing the groupBy operation.
Explain why your optimization reduces shuffle and improves performance.

Approach:
To minimize shuffle during a groupBy operation, we should:
Use repartition() efficiently to avoid unnecessary partitions.
Use reduceByKey() instead of groupByKey(), as it performs local aggregation before shuffling.
If working with a DataFrame, use partitionBy() while writing output.

In [4]:
data = [ (101, "Rahul", "IT", 90000), (102, "Sita", "HR", 75000), 
(103, "Vikram", "IT", 85000), (104, "Priya", "HR", 72000), 
(105, "Anjali", "IT", 88000), (106, "Manish", "Sales", 67000), 
(107, "Neha", "Sales", 70000) ] 

columns = ["emp_id", "name", "dept", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

+------+------+-----+------+
|emp_id|  name| dept|salary|
+------+------+-----+------+
|   101| Rahul|   IT| 90000|
|   102|  Sita|   HR| 75000|
|   103|Vikram|   IT| 85000|
|   104| Priya|   HR| 72000|
|   105|Anjali|   IT| 88000|
|   106|Manish|Sales| 67000|
|   107|  Neha|Sales| 70000|
+------+------+-----+------+



In [6]:
#df.groupBy('dept').sum('salary').show()

#step 1: Convert to RDD and Use reduceByKey (optimized Shuffle)
rdd = df.rdd.map(lambda x:(x[2], x[3]))   # (dept, salary)
rdd.collect()

[('IT', 90000),
 ('HR', 75000),
 ('IT', 85000),
 ('HR', 72000),
 ('IT', 88000),
 ('Sales', 67000),
 ('Sales', 70000)]

In [9]:
optimized_result = rdd.reduceByKey(lambda x, y : x+y)  # Aggregation before shuffle
optimized_result.collect()

[('HR', 147000), ('IT', 263000), ('Sales', 137000)]

In [11]:
# convert back to dataframe

optimized_df = optimized_result.toDF(['dept', 'total_salary'])
optimized_df.show()

+-----+------------+
| dept|total_salary|
+-----+------------+
|   HR|      147000|
|   IT|      263000|
|Sales|      137000|
+-----+------------+



In [15]:
# Step 2:
df_optimized =  df.repartition('dept').groupBy('dept').agg(sum('salary').alias('total_salary'))
df_optimized.show()

+-----+------------+
| dept|total_salary|
+-----+------------+
|   IT|      263000|
|   HR|      147000|
|Sales|      137000|
+-----+------------+



In [19]:
# write a PySpark program to identify products that have never been sold.

products = spark.createDataFrame([ (1, "Laptop"), (2, "Tablet"), 
(3, "Smartphone"), (4, "Monitor"), 
(5, "Keyboard") ], ["product_id", "product_name"]) 

products.show()

sales = spark.createDataFrame([ (101, 1, "2025-01-01"), (102, 3, "2025-01-02"), (103, 5, "2025-01-03") ], ["sale_id", "product_id", "sale_date"]) 
sales.show()

+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|      Laptop|
|         2|      Tablet|
|         3|  Smartphone|
|         4|     Monitor|
|         5|    Keyboard|
+----------+------------+

+-------+----------+----------+
|sale_id|product_id| sale_date|
+-------+----------+----------+
|    101|         1|2025-01-01|
|    102|         3|2025-01-02|
|    103|         5|2025-01-03|
+-------+----------+----------+



In [25]:
products.join(sales, products.product_id == sales.product_id, how='left').filter(sales.product_id.isNull()).select(products.product_id).show()

+----------+
|product_id|
+----------+
|         2|
|         4|
+----------+



- Replace missing values in the price column with the median price instead of the mean.
- Drop rows where the product column is null, but if the price column is above 300, replace the null product with "Unknown".
- Fill missing values in the quantity column with the average quantity rounded to the nearest integer.
- Add a new column, total_value, which is the product of price and quantity.
- Remove the product_id column from the DataFrame.

In [137]:
data = [
 (1, "Laptop", 1000, 5),
 (2, "Mouse", None, None),
 (3, "Keyboard", 50, 2),
 (4, "Monitor", 200, None),
 (5, None, 500, None),
]
columns = ["product_id", "product", "price", "quantity"]
df = spark.createDataFrame(data, columns)
df.show()

+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse| NULL|    NULL|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|    NULL|
|         5|    NULL|  500|    NULL|
+----------+--------+-----+--------+



In [138]:
# Replace missing values in the price column with the median price instead of the mean.

from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = ['price'], outputCols = ['price']).setStrategy('median')
df =imputer.fit(df).transform(df)
df.show()

# OR 
median_price = df.approxQuantile("price", [0.5], 0.01)[0]
df = df.fillna({"price": median_price})
df.show()

+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse|  200|    NULL|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|    NULL|
|         5|    NULL|  500|    NULL|
+----------+--------+-----+--------+

+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse|  200|    NULL|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|    NULL|
|         5|    NULL|  500|    NULL|
+----------+--------+-----+--------+



In [115]:
# Drop rows where the product column is null, but if the price column is above 300, replace the null product with "Unknown". 

df1 = df.withColumn('product', when((col('product').isNull()) & (col('price') > 300), 'unknown').otherwise(col('product'))).select('*')
df1.show()


+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse|  200|    NULL|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|    NULL|
|         5| unknown|  500|    NULL|
+----------+--------+-----+--------+



In [116]:
df2 = df1.filter(col('product').isNotNull())
df2.show()

+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse|  200|    NULL|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|    NULL|
|         5| unknown|  500|    NULL|
+----------+--------+-----+--------+



In [None]:
# Fill missing values in the quantity column with the average quantity rounded to the nearest integer.

In [128]:
avg_quantity = df2.select(round(avg(col('quantity'))).alias('quantity')).collect()[0]['quantity']
avg_quantity

4.0

In [129]:
df3 = df2.fillna({'quantity': avg_quantity})
df3.show()

+----------+--------+-----+--------+
|product_id| product|price|quantity|
+----------+--------+-----+--------+
|         1|  Laptop| 1000|       5|
|         2|   Mouse|  200|       4|
|         3|Keyboard|   50|       2|
|         4| Monitor|  200|       4|
|         5| unknown|  500|       4|
+----------+--------+-----+--------+



In [131]:
#Add a new column, total_value, which is the product of price and quantity.

df4 = df3.withColumn('total_value', col('price') * col('quantity')).select('*')
df4.show()

+----------+--------+-----+--------+-----------+
|product_id| product|price|quantity|total_value|
+----------+--------+-----+--------+-----------+
|         1|  Laptop| 1000|       5|       5000|
|         2|   Mouse|  200|       4|        800|
|         3|Keyboard|   50|       2|        100|
|         4| Monitor|  200|       4|        800|
|         5| unknown|  500|       4|       2000|
+----------+--------+-----+--------+-----------+



In [134]:
# Remove the product_id column from the DataFrame.
df4.drop('product_id').show()

+--------+-----+--------+-----------+
| product|price|quantity|total_value|
+--------+-----+--------+-----------+
|  Laptop| 1000|       5|       5000|
|   Mouse|  200|       4|        800|
|Keyboard|   50|       2|        100|
| Monitor|  200|       4|        800|
| unknown|  500|       4|       2000|
+--------+-----+--------+-----------+



47

- Define an explicit schema for this dataset using StructType and StructField.
- Load this data into a PySpark DataFrame using the defined schema.
- Extract the employees who belong to the "IT" department and have a salary greater than 70000.
- Split the Address column into two separate columns: City and State.
- Save the transformed data into a Parquet file.


In [7]:
schema = StructType([ StructField("Emp_ID", IntegerType(), True), 
                     StructField("Name", StringType(), True), 
                     StructField("Age", IntegerType(), True), 
                     StructField("Salary", IntegerType(), True), 
                     StructField("Department", StringType(), True),
                     StructField("Address", StringType(), True) ]) 

data = [ (101, "Rajesh", 30, 60000, "IT", "Mumbai, Maharashtra"), 
         (102, "Priya", 28, 75000, "HR", "Bengaluru, Karnataka"), 
         (103, "Suresh", 35, 50000, "Finance", "Chennai, Tamil Nadu"), 
         (104, "Anjali", 25, 80000, "IT", "Pune, Maharashtra"), 
         (105, "Arjun", 40, 90000, "Management", "Hyderabad, Telangana") ]

df = spark.createDataFrame(data, schema)
df.show()

+------+------+---+------+----------+--------------------+
|Emp_ID|  Name|Age|Salary|Department|             Address|
+------+------+---+------+----------+--------------------+
|   101|Rajesh| 30| 60000|        IT| Mumbai, Maharashtra|
|   102| Priya| 28| 75000|        HR|Bengaluru, Karnataka|
|   103|Suresh| 35| 50000|   Finance| Chennai, Tamil Nadu|
|   104|Anjali| 25| 80000|        IT|   Pune, Maharashtra|
|   105| Arjun| 40| 90000|Management|Hyderabad, Telangana|
+------+------+---+------+----------+--------------------+



In [10]:
# Extract the employees who belong to the "IT" department and have a salary greater than 70000.

df.filter((col('Department') == 'IT') & (col('Salary') > 70000)).show()

+------+------+---+------+----------+-----------------+
|Emp_ID|  Name|Age|Salary|Department|          Address|
+------+------+---+------+----------+-----------------+
|   104|Anjali| 25| 80000|        IT|Pune, Maharashtra|
+------+------+---+------+----------+-----------------+



In [14]:
# Split the Address column into two separate columns: City and State.
trn_df = df.withColumn('City', split(col('Address'), ', ').getItem(0)).withColumn('State', split(col('Address'), ', ').getItem(1))
# OR 
#trn_df = df.withColumn('City', split(col('Address'), ', ')[0]).withColumn('State', split(col('Address'), ', ')[1])

trn_df.show()

+------+------+---+------+----------+--------------------+---------+-----------+
|Emp_ID|  Name|Age|Salary|Department|             Address|     City|      State|
+------+------+---+------+----------+--------------------+---------+-----------+
|   101|Rajesh| 30| 60000|        IT| Mumbai, Maharashtra|   Mumbai|Maharashtra|
|   102| Priya| 28| 75000|        HR|Bengaluru, Karnataka|Bengaluru|  Karnataka|
|   103|Suresh| 35| 50000|   Finance| Chennai, Tamil Nadu|  Chennai| Tamil Nadu|
|   104|Anjali| 25| 80000|        IT|   Pune, Maharashtra|     Pune|Maharashtra|
|   105| Arjun| 40| 90000|Management|Hyderabad, Telangana|Hyderabad|  Telangana|
+------+------+---+------+----------+--------------------+---------+-----------+



In [16]:
# Save the transformed data into a Parquet file.

trn_df.write.mode('overwrite').parquet('trn_df.parquet')

In [114]:
# 48 calculate the month with the highest sales for each region using window functions and the max() function. Ensure that the result includes the region name, month, and sales value. Consider sales fluctuations, and the dataset should contain multiple records for each region to test windowing correctly.

data = [ ("Amit", "North", "Jan", 12000), ("Rajesh", "North", "Feb", 15000), ("Sunita", "North", "Mar", 11000), ("Meena", "South", "Jan", 17000), 
("Ravi", "South", "Feb", 20000), ("Priya", "South", "Mar", 18000), 
("Suresh", "East", "Jan", 10000), ("Vishal", "East", "Feb", 22000), 
("Akash", "East", "Mar", 21000), ("Anjali", "West", "Jan", 15000), 
("Deepak", "West", "Feb", 13000), ("Nidhi", "West", "Mar", 17000), ] 
columns = ["Salesperson", "Region", "Month", "Sales"]
df = spark.createDataFrame(data, columns)
df.show()

+-----------+------+-----+-----+
|Salesperson|Region|Month|Sales|
+-----------+------+-----+-----+
|       Amit| North|  Jan|12000|
|     Rajesh| North|  Feb|15000|
|     Sunita| North|  Mar|11000|
|      Meena| South|  Jan|17000|
|       Ravi| South|  Feb|20000|
|      Priya| South|  Mar|18000|
|     Suresh|  East|  Jan|10000|
|     Vishal|  East|  Feb|22000|
|      Akash|  East|  Mar|21000|
|     Anjali|  West|  Jan|15000|
|     Deepak|  West|  Feb|13000|
|      Nidhi|  West|  Mar|17000|
+-----------+------+-----+-----+



In [116]:
win = Window.partitionBy('Region').orderBy(col('Sales').desc())
df1 = df.withColumn('rank', row_number().over(win))
df1.show()

+-----------+------+-----+-----+----+
|Salesperson|Region|Month|Sales|rank|
+-----------+------+-----+-----+----+
|     Vishal|  East|  Feb|22000|   1|
|      Akash|  East|  Mar|21000|   2|
|     Suresh|  East|  Jan|10000|   3|
|     Rajesh| North|  Feb|15000|   1|
|       Amit| North|  Jan|12000|   2|
|     Sunita| North|  Mar|11000|   3|
|       Ravi| South|  Feb|20000|   1|
|      Priya| South|  Mar|18000|   2|
|      Meena| South|  Jan|17000|   3|
|      Nidhi|  West|  Mar|17000|   1|
|     Anjali|  West|  Jan|15000|   2|
|     Deepak|  West|  Feb|13000|   3|
+-----------+------+-----+-----+----+



In [117]:
df1.filter(col('rank') == 1).select('Region', 'Month', 'Sales').show()

+------+-----+-----+
|Region|Month|Sales|
+------+-----+-----+
|  East|  Feb|22000|
| North|  Feb|15000|
| South|  Feb|20000|
|  West|  Mar|17000|
+------+-----+-----+



In [170]:
#How do you create a new column in a DataFrame?

#Task: Given a DataFrame with columns name and salary, create a new column bonus which is 10% of the salary.

data = [("John", 50000),
("Jane", 60000),
("Doe", 55000)]
columns = ["name", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

+----+------+
|name|salary|
+----+------+
|John| 50000|
|Jane| 60000|
| Doe| 55000|
+----+------+



In [178]:
df.withColumn('bonus', (col('salary')*10)/100).show()

+----+------+------+
|name|salary| bonus|
+----+------+------+
|John| 50000|5000.0|
|Jane| 60000|6000.0|
| Doe| 55000|5500.0|
+----+------+------+



##     GeekCoders

https://www.youtube.com/watch?v=Rp5LWT4or-o&list=PLxy0DxWEupiODTF_xM5Lw1ghc0XtLCUhC&index=1

### 13 Write a Pyspark query to report the movies with an odd-numbered ID and a description that is not 'boring'. Return the result table in descending order by rating.

In [118]:
data=[(1, 'War','great 3D',8.9) 
,(2, 'Science','fiction',8.5) 
,(3, 'irish','boring',6.2)
,(4, 'Ice song','Fantacy',8.6)  
,(5, 'House card','Interesting',9.1)]   
schema="ID int,movie string,description string,rating double"
df=spark.createDataFrame(data,schema) 
df.show() 

+---+----------+-----------+------+
| ID|     movie|description|rating|
+---+----------+-----------+------+
|  1|       War|   great 3D|   8.9|
|  2|   Science|    fiction|   8.5|
|  3|     irish|     boring|   6.2|
|  4|  Ice song|    Fantacy|   8.6|
|  5|House card|Interesting|   9.1|
+---+----------+-----------+------+



In [119]:
df.where((col('ID') % 2 != 0 ) & (trim(lower(col('description'))) != 'boring')).orderBy(col('rating').desc()).show()

+---+----------+-----------+------+
| ID|     movie|description|rating|
+---+----------+-----------+------+
|  5|House card|Interesting|   9.1|
|  1|       War|   great 3D|   8.9|
+---+----------+-----------+------+



In [120]:
# OR 
df.select('*').filter((col('ID')%2!=0) & (trim(lower(col('description'))) != 'boring')).orderBy(col('rating').desc()).show()

+---+----------+-----------+------+
| ID|     movie|description|rating|
+---+----------+-----------+------+
|  5|House card|Interesting|   9.1|
|  1|       War|   great 3D|   8.9|
+---+----------+-----------+------+



### 14 Collect_list and Aggregation

In [122]:
data = [
    ("john", "tomato", 2),
    ("bill", "apple", 2),
    ("john", "banana", 2),
    ("john", "tomato", 3),
    ("bill", "taco", 2),
    ("bill", "apple", 2),
]
schema = "name string,item string,weight int"
df = spark.createDataFrame(data, schema)
df.show()

+----+------+------+
|name|  item|weight|
+----+------+------+
|john|tomato|     2|
|bill| apple|     2|
|john|banana|     2|
|john|tomato|     3|
|bill|  taco|     2|
|bill| apple|     2|
+----+------+------+



In [123]:
df_final =df.groupBy('name', 'item').agg(sum(col('weight')).alias('sum_weight'))
df_final.show()

+----+------+----------+
|name|  item|sum_weight|
+----+------+----------+
|john|tomato|         5|
|bill| apple|         4|
|john|banana|         2|
|bill|  taco|         2|
+----+------+----------+



In [124]:
df_fi = df_final.groupBy('name').agg(collect_list(struct('item', 'sum_weight')).alias('items')).orderBy('name')
df_fi.show(truncate= False)

+----+--------------------------+
|name|items                     |
+----+--------------------------+
|bill|[{apple, 4}, {taco, 2}]   |
|john|[{tomato, 5}, {banana, 2}]|
+----+--------------------------+



### 15 find all duplicate emails
Write a pyspark dataframe query to find all duplicate emails in a table named Person.

In [126]:
data = [(1, "abc@gmail.com"), (2, "bcd@gmail.com"), (3, "abc@gmail.com")]
schema = "ID int,email string"
df = spark.createDataFrame(data, schema)
df.show()

+---+-------------+
| ID|        email|
+---+-------------+
|  1|abc@gmail.com|
|  2|bcd@gmail.com|
|  3|abc@gmail.com|
+---+-------------+



In [127]:
df.groupBy('email').count().filter(col('count')>1).select('email').show()

+-------------+
|        email|
+-------------+
|abc@gmail.com|
+-------------+



In [128]:
df.createOrReplaceTempView('test')
spark.sql("""select email from test group by email having count(email)>1""").show()

+-------------+
|        email|
+-------------+
|abc@gmail.com|
+-------------+



### 18 Write a pyspark query for a report that provides the customer ids from the customer table that bought all the products in the product table.

In [129]:
data=[(1,5),(2,6),(3,5),(3,6),(1,6)]
schema="customer_id int,product_key int"
customer_df=spark.createDataFrame(data,schema)

data=[(5,),(6,)]
schema="product_key int"
product_df=spark.createDataFrame(data,schema)

customer_df.show()
product_df.show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          2|          6|
|          3|          5|
|          3|          6|
|          1|          6|
+-----------+-----------+

+-----------+
|product_key|
+-----------+
|          5|
|          6|
+-----------+



In [130]:
# take unique key from product table 
df_product = product_df.agg(countDistinct(col('product_key')).alias('cnt_products'))
df_product.show()

+------------+
|cnt_products|
+------------+
|           2|
+------------+



In [131]:
df_customer = customer_df.groupBy(col('customer_id')).agg(countDistinct(col('product_key')).alias('cnt_products'))
df_customer.show()

+-----------+------------+
|customer_id|cnt_products|
+-----------+------------+
|          1|           2|
|          3|           2|
|          2|           1|
+-----------+------------+



In [132]:
df = df_product.join(df_customer, df_product.cnt_products == df_customer.cnt_products, 'inner').select('customer_id')
df.show()

+-----------+
|customer_id|
+-----------+
|          1|
|          3|
+-----------+



### 19 . Get the employees dept id with maximun and minimum salary in each department

In [133]:
data=[('Genece' , 2 , 75000),
('Jaimin' , 2 , 80000 ),
('Pankaj' , 2 , 80000 ),
('Tarvares' , 2 , 70000),
('Marlania' , 4 , 70000),
('Briana' , 4 , 85000),
('Kimberi' , 4 , 55000),
('Gabriella' , 4 , 55000),  
('Lakken', 5, 60000),
('Latoynia' , 5 , 65000) ]
schema="emp_name string,dept_id int,salary int"
df=spark.createDataFrame(data,schema)
df.show()

+---------+-------+------+
| emp_name|dept_id|salary|
+---------+-------+------+
|   Genece|      2| 75000|
|   Jaimin|      2| 80000|
|   Pankaj|      2| 80000|
| Tarvares|      2| 70000|
| Marlania|      4| 70000|
|   Briana|      4| 85000|
|  Kimberi|      4| 55000|
|Gabriella|      4| 55000|
|   Lakken|      5| 60000|
| Latoynia|      5| 65000|
+---------+-------+------+



In [134]:
df_1 = df.groupBy('dept_id').agg(max('salary').alias('max_salary'), min('salary').alias('min_salary')).withColumnRenamed('dept_id', 'dept_id_1')
df_1.show()

+---------+----------+----------+
|dept_id_1|max_salary|min_salary|
+---------+----------+----------+
|        2|     80000|     70000|
|        4|     85000|     55000|
|        5|     65000|     60000|
+---------+----------+----------+



In [135]:
df_2 = df.join(df_1, df.dept_id == df_1.dept_id_1, 'inner')
df_2.show()

+---------+-------+------+---------+----------+----------+
| emp_name|dept_id|salary|dept_id_1|max_salary|min_salary|
+---------+-------+------+---------+----------+----------+
|   Genece|      2| 75000|        2|     80000|     70000|
|   Jaimin|      2| 80000|        2|     80000|     70000|
|   Pankaj|      2| 80000|        2|     80000|     70000|
| Marlania|      4| 70000|        4|     85000|     55000|
| Tarvares|      2| 70000|        2|     80000|     70000|
|   Briana|      4| 85000|        4|     85000|     55000|
|  Kimberi|      4| 55000|        4|     85000|     55000|
|Gabriella|      4| 55000|        4|     85000|     55000|
|   Lakken|      5| 60000|        5|     65000|     60000|
| Latoynia|      5| 65000|        5|     65000|     60000|
+---------+-------+------+---------+----------+----------+



In [136]:
df_3 = df_2.filter((col('max_salary') == col('salary')) | (col('min_salary') == col('salary')))
df_3.show()

+---------+-------+------+---------+----------+----------+
| emp_name|dept_id|salary|dept_id_1|max_salary|min_salary|
+---------+-------+------+---------+----------+----------+
|   Jaimin|      2| 80000|        2|     80000|     70000|
|   Pankaj|      2| 80000|        2|     80000|     70000|
| Tarvares|      2| 70000|        2|     80000|     70000|
|   Briana|      4| 85000|        4|     85000|     55000|
|  Kimberi|      4| 55000|        4|     85000|     55000|
|Gabriella|      4| 55000|        4|     85000|     55000|
|   Lakken|      5| 60000|        5|     65000|     60000|
| Latoynia|      5| 65000|        5|     65000|     60000|
+---------+-------+------+---------+----------+----------+



In [137]:
df_f = df_3.groupBy('dept_id', 'salary').agg(collect_list('emp_name').alias('emp_names'))
df_f.show()

+-------+------+--------------------+
|dept_id|salary|           emp_names|
+-------+------+--------------------+
|      2| 80000|    [Jaimin, Pankaj]|
|      2| 70000|          [Tarvares]|
|      4| 85000|            [Briana]|
|      4| 55000|[Kimberi, Gabriella]|
|      5| 60000|            [Lakken]|
|      5| 65000|          [Latoynia]|
+-------+------+--------------------+



### 20 . Cache and Persist DataFrame

In [138]:
data = [("Raj","Doe",None),
 (None,"Samuel","VIZAG"),
 ("David","Smith", None),
 ("Samson",None, "HYD"),
 ("Immi", "Steve", "BNG"),
 (None, None, None)]

columns = ["Firstname", "Lastname", "City"]

df = spark.createDataFrame(data,columns)
df.cache()
df.show()

+---------+--------+-----+
|Firstname|Lastname| City|
+---------+--------+-----+
|      Raj|     Doe| NULL|
|     NULL|  Samuel|VIZAG|
|    David|   Smith| NULL|
|   Samson|    NULL|  HYD|
|     Immi|   Steve|  BNG|
|     NULL|    NULL| NULL|
+---------+--------+-----+



In [139]:
for i in df.columns:
    total_count = df.select(col(i)).count()
    null_values = df.filter(col(i).isNull()).count()
    percentage = (null_values/total_count)*100
    print(i, percentage)

Firstname 33.33333333333333
Lastname 33.33333333333333
City 50.0


### 25

In [140]:
data = [('Alice' ,['Badminton', 'Tennis']),
        ('Bob', ['Tennis', 'Cricket']),
        ('Julie', ['Cricket','Carroms'])]
schema = ['name', 'Hobbies']
df = spark.createDataFrame(data=data, schema= schema)
df.show()
df.printSchema()

+-----+-------------------+
| name|            Hobbies|
+-----+-------------------+
|Alice|[Badminton, Tennis]|
|  Bob|  [Tennis, Cricket]|
|Julie| [Cricket, Carroms]|
+-----+-------------------+

root
 |-- name: string (nullable = true)
 |-- Hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [141]:
df.select(col('name'), explode(col('Hobbies'))).show()

+-----+---------+
| name|      col|
+-----+---------+
|Alice|Badminton|
|Alice|   Tennis|
|  Bob|   Tennis|
|  Bob|  Cricket|
|Julie|  Cricket|
|Julie|  Carroms|
+-----+---------+



In [142]:
data = [('Goa','','AP'), ('','AP',None),(None, '','Blr')]
schema = 'City1 string, City2 string, City3 string'
df2 = spark.createDataFrame(data, schema)
df2.show()

+-----+-----+-----+
|City1|City2|City3|
+-----+-----+-----+
|  Goa|     |   AP|
|     |   AP| NULL|
| NULL|     |  Blr|
+-----+-----+-----+



In [143]:
df_2 = df2.withColumn('City1', when(col('City1')=='',None).otherwise(col('City1')))\
        .withColumn('City2', when(col('City2')=='',None).otherwise(col('City2')))\
        .withColumn('City3', when(col('City3')=='',None).otherwise(col('City3')))
df_2.show()

+-----+-----+-----+
|City1|City2|City3|
+-----+-----+-----+
|  Goa| NULL|   AP|
| NULL|   AP| NULL|
| NULL| NULL|  Blr|
+-----+-----+-----+



In [144]:
dff= df_2.withColumn('Result', coalesce(col('City1'), col('City2'), col('City3'))).select('Result')
dff.show()

+------+
|Result|
+------+
|   Goa|
|    AP|
|   Blr|
+------+



In [145]:
# 26

data=[('John Doe','{"street": "123 Main St", "city": "Anytown"}'),('Jane Smith','{"street": "456 Elm St", "city": "Othertown"}')]
df=spark.createDataFrame(data,schema="name string,address string")
df.show(truncate=False)

+----------+---------------------------------------------+
|name      |address                                      |
+----------+---------------------------------------------+
|John Doe  |{"street": "123 Main St", "city": "Anytown"} |
|Jane Smith|{"street": "456 Elm St", "city": "Othertown"}|
+----------+---------------------------------------------+



In [146]:
df1 = df.withColumn('parsed_json', from_json(col('address'), 'street string,city string' ))
df1.show(truncate=False)

+----------+---------------------------------------------+-----------------------+
|name      |address                                      |parsed_json            |
+----------+---------------------------------------------+-----------------------+
|John Doe  |{"street": "123 Main St", "city": "Anytown"} |{123 Main St, Anytown} |
|Jane Smith|{"street": "456 Elm St", "city": "Othertown"}|{456 Elm St, Othertown}|
+----------+---------------------------------------------+-----------------------+



In [151]:
df2 = df1.withColumn(col('name'), col('parsed_json').street.alias('street'),  col('parsed_json').city.alias('city'))
df2.show()

TypeError: DataFrame.withColumn() takes 3 positional arguments but 4 were given

### 28 Find missing Numbers in the DataFrame

In [152]:
data = [ (1, ), (2,), (3,), (6,), (7,), (8,)]
schema="Id int"
df = spark.createDataFrame(data,schema=schema)
df.show()

+---+
| Id|
+---+
|  1|
|  2|
|  3|
|  6|
|  7|
|  8|
+---+



In [153]:
df_list = df.select(min(col('Id')), max(col('Id')))
df_list.show()

+-------+-------+
|min(Id)|max(Id)|
+-------+-------+
|      1|      8|
+-------+-------+



In [162]:
print(df_list.first())
print(df_list.first()[0])
print(df_list.first()[1])

Row(min(Id)=1, max(Id)=8)
1
8


In [158]:
df_new = spark.range(df_list.first()[0], df_list.first()[1]+1)
df_new.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
+---+



In [159]:
df_new.subtract(df).show()

+---+
| id|
+---+
|  5|
|  4|
+---+



In [163]:
data=[
('Rudra','math',79),
('Rudra','eng',60),
('Shivu','math', 68),
('Shivu','eng', 59),
('Anu','math', 65),
('Anu','eng',80)
]
schema="Name string,Sub string,Marks int"
df=spark.createDataFrame(data,schema)
df.show()

+-----+----+-----+
| Name| Sub|Marks|
+-----+----+-----+
|Rudra|math|   79|
|Rudra| eng|   60|
|Shivu|math|   68|
|Shivu| eng|   59|
|  Anu|math|   65|
|  Anu| eng|   80|
+-----+----+-----+



In [164]:
df1 = df.groupBy('Name').pivot('Sub').sum('Marks')
df1.show()

+-----+---+----+
| Name|eng|math|
+-----+---+----+
|Shivu| 59|  68|
|Rudra| 60|  79|
|  Anu| 80|  65|
+-----+---+----+



In [165]:
# OR 
df2 = df.groupBy(col('Name')).agg(collect_list(col("Marks")).alias("Marks_New"))
df2.show()
df3 = df2.select(col('Name'), col('Marks_New')[0].alias('math'),col('Marks_New')[1].alias('eng'))
df3.show()

+-----+---------+
| Name|Marks_New|
+-----+---------+
|Rudra| [79, 60]|
|Shivu| [68, 59]|
|  Anu| [65, 80]|
+-----+---------+

+-----+----+---+
| Name|math|eng|
+-----+----+---+
|Rudra|  79| 60|
|Shivu|  68| 59|
|  Anu|  65| 80|
+-----+----+---+



### 43 Write a Pyspark code to get all the customer ID along with their final balance amount after calculating their transactions based on transaction type.
If any customer has not any transactions then his final balance should remain same as current amount.

In [166]:
transactions_data = [
    (1, "credit", 30.0),
    (1, "debit", 90.0),
    (2, "credit", 50.0),
    (3, "debit", 57.0),
    (2, "debit", 90.0)]
transaction_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("transaction_amount", FloatType(), True)
])

trans_df = spark.createDataFrame(transactions_data, schema=transaction_schema)
trans_df.show()

amounts_data = [
    (1, 1000.0),
    (2, 2000.0),
    (3, 3000.0),
    (4, 4000.0)]
amount_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("current_amount", FloatType(), True)])

amt_df = spark.createDataFrame(amounts_data, schema=amount_schema)
amt_df.show()

+-----------+----------------+------------------+
|customer_id|transaction_type|transaction_amount|
+-----------+----------------+------------------+
|          1|          credit|              30.0|
|          1|           debit|              90.0|
|          2|          credit|              50.0|
|          3|           debit|              57.0|
|          2|           debit|              90.0|
+-----------+----------------+------------------+

+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|          1|        1000.0|
|          2|        2000.0|
|          3|        3000.0|
|          4|        4000.0|
+-----------+--------------+



In [167]:
df_1 = trans_df.groupBy('customer_id').agg(sum(when(col('transaction_type') == 'credit', col('transaction_amount'))\
                                               .otherwise(0)).alias('total_credit'),\
                                           sum(when(col('transaction_type') == 'debit', col('transaction_amount'))\
                                               .otherwise(0)).alias('total_debit'))\
                                      .withColumn('total_remaining_amnt', col('total_debit')-col('total_credit'))
df_1.show()

+-----------+------------+-----------+--------------------+
|customer_id|total_credit|total_debit|total_remaining_amnt|
+-----------+------------+-----------+--------------------+
|          1|        30.0|       90.0|                60.0|
|          2|        50.0|       90.0|                40.0|
|          3|         0.0|       57.0|                57.0|
+-----------+------------+-----------+--------------------+



In [168]:
df_2 = amt_df.join(df_1, amt_df.customer_id == df_1.customer_id, how='left')\
                          .withColumn('total_remaining_amnt', coalesce(col('total_remaining_amnt'), lit(0)))\
                          .withColumn('current_amount', col('current_amount') - col('total_remaining_amnt'))\
                          .drop(df_1.customer_id)\
                          .select('customer_id', 'current_amount')
df_2.show()

+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|          1|         940.0|
|          2|        1960.0|
|          3|        2943.0|
|          4|        4000.0|
+-----------+--------------+

