In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("scenarios").getOrCreate()

In [4]:
# ASSIGNMENT 1 (Primary Department per Employee)
# Input
# (employee_id, department_id, primary_flag)

data1 = [
    (1,1,'N'),
    (2,1,'Y'),
    (2,2,'N'),
    (3,3,'N'),
    (4,2,'N'),
    (4,3,'Y'),
    (4,4,'N')
]

df1 = spark.createDataFrame(data1, ["employee_id","department_id","primary_flag"])
df1.show()


+-----------+-------------+------------+
|employee_id|department_id|primary_flag|
+-----------+-------------+------------+
|          1|            1|           N|
|          2|            1|           Y|
|          2|            2|           N|
|          3|            3|           N|
|          4|            2|           N|
|          4|            3|           Y|
|          4|            4|           N|
+-----------+-------------+------------+



In [10]:
# PRIMARY rows
primary = df1.filter(col("primary_flag") == "Y") \
            .select("employee_id", "department_id") \
            .dropDuplicates(["employee_id"]) \
            .alias("p")

# NON-PRIMARY rows
non_primary = df1.filter(col("primary_flag") == "N") \
                .select("employee_id", "department_id") \
                .dropDuplicates(["employee_id"]) \
                .alias("np")

# DISTINCT employee list
employees = df1.select("employee_id").dropDuplicates()

# LEFT JOIN to get primary first, fallback to non-primary
result = employees \
    .join(primary, "employee_id", "left") \
    .join(non_primary, "employee_id", "left") \
    .select(
        col("employee_id"),
        coalesce(col("p.department_id"), col("np.department_id")).alias("department_id")
    ) \
    .orderBy("employee_id")

result.show()

+-----------+-------------+
|employee_id|department_id|
+-----------+-------------+
|          1|            1|
|          2|            1|
|          3|            3|
|          4|            3|
+-----------+-------------+



In [11]:
# ASSIGNMENT 2 (Split customer_name)

data2 = [
    ("kasireddy naidu",),
    ("konidela ram charan",),
    ("Nandamuri tarak ramarao",),
    ("charan",)
]

df2 = spark.createDataFrame(data2, ["customer_name"])
df2.show(truncate=False)


+-----------------------+
|customer_name          |
+-----------------------+
|kasireddy naidu        |
|konidela ram charan    |
|Nandamuri tarak ramarao|
|charan                 |
+-----------------------+



In [13]:
# Split name by spaces
split_col = split(col("customer_name"), " ")

df2_out = df2.withColumn("first_name", split_col[0]) \
             .withColumn(
                 "middle_name",
                 when(size(split_col) == 3, split_col[1]).otherwise(None)
             ) \
             .withColumn(
                 "last_name",
                 when(size(split_col) >= 2, split_col[size(split_col) - 1])
                 .otherwise(None)
             )

df2_out.show(truncate=False)

+-----------------------+----------+-----------+---------+
|customer_name          |first_name|middle_name|last_name|
+-----------------------+----------+-----------+---------+
|kasireddy naidu        |kasireddy |NULL       |naidu    |
|konidela ram charan    |konidela  |ram        |charan   |
|Nandamuri tarak ramarao|Nandamuri |tarak      |ramarao  |
|charan                 |charan    |NULL       |NULL     |
+-----------------------+----------+-----------+---------+



In [15]:
# ASSIGNMENT 3 (Total amount spent per user)

data3 = [
    (1,101,500.0,'2024-01-01'),
    (1,101,600.0,'2024-01-01'),
    (2,102,200.0,'2024-01-02'),
    (3,101,300.0,'2024-01-03'),
    (4,103,100.0,'2024-01-04'),
    (5,102,400.0,'2024-01-05'),
    (6,103,600.0,'2024-01-06'),
    (7,101,200.0,'2024-01-07'),
]

df3 = spark.createDataFrame(data3, ["t_id","user_id","amount","t_date"])
df3.show()


+----+-------+------+----------+
|t_id|user_id|amount|    t_date|
+----+-------+------+----------+
|   1|    101| 500.0|2024-01-01|
|   1|    101| 600.0|2024-01-01|
|   2|    102| 200.0|2024-01-02|
|   3|    101| 300.0|2024-01-03|
|   4|    103| 100.0|2024-01-04|
|   5|    102| 400.0|2024-01-05|
|   6|    103| 600.0|2024-01-06|
|   7|    101| 200.0|2024-01-07|
+----+-------+------+----------+



In [16]:
df3_out = df3.groupBy("user_id") \
             .agg(sum("amount").alias("total_spend"))

df3_out.show()


+-------+-----------+
|user_id|total_spend|
+-------+-----------+
|    101|     1600.0|
|    102|      600.0|
|    103|      700.0|
+-------+-----------+



In [19]:
# Get distinct users
users = df3.select("user_id").distinct().alias("u")

# Calculate max - min for each user
range_df = df3.groupBy("user_id").agg(
    (max("amount") - min("amount")).alias("total_spend")
).alias("r")

# Join and filter only user 101
result = users.join(range_df, "user_id") \
              .filter(col("user_id") == 101)

result.show()

+-------+-----------+
|user_id|total_spend|
+-------+-----------+
|    101|      400.0|
+-------+-----------+



In [30]:
# # ASSIGNMENT 4 (All team pair combinations using JOIN)

data = [("RCB",), ("CSK",), ("MI",), ("PBKS",)]
columns = ["team"]

df = spark.createDataFrame(data, columns)
df.show()


+----+
|team|
+----+
| RCB|
| CSK|
|  MI|
|PBKS|
+----+



In [31]:

# Define a dictionary with team order
team_order = {"RCB": 1, "CSK": 2, "MI": 3, "PBKS": 4}

# Add numeric order column
df_ordered = df.withColumn("order", when(col("team") == "RCB", 1)
                                     .when(col("team") == "CSK", 2)
                                     .when(col("team") == "MI", 3)
                                     .when(col("team") == "PBKS", 4))

# Cross join
df1 = df_ordered.alias("df1")
df2 = df_ordered.alias("df2")
cross_df = df1.crossJoin(df2)

# Remove self-pairing and preserve desired order
result_df = cross_df.filter(col("df1.team") != col("df2.team")) \
                    .filter(col("df1.order") < col("df2.order")) \
                    .select(col("df1.team").alias("team1"), col("df2.team").alias("team2"))

# Show result
result_df.show()


+-----+-----+
|team1|team2|
+-----+-----+
|  RCB|  CSK|
|  RCB|   MI|
|  RCB| PBKS|
|  CSK|   MI|
|  CSK| PBKS|
|   MI| PBKS|
+-----+-----+

