In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

In [2]:
spark = SparkSession.builder\
      .config("spark.sql.shuffle.partitions", 4)\
      .master("local[4]")\
      .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 09:32:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
races = spark.read.parquet("../../../data/parquetnopart/races.parquet")\
    .select("raceId", "year")

In [4]:
seasonWindow = Window.partitionBy("year")
driverRaceWindow = Window.partitionBy("driverId", "raceId")
raceDriverLapWindow = Window.partitionBy("driverId", "raceId").orderBy("lap")

In [5]:
overtakes = spark.read.parquet("../../../data/parquetnopart/lap_times.parquet")\
    .withColumn("position", F.col("position").cast(T.IntegerType()))\
    .withColumn("lap", F.col("lap").cast(T.IntegerType()))\
    .join(races, "raceId")\
    .withColumn("positionNextLap", F.lead(F.col("position"), 1).over(raceDriverLapWindow))\
    .withColumn("positionsGainedLap", F.when(F.col("positionNextLap") < F.col("position") , F.abs(F.col("position") - F.col("positionNextLap"))).otherwise(0))\
    .groupBy("year")\
    .agg(F.sum(F.col("positionsGainedLap")).alias("positionsGainedSeason"))\
    .withColumn("rankPositionsGained", F.rank().over(Window.orderBy(F.col("positionsGainedSeason").desc())))

In [6]:
leadersTroughoutSeason = spark.read.parquet("../../../data/parquetnopart/driver_standings.parquet")\
    .join(races, "raceId")\
    .where(F.col("position") == 1)\
    .dropDuplicates(["driverId", "position", "year"])\
    .groupBy("year")\
    .agg(F.approx_count_distinct(F.col("driverId")).alias("distinctLeaders"))\
    .withColumn("rankDistinctLeaders", F.rank().over(Window.orderBy(F.col("distinctLeaders").desc())))

In [7]:
winnersTroughoutSeason = spark.read.parquet("../../../data/parquetnopart/results.parquet")\
    .join(races, "raceId")\
    .where(F.col("position") == 1)\
    .dropDuplicates(["driverId", "position", "year"])\
    .groupBy("year")\
    .agg(F.approx_count_distinct(F.col("driverId")).alias("distinctWinners"))\
    .withColumn("rankDistinctWinners", F.rank().over(Window.orderBy(F.col("distinctWinners").desc())))

In [8]:
def averageRank(cols):
    return sum(cols) / len(cols)

In [9]:
averageRank = F.udf(averageRank, T.IntegerType())

Usar la UDF definida arriba da resultados erróneos. Por ello se suman las columnas y se divide entre 3. Esto es menos flexible ya que si quisiera hacer la media del contenido de 6 columnas tendría que hacerlo a mano. Para solucionarlo también se puede castear la columna avgRank a Integer. Por defecto debe ser que las UDF devuelven un String en Python. Es importante establecer el tipo del output de la función.

In [10]:
results = overtakes\
    .join(leadersTroughoutSeason, "year", "inner")\
    .join(winnersTroughoutSeason, "year", "inner")\
    .withColumn("avgRank", averageRank(F.array(F.col("rankDistinctWinners"), F.col("rankDistinctLeaders"), F.col("rankPositionsGained"))))\
    .withColumn("overallRank", F.rank().over(Window.orderBy("avgRank")))\
    .sort("overallRank")

In [11]:
import time
def current_milli_time():
    return round(time.time() * 1000)

def run():
    start = current_milli_time()
    results.collect()
    return current_milli_time() - start

def average(l):
    return sum(l)/len(l)
    
def time_test():
    l = list()
    for i in range(25):
        l.append(run())
    return average(l)

res = time_test()


22/05/05 09:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/05 09:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/05 09:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/05 09:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/05 09:33:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/05/05 09:33:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradat