In [2]:
from pyspark.sql import SparkSession
import pandas as pd
from datetime import date, datetime
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import functions
df =spark.read.csv('data\\joins\\user_transactions.csv', header=True, inferSchema=True)

In [3]:
df.show()

+--------------+-------+------+---------+----------------+
|transaction_id|user_id|amount| location|transaction_time|
+--------------+-------+------+---------+----------------+
|           201|      1|   500|Hyderabad|10-03-2025 12:00|
|           202|      1|   700|Hyderabad|10-03-2025 12:04|
|           203|      2|   200|Hyderabad|10-03-2025 14:00|
|           204|      2|   250|  Chennai|10-03-2025 14:01|
|           205|      3|  1000| Banglore|10-03-2025 15:30|
|           206|      3|  1500|Hyderabad|10-03-2025 15:34|
+--------------+-------+------+---------+----------------+



In [4]:
from pyspark.sql import functions as f
df.agg(f.sum('amount')).show()

+-----------+
|sum(amount)|
+-----------+
|       4150|
+-----------+



In [5]:
df.agg(f.sum('amount')).show()
df.groupBy('user_id').agg(f.sum('amount')).show()

+-----------+
|sum(amount)|
+-----------+
|       4150|
+-----------+

+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|      1|       1200|
|      3|       2500|
|      2|        450|
+-------+-----------+



In [10]:
from pyspark.sql.window import Window
df.agg(f.sum('amount')).show()
df.groupBy('user_id').agg(f.sum('amount')).show()
w_spec = Window.partitionBy('user_id')
df.withColumn('uer_sum_amount', f.sum('amount').over(w_spec)).show()

+-----------+
|sum(amount)|
+-----------+
|       4150|
+-----------+

+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|      1|       1200|
|      3|       2500|
|      2|        450|
+-------+-----------+

+--------------+-------+------+---------+----------------+--------------+
|transaction_id|user_id|amount| location|transaction_time|uer_sum_amount|
+--------------+-------+------+---------+----------------+--------------+
|           201|      1|   500|Hyderabad|10-03-2025 12:00|          1200|
|           202|      1|   700|Hyderabad|10-03-2025 12:04|          1200|
|           203|      2|   200|Hyderabad|10-03-2025 14:00|           450|
|           204|      2|   250|  Chennai|10-03-2025 14:01|           450|
|           205|      3|  1000| Banglore|10-03-2025 15:30|          2500|
|           206|      3|  1500|Hyderabad|10-03-2025 15:34|          2500|
+--------------+-------+------+---------+----------------+--------------+



In [11]:
from pyspark.sql.window import Window
print(dir(Window))

['_FOLLOWING_THRESHOLD', '_JAVA_MAX_LONG', '_JAVA_MIN_LONG', '_PRECEDING_THRESHOLD', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'currentRow', 'orderBy', 'partitionBy', 'rangeBetween', 'rowsBetween', 'unboundedFollowing', 'unboundedPreceding']


In [12]:
from pyspark.sql.window import Window
w_spec = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn('uer_sum_amount', f.sum('amount').over(w_spec)).show()
df.withColumn('uer_sum_amount', f.max('amount').over(w_spec)).show()
df.withColumn('uer_sum_amount', f.min('amount').over(w_spec)).show()



+--------------+-------+------+---------+----------------+--------------+
|transaction_id|user_id|amount| location|transaction_time|uer_sum_amount|
+--------------+-------+------+---------+----------------+--------------+
|           201|      1|   500|Hyderabad|10-03-2025 12:00|           500|
|           202|      1|   700|Hyderabad|10-03-2025 12:04|          1200|
|           203|      2|   200|Hyderabad|10-03-2025 14:00|          1400|
|           204|      2|   250|  Chennai|10-03-2025 14:01|          1650|
|           205|      3|  1000| Banglore|10-03-2025 15:30|          2650|
|           206|      3|  1500|Hyderabad|10-03-2025 15:34|          4150|
+--------------+-------+------+---------+----------------+--------------+

+--------------+-------+------+---------+----------------+--------------+
|transaction_id|user_id|amount| location|transaction_time|uer_sum_amount|
+--------------+-------+------+---------+----------------+--------------+
|           201|      1|   500|Hydera

In [13]:
from pyspark.sql.window import Window
w_spec = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('uer_sum_amount', f.sum('amount').over(w_spec)).show()
df.withColumn('uer_sum_amount', f.max('amount').over(w_spec)).show()
df.withColumn('uer_sum_amount', f.min('amount').over(w_spec)).show()



+--------------+-------+------+---------+----------------+--------------+
|transaction_id|user_id|amount| location|transaction_time|uer_sum_amount|
+--------------+-------+------+---------+----------------+--------------+
|           201|      1|   500|Hyderabad|10-03-2025 12:00|          4150|
|           202|      1|   700|Hyderabad|10-03-2025 12:04|          4150|
|           203|      2|   200|Hyderabad|10-03-2025 14:00|          4150|
|           204|      2|   250|  Chennai|10-03-2025 14:01|          4150|
|           205|      3|  1000| Banglore|10-03-2025 15:30|          4150|
|           206|      3|  1500|Hyderabad|10-03-2025 15:34|          4150|
+--------------+-------+------+---------+----------------+--------------+

+--------------+-------+------+---------+----------------+--------------+
|transaction_id|user_id|amount| location|transaction_time|uer_sum_amount|
+--------------+-------+------+---------+----------------+--------------+
|           201|      1|   500|Hydera

In [19]:
from pyspark.sql.window import Window
df =spark.read.csv('data\\joins\\user_transactions.csv', header=True, inferSchema=True)
w_spec = Window.orderBy(f.col('amount').desc())
df = df.withColumn('amount_rank', f.rank().over(w_spec))
df.withColumn('amount_denserank', f.dense_rank().over(w_spec)).show()

+--------------+-------+------+---------+----------------+-----------+----------------+
|transaction_id|user_id|amount| location|transaction_time|amount_rank|amount_denserank|
+--------------+-------+------+---------+----------------+-----------+----------------+
|           206|      3|  1500|Hyderabad|10-03-2025 15:34|          1|               1|
|           208|      3|  1500|Hyderabad|10-03-2025 15:34|          1|               1|
|           205|      3|  1000| Banglore|10-03-2025 15:30|          3|               2|
|           207|      3|  1000| Banglore|10-03-2025 15:30|          3|               2|
|           202|      1|   700|Hyderabad|10-03-2025 12:04|          5|               3|
|           210|      1|   700|Hyderabad|10-03-2025 12:04|          5|               3|
|           201|      1|   500|Hyderabad|10-03-2025 12:00|          7|               4|
|           209|      1|   500|Hyderabad|10-03-2025 12:00|          7|               4|
|           204|      2|   250| 