In [0]:

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.window import Window


In [0]:

# Define schema for the DataFrame
schema = StructType([
    StructField("person_id", IntegerType(), True),
    StructField("person_name", StringType(), True),
    StructField("weight", IntegerType(), True),
    StructField("turn", IntegerType(), True)
])

# Data
data = [
    (5, "Alice", 250, 1),
    (4, "Bob", 175, 5),
    (3, "Alex", 350, 2),
    (6, "John Cena", 400, 3),
    (1, "Winston", 500, 6),
    (2, "Marie", 200, 4)
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
df.show()

+---------+-----------+------+----+
|person_id|person_name|weight|turn|
+---------+-----------+------+----+
|        5|      Alice|   250|   1|
|        4|        Bob|   175|   5|
|        3|       Alex|   350|   2|
|        6|  John Cena|   400|   3|
|        1|    Winston|   500|   6|
|        2|      Marie|   200|   4|
+---------+-----------+------+----+



In [0]:

df.createOrReplaceTempView('queue')


#### PySpark Approach

In [0]:

windowSpec1 = Window.orderBy(F.col('turn'))
windowSpec2 = Window.orderBy(F.col('cumm_weight'))


df_result = (
    df.withColumn('cumm_weight', F.sum(F.col('weight')).over(windowSpec))
        .withColumn('rnk', F.dense_rank().over(windowSpec2))
        .filter(F.col('cumm_weight') <= 1000)
        .select('person_name').orderBy(F.desc(F.col('rnk')))
        .limit(1)
).show()



+-----------+
|person_name|
+-----------+
|  John Cena|
+-----------+



#### SQL Approach

In [0]:

%sql

WITH cte AS	(
	SELECT 
		*,
		SUM(weight) OVER(ORDER BY turn) as cumm_weight
	FROM queue
),cte2 AS (
	SELECT 
		*,
		DENSE_RANK() OVER(ORDER BY cumm_weight) as rnk
	FROM cte
)
SELECT person_name FROM cte2
WHERE cumm_weight <= 1000
ORDER BY rnk DESC
LIMIT 1


person_name
John Cena
