# 🔍 Tasks/Questions to Solve (Project Style)
## Net Position Summary

For each customer and security, calculate total quantity bought and sold, and their net position.

Identify whether the customer still holds that stock or has exited.

## Top Performing Securities by Profit

Calculate realized profit per transaction (only for SELL).

Identify top 3 most profitable securities per customer using row_number or dense_rank.

## Broker Usage Analysis

Count how many different brokers each customer used.

Most frequently used broker per customer.

## Investment Timeline

Find the first and last transaction date per customer.

Calculate the average time between successive transactions.

## High Frequency Traders

Find customers with more than 3 transactions in any 30-day window using window + datediff.

Running Total of Investment Amount

Running total of BUY amount per customer over time using sum over Window.partitionBy().orderBy().

## Price Trend per Security

Calculate the average and max price of each security over time.

## Security Holding Duration

For each complete BUY → SELL pair (same quantity, security, customer), compute holding period in days.

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
import os

In [3]:
# Manually set SPARK_HOME (fixing the incorrect path issue)
os.environ["SPARK_HOME"] = "C:\\spark\\spark-3.3.2-bin-hadoop2"

In [4]:
spark = SparkSession.builder\
        .appName('project_1')\
        .getOrCreate()

In [5]:
trasanction_schema = StructType([
    StructField("transaction_id",StringType(),True),
    StructField("customer_id",StringType(),True),
    StructField("security_name",StringType(),True),
    StructField("transaction_type",StringType(),True),
    StructField("quantity",IntegerType(),True),
    StructField("price",IntegerType(),True),
    StructField("broker",StringType(),True),
    StructField("transacstion_date",DateType(),True)
])

In [6]:
transaction_raw_df = spark.read.csv("transactions.csv",header=True,schema=trasanction_schema)

In [7]:
transaction_raw_df.show()

+--------------+-----------+-------------+----------------+--------+-----+--------+-----------------+
|transaction_id|customer_id|security_name|transaction_type|quantity|price|  broker|transacstion_date|
+--------------+-----------+-------------+----------------+--------+-----+--------+-----------------+
|            T1|       C001|         INFY|             BUY|      10| 1500| Zerodha|       2023-01-15|
|            T2|       C001|         INFY|            SELL|      10| 1600| Zerodha|       2023-03-10|
|            T3|       C001|          TCS|             BUY|       5| 3200|   Groww|       2023-01-20|
|            T4|       C001|          TCS|            SELL|       5| 3100|   Groww|       2023-05-01|
|            T5|       C002|         HDFC|             BUY|       8| 2400|  Upstox|       2023-01-01|
|            T6|       C002|         HDFC|            SELL|       4| 2500| Zerodha|       2023-02-15|
|            T7|       C002|         HDFC|            SELL|       4| 2550|  Upstox

In [None]:
### Net Position Summary
### For each customer and security, calculate total quantity bought and sold, and their net position.

In [8]:
buy_transaction_df = transaction_raw_df.filter(col("transaction_type")=='BUY')

In [9]:
buy_transaction_df = buy_transaction_df.withColumnRenamed("price","buy_price")
buy_transaction_df = buy_transaction_df.withColumnRenamed("quantity","buy_quantity")

In [10]:
sell_transaction_df = transaction_raw_df.filter(col("transaction_type")=='SELL')

In [11]:
sell_transaction_df = sell_transaction_df.withColumnRenamed("price","sell_price")
sell_transaction_df = sell_transaction_df.withColumnRenamed("quantity","sell_quantity")

In [19]:
agg_buy_transaction_df = buy_transaction_df.groupBy('customer_id','transaction_type','security_name').agg(sum(col("buy_quantity")).alias('buy_quantity'),sum("buy_price").alias('buy_price'))

In [21]:
agg_sell_transaction_df = sell_transaction_df.groupBy('customer_id','transaction_type','security_name').agg(sum(col("sell_quantity")).alias('sell_quantity'),sum('sell_price').alias('sell_price'))

In [23]:
buy_sell_txn_df = agg_buy_transaction_df.join(agg_sell_transaction_df,how='left',on=['customer_id','security_name'])

In [25]:
buy_sell_fillna = buy_sell_txn_df.fillna(0)

In [119]:
net_quantity = buy_sell_fillna.withColumn("net_quantity",(col("buy_quantity") - col("sell_quantity")))

In [27]:
## Top Performing Securities by Profit

#Calculate realized profit per transaction (only for SELL).

#Identify top 3 most profitable securities per customer using row_number or dense_rank.

In [31]:
net_profit_df = buy_sell_txn_df.withColumn("net_profit",(col("sell_price")-col("buy_price")))

In [33]:
window = Window.partitionBy('customer_id').orderBy(col('net_profit').desc())

In [35]:
net_profit_rnk_df = net_profit_df.withColumn('rnk',row_number().over(window))

In [37]:
top_3_security_by_cust = net_profit_rnk_df.filter(col("rnk")<=3)

In [38]:
## Broker Usage Analysis
## Count how many different brokers each customer used.
## Most frequently used broker per customer.

In [44]:
cust_brk_cnt = transaction_raw_df.groupBy("customer_id","broker").agg(count('broker'))

In [47]:
cust_broker_df = cust_brk_cnt.groupby('customer_id').agg(count('broker').alias('no_of_brokers'))

In [48]:
# Investment Timeline
# Find the first and last transaction date per customer.

# Calculate the average time between successive transactions.

In [53]:
cust_first_last_txn_df = transaction_raw_df.groupBy('customer_id').agg(min(col('transacstion_date')).alias('first_transaction_date'),max(col('transacstion_date')).alias('last_transaction_date'))

In [54]:
window1 = Window.partitionBy(col('customer_id')).orderBy(col('transacstion_date').asc())

In [59]:
prev_txn_date_df = transaction_raw_df.withColumn('previous_transaction_date',lag("transacstion_date").over(window1))

In [62]:
date_diff_df = prev_txn_date_df.withColumn('date_difference',datediff(col("transacstion_date"),col("previous_transaction_date")))

In [65]:
drop_null_date_diff_df = date_diff_df.filter(col("date_difference").isNotNull())

In [70]:
avg_diff_df = drop_null_date_diff_df.groupBy('customer_id').agg(round(avg(col("date_difference")),2).alias("average_time_between_successive_transaction"))

In [None]:
## Price Trend per Security

# Calculate the average and max price of each security over time.

In [77]:
avg_price_df = transaction_raw_df.groupBy('security_name').agg(max(col("price")).alias('max_price'),round(avg(col("price")),2).alias("average_price"))

In [79]:
## Security Holding Duration

# For each complete BUY → SELL pair (same quantity, security, customer), compute holding period in days.

In [81]:
buy_transaction_df_1 = buy_transaction_df.withColumnRenamed("transacstion_date","buy_date")
sell_transaction_df_1 = sell_transaction_df.withColumnRenamed("transacstion_date","sell_date")

In [89]:
buy_transactions = buy_transaction_df_1.groupBy('customer_id', 'security_name', 'transaction_type').agg(sum(col('buy_quantity')).alias('buy_quantity'),sum(col('buy_price')).alias('buy_price'),max(col('buy_date')).alias('buy_date'))

In [95]:
sell_transactions = sell_transaction_df_1.groupBy('customer_id', 'security_name', 'transaction_type').agg(sum(col('sell_quantity')).alias('sell_quantity'),sum(col('sell_price')).alias('sell_price'),max(col('sell_date')).alias('sell_date'))

In [98]:
merge_buy_sell_df = buy_transactions.join(sell_transactions,how='inner',on=['customer_id','security_name'])

In [102]:
txn_holding_days_df = merge_buy_sell_df.withColumn("holding_period_days",(datediff(col("sell_date"),col("buy_date"))))

In [None]:
## High Frequency Traders

# Find customers with more than 3 transactions in any 30-day window using window + datediff.

# Running Total of Investment Amount

# Running total of BUY amount per customer over time using sum over Window.partitionBy().orderBy().

In [115]:
window2 = Window.partitionBy('customer_id').orderBy('transacstion_date')

In [118]:
running_total_amount_df = buy_transaction_df.withColumn("running_total",sum('buy_price').over(window2))

In [122]:
net_quantity.show()

+-----------+-------------+----------------+------------+---------+----------------+-------------+----------+------------+
|customer_id|security_name|transaction_type|buy_quantity|buy_price|transaction_type|sell_quantity|sell_price|net_quantity|
+-----------+-------------+----------------+------------+---------+----------------+-------------+----------+------------+
|       C002|         HDFC|             BUY|           8|     2400|            SELL|            8|      5050|           0|
|       C003|          TCS|             BUY|          10|     3100|            SELL|           10|      3050|           0|
|       C001|         INFY|             BUY|          10|     1500|            SELL|           10|      1600|           0|
|       C001|          TCS|             BUY|           5|     3200|            SELL|            5|      3100|           0|
|       C002|         INFY|             BUY|           8|     2980|            SELL|            8|      1550|           0|
|       C003|   

In [121]:
merge_agg_bu_sell_df = agg_buy_transaction_df.join(agg_sell_transaction_df,how='left',on=['customer_id','security_name'])

+-----------+-------------+----------------+------------+---------+----------------+-------------+----------+
|customer_id|security_name|transaction_type|buy_quantity|buy_price|transaction_type|sell_quantity|sell_price|
+-----------+-------------+----------------+------------+---------+----------------+-------------+----------+
|       C002|         HDFC|             BUY|           8|     2400|            SELL|            8|      5050|
|       C003|          TCS|             BUY|          10|     3100|            SELL|           10|      3050|
|       C001|         INFY|             BUY|          10|     1500|            SELL|           10|      1600|
|       C001|          TCS|             BUY|           5|     3200|            SELL|            5|      3100|
|       C002|         INFY|             BUY|           8|     2980|            SELL|            8|      1550|
|       C003|     RELIANCE|             BUY|          15|     4250|            SELL|           15|      2300|
+---------

In [None]:
net_quantity.filter('')