In [1]:
#Import necessary modules
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, when, lag, round, sqrt, pow, sum as sum_func, max as max_func, lit, min as min_func, abs, avg, count
from pyspark.sql.types import StructType, StructField, LongType, StringType, BooleanType, DoubleType, ByteType, ShortType
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import os

In [None]:
#Create spark session
spark = SparkSession.builder \
    .appName("Steve") \
    .config("spark.driver.memory", "16g") \
    .getOrCreate()

In [None]:
#Select only desired columns and apply schema
schema = StructType([
    StructField("gameId", LongType(), True),
    StructField("playId", LongType(), True),
    StructField("nflId", DoubleType(), True),
    StructField("frameId", LongType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("frameType", StringType(), True)
])

#Read all the tracking data (which should have same schema) into one dataframe
folder = "data/tracking/"
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".parquet")]
trackingDf = spark.read.schema(schema).parquet(*files)

#Filter and select tracking data just before the snap - to determine if there is any pre-snap motion
snapEventsDf = trackingDf.filter(col("frameType") == "SNAP").groupby("gameId", "playId").agg(min_func("frameId").alias("snapEvent"))

#Join with full data to get last second before snap, using the last 10 frames (update rate of 0.1 seconds)
presnapDf = trackingDf.join(snapEventsDf, ["gameId", "playId"]).filter(col("frameId") < col("snapEvent")).filter(col("frameId") >= (col("snapEvent") - 10))

In [4]:
schema = StructType([
    StructField("gameId", LongType(), True),
    StructField("playId", LongType(), True),
    StructField("yardlineNumber", LongType(), True),
    StructField("expectedPointsAdded", DoubleType(), True),
    StructField("offenseFormation", StringType(), True), 
    StructField("receiverAlignment", StringType(), True), 
    StructField("pff_passCoverage", StringType(), True),
    StructField("pff_manZone", StringType(), True),
    StructField("pff_runConceptPrimary", StringType(), True)
])

playsDf = spark.read.schema(schema).parquet("data/plays.parquet")

In [5]:
#playsDf.describe().show()

+-------+--------------------+------------------+------------------+--------------------+----------------+-----------------+----------------+-----------+---------------------+
|summary|              gameId|            playId|    yardlineNumber| expectedPointsAdded|offenseFormation|receiverAlignment|pff_passCoverage|pff_manZone|pff_runConceptPrimary|
+-------+--------------------+------------------+------------------+--------------------+----------------+-----------------+----------------+-----------+---------------------+
|  count|               16124|             16124|             16124|               16124|           15936|            15936|           15932|      15932|                 9071|
|   mean|2.0220989036912057E9|2023.8305631356984|29.226184569585712|-0.02938121470672...|            NULL|             NULL|            NULL|       NULL|                 NULL|
| stddev|   5979.251755132589|1182.0907707451506|12.662827976390345|   1.397405113935573|            NULL|             NULL|            NULL|       NULL|                 NULL|
|    min|          2022090800|                54|                 1|   -13.0236002178863|           EMPTY|              1x0|           2-Man|        Man|              COUNTER|
|    max|          2022110700|              5120|                50|     8.6989859752357|         WILDCAT|              4x2|        Red Zone|       Zone|            UNDEFINED|
+-------+--------------------+------------------+------------------+--------------------+----------------+-----------------+----------------+-----------+---------------------+

In [6]:
#playsDf.select("expectedPointsAdded").describe().show()

+-------+--------------------+
|summary| expectedPointsAdded|
+-------+--------------------+
|  count|               16124|
|   mean|-0.02938121470672...|
| stddev|   1.397405113935573|
|    min|   -13.0236002178863|
|    max|     8.6989859752357|
+-------+--------------------+

In [7]:
#THIS IS JUST FOR ONE TIME VIEWING - not included in full workflow
def analyse_numerical(df):
    df = playsDf.toPandas()
    df = df.drop(['gameId', 'playId'], axis=1)
    df.hist(bins=20, figsize=(7, 5))
    plt.tight_layout()
    plt.show()


In [8]:
#THIS IS JUST FOR ONE TIME VIEWING - not included in full workflow
def analyse_text(df):
    textCols = df.select_dtypes(include = ['object', 'category']).columns

    for columnName in textCols:
        column = df[columnName].fillna('missing')
        plt.figure(figsize = (10,10))
        sns.countplot(data=df, x=columnName, order=df[columnName].value_counts().index[:15])
        plt.xticks(rotation = 90)
        plt.title(f"{columnName} Distribution")
        plt.tight_layout()
        plt.grid(True)
        plt.show()


In [9]:
#THIS IS JUST FOR ONE TIME VIEWING - not included in full workflow
def correlation(df):
    encodedDf = pd.get_dummies(df, drop_first=True)
    correlations = encodedDf.corr()
    plt.figure(figsize = (10, 10))
    sns.heatmap(correlations, cmap='coolwarm')
    plt.title('Correlation Matrix - One Hot Encoded')
    plt.show()

In [10]:
#Read in columns of interest from players data source 
schema = StructType([
    StructField("nflId", LongType(), True), 
    StructField("position", StringType(), True)
])

#Read in players datasource and apply schema
playersDf = spark.read.schema(schema).parquet("data/players.parquet")

In [11]:
#Add positions column, through a join
presnapDf = presnapDf.join(playersDf.select("nflId", "position"), on="nflId", how="left")

#Get rid of playersDf from cache as it is obsolete
playersDf.unpersist()

#Position classifications for later usage
offensivePositions = ["QB", "RB", "FB", "HB", "WR", "TE", "LT", "LG", "C", "RG", "RT"]
defensivePositions = ["CB", "S", "FS", "SS", "MLB", "OLB", "ILB", "LB", "DT", "DE", "NT", "DB"]

#Create a classifier column for if the player is on offence or defence
presnapDf = presnapDf.withColumn("isOffence",when(col("position").isin(*offensivePositions), True).when(col("position").isin(*defensivePositions), False).otherwise(lit(None)).cast(BooleanType()))

#--------------------------------------------------Safety feature engineering---------------------------------------------------
#Select only the safeties
safetyDf = presnapDf.filter(col("position").isin(["SS", "FS"]))

safetyDf.drop("frameId", "frameType", "snapEvent", "isOffence")

#Get the yardline of the play
safetyDf = safetyDf.join(playsDf.select("gameId", "playId", "yardlineNumber"), on = ["gameId", "playId"], how = "left")

#Determine which direction the offence is facing
safetyDf = safetyDf.withColumn("play_direction", when(col("x") < col("yardlineNumber"), "right").otherwise("left"))

#Fix in the line of scrimmage, using the play direction, this is so that "line_of_scrimmage" is in the same units as "x"
safetyDf = safetyDf.withColumn("line_of_scrimage",when(col("play_direction") == "left", 100 - col("yardlineNumber")).otherwise(col("yardlineNumber")))

#Determine safety depth
safetyDf = safetyDf.withColumn("distance", abs(col("x") - col("line_of_scrimage")))

#Select only the last frame, this will have the distance a moment before the snap
frameGroupByDf = safetyDf.groupBy("nflId", "gameId", "playId").agg(max_func("frameId").alias("frameId"))

#Add the new calculated columns
safetyDf = safetyDf.join(frameGroupByDf, on = ["nflId","gameId", "playId", "frameId"], how = "inner")

#Create the two features by averaging the safety depth and creating a count of safeties, (the new calculated columns)
safetyDf = safetyDf.groupBy("gameId", "playId").agg(avg("distance").alias("avgSafetyDistance"), count("distance").alias("numSafeties"))


In [12]:
#--------------------------------------------------------Pre snap motion engineering---------------------------------------------------

#Drop redundant columns
presnapDf = presnapDf.drop("frameId", "frameType", "snapEvent", "position")

#Formalise the sequence of the dataframe through an id to be used in a partition windw
presnapDf = presnapDf.withColumn("sequenceId", monotonically_increasing_id())
sparkWindow = Window.partitionBy("gameId", "playId", "nflId").orderBy("sequenceId")

#Over the window (per player per play) determine their position compared to the last data point
presnapDf = presnapDf.withColumn("prevX", round(lag("x").over(sparkWindow), 2))
presnapDf = presnapDf.withColumn("prevY", round(lag("y").over(sparkWindow), 2))

#Calculate cumulative distance
presnapDf = presnapDf.withColumn("cumulativeDistance", round(when(col("prevX").isNull() | col("prevY").isNull(), 0.0).otherwise(sqrt(pow(col("x") - col("prevX"), 2) + pow(col("y") - col("prevY"), 2))),5))

#Sum all the movement values - this gives total distance moved in a second
playerMotion = presnapDf.groupBy("gameId", "playId", "nflId", "isOffence").agg(sum_func("cumulativeDistance").alias("distanceMoved"))

presnapDf.unpersist()

#Using a threshold of 2 yards in the 1 second time frame, determine if the player was in motion
playerMotion = playerMotion.withColumn("motion", col("distanceMoved") > 2)

#Determine if any player on each side was in motion
playMotion = playerMotion.groupBy("gameId", "playId", "isOffence").agg(max_func("motion").alias("isMotion"))

#Create final result
playMotion = playMotion.select("gameId", "playId", "isOffence", "isMotion")

#Move each play onto one row for an easier join with the rest of the data
playMotion = playMotion.withColumn("isOffenceMoving", when(col("isOffence") == True, col("isMotion")))\
                   .withColumn("isDefenceMoving", when(col("isOffence") == False, col("isMotion")))\
                   .groupBy("gameId", "playId")\
                   .agg(max_func("isOffenceMoving").alias("isOffenceMoving"),
                        max_func("isDefenceMoving").alias("isDefenceMoving"))

In [13]:
#Entire Dataframe ready for ML workflow, join all feature tables together
mergedDf = playsDf.join(playMotion, ["gameId", "playId"], "inner").join(safetyDf, ["gameId", "playId"], "inner")
mergedDf = mergedDf.withColumn("numSafeties", mergedDf["numSafeties"].cast(ByteType()))
mergedDf = mergedDf.withColumn("yardlineNumber", mergedDf["yardlineNumber"].cast(ShortType()))
mergedDf.cache()
playMotion.unpersist()
trackingDf.unpersist()

DataFrame[gameId: bigint, playId: bigint, nflId: double, frameId: bigint, x: double, y: double, frameType: string]

In [14]:
def down_sample(mergedDf, fraction):
    '''
    This function is used to down sample the majoirty class when needed
    inputs: 
        mergedDf (pySpark DataFrame) - This is the processed data ready to be split 
        fraction (float) - The fraction that the user wants to keep (0.8 = keeps 80%)
    outputs:
        mergedDf (pySpark DataFrame) - Now down sampled Dataframe
    '''
    #Grab all the data points within the majority range (-1 to 1)
    majorityDf = mergedDf.filter((col("expectedPointsAdded") >= -1) & (col("expectedPointsAdded") <=1))

    #Select a fraction of them
    majorityDf = majorityDf.sample(fraction = fraction, seed = 1)

    #Select everything that ISNT in the majority range
    mergedDf = mergedDf.filter((col("expectedPointsAdded") < -1) | (col("expectedPointsAdded") > 1))

    #Union join to bring the two together
    mergedDf = majorityDf.union(mergedDf)

    return mergedDf

In [15]:
#Split the data 
trainDf, testDf = mergedDf.randomSplit([0.8, 0.2], seed = 1)
trainDf.cache()
testDf.cache()

DataFrame[gameId: bigint, playId: bigint, yardlineNumber: smallint, expectedPointsAdded: double, offenseFormation: string, receiverAlignment: string, pff_passCoverage: string, pff_manZone: string, pff_runConceptPrimary: string, isOffenceMoving: boolean, isDefenceMoving: boolean, avgSafetyDistance: double, numSafeties: tinyint]

In [16]:
from pyspark.ml import Pipeline
#Import pipeline features
from pyspark.ml.feature import OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
#Import evaluator object
from pyspark.ml.evaluation import RegressionEvaluator
#Import potential models
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor


target = "expectedPointsAdded"
numFeatures = ["gameId", "playId", "yardlineNumber", "avgSafetyDistance", "numSafeties"]
boolFeatures = ["isOffenceMoving", "isDefenceMoving"]
textFeatures = ["offenseFormation", "receiverAlignment", "pff_passCoverage", "pff_manZone", "pff_runConceptPrimary"]

#Text handler, pySpark's onehotencoder only accepts numerical input (hence StringIndexer)
#Indexer maps strings to integers
indexer = [StringIndexer(inputCol=columnName, outputCol=columnName + "idx", handleInvalid="keep") for columnName in textFeatures]
#Encoder maps integer to vector
encoder = [OneHotEncoder(inputCol=columnName + "idx", outputCol=columnName + "vector") for columnName in textFeatures]

assembler = VectorAssembler(inputCols= numFeatures + boolFeatures + [columnName + "vector" for columnName in textFeatures], outputCol="features")
scaler = StandardScaler(inputCol= "features", outputCol="features-Scaled")

potentialModels = {"LinearRegression": LinearRegression(featuresCol="features-Scaled", labelCol=target),
                   "GBT Regressor": GBTRegressor(featuresCol="features-Scaled", labelCol=target),
                   "Random Forest" : RandomForestRegressor(featuresCol="features-Scaled", labelCol=target)
                   }

evalObj = RegressionEvaluator(metricName="rmse", labelCol=target, predictionCol="prediction")

colsToDrop = numFeatures + boolFeatures + textFeatures + [columnName + "idx" for columnName in textFeatures] + [columnName + "vector" for columnName in textFeatures]


#for modelName, model in potentialModels.items():
#   print(f"Commencing evaluation on {modelName} !")
#   pipeline = Pipeline(stages = indexer + encoder + [assembler, scaler, model])
#   fullPipelineObj = pipeline.fit(trainDf)
#
#   predictions = fullPipelineObj.transform(testDf)
    
#   predictions.drop(*colsToDrop)
    
#   rmse = evalObj.evaluate(predictions)

#   print(f"{modelName} = {rmse}")


Commencing evaluation on LinearRegression !
LinearRegression = 1.4334905085709757
Commencing evaluation on GBT Regressor !
GBT Regressor = 1.4419406537003188
Commencing evaluation on Random Forest !
Random Forest = 1.4300720209132907

for 90% sampling 
Commencing evaluation on LinearRegression !
LinearRegression = 1.433266075915844
Commencing evaluation on GBT Regressor !
GBT Regressor = 1.442627337619911
Commencing evaluation on Random Forest !
Random Forest = 1.431955721144724

for 70% sampling 
Commencing evaluation on LinearRegression !
LinearRegression = 1.5311121164404962
Commencing evaluation on GBT Regressor !
GBT Regressor = 1.5562678111012818
Commencing evaluation on Random Forest !
Random Forest = 1.5313389438322291

for 50% Sampling 
Commencing evaluation on LinearRegression !
LinearRegression = 1.6648558482495757
Commencing evaluation on GBT Regressor !
GBT Regressor = 1.685530584445583
Commencing evaluation on Random Forest !
Random Forest = 1.6646815092344738

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#Selected Model
RFR = RandomForestRegressor(featuresCol="features-Scaled", labelCol=target)

#Define pipeline
pipeline = Pipeline(stages = indexer + encoder + [assembler, scaler, RFR])

paraGrid = ParamGridBuilder() \
    .addGrid(RFR.numTrees, [20, 50, 100]) \
    .addGrid(RFR.maxDepth, [5, 10]) \
    .addGrid(RFR.minInstancesPerNode, [1, 5]) \
    .build()

#2 folds of cross validation, evalobj = rmse
CrossVal = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paraGrid,
    evaluator=evalObj,
    numFolds=2,
)

CrossValModel = CrossVal.fit(trainDf) 

predictions = CrossValModel.transform(testDf)
rmse = evalObj.evaluate(predictions)
print(f"Optimised RMSE: {rmse}")

#To save memory
trainDf.unpersist()
del trainDf
testDf.unpersist()
del testDf
spark.catalog.clearCache()

bestModel = CrossValModel.bestModel

RFR_model = bestModel.stages[-1]

bestPara = {
    'numTrees': RFR_model.getNumTrees,
    'maxDepth': RFR_model.getMaxDepth(),
    'minInstancesPerNode': RFR_model.getMinInstancesPerNode(),
    'featureSubsetStrategy': RFR_model.getFeatureSubsetStrategy()
}

for hyperPara, value in bestPara.items():
    print(f"  {hyperPara}: {value}") 


Py4JJavaError: An error occurred while calling o726.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 0.0 failed 1 times, most recent failure: Lost task 9.0 in stage 0.0 (TID 9) (DESKTOP-DEBTK35 executor driver): java.io.IOException: There is not enough space on the disk
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:349)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:540)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:70)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:337)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:174)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:284)
	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:232)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.baseCacheRDD(InMemoryTableScanExec.scala:202)
	at org.apache.spark.sql.execution.adaptive.TableCacheQueryStageExec.future$lzycompute(QueryStageExec.scala:296)
	at org.apache.spark.sql.execution.adaptive.TableCacheQueryStageExec.future(QueryStageExec.scala:292)
	at org.apache.spark.sql.execution.adaptive.TableCacheQueryStageExec.doMaterialize(QueryStageExec.scala:307)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:71)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdate$5(AdaptiveSparkPlanExec.scala:309)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdate$5$adapted(AdaptiveSparkPlanExec.scala:307)
	at scala.collection.immutable.Vector.foreach(Vector.scala:2125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdate$1(AdaptiveSparkPlanExec.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:279)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
	at org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1482)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1482)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:152)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
		at scala.Option.foreach(Option.scala:437)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
Caused by: java.io.IOException: There is not enough space on the disk
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:349)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:540)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:70)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:337)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:174)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
