In [1]:
test_file = "1_cols_1_kb"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LargeRowBenchMark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/26 23:22:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Parquet

In [3]:
df = spark.read.parquet(f"data/{test_file}.parquet")
df.createOrReplaceTempView("t")
df

                                                                                

DataFrame[uid: bigint, name: string, col_0: array<double>]

In [4]:
df.collect()

[Row(uid=1, name='Alice', col_0=[0.28056457144557134, 0.22554365961658462, 0.11637891638498843, 0.702036635851732, 0.49840683542495756, 0.7210834144692044, 0.23532594567500076, 0.9936512245288306, 0.6854757676499107, 0.44010117712883146, 0.3766069023686488, 0.8842291681051566, 0.9821549039375922, 0.46884603714284456, 0.6603236438912433, 0.7750178607559738, 0.8623362097726647, 0.0615432689648866, 0.1322368247522746, 0.2630430395117067, 0.6122765664327072, 0.5508071555275719, 0.5922901960975323, 0.102441795624441, 0.018195757672059387, 0.7928628433443801, 0.3651587342503795, 0.7159430375195351, 0.8783717994546104, 0.7224962556950038, 0.4879294156911467, 0.7989088796034195, 0.3701765005454104, 0.8426026547427485, 0.5744292770752667, 0.16738673744628518, 0.40085925598303274, 0.28035441643099135, 0.4184745834806023, 0.9093972029599521, 0.6039381298131273, 0.060620600316838025, 0.3647035629652451, 0.47992289819706335, 0.5927191812741408, 0.4951931377988701, 0.5228435369050134, 0.134148032101

In [5]:
from pyspark.sql.functions import array_max

df.select(array_max("col_0")).collect()

[Row(array_max(col_0)=0.9990944368573513)]

In [6]:
from pyspark.sql.functions import array_sort

df.select(array_sort("col_0").alias('sorted_col_0')).collect()

[Row(sorted_col_0=[0.0036917736821856506, 0.01577494585998651, 0.018195757672059387, 0.03158961725710696, 0.03426128586905797, 0.03814310322929615, 0.048518012005965505, 0.060620600316838025, 0.0615432689648866, 0.07151878537687384, 0.0736609603036551, 0.08253997328300844, 0.102441795624441, 0.11401238652398116, 0.11586692346138128, 0.11637891638498843, 0.12047489481439255, 0.1322368247522746, 0.13414803210124715, 0.14180584430873244, 0.14518923525674265, 0.16105484327808262, 0.16604697578332817, 0.16738673744628518, 0.17736031822134213, 0.17788786745267837, 0.17987050502990165, 0.18028788045885302, 0.19437598350565088, 0.22554365961658462, 0.2305138588938913, 0.23433644915574248, 0.23532594567500076, 0.24291822481644587, 0.24374857806635963, 0.2630430395117067, 0.26317289396888166, 0.26843815159014495, 0.2719257923483087, 0.28035441643099135, 0.28056457144557134, 0.28444588060550635, 0.311149008121275, 0.3647035629652451, 0.3651587342503795, 0.3701765005454104, 0.3766069023686488, 0.3

In [7]:
import pyspark.sql.functions as F

df.select(F.aggregate("col_0", F.lit(0.0), lambda acc, x: acc + x).alias("total")).collect()

[Row(total=63.71964782105779)]

In [8]:
from pyspark.sql.types import DoubleType

def get_second_max(arr):
    if arr is None or len(arr) < 2:
        return None
    sorted_arr = sorted(arr, reverse=True)
    if len(sorted_arr) < 2:
        return None
    return sorted_arr[1]

spark.udf.register("get_second_max", get_second_max, DoubleType())
spark.sql("select get_second_max(col_0) from t").collect()

                                                                                

[Row(get_second_max(col_0)=0.9936512245288306)]

# CSV

In [9]:
df = spark.read.csv(f"data/{test_file}.csv", header=True)
df

DataFrame[uid: string, name: string, col_0: string]

In [10]:
# convert col_0 to an array of doubles
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import ArrayType, DoubleType

df = df.withColumn("col_0", from_json(col("col_0"), ArrayType(DoubleType())))
df.createOrReplaceTempView("t")
df

DataFrame[uid: string, name: string, col_0: array<double>]

In [11]:
df.collect()

[Row(uid='1', name='Alice', col_0=[0.28056457144557134, 0.22554365961658462, 0.11637891638498843, 0.702036635851732, 0.49840683542495756, 0.7210834144692044, 0.23532594567500076, 0.9936512245288306, 0.6854757676499107, 0.44010117712883146, 0.3766069023686488, 0.8842291681051566, 0.9821549039375922, 0.46884603714284456, 0.6603236438912433, 0.7750178607559738, 0.8623362097726647, 0.0615432689648866, 0.1322368247522746, 0.2630430395117067, 0.6122765664327072, 0.5508071555275719, 0.5922901960975323, 0.102441795624441, 0.018195757672059387, 0.7928628433443801, 0.3651587342503795, 0.7159430375195351, 0.8783717994546104, 0.7224962556950038, 0.4879294156911467, 0.7989088796034195, 0.3701765005454104, 0.8426026547427485, 0.5744292770752667, 0.16738673744628518, 0.40085925598303274, 0.28035441643099135, 0.4184745834806023, 0.9093972029599521, 0.6039381298131273, 0.060620600316838025, 0.3647035629652451, 0.47992289819706335, 0.5927191812741408, 0.4951931377988701, 0.5228435369050134, 0.1341480321

In [12]:
df.select(array_max("col_0")).collect()

[Row(array_max(col_0)=0.9990944368573513)]

In [13]:
df.select(array_sort("col_0").alias('sorted_col_0')).collect()

[Row(sorted_col_0=[0.0036917736821856506, 0.01577494585998651, 0.018195757672059387, 0.03158961725710696, 0.03426128586905797, 0.03814310322929615, 0.048518012005965505, 0.060620600316838025, 0.0615432689648866, 0.07151878537687384, 0.0736609603036551, 0.08253997328300844, 0.102441795624441, 0.11401238652398116, 0.11586692346138128, 0.11637891638498843, 0.12047489481439255, 0.1322368247522746, 0.13414803210124715, 0.14180584430873244, 0.14518923525674265, 0.16105484327808262, 0.16604697578332817, 0.16738673744628518, 0.17736031822134213, 0.17788786745267837, 0.17987050502990165, 0.18028788045885302, 0.19437598350565088, 0.22554365961658462, 0.2305138588938913, 0.23433644915574248, 0.23532594567500076, 0.24291822481644587, 0.24374857806635963, 0.2630430395117067, 0.26317289396888166, 0.26843815159014495, 0.2719257923483087, 0.28035441643099135, 0.28056457144557134, 0.28444588060550635, 0.311149008121275, 0.3647035629652451, 0.3651587342503795, 0.3701765005454104, 0.3766069023686488, 0.3

In [14]:
df.select(F.aggregate("col_0", F.lit(0.0), lambda acc, x: acc + x).alias("total")).collect()

[Row(total=63.71964782105779)]

In [15]:
spark.sql("select get_second_max(col_0) from t").collect()

[Row(get_second_max(col_0)=0.9936512245288306)]

# Stop Spark

In [16]:
spark.stop()