In [0]:
customers_data = [
    (4001, 'mahendra', 45, 'Bangalore'),
    (4002, 'mahi',34, 'Nellore'),
    (4003, 'teja', 56, 'Chennai'),
    (4004, 'hari', 28, 'Bangalore'),
    (4005, 'ganesh', 43, 'Chennai'),
    (4006, 'yogi', 30, 'Hyderabad'),
    (4007, 'balu', 48, 'Chennai'),
    (4008, 'rafi', 36, 'Hyderabad'),
    (4009, 'kumari', 28, 'Bangalore')
]

transactions_data = [
    (200, 4001, 'debit', 1500.00, '2024-03-15'),
    (201, 4003, 'debit', 4500.00, '2024-03-12'),
    (202, 4005, 'credit', 2300.00, '2024-03-09'),
    (203, 4007, 'debit', 5400.00, '2024-03-15'),
    (204, 4002, 'credit', 9800.00, '2024-03-17'),
    (205, 4007, 'debit', 7400.00, '2024-03-21'),
    (206, 4006, 'credit', 5800.00, '2024-03-22'),
    (207, 4010, 'debit', 4560.00, '2024-03-28'),
    (208, 4006, 'credit', 4400.00, '2024-03-31')
]

In [0]:
spark

In [0]:
from pyspark.sql.types import *
sch_cust = StructType([
    StructField('cust_id', IntegerType()),
    StructField('name', StringType()),
    StructField('age', IntegerType()),
    StructField('location', StringType())
    ])

sch_transactions = StructType([
    StructField('id', IntegerType()),
    StructField('customer', IntegerType()),
    StructField('Transaction_type', StringType()),
    StructField('amount', FloatType()),
    StructField('transaction_date', StringType())
])


In [0]:
cust_df = spark.createDataFrame(customers_data, schema=sch_cust)
cust_df.show(truncate=False)

+-------+--------+---+---------+
|cust_id|name    |age|location |
+-------+--------+---+---------+
|4001   |mahendra|45 |Bangalore|
|4002   |mahi    |34 |Nellore  |
|4003   |teja    |56 |Chennai  |
|4004   |hari    |28 |Bangalore|
|4005   |ganesh  |43 |Chennai  |
|4006   |yogi    |30 |Hyderabad|
|4007   |balu    |48 |Chennai  |
|4008   |rafi    |36 |Hyderabad|
|4009   |kumari  |28 |Bangalore|
+-------+--------+---+---------+



In [0]:
transactions_df = spark.createDataFrame(transactions_data, schema=sch_transactions)
transactions_df.show()

+---+--------+----------------+------+----------------+
| id|customer|Transaction_type|amount|transaction_date|
+---+--------+----------------+------+----------------+
|200|    4001|           debit|1500.0|      2024-03-15|
|201|    4003|           debit|4500.0|      2024-03-12|
|202|    4005|          credit|2300.0|      2024-03-09|
|203|    4007|           debit|5400.0|      2024-03-15|
|204|    4002|          credit|9800.0|      2024-03-17|
|205|    4007|           debit|7400.0|      2024-03-21|
|206|    4006|          credit|5800.0|      2024-03-22|
|207|    4010|           debit|4560.0|      2024-03-28|
|208|    4006|          credit|4400.0|      2024-03-31|
+---+--------+----------------+------+----------------+



In [0]:
cust_transactions_df = cust_df.join(transactions_df, cust_df['cust_id']==transactions_df['customer'], how='inner')
cust_transactions_df.show()

+-------+--------+---+---------+---+--------+----------------+------+----------------+
|cust_id|    name|age| location| id|customer|Transaction_type|amount|transaction_date|
+-------+--------+---+---------+---+--------+----------------+------+----------------+
|   4001|mahendra| 45|Bangalore|200|    4001|           debit|1500.0|      2024-03-15|
|   4002|    mahi| 34|  Nellore|204|    4002|          credit|9800.0|      2024-03-17|
|   4003|    teja| 56|  Chennai|201|    4003|           debit|4500.0|      2024-03-12|
|   4005|  ganesh| 43|  Chennai|202|    4005|          credit|2300.0|      2024-03-09|
|   4006|    yogi| 30|Hyderabad|206|    4006|          credit|5800.0|      2024-03-22|
|   4006|    yogi| 30|Hyderabad|208|    4006|          credit|4400.0|      2024-03-31|
|   4007|    balu| 48|  Chennai|203|    4007|           debit|5400.0|      2024-03-15|
|   4007|    balu| 48|  Chennai|205|    4007|           debit|7400.0|      2024-03-21|
+-------+--------+---+---------+---+-------

In [0]:
from pyspark.sql.functions import *
res = cust_transactions_df.select('cust_id', 'location',col('id').alias('transaction_id'), 'transaction_type', 'amount')
res.show()

+-------+---------+--------------+----------------+------+
|cust_id| location|transaction_id|transaction_type|amount|
+-------+---------+--------------+----------------+------+
|   4001|Bangalore|           200|           debit|1500.0|
|   4002|  Nellore|           204|          credit|9800.0|
|   4003|  Chennai|           201|           debit|4500.0|
|   4005|  Chennai|           202|          credit|2300.0|
|   4006|Hyderabad|           206|          credit|5800.0|
|   4006|Hyderabad|           208|          credit|4400.0|
|   4007|  Chennai|           203|           debit|5400.0|
|   4007|  Chennai|           205|           debit|7400.0|
+-------+---------+--------------+----------------+------+



In [0]:
# Q1. Return the location where the lowest and highest number of transactions takes place. 

In [0]:

grouped_result = res.groupBy('location').agg(
    count('transaction_id').alias('total_number_of_transactions'))
grouped_result.show()

+---------+----------------------------+
| location|total_number_of_transactions|
+---------+----------------------------+
|Bangalore|                           1|
|  Chennai|                           4|
|  Nellore|                           1|
|Hyderabad|                           2|
+---------+----------------------------+



In [0]:
from pyspark.sql.window import Window
trans_ord1 = Window.orderBy(col('total_number_of_transactions'))
trans_ord2 = Window.orderBy(col('total_number_of_transactions').desc())
final = grouped_result.withColumn('drnk_lowest', dense_rank().over(trans_ord1))\
    .withColumn('drnk_highest', dense_rank().over(trans_ord2))
final.show()

+---------+----------------------------+-----------+------------+
| location|total_number_of_transactions|drnk_lowest|drnk_highest|
+---------+----------------------------+-----------+------------+
|  Chennai|                           4|          3|           1|
|Hyderabad|                           2|          2|           2|
|Bangalore|                           1|          1|           3|
|  Nellore|                           1|          1|           3|
+---------+----------------------------+-----------+------------+



In [0]:
final.select(col('location').alias('lowest_num_transactions')).where(col('drnk_lowest')==1).show()
final.select(col('location').alias('highest_num_transactions')).where(col('drnk_highest')==1).show()

+-----------------------+
|lowest_num_transactions|
+-----------------------+
|              Bangalore|
|                Nellore|
+-----------------------+

+------------------------+
|highest_num_transactions|
+------------------------+
|                 Chennai|
+------------------------+



In [0]:

-- Q2. Return the location where the highest difference between credit and debit transaction amount.
-- Q3. Return the customer who spent the highest amount.



In [0]:
cust_transactions_df.show()

+-------+--------+---+---------+---+--------+----------------+------+----------------+
|cust_id|    name|age| location| id|customer|Transaction_type|amount|transaction_date|
+-------+--------+---+---------+---+--------+----------------+------+----------------+
|   4001|mahendra| 45|Bangalore|200|    4001|           debit|1500.0|      2024-03-15|
|   4002|    mahi| 34|  Nellore|204|    4002|          credit|9800.0|      2024-03-17|
|   4003|    teja| 56|  Chennai|201|    4003|           debit|4500.0|      2024-03-12|
|   4005|  ganesh| 43|  Chennai|202|    4005|          credit|2300.0|      2024-03-09|
|   4006|    yogi| 30|Hyderabad|206|    4006|          credit|5800.0|      2024-03-22|
|   4006|    yogi| 30|Hyderabad|208|    4006|          credit|4400.0|      2024-03-31|
|   4007|    balu| 48|  Chennai|203|    4007|           debit|5400.0|      2024-03-15|
|   4007|    balu| 48|  Chennai|205|    4007|           debit|7400.0|      2024-03-21|
+-------+--------+---+---------+---+-------

In [0]:
result2 = cust_transactions_df.withColumn('credit', when(col('transaction_type') == 'credit', col('amount'))\
    .otherwise(0))\
    .withColumn('debit', when(col('transaction_type') == 'debit', col('amount'))\
        .otherwise(0))
result2.show()

+-------+--------+---+---------+---+--------+----------------+------+----------------+------+------+
|cust_id|    name|age| location| id|customer|Transaction_type|amount|transaction_date|credit| debit|
+-------+--------+---+---------+---+--------+----------------+------+----------------+------+------+
|   4001|mahendra| 45|Bangalore|200|    4001|           debit|1500.0|      2024-03-15|   0.0|1500.0|
|   4002|    mahi| 34|  Nellore|204|    4002|          credit|9800.0|      2024-03-17|9800.0|   0.0|
|   4003|    teja| 56|  Chennai|201|    4003|           debit|4500.0|      2024-03-12|   0.0|4500.0|
|   4005|  ganesh| 43|  Chennai|202|    4005|          credit|2300.0|      2024-03-09|2300.0|   0.0|
|   4006|    yogi| 30|Hyderabad|206|    4006|          credit|5800.0|      2024-03-22|5800.0|   0.0|
|   4006|    yogi| 30|Hyderabad|208|    4006|          credit|4400.0|      2024-03-31|4400.0|   0.0|
|   4007|    balu| 48|  Chennai|203|    4007|           debit|5400.0|      2024-03-15|   0.

In [0]:
credit_debit_diff = result2.groupBy('location').agg(
    sum('credit').alias('credit_amount'), 
    sum('debit').alias('debit_amount'))
credit_debit_diff.show()

+---------+-------------+------------+
| location|credit_amount|debit_amount|
+---------+-------------+------------+
|Bangalore|          0.0|      1500.0|
|  Chennai|       2300.0|     17300.0|
|  Nellore|       9800.0|         0.0|
|Hyderabad|      10200.0|         0.0|
+---------+-------------+------------+



In [0]:
diff = credit_debit_diff.select('location', (abs(col('credit_amount') - col('debit_amount')).alias('transaction_amount_difference')))
diff.show()

+---------+-----------------------------+
| location|transaction_amount_difference|
+---------+-----------------------------+
|Bangalore|                       1500.0|
|  Chennai|                      15000.0|
|  Nellore|                       9800.0|
|Hyderabad|                      10200.0|
+---------+-----------------------------+



In [0]:
diff.select('location').orderBy(col('transaction_amount_difference').desc()).show(n=1)

+--------+
|location|
+--------+
| Chennai|
+--------+
only showing top 1 row



In [0]:
result3 = result2.groupBy('cust_id', 'name')\
    .agg(sum('debit').alias('spending_amount'))
result3.show()

+-------+--------+---------------+
|cust_id|    name|spending_amount|
+-------+--------+---------------+
|   4001|mahendra|         1500.0|
|   4002|    mahi|            0.0|
|   4003|    teja|         4500.0|
|   4005|  ganesh|            0.0|
|   4006|    yogi|            0.0|
|   4007|    balu|        12800.0|
+-------+--------+---------------+



In [0]:
result3.orderBy(col('spending_amount').desc()).show(n=1)

+-------+----+---------------+
|cust_id|name|spending_amount|
+-------+----+---------------+
|   4007|balu|        12800.0|
+-------+----+---------------+
only showing top 1 row

