In [None]:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.master("local[*]").appName("Manna").getOrCreate()
print('PySpark Version :'+spark.version)
tradebook=spark.read.csv("data/tradebook.csv",header=True)
tradebook = tradebook.withColumn('symbol', F.regexp_replace('symbol', '-BE', ''))
tradebook.show()

PySpark Version :3.5.6
+----------+------------+----------+--------+-------+------+----------+-------+----------+-----------+--------+----------------+--------------------+
|    symbol|        isin|trade_date|exchange|segment|series|trade_type|auction|  quantity|      price|trade_id|        order_id|order_execution_time|
+----------+------------+----------+--------+-------+------+----------+-------+----------+-----------+--------+----------------+--------------------+
|JUNIORBEES|INF732E01045|2024-04-01|     NSE|     EQ|    EQ|      sell|  false|  2.000000| 652.290000|41795167|1200000009438445| 2024-04-01T09:57:47|
|LIQUIDBEES|INF732E01037|2024-04-01|     NSE|     EQ|    EQ|      sell|  false| 33.000000| 999.990000|41795164|1200000009438440| 2024-04-01T09:57:47|
| NIFTYBEES|INF204KB14I2|2024-04-01|     NSE|     EQ|    EQ|       buy|  false| 43.000000| 249.050000|41796155|1200000009443221| 2024-04-01T09:57:49|
|LIQUIDCASE|INF0R8F01034|2024-04-01|     NSE|     EQ|    EQ|       buy|  fals

In [41]:
taxpnl_shortterm = spark.read.csv('data/taxPnL-shortTerm.csv',header=True,sep='\t')
taxpnl_shortterm.show()

+----------+--------+----------+----------+------------+
|    Symbol|Quantity| Buy Value|Sell Value|Realized P&L|
+----------+--------+----------+----------+------------+
|GODFRYPHLP|       1|    5640.1|   4914.85|     -725.25|
|JUNIORBEES|       1|    411.64|    652.29|      240.65|
| INDIAMART|      12|   29984.7|     28195|     -1789.7|
|     BSOFT|      52|   36914.8|     36543|      -371.8|
|       OIL|       3|   1854.75|    1735.5|     -119.25|
|       BSE|       2|   7379.35|    8838.4|     1459.05|
|SUDARSCHEM|      28|   28355.5|  26940.65|    -1414.85|
| SCHNEIDER|      38|   31278.9|   31047.2|      -231.7|
|       ACE|       9|   14435.1|   11890.5|     -2544.6|
|    GESHIP|      20|  22278.95|   22998.6|      719.65|
| BAJAJHIND|    1205|  45215.81|   40463.9|    -4751.91|
|      STAR|      24|29937.5498|   26219.6|  -3717.9498|
|KARURVYSYA|      34|    6913.9|   7657.14|      743.24|
|      PGEL|      41|  26299.45|  25169.05|     -1130.4|
|      NTPC|     112|  36068.45

In [42]:
taxpnl_longterm = spark.read.csv('data/taxPnL-longTerm.csv', sep='\t',header=True,)
taxpnl_longterm.show()

+----------+--------+---------+----------+------------+
|    Symbol|Quantity|Buy Value|Sell Value|Realized P&L|
+----------+--------+---------+----------+------------+
|JUNIORBEES|       3|   1269.2|   2292.29|     1023.09|
|       BSE|      10|  24650.5|  45744.75|    21094.25|
|LIQUIDBEES|   8.391|8023.0202| 8390.9161|    367.8959|
| TATASTEEL|      42|  5067.42|    7635.6|     2568.18|
|  INDHOTEL|      15|     5142|  10773.25|     5631.25|
|TATACONSUM|       3|   2428.5|    3357.3|       928.8|
|  TATACHEM|       7|     6947|    6552.5|      -394.5|
|     TITAN|       1|  2536.95|   3022.75|       485.8|
|       TCS|       1|  3807.95|   4480.85|       672.9|
| NIFTYBEES|      16|  3267.46|    4606.4|     1338.94|
| TATAPOWER|      20|  5125.85|      9000|     3874.15|
|TATAMOTORS|      14|  8715.45|   13736.8|     5021.35|
+----------+--------+---------+----------+------------+



In [43]:
tradebook_grp=tradebook.where(F.col('trade_type') == 'sell')\
    .withColumn(
        'date_group',
        F.when(F.col('trade_date') <= '2024-07-23', "Before 23-July-2024")
         .otherwise("After 23-July-2024")
    )\
    .withColumn('trade_value', F.col('price') * F.col('quantity'))\
    .groupBy('symbol','date_group')\
    .agg(
        F.count('symbol').alias('count'),
        F.max(F.col('trade_date')).alias('last_trade_date'),
        F.sum(F.col('quantity')).alias('total_quantity'),
        F.sum(F.round(F.col('trade_value'), 2)).alias('total_trade_value')
    )\
    .orderBy('symbol', 'date_group')
tradebook_grp.show(n=100, truncate=False)

+----------+-------------------+-----+---------------+--------------+------------------+
|symbol    |date_group         |count|last_trade_date|total_quantity|total_trade_value |
+----------+-------------------+-----+---------------+--------------+------------------+
|ACE       |After 23-July-2024 |2    |2024-12-02     |9.0           |11890.5           |
|AUROPHARMA|Before 23-July-2024|4    |2024-05-02     |10.0          |11542.5           |
|BAJAJHIND |After 23-July-2024 |3    |2024-11-04     |1205.0        |40463.899999999994|
|BANKBARODA|After 23-July-2024 |1    |2025-03-03     |135.0         |26291.25          |
|BASF      |After 23-July-2024 |1    |2024-12-02     |11.0          |62509.15          |
|BSE       |After 23-July-2024 |3    |2025-03-03     |12.0          |54583.15          |
|BSOFT     |Before 23-July-2024|6    |2024-07-01     |52.0          |36543.0           |
|CHOLAHLDNG|After 23-July-2024 |4    |2025-01-28     |12.0          |16479.35          |
|COCHINSHIP|After 23-

In [44]:
short_term_dated = taxpnl_shortterm.join(tradebook_grp, \
        on=(taxpnl_shortterm['symbol']==tradebook_grp['symbol']) \
            &(taxpnl_shortterm['Sell Value']==F.round(tradebook_grp['total_trade_value'], 2)) \
        ,how='left') \
    .select(taxpnl_shortterm['*'], tradebook_grp['last_trade_date'].alias('trade_date'), tradebook_grp['date_group'].alias('date_group'))\
    .filter(F.col('trade_date').isNotNull())

short_term_non_dated = taxpnl_shortterm.join(tradebook_grp, \
        on=(taxpnl_shortterm['symbol']==tradebook_grp['symbol']) \
            &(taxpnl_shortterm['Sell Value']==F.round(tradebook_grp['total_trade_value'], 2)) \
        ,how='leftanti')

In [45]:
short_term_dated.show(n=100, truncate=False)

+----------+--------+----------+----------+------------+----------+-------------------+
|Symbol    |Quantity|Buy Value |Sell Value|Realized P&L|trade_date|date_group         |
+----------+--------+----------+----------+------------+----------+-------------------+
|ACE       |9       |14435.1   |11890.5   |-2544.6     |2024-12-02|After 23-July-2024 |
|AUROPHARMA|10      |9886.95   |11542.5   |1655.55     |2024-05-02|Before 23-July-2024|
|BAJAJHIND |1205    |45215.81  |40463.9   |-4751.91    |2024-11-04|After 23-July-2024 |
|BANKBARODA|135     |28003.05  |26291.25  |-1711.8     |2025-03-03|After 23-July-2024 |
|BASF      |11      |88410.5   |62509.15  |-25901.35   |2024-12-02|After 23-July-2024 |
|BSOFT     |52      |36914.8   |36543     |-371.8      |2024-07-01|Before 23-July-2024|
|CHOLAHLDNG|12      |21399.6   |16479.35  |-4920.25    |2025-01-28|After 23-July-2024 |
|CREDITACC |11      |17551.9   |13354     |-4197.9     |2024-08-27|After 23-July-2024 |
|DEEPAKFERT|24      |24141.6   |

In [46]:
# short_term_dated.show(n=100, truncate=False)
short_term_non_dated.show(n=100, truncate=False)

+----------+--------+---------+----------+------------+
|Symbol    |Quantity|Buy Value|Sell Value|Realized P&L|
+----------+--------+---------+----------+------------+
|JUNIORBEES|1       |411.64   |652.29    |240.65      |
|BSE       |2       |7379.35  |8838.4    |1459.05     |
|LIQUIDBEES|24.609  |23693.01 |24608.7539|915.7439    |
|MOTILALOFS|35      |24183.25 |22613.35  |-1569.9     |
|TATACHEM  |2       |2050     |1620      |-430        |
|NIFTYBEES |55      |13469.86 |15338.68  |1868.82     |
|LIQUIDCASE|1367    |139160   |141582.91 |2422.91     |
|TATAMOTORS|28      |20779.35 |27485.15  |6705.8      |
|COCHINSHIP|59      |53452.65 |107575.5  |54122.85    |
|GET&D     |19      |26335.9  |31603.85  |5267.95     |
+----------+--------+---------+----------+------------+



In [47]:
tradebook_grouped = tradebook_grp.groupBy('symbol')\
    .agg(
        F.max(F.col('last_trade_date')).alias('last_trade_date'),
        F.min(F.col('last_trade_date')).alias('past_trade_date'),
        F.sum(F.col('total_quantity')).alias('total_quantity'),
        F.sum(F.round(F.col('total_trade_value'), 2)).alias('total_trade_value')
    )\
    .orderBy('symbol')
tradebook_grouped.show(n=100, truncate=False)

+----------+---------------+---------------+--------------+------------------+
|symbol    |last_trade_date|past_trade_date|total_quantity|total_trade_value |
+----------+---------------+---------------+--------------+------------------+
|ACE       |2024-12-02     |2024-12-02     |9.0           |11890.5           |
|AUROPHARMA|2024-05-02     |2024-05-02     |10.0          |11542.5           |
|BAJAJHIND |2024-11-04     |2024-11-04     |1205.0        |40463.9           |
|BANKBARODA|2025-03-03     |2025-03-03     |135.0         |26291.25          |
|BASF      |2024-12-02     |2024-12-02     |11.0          |62509.15          |
|BSE       |2025-03-03     |2025-03-03     |12.0          |54583.15          |
|BSOFT     |2024-07-01     |2024-07-01     |52.0          |36543.0           |
|CHOLAHLDNG|2025-01-28     |2025-01-28     |12.0          |16479.35          |
|COCHINSHIP|2024-09-13     |2024-07-01     |59.0          |107575.5          |
|CREDITACC |2024-08-27     |2024-08-27     |11.0    

In [48]:
short_term_non_dated_df = short_term_non_dated.join(tradebook_grouped, \
        on=(short_term_non_dated['symbol']==tradebook_grouped['symbol']) \
            # &(short_term_non_dated['Sell Value']==F.round(tradebook_grouped['total_trade_value'], 2)) \
        ,how='left')\
        .withColumn('trade_date', F.coalesce('past_trade_date','last_trade_date'))\
        .withColumn('date_group', F.when(F.col('trade_date') <= '2024-07-23', "Before 23-July-2024")\
            .otherwise("After 23-July-2024"))\
        .select(short_term_non_dated['*'],'trade_date','date_group')

In [49]:
short_term_non_dated_df.show(n=100, truncate=False)

+----------+--------+---------+----------+------------+----------+-------------------+
|Symbol    |Quantity|Buy Value|Sell Value|Realized P&L|trade_date|date_group         |
+----------+--------+---------+----------+------------+----------+-------------------+
|JUNIORBEES|1       |411.64   |652.29    |240.65      |2024-04-01|Before 23-July-2024|
|BSE       |2       |7379.35  |8838.4    |1459.05     |2025-03-03|After 23-July-2024 |
|LIQUIDBEES|24.609  |23693.01 |24608.7539|915.7439    |2024-04-01|Before 23-July-2024|
|MOTILALOFS|35      |24183.25 |22613.35  |-1569.9     |2024-07-01|Before 23-July-2024|
|TATACHEM  |2       |2050     |1620      |-430        |2024-06-14|Before 23-July-2024|
|NIFTYBEES |55      |13469.86 |15338.68  |1868.82     |2025-01-03|After 23-July-2024 |
|LIQUIDCASE|1367    |139160   |141582.91 |2422.91     |2024-07-02|Before 23-July-2024|
|TATAMOTORS|28      |20779.35 |27485.15  |6705.8      |2024-06-14|Before 23-July-2024|
|COCHINSHIP|59      |53452.65 |107575.5  |5

In [51]:
tradebook_grouped_updated = tradebook_grouped.join(short_term_non_dated_df, \
        on=(short_term_non_dated['symbol']==tradebook_grouped['symbol']) \
        ,how='left')\
        .withColumn('updated_quantity',F.coalesce((tradebook_grouped['total_quantity'] - short_term_non_dated['Quantity']),tradebook_grouped['total_quantity']))\
        .withColumn('updated_value', F.coalesce((tradebook_grouped['total_trade_value'] - short_term_non_dated['Sell Value']),tradebook_grouped['total_trade_value']))\
        .select(tradebook_grouped['symbol'],tradebook_grouped['last_trade_date'],tradebook_grouped['past_trade_date'], F.col('updated_quantity').alias('total_quantity'), F.col('updated_value').alias('total_trade_value'))
tradebook_grouped_updated.show(n=100, truncate=False)

+----------+---------------+---------------+-----------------+------------------+
|symbol    |last_trade_date|past_trade_date|total_quantity   |total_trade_value |
+----------+---------------+---------------+-----------------+------------------+
|ACE       |2024-12-02     |2024-12-02     |9.0              |11890.5           |
|AUROPHARMA|2024-05-02     |2024-05-02     |10.0             |11542.5           |
|BAJAJHIND |2024-11-04     |2024-11-04     |1205.0           |40463.9           |
|BANKBARODA|2025-03-03     |2025-03-03     |135.0            |26291.25          |
|BASF      |2024-12-02     |2024-12-02     |11.0             |62509.15          |
|BSE       |2025-03-03     |2025-03-03     |10.0             |45744.75          |
|BSOFT     |2024-07-01     |2024-07-01     |52.0             |36543.0           |
|CHOLAHLDNG|2025-01-28     |2025-01-28     |12.0             |16479.35          |
|COCHINSHIP|2024-09-13     |2024-07-01     |0.0              |0.0               |
|CREDITACC |2024

In [None]:
# Here we need to subtract the short-term trades from tradebook_grp to get the long-term trades
taxpnl_longterm_df  = taxpnl_longterm.join(tradebook_grouped_updated, \
        on=(taxpnl_longterm['symbol']==tradebook_grouped_updated['symbol']) \
            &(F.round(taxpnl_longterm['Sell Value'], 2)==F.round(tradebook_grouped_updated['total_trade_value'], 2)) \
        ,how='left') \
    .withColumn('date_group', F.when(F.col('last_trade_date') <= '2024-07-23', "Before 23-July-2024")\
            .otherwise("After 23-July-2024"))\
    .withColumn('trade_date', F.coalesce('last_trade_date','past_trade_date'))\
    .select(taxpnl_longterm['*'], F.col('trade_date'), F.col('date_group'))