### Horse Racing Machine Learning Model using Pyspark and Databricks

A dashboard created with databricks was used for the exploratory analysis, the dashboard can be found using the following link: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2076258431253745/4211955669654020/8071911021850060/latest.html


### Objective
The objetive of this project was to develop a supervised learning classification algorithm to predict the likelihood of winning a derby
race, given a set of observations.

In [3]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 57.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=2a23c8c8ee32c93caa58b19ed131e7654dda162982928664b599a9202d78050f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [4]:
from pyspark.sql import SparkSession #initiallizing spark session
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
spark #confirming installation and initialization

### Dataset Attributes :


This competition has provided 4 csv files with some common features :

#### nyra_start_table.csv
Horse/jockey race data

- track_id - 3 character id for the track the race took place at. AQU: Aqueduct, BEL: Belmont, SAR: Saratoga.
- race_date - date the race took place. YYYY-MM-DD.
- race_number - Number of the race. Passed as 3 characters but can be cast or converted to int for this data set.
- program_number - Program number of the horse in the race passed as 3 characters. Should remain 3 characters as it isn't limited to just numbers. Is essentially the unique identifier of the horse in the race.
- weight_carried - An integer of the weight carried by the horse in the race.
- jockey - Name of the jockey on the horse in the race. 50 character max.
- odds - Odds to win the race passed as an integer. Divide by 100 to derive the odds to 1. Example - 1280 would be 12.8-1.
- position_at_finish - An integer of the horse's finishing position. (added to the dataset 9/8/22)

#### nyra_race_table.csv
Racetrack race data

- track_id - 3 character id for the track the race took place at. AQU: Aqueduct, BEL: Belmont, SAR: Saratoga.
- race_date - date the race took place. YYYY-MM-DD.
- race_number - Number of the race. Passed as 3 characters but can be cast or converted to int for this data set.
- distance_id - Distance of the race in furlongs passed as an integer. Example - 600 would be 6 furlongs.
- course_type - The course the race was run over passed as one character. M - Hurdle, D - Dirt, O - Outer turf, I - Inner turf, T - turf.
- track_condition - The condition of the course the race was run on passed as three characters. YL - Yielding, FM - Firm, SY - Sloppy, GD - Good, FT - Fast, MY - Muddy, SF - Soft.
- run_up_distance - Distance in feet of the gate to the start of the race passed as an integer.
- race_type - The classification of the race passed as as five characters. STK - Stakes, WCL - Waiver Claiming, WMC - Waiver Maiden Claiming, SST - Starter Stakes, SHP - Starter Handicap, CLM - Claiming, STR - Starter Allowance, AOC - Allowance Optionl Claimer, SOC - Starter Optional Claimer, MCL - Maiden Claiming, ALW - Allowance, MSW - Maiden Special Weight.
- purse - Purse in US dollars of the race passed as an money with two decimal places.
- post_time - Time of day the race began passed as 5 character. Example - 01220 would be 12:20.

##### nyra_tracking_table.csv

- track_id - 3 character id for the track the race took place at. AQU: Aqueduct, BEL: Belmont, SAR: Saratoga.
- race_date - date the race took place. YYYY-MM-DD.
- race_number - Number of the race. Passed as 3 characters but can be cast or converted to int for this data set.
- program_number - Program number of the horse in the race passed as 3 characters. Should remain 3 characters as it isn't limited to just numbers. Is essentially the unique identifier of the horse in the race.
- trakus_index - The common collection of point of the lat / long of the horse in the race passed as an integer. From what we can tell, it's collected every 0.25 seconds.
- latitude - The latitude of the horse in the race passed as a float.
- longitude - The longitude of the horse in the race passed as a float.

##### nyra_2019_complete.csv 
This file is the combined 3 files into one table. The keys to join them trakus with race - track_id, race_date, race_number. To join trakus with start - track_id, race_date, race_number, program_number.


In [6]:
df_race = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_race_table.csv', header=True)
df_start = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_start_table.csv', header=True)
df_tracking = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_tracking_table.csv', header=True)
df_complete = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_2019_complete.csv', header=True)

In [7]:
df_race.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race_number: string (nullable = true)
 |-- distance_id: string (nullable = true)
 |-- course_type: string (nullable = true)
 |-- track_condition: string (nullable = true)
 |-- run_up_distance: string (nullable = true)
 |-- race_type: string (nullable = true)
 |-- purse: string (nullable = true)
 |-- post_time: string (nullable = true)



In [8]:
df_race.show(5)

+--------+----------+-----------+-----------+-----------+---------------+---------------+---------+-----+---------+
|track_id| race_date|race_number|distance_id|course_type|track_condition|run_up_distance|race_type|purse|post_time|
+--------+----------+-----------+-----------+-----------+---------------+---------------+---------+-----+---------+
|     AQU|2019-01-01|          1|        650|          D|            MY |             36|      AOC|80000|    01220|
|     AQU|2019-01-01|          2|        600|          D|            MY |             48|      MCL|41000|    01250|
|     AQU|2019-01-01|          3|        550|          D|            MY |             54|      MCL|35000|    00121|
|     AQU|2019-01-01|          4|        900|          D|            MY |            101|      AOC|80000|    00150|
|     AQU|2019-01-01|          5|        700|          D|            MY |             60|      ALW|64000|    00220|
+--------+----------+-----------+-----------+-----------+---------------

In [9]:
df_start.printSchema()

root
 |-- AQU: string (nullable = true)
 |-- 2019-01-01: string (nullable = true)
 |-- 1: string (nullable = true)
 |-- 1  : string (nullable = true)
 |-- 123: string (nullable = true)
 |-- Dylan Davis: string (nullable = true)
 |-- 130: string (nullable = true)
 |-- 2: string (nullable = true)



In [10]:
df_start.show(5)

+---+----------+---+---+---+---------------+----+---+
|AQU|2019-01-01|  1|1  |123|    Dylan Davis| 130|  2|
+---+----------+---+---+---+---------------+----+---+
|AQU|2019-01-01|  1|2  |120|Junior Alvarado| 295|  3|
|AQU|2019-01-01|  1|3  |118|   Jose Lezcano| 180|  4|
|AQU|2019-01-01|  1|4  |123|   Jomar Garcia|1280|  5|
|AQU|2019-01-01|  1|5  |118|  Manuel Franco|1150|  1|
|AQU|2019-01-01|  2|1  |121|   Jose Lezcano| 220|  2|
+---+----------+---+---+---+---------------+----+---+
only showing top 5 rows



In [11]:
df_tracking.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race_number: string (nullable = true)
 |-- program_number: string (nullable = true)
 |-- trakus_index: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [12]:
df_tracking.show(5)

+--------+----------+-----------+--------------+------------+------------------+-------------------+
|track_id| race_date|race_number|program_number|trakus_index|          latitude|          longitude|
+--------+----------+-----------+--------------+------------+------------------+-------------------+
|     AQU|2019-01-01|          9|           6  |          72|  40.6729017197787|  -73.8276065972899|
|     AQU|2019-01-01|          9|           6  |          73|40.672945987033899|-73.827587266669497|
|     AQU|2019-01-01|          9|           6  |          74|  40.6729903067774|-73.827568024587293|
|     AQU|2019-01-01|          9|           6  |          63|40.672509778989301|-73.827781021601794|
|     AQU|2019-01-01|          9|           6  |          64|40.672552699664003|-73.827761550864395|
+--------+----------+-----------+--------------+------------+------------------+-------------------+
only showing top 5 rows



In [13]:
df_complete.printSchema()

root
 |-- AQU: string (nullable = true)
 |-- 2019-01-01: string (nullable = true)
 |-- 9: string (nullable = true)
 |-- 6  : string (nullable = true)
 |-- 72: string (nullable = true)
 |-- 40.6729017197787: string (nullable = true)
 |-- -73.8276065972899: string (nullable = true)
 |-- 600: string (nullable = true)
 |-- D: string (nullable = true)
 |-- GD : string (nullable = true)
 |-- 48: string (nullable = true)
 |-- CLM: string (nullable = true)
 |-- 25000.00: string (nullable = true)
 |-- 00420: string (nullable = true)
 |-- 120: string (nullable = true)
 |-- Andre Shivnarine Worrie: string (nullable = true)
 |-- 2090: string (nullable = true)
 |-- 8: string (nullable = true)



In [14]:
df_complete.show(5)

+---+----------+---+---+---+----------------+-----------------+---+---+---+---+---+--------+-----+---+-----------------------+----+---+
|AQU|2019-01-01|  9|6  | 72|40.6729017197787|-73.8276065972899|600|  D|GD | 48|CLM|25000.00|00420|120|Andre Shivnarine Worrie|2090|  8|
+---+----------+---+---+---+----------------+-----------------+---+---+---+---+---+--------+-----+---+-----------------------+----+---+
|AQU|2019-01-01|  9|6  | 73|40.6729459870339|-73.8275872666695|600|  D|GD | 48|CLM|25000.00|00420|120|   Andre Shivnarine ...|2090|  8|
|AQU|2019-01-01|  9|6  | 74|40.6729903067774|-73.8275680245873|600|  D|GD | 48|CLM|25000.00|00420|120|   Andre Shivnarine ...|2090|  8|
|AQU|2019-01-01|  9|6  | 63|40.6725097789893|-73.8277810216018|600|  D|GD | 48|CLM|25000.00|00420|120|   Andre Shivnarine ...|2090|  8|
|AQU|2019-01-01|  9|6  | 64| 40.672552699664|-73.8277615508644|600|  D|GD | 48|CLM|25000.00|00420|120|   Andre Shivnarine ...|2090|  8|
|AQU|2019-01-01|  9|6  | 65|40.6725958007316|-73

### Dataset cleaning and preparation

From a first look to the data files, we observe:

- Tables start, race and tracking share features: track_id, race_date, and race_number. Also, table start and trakcing share feature program_number.

- The table start and complete, do not contain the first row with the name of the columns. 

- All values are of type string, we need to reassign the correct type for each feature. Also, feature program_number seems to have values with a space at the end, we need to correct this. 




In [15]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType 
from pyspark.sql import functions as sf

#Creating schema for table with columns name and type
schema_start = StructType([
    StructField('track_id', StringType(),  True),
    StructField('race_date',StringType(),   True),
    StructField('race_number',IntegerType(),   True),
    StructField('program_number',StringType(),  True),
    StructField('weight_carried',IntegerType(),  True),
    StructField('jockey',StringType(),  True),
    StructField('odds',IntegerType(),  True),
     StructField('position_at_finish',IntegerType(),  True)
    ])

df_start = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_start_table.csv',header=False,schema=schema_start)

df_start = df_start.withColumn('program_number', sf.trim(df_start.program_number)) #eliminating space at the end of feature program_number
df_start = df_start.withColumn('program_number', df_start.program_number.cast(IntegerType()))

df_start.show(5)

+--------+----------+-----------+--------------+--------------+---------------+----+------------------+
|track_id| race_date|race_number|program_number|weight_carried|         jockey|odds|position_at_finish|
+--------+----------+-----------+--------------+--------------+---------------+----+------------------+
|     AQU|2019-01-01|          1|             1|           123|    Dylan Davis| 130|                 2|
|     AQU|2019-01-01|          1|             2|           120|Junior Alvarado| 295|                 3|
|     AQU|2019-01-01|          1|             3|           118|   Jose Lezcano| 180|                 4|
|     AQU|2019-01-01|          1|             4|           123|   Jomar Garcia|1280|                 5|
|     AQU|2019-01-01|          1|             5|           118|  Manuel Franco|1150|                 1|
+--------+----------+-----------+--------------+--------------+---------------+----+------------------+
only showing top 5 rows



In [16]:
df_start.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race_number: integer (nullable = true)
 |-- program_number: integer (nullable = true)
 |-- weight_carried: integer (nullable = true)
 |-- jockey: string (nullable = true)
 |-- odds: integer (nullable = true)
 |-- position_at_finish: integer (nullable = true)



In [17]:
schema_complete = StructType([
    StructField('track_id', StringType(),  True),
    StructField('race_date',StringType(),   True),
    StructField('race_number',IntegerType(),   True),
    StructField('program_number',StringType(),  True),
    StructField('trakus_index',IntegerType(),  True),
    StructField('latitude',DoubleType(),  True),
    StructField('longitude',DoubleType(),  True),
    StructField('distance_id',IntegerType(),  True),
    StructField('course_type',StringType(),  True),
    StructField('track_condition',StringType(),  True),
    StructField('run_up_distance',IntegerType(),  True),
    StructField('race_type',StringType(),  True),
    StructField('purse',DoubleType(),  True),
    StructField('post_time',IntegerType(),  True),
    StructField('weight_carried',IntegerType(),  True),
    StructField('jockey',StringType(),  True),
    StructField('odds',IntegerType(),  True),
     StructField('position_at_finish',IntegerType(),  True)
    ])

df_complete = spark.read.csv('/content/drive/My Drive/Big_Data/nyra_2019_complete.csv',header=False,schema=schema_complete)

df_complete = df_complete.withColumn('program_number', sf.trim(df_complete.program_number))
df_complete = df_complete.withColumn('program_number', df_complete.program_number.cast(IntegerType()))

#We will also create a new column that we will use later to join the different dataframes.

df_complete = df_complete.withColumn('ID_horse', 
                    sf.concat(sf.col('track_id'),sf.lit('_'), sf.col('race_date'),sf.lit('_'), sf.col('race_number'),sf.lit('_'), sf.col('program_number')))

df_complete.show(5)

+--------+----------+-----------+--------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+--------------------+----+------------------+------------------+
|track_id| race_date|race_number|program_number|trakus_index|        latitude|        longitude|distance_id|course_type|track_condition|run_up_distance|race_type|  purse|post_time|weight_carried|              jockey|odds|position_at_finish|          ID_horse|
+--------+----------+-----------+--------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+--------------------+----+------------------+------------------+
|     AQU|2019-01-01|          9|             6|          72|40.6729017197787|-73.8276065972899|        600|          D|            GD |             48|      CLM|25000.0|      420|           120|Andre Shivnarine ...|2090

In [18]:
df_complete.groupBy('track_id').count().show()

+--------+-------+
|track_id|  count|
+--------+-------+
|     SAR|1122927|
|     AQU|2158369|
|     BEL|1947134|
+--------+-------+



In [19]:
df_race = df_race.withColumn('race_number', df_race['race_number'].cast(IntegerType())) \
.withColumn('distance_id', df_race['distance_id'].cast(IntegerType())) \
      .withColumn('run_up_distance', df_race['run_up_distance'].cast(IntegerType())) \
      .withColumn('purse', df_race['purse'].cast(DoubleType())) \
      .withColumn('post_time', df_race['post_time'].cast(IntegerType()))
df_race.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race_number: integer (nullable = true)
 |-- distance_id: integer (nullable = true)
 |-- course_type: string (nullable = true)
 |-- track_condition: string (nullable = true)
 |-- run_up_distance: integer (nullable = true)
 |-- race_type: string (nullable = true)
 |-- purse: double (nullable = true)
 |-- post_time: integer (nullable = true)



In [20]:
df_tracking = df_tracking.withColumn('program_number', sf.trim(df_tracking.program_number))

df_tracking = df_tracking.withColumn('race_number', df_tracking['race_number'].cast(IntegerType())) \
      .withColumn('program_number', df_tracking['program_number'].cast(IntegerType())) \
      .withColumn('trakus_index', df_tracking['trakus_index'].cast(IntegerType())) \
      .withColumn('latitude', df_tracking['latitude'].cast(DoubleType())) \
      .withColumn('longitude', df_tracking['longitude'].cast(DoubleType()))
df_tracking.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race_number: integer (nullable = true)
 |-- program_number: integer (nullable = true)
 |-- trakus_index: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [21]:
#Checking for missing and null values in the dataframes
df_race.select([sf.count(sf.when(sf.isnan(c), c)).alias(c) for c in df_race.columns]).show()
df_start.select([sf.count(sf.when(sf.isnan(c), c)).alias(c) for c in df_start.columns]).show()
df_tracking.select([sf.count(sf.when(sf.isnan(c), c)).alias(c) for c in df_tracking.columns]).show()
df_complete.select([sf.count(sf.when(sf.isnan(c), c)).alias(c) for c in df_complete.columns]).show()

+--------+---------+-----------+-----------+-----------+---------------+---------------+---------+-----+---------+
|track_id|race_date|race_number|distance_id|course_type|track_condition|run_up_distance|race_type|purse|post_time|
+--------+---------+-----------+-----------+-----------+---------------+---------------+---------+-----+---------+
|       0|        0|          0|          0|          0|              0|              0|        0|    0|        0|
+--------+---------+-----------+-----------+-----------+---------------+---------------+---------+-----+---------+

+--------+---------+-----------+--------------+--------------+------+----+------------------+
|track_id|race_date|race_number|program_number|weight_carried|jockey|odds|position_at_finish|
+--------+---------+-----------+--------------+--------------+------+----+------------------+
|       0|        0|          0|             0|             0|     0|   0|                 0|
+--------+---------+-----------+--------------+-

### Extra Datasets: Injuries and Horses

We will add two extra datasets that complements the Big Data Derby 2022 database. These datasets cointain information about the horses(name and ID) and information about the horses that broken down, were injured or  died at New York State race tracks. 

More information about these datasets can be found in:
-  https://www.kaggle.com/datasets/mpwolke/cusersmarildownloadsequinecsv
- https://www.kaggle.com/datasets/themarkgreen/big-data-derby-2022-global-horse-ids-and-places?select=horse_ids.csv

#### Dataset: Injuries

In [22]:
schema_injuries = StructType([
    StructField('year', IntegerType(),  True),
    StructField('incident_date',StringType(),   True),
    StructField('incident_type',StringType(),   True),
    StructField('track',StringType(),  True),
    StructField('inv_location',StringType(),  True),
    StructField('racing_type_description',StringType(),  True),
    StructField('division',StringType(),  True),
    StructField('weather_conditions',StringType(),  True),
    StructField('horse',StringType(),  True),
    StructField('trainer',StringType(),  True),
    StructField('jockey_driver',StringType(),  True),
    StructField('incident_description',StringType(),  True),
    StructField('death_or_injury',StringType(),  True)
    ])

df_injuries = spark.read.csv('/content/drive/My Drive/Big_Data/equine.csv', sep=';', header=True, schema=schema_injuries)
df_injuries.show(5)


+----+-------------+-----------------+--------------------+------------+-----------------------+------------+------------------+---------------+--------------------+-------------+--------------------+---------------+
|year|incident_date|    incident_type|               track|inv_location|racing_type_description|    division|weather_conditions|          horse|             trainer|jockey_driver|incident_description|death_or_injury|
+----+-------------+-----------------+--------------------+------------+-----------------------+------------+------------------+---------------+--------------------+-------------+--------------------+---------------+
|2009|   03/04/2009|     EQUINE DEATH|Aqueduct Racetrac...|    Aqueduct|                 Racing|Thoroughbred|              null|Private Details|JOHN P. TERRANOVA II|         null|Private Details-T...|     Euthanasia|
|2009|   03/04/2009|ON-TRACK ACCIDENT|Aqueduct Racetrac...|        null|                 Racing|Thoroughbred|              null|Priv

In [23]:
df_injuries.printSchema()

root
 |-- year: integer (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- track: string (nullable = true)
 |-- inv_location: string (nullable = true)
 |-- racing_type_description: string (nullable = true)
 |-- division: string (nullable = true)
 |-- weather_conditions: string (nullable = true)
 |-- horse: string (nullable = true)
 |-- trainer: string (nullable = true)
 |-- jockey_driver: string (nullable = true)
 |-- incident_description: string (nullable = true)
 |-- death_or_injury: string (nullable = true)



In [24]:
#Filtering the data that is relevant to our data set
df_injuries = df_injuries.filter((sf.col('year') == 2019) & 
                   (sf.col('track').contains('NYRA')) & 
                   (sf.col('racing_type_description').contains('Racing')) &
                   (sf.col('jockey_driver').isNotNull()) &
                   (sf.col('horse').isNotNull())    
                   )


In [25]:
df_injuries.show(5)

+----+-------------+---------------+--------------------+--------------------+-----------------------+------------+------------------+--------------+--------------------+--------------------+--------------------+---------------+
|year|incident_date|  incident_type|               track|        inv_location|racing_type_description|    division|weather_conditions|         horse|             trainer|       jockey_driver|incident_description|death_or_injury|
+----+-------------+---------------+--------------------+--------------------+-----------------------+------------+------------------+--------------+--------------------+--------------------+--------------------+---------------+
|2019|   01/04/2019|RACING INCIDENT|Aqueduct Racetrac...|         Before wire|                 Racing|Thoroughbred|              null|         Fifty|   DAVID A. CANNIZZO|        JOSE LEZCANO|Fifty-T:  David C...|           null|
|2019|   01/06/2019|RACING INCIDENT|Aqueduct Racetrac...|                null|      

In [26]:
df_injuries = df_injuries.drop('year', 'trainer', 'incident_description', 'inv_location', 'racing_type_description', 'division', 'weather_conditions','death_or_injury')
df_injuries.printSchema()

root
 |-- incident_date: string (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- track: string (nullable = true)
 |-- horse: string (nullable = true)
 |-- jockey_driver: string (nullable = true)



Matching jockey name with the original dataset names

In [27]:
df_injuries = df_injuries.withColumn('jockey_driver',sf.initcap(sf.col('jockey_driver'))) #Changing format of names from Uppercase to Sentence Case
df_injuries.select(df_injuries.jockey_driver).distinct().show(df_injuries.count(),False)

+-------------------------+
|jockey_driver            |
+-------------------------+
|Christina Bonilla        |
|Julio A. Correa Alamo    |
|Carlos J. Hernandez **   |
|Luis A. Rodriguez  Castro|
|Christopher P. Decarlo   |
|Tyler S. Gaffalione      |
|Andrea C. Rodriguez-caez |
|Luis R. Reyes            |
|Gerard J. Galligan       |
|Javier J. Castellano     |
|Ricardo Santana Jr       |
|Manuel Franco            |
|Reyluis A. Gutierrez     |
|Samuel Camacho           |
|Kendrick Carmouche       |
|Irad Ortiz Jr            |
|John Bisono              |
|Talbert Howell           |
|Junior R. Alvarado       |
|Dylan Davis              |
|Michael D. Mitchell      |
|Eric Cancel              |
|Rajiv C. Maragh          |
|Pascasio(paco) Lopez     |
|Jose Lezcano             |
|Hector R. Diaz Jr        |
|Michael J. Luzzi         |
|Jose A. Baez             |
|Dyn O. Panell            |
|Pablo Fragoso            |
|Kirk Johnson             |
|Luis Saez                |
|Jose L. Ortiz      

In [28]:
df_injuries.select(df_injuries.jockey_driver).distinct().count()

37

In [29]:
df_start.select(df_start.jockey).distinct().show(df_start.count(),False)

+--------------------------+
|jockey                    |
+--------------------------+
|Shannon Uske              |
|Carlos J. Hernandez       |
|Jomar Torres              |
|Leonel Reyes              |
|Joel Sone                 |
|Sam Twiston-Davies        |
|Benjamin Hernandez        |
|Wayne Lordan              |
|Kieran Norris             |
|Virginia Tormey           |
|Junior Alvarado           |
|Declan Carroll            |
|Dana G. Whitney           |
|Flavien Prat              |
|Suguru Hamanaka           |
|Emanuel De Diego          |
|Channing Hill             |
|Andrea Atzeni             |
|Jose F. Rojas             |
|Reylu Gutierrez           |
|Rosario Montanez          |
|Mario Gutierrez           |
|Hector Rafael Diaz Jr.    |
|Rafael Manuel Hernandez   |
|Joseph Talamo             |
|Jomar Garcia              |
|Jose Antonio Gallego      |
|Wilmer A. Garcia          |
|Mike E. Smith             |
|Brian Joseph Hernandez Jr.|
|Jeremy Rose               |
|Victor Espino

In [30]:
jockey_dic = {'Reyluis A. Gutierrez' : 'Reylu Gutierrez',
               'Christina Bonilla' : 'Cristina Bonilla',
               'Dyn O. Panell' : 'Dyn Panell',
               'Benjamin Hernandez  Soto' : 'Benjamin Hernandez',
               'Junior R. Alvarado' : 'Junior Alvarado',
               'Carlos J. Hernandez **': 'Carlos J. Hernandez' , 
               'Christopher P. Decarlo' : 'Christopher P. DeCarlo',
               'Rajiv C. Maragh' : 'Rajiv Maragh',
               'Samuel Camacho' : 'Samuel Camacho Jr.',
               'Irad Ortiz Jr' : 'Irad Ortiz Jr.',
               'Javier J. Castellano' : 'Javier Castellano',
               'Andrea C. Rodriguez-Caez' : 'Andrea C. Rodriguez',
               'Jose A. Baez': 'Jose Baez', 
               'Chris M. Landeros' : 'Chris Landeros',
               'Ricardo Santana Jr' :'Ricardo Santana Jr.',
               'Tyler S. Gaffalione' : 'Tyler Gaffalione',
               'Pascasio(Paco) Lopez' :'Paco Lopez',
               'Luis A. Rodriguez  Castro' :  'Luis A. Rodriguez Castro',
               'Hector R. Diaz Jr' :  'Hector Rafael Diaz Jr.',
               'Julio A. Correa Alamo' : 'Julio Correa',}


df_injuries.replace(jockey_dic,1,'jockey_driver').show(5)



+-------------+---------------+--------------------+--------------+----------------+
|incident_date|  incident_type|               track|         horse|   jockey_driver|
+-------------+---------------+--------------------+--------------+----------------+
|   01/04/2019|RACING INCIDENT|Aqueduct Racetrac...|         Fifty|    Jose Lezcano|
|   01/06/2019|RACING INCIDENT|Aqueduct Racetrac...|    Letzgometz|     Dylan Davis|
|   01/06/2019|  RACING INJURY|Aqueduct Racetrac...|My Best Friend| Reylu Gutierrez|
|   01/11/2019|  FALL OF RIDER|Aqueduct Racetrac...|   LIA SHIVANI|Cristina Bonilla|
|   01/12/2019|   EQUINE DEATH|Aqueduct Racetrac...|    Westerdale|     Dylan Davis|
+-------------+---------------+--------------------+--------------+----------------+
only showing top 5 rows



In [31]:
df_injuries.select(df_injuries.jockey_driver).distinct().count() #Verifying no unique values were created or erased after replacing the values of the dataset with the diccionary

37

In [32]:
#Matching track name with the original dataset track names

track_dic = {'Aqueduct Racetrack (NYRA)' : 'AQU',
               'Belmont Park (NYRA)' : 'BEL',
               'Saratoga Racecourse (NYRA)' : 'SAR',}


df_injuries = df_injuries.replace(track_dic,1,'track')

df_injuries = df_injuries.withColumnRenamed('track',
                                            'track_id').withColumnRenamed('horse','horse_name').withColumnRenamed('jockey_driver','jockey')
df_injuries.show(5)

+-------------+---------------+--------+--------------+--------------------+
|incident_date|  incident_type|track_id|    horse_name|              jockey|
+-------------+---------------+--------+--------------+--------------------+
|   01/04/2019|RACING INCIDENT|     AQU|         Fifty|        Jose Lezcano|
|   01/06/2019|RACING INCIDENT|     AQU|    Letzgometz|         Dylan Davis|
|   01/06/2019|  RACING INJURY|     AQU|My Best Friend|Reyluis A. Gutierrez|
|   01/11/2019|  FALL OF RIDER|     AQU|   LIA SHIVANI|   Christina Bonilla|
|   01/12/2019|   EQUINE DEATH|     AQU|    Westerdale|         Dylan Davis|
+-------------+---------------+--------+--------------+--------------------+
only showing top 5 rows



#### Dataset: Horses

The dataset Horses contains two files: horse_ID which contains information about the horse's races (track_id, program_number, race_date, race_number), and horse_name, which contains the ID of the horse and its name. We will use the column ID_horse created before in table df_complete to merge all datasets.

In [33]:
schema_horses_name = StructType([
    StructField('_c0', IntegerType(),  True),
    StructField('horse_id', IntegerType(),  True),
    StructField('horse_name',StringType(),   True)
    ])

df_horses_name = spark.read.csv('/content/drive/My Drive/Big_Data/horse_names.csv', header=True, schema=schema_horses_name)
df_horses_name = df_horses_name.drop('_c0')
df_horses_name.show(5)

+--------+------------------+
|horse_id|        horse_name|
+--------+------------------+
|       0|Jc's Shooting Star|
|       1|  Sounds Delicious|
|       2|     Crimson Frost|
|       3| Friend of Liberty|
|       4|      Bobby's Song|
+--------+------------------+
only showing top 5 rows



In [34]:
df_horses_name.printSchema()

root
 |-- horse_id: integer (nullable = true)
 |-- horse_name: string (nullable = true)



In [35]:
schema_horses_id = StructType([
    StructField('_c0', IntegerType(),  True),
    StructField('track_id', StringType(),  True),
    StructField('race_date',StringType(),   True),
    StructField('race',IntegerType(),   True),
    StructField('program_number',StringType(),   True),
    StructField('horse_id',IntegerType(),   True),
    StructField('finishing_place',IntegerType(),   True)
    ])

df_horses_id = spark.read.csv('/content/drive/My Drive/Big_Data/horse_ids.csv', header=True, schema=schema_horses_id)

df_horses_id = df_horses_id.withColumn('program_number', sf.trim(df_horses_id.program_number))
df_horses_id = df_horses_id.withColumn('program_number', df_horses_id.program_number.cast(IntegerType()))

df_horses_id = df_horses_id.drop('_c0')
df_horses_id.show(5)

+--------+----------+----+--------------+--------+---------------+
|track_id| race_date|race|program_number|horse_id|finishing_place|
+--------+----------+----+--------------+--------+---------------+
|     AQU|2019-01-01|   1|             5|       0|              1|
|     AQU|2019-01-01|   1|             1|       1|              2|
|     AQU|2019-01-01|   1|             2|       2|              3|
|     AQU|2019-01-01|   1|             3|       3|              4|
|     AQU|2019-01-01|   1|             4|       4|              5|
+--------+----------+----+--------------+--------+---------------+
only showing top 5 rows



In [36]:
df_horses_id.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- race_date: string (nullable = true)
 |-- race: integer (nullable = true)
 |-- program_number: integer (nullable = true)
 |-- horse_id: integer (nullable = true)
 |-- finishing_place: integer (nullable = true)



In [37]:
df_horses = df_horses_id.join(df_horses_name, df_horses_id.horse_id == df_horses_name.horse_id, 'outer')

In [38]:
df_horses.show(5)

+--------+----------+----+--------------+--------+---------------+--------+------------------+
|track_id| race_date|race|program_number|horse_id|finishing_place|horse_id|        horse_name|
+--------+----------+----+--------------+--------+---------------+--------+------------------+
|     AQU|2019-01-01|   1|             5|       0|              1|       0|Jc's Shooting Star|
|     AQU|2019-12-07|   4|             5|       0|              5|       0|Jc's Shooting Star|
|     AQU|2019-12-20|   7|             5|       0|              6|       0|Jc's Shooting Star|
|     BEL|2019-05-27|   5|             4|       0|             10|       0|Jc's Shooting Star|
|     BEL|2019-06-15|   9|             1|       0|              2|       0|Jc's Shooting Star|
+--------+----------+----+--------------+--------+---------------+--------+------------------+
only showing top 5 rows



In [39]:
#Matching columnd ID_horse to join horse's information with original dataset
df_horses = df_horses.withColumn('ID_horse', 
                    sf.concat(sf.col('track_id'),sf.lit('_'), sf.col('race_date'),sf.lit('_'), sf.col('race'),sf.lit('_'),sf.col('program_number')))
df_horses = df_horses.withColumnRenamed('race',
                                        'race_number').withColumnRenamed('finishing_place','position_at_finish')

df_horses = df_horses.drop('horse_id')
df_horses.show(5)


+--------+----------+-----------+--------------+------------------+------------------+------------------+
|track_id| race_date|race_number|program_number|position_at_finish|        horse_name|          ID_horse|
+--------+----------+-----------+--------------+------------------+------------------+------------------+
|     AQU|2019-01-01|          1|             5|                 1|Jc's Shooting Star|AQU_2019-01-01_1_5|
|     AQU|2019-12-07|          4|             5|                 5|Jc's Shooting Star|AQU_2019-12-07_4_5|
|     AQU|2019-12-20|          7|             5|                 6|Jc's Shooting Star|AQU_2019-12-20_7_5|
|     BEL|2019-05-27|          5|             4|                10|Jc's Shooting Star|BEL_2019-05-27_5_4|
|     BEL|2019-06-15|          9|             1|                 2|Jc's Shooting Star|BEL_2019-06-15_9_1|
+--------+----------+-----------+--------------+------------------+------------------+------------------+
only showing top 5 rows



####Dataset preparation 

Merging table: Complete, injuries and horses.


In [40]:
df_injuries.count()

98

In [41]:
df_horses.count()

14916

In [42]:
df_inj_horses = df_injuries.join(df_horses, ['horse_name','track_id'] ,'left')
df_inj_horses.show(5)

+-----------+--------+-------------+---------------+-----------------+----------+-----------+--------------+------------------+------------------+
| horse_name|track_id|incident_date|  incident_type|           jockey| race_date|race_number|program_number|position_at_finish|          ID_horse|
+-----------+--------+-------------+---------------+-----------------+----------+-----------+--------------+------------------+------------------+
|      Fifty|     AQU|   01/04/2019|RACING INCIDENT|     Jose Lezcano|2019-01-04|          6|             1|                 7|AQU_2019-01-04_6_1|
|      Fifty|     AQU|   01/04/2019|RACING INCIDENT|     Jose Lezcano|2019-02-07|          8|             6|                 4|AQU_2019-02-07_8_6|
|      Fifty|     AQU|   01/04/2019|RACING INCIDENT|     Jose Lezcano|2019-02-28|          6|             2|                 6|AQU_2019-02-28_6_2|
|LIA SHIVANI|     AQU|   01/11/2019|  FALL OF RIDER|Christina Bonilla|      null|       null|          null|          

In [43]:
df_inj_horses.count()

194

In [44]:
df_injuries.select(sf.countDistinct('track_id')).show()

+------------------------+
|count(DISTINCT track_id)|
+------------------------+
|                       3|
+------------------------+



In [45]:
df_complete.count()

5228430

In [46]:
df_prepped = df_complete.join(df_inj_horses, ['ID_horse','track_id','race_date','race_number','program_number','jockey','position_at_finish'],how='left')
df_prepped.show(10)

+------------------+--------+----------+-----------+--------------+--------------------+------------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+----+----------+-------------+-------------+
|          ID_horse|track_id| race_date|race_number|program_number|              jockey|position_at_finish|trakus_index|        latitude|        longitude|distance_id|course_type|track_condition|run_up_distance|race_type|  purse|post_time|weight_carried|odds|horse_name|incident_date|incident_type|
+------------------+--------+----------+-----------+--------------+--------------------+------------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+----+----------+-------------+-------------+
|AQU_2019-01-01_9_6|     AQU|2019-01-01|          9|             6|Andre Shivnarine ...|               

In [47]:
df_prepped.count()

5228430

**Number of Days of rest after an Injury**

In [48]:
#Create new column regarding the number of days a horse rest after an injury
# "race date" vs "incident_date"
from pyspark.sql.functions import *
# Switch "incident_date" column to Date format
df_prepped =  df_prepped.withColumn( "incident_date_t", to_date(col("incident_date"),"MM/dd/yyyy") ) 
#Calculate the # of days a horse wait to race after having an injury
df_prepped = df_prepped.withColumn("days_from_injury", datediff(col("race_date"), col ("incident_date_t")) )
df_prepped.show()

+------------------+--------+----------+-----------+--------------+-----------+------------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+----+----------+-------------+-------------+---------------+----------------+
|          ID_horse|track_id| race_date|race_number|program_number|     jockey|position_at_finish|trakus_index|        latitude|        longitude|distance_id|course_type|track_condition|run_up_distance|race_type|  purse|post_time|weight_carried|odds|horse_name|incident_date|incident_type|incident_date_t|days_from_injury|
+------------------+--------+----------+-----------+--------------+-----------+------------------+------------+----------------+-----------------+-----------+-----------+---------------+---------------+---------+-------+---------+--------------+----+----------+-------------+-------------+---------------+----------------+
|AQU_2019-04-14_7_9|     AQU|20

In [49]:
#Display unique values regarding variable ("days_from_injury")
df_prepped.select("days_from_injury").distinct().show()
# If number of days from injury is negative that means that the  injury happened after the race
# We might expect that negative values and large # of days of rest be associated with good performance
# while 0 and small values for the variable be associated with poor performance

+----------------+
|days_from_injury|
+----------------+
|            -145|
|            null|
|            -126|
|             -77|
|            -104|
|            -266|
|             -22|
|            -118|
|             -29|
|               8|
|             -91|
|            -299|
|             -34|
|             -56|
|            -112|
|            -310|
|             -28|
|               0|
+----------------+



**Selection of Potential Predictors**

In [50]:
# Initially we will include most variables. we will only exclude variables which info is better represented by other variables
# Later on, we will perform feature selection to find opportunities of reducing the # of predictors
non_predictors = ["race_date", "race_number", "program_number","trakus_index", "latitude", "longitude", "post_time" , "horse_name", "incident_date","incident_date_t"]
predictors = [item for item in df_prepped.columns if item not in non_predictors]

In [51]:
display(predictors)

['ID_horse',
 'track_id',
 'jockey',
 'position_at_finish',
 'distance_id',
 'course_type',
 'track_condition',
 'run_up_distance',
 'race_type',
 'purse',
 'weight_carried',
 'odds',
 'incident_type',
 'days_from_injury']

In [52]:
df_predictors = df_prepped.select(predictors)
#remove duplicates
df_predictors = df_predictors.dropDuplicates()

In [53]:
df_predictors.show(10)

+-------------------+--------+--------------------+------------------+-----------+-----------+---------------+---------------+---------+--------+--------------+----+-------------+----------------+
|           ID_horse|track_id|              jockey|position_at_finish|distance_id|course_type|track_condition|run_up_distance|race_type|   purse|weight_carried|odds|incident_type|days_from_injury|
+-------------------+--------+--------------------+------------------+-----------+-----------+---------------+---------------+---------+--------+--------------+----+-------------+----------------+
| AQU_2019-11-10_8_7|     AQU|        Joel Rosario|                 6|        700|          D|            FT |             48|      STK|150000.0|           120| 900|         null|            null|
| BEL_2019-06-07_6_1|     BEL|        Joel Rosario|                 7|        800|          T|            FM |             74|      AOC| 97000.0|           118|1550|         null|            null|
| BEL_2019-06-0

**Replace Null values with Median (var: "Days from Injury")**

In [54]:
#Convert days from injury column to pandas for calculating median
import pyspark.pandas as ps
variable = df_predictors.select("days_from_injury")
pandasDF = variable.to_pandas_on_spark()



In [55]:
# Median Calculation
import numpy as np

median = pandasDF.median().to_numpy()
median = np.float64(median)
display (median)



0.0

In [56]:
#Replace null values with median
df_predictors = df_predictors.na.fill({"days_from_injury" : median})

In [57]:
df_predictors.show()

+-------------------+--------+--------------------+------------------+-----------+-----------+---------------+---------------+---------+--------+--------------+----+-------------+----------------+
|           ID_horse|track_id|              jockey|position_at_finish|distance_id|course_type|track_condition|run_up_distance|race_type|   purse|weight_carried|odds|incident_type|days_from_injury|
+-------------------+--------+--------------------+------------------+-----------+-----------+---------------+---------------+---------+--------+--------------+----+-------------+----------------+
| AQU_2019-11-10_8_7|     AQU|        Joel Rosario|                 6|        700|          D|            FT |             48|      STK|150000.0|           120| 900|         null|               0|
| BEL_2019-06-07_6_1|     BEL|        Joel Rosario|                 7|        800|          T|            FM |             74|      AOC| 97000.0|           118|1550|         null|               0|
| BEL_2019-06-0

In [66]:
df_predictors.write.option("header",True).mode("overwrite")\
 .csv("/content/drive/My Drive/Big_Data/data.csv")

**Preparing string Variables for Feature Selection**

In [None]:
# Handling Categorical Variables
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

# conversion of string to 1, 2 3 ... categorical format
indexers = [StringIndexer(inputCol=column, outputCol=column+"_c").setHandleInvalid("keep").fit(df_predictors) for column in ["track_id","jockey","course_type","track_condition", "race_type", "incident_type"]]

pipeline = Pipeline(stages=indexers)
df_predictors = pipeline.fit(df_predictors).transform(df_predictors)

In [None]:
# Drop variables that has been replaced by categorical counterpart
df_predictors = df_predictors.drop("track_id","jockey","course_type","track_condition", "race_type", "incident_type")

In [None]:
# Create feature vector for feature selection
# Note that ChiSquare Selector does not work on One Hot Encoded Variables
from pyspark.ml.feature import VectorAssembler
# Features start from index 2 and on as index 0 and 1 represent ID and target variable respectively
assembler = (VectorAssembler()
             .setInputCols(df_predictors.columns[2:])
             .setOutputCol("features")
  )

feature_vector = assembler.transform(df_predictors)

In [None]:
feature_vector.show()

+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+----------+--------+-------------+-----------------+-----------+---------------+--------------------+
|           ID_horse|position_at_finish|distance_id|run_up_distance|   purse|weight_carried|odds|days_from_injury|track_id_c|jockey_c|course_type_c|track_condition_c|race_type_c|incident_type_c|            features|
+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+----------+--------+-------------+-----------------+-----------+---------------+--------------------+
| AQU_2019-11-10_8_7|                 6|        700|             48|150000.0|           120| 900|               0|       0.0|     9.0|          0.0|              0.0|        3.0|            4.0|[700.0,48.0,15000...|
| BEL_2019-06-07_6_1|                 7|        800|             74| 97000.0|           118|1550|               0|       1.0|     9.0|  

**Feature Selection**

In [None]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# Set fpr = 0.05. There is a 95% chance that a predictor provides value towards inferance of the target variable
selector = ChiSqSelector(fpr = 0.05, featuresCol= "features", outputCol= "selectedFeatures", labelCol = "position_at_finish")
model=selector.fit(feature_vector)
result = model.transform(feature_vector)

In [None]:
#Feature index start at 2 as the first 2 indexes are for ID and Target Variable respectively
index_selected =  [x+2 for x in model.selectedFeatures]
display (index_selected)

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]

In [None]:
# Display features selected
selected_vars= np.array (df_predictors.columns)[index_selected]
display(selected_vars)

array(['distance_id', 'run_up_distance', 'purse', 'weight_carried',
       'odds', 'days_from_injury', 'track_id_c', 'jockey_c',
       'course_type_c', 'track_condition_c', 'race_type_c',
       'incident_type_c'], dtype='<U18')

**Preparing Variables for Machine Learning Algorithms**


In [None]:
#onehotencoder 
onehotencoder_cat_vector = [OneHotEncoder(inputCol= column, outputCol= column+"e") for column in ["track_id_c", "jockey_c", "course_type_c","track_condition_c", "race_type_c", "incident_type_c" ]]
pipeline_e = Pipeline(stages=onehotencoder_cat_vector)
df_predictors = pipeline_e.fit(df_predictors).transform(df_predictors)

In [None]:
# Drop Variables that were Encoded
df_predictors = df_predictors.drop("track_id_c", "jockey_c", "course_type_c","track_condition_c", "race_type_c", "incident_type_c")

In [None]:
# Create feature Matrix for ML Algorithns
# Features are included according to ChiSquareSelector
assembler = (VectorAssembler()
             .setInputCols(selected_vars)
             .setOutputCol("SelectedFeatures")
  )

feature_matrix = assembler.transform(df_predictors)

In [None]:
# Split Dataset
train, test = feature_matrix.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
train.cache()
test.cache()

DataFrame[ID_horse: string, position_at_finish: int, distance_id: int, run_up_distance: int, purse: double, weight_carried: int, odds: int, days_from_injury: int, track_id_ce: vector, jockey_ce: vector, course_type_ce: vector, track_condition_ce: vector, race_type_ce: vector, incident_type_ce: vector, SelectedFeatures: vector]

In [None]:
train.show()

+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+-------------+----------------+--------------+------------------+--------------+----------------+--------------------+
|           ID_horse|position_at_finish|distance_id|run_up_distance|   purse|weight_carried|odds|days_from_injury|  track_id_ce|       jockey_ce|course_type_ce|track_condition_ce|  race_type_ce|incident_type_ce|    SelectedFeatures|
+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+-------------+----------------+--------------+------------------+--------------+----------------+--------------------+
| AQU_2019-01-25_7_6|                 4|        800|             52| 67000.0|           123|1020|               0|(3,[0],[1.0])| (178,[2],[1.0])| (5,[0],[1.0])|     (7,[2],[1.0])|(12,[4],[1.0])|       (4,[],[])|(215,[0,1,2,3,4,6...|
| AQU_2019-02-02_7_6|                 5|        600|             42|

**Scale Variables**

In [None]:
from pyspark.ml.feature import StandardScaler
# Let us create an object of StandardScaler class
Scaler=StandardScaler().setInputCol("SelectedFeatures").setOutputCol("ScaledFeatures")
train = Scaler.fit(train).transform(train)
test = Scaler.fit(test).transform(test)

In [None]:
test.show()

+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+-------------+----------------+--------------+------------------+--------------+----------------+--------------------+--------------------+
|           ID_horse|position_at_finish|distance_id|run_up_distance|   purse|weight_carried|odds|days_from_injury|  track_id_ce|       jockey_ce|course_type_ce|track_condition_ce|  race_type_ce|incident_type_ce|    SelectedFeatures|      ScaledFeatures|
+-------------------+------------------+-----------+---------------+--------+--------------+----+----------------+-------------+----------------+--------------+------------------+--------------+----------------+--------------------+--------------------+
|AQU_2019-04-06_11_5|                11|        800|             54| 75000.0|           126| 800|               0|(3,[0],[1.0])| (178,[2],[1.0])| (5,[0],[1.0])|     (7,[0],[1.0])|(12,[5],[1.0])|       (4,[],[])|(215,[0,1,2,3,4,6...|(215,[

**Random Forest Classifier**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="position_at_finish", featuresCol="ScaledFeatures")
rf_model = rf.fit(train)
rf_prediction = rf_model.transform(test)
rf_prediction.select("prediction", "position_at_finish").show()

+----------+------------------+
|prediction|position_at_finish|
+----------+------------------+
|       1.0|                11|
|       5.0|                 4|
|       4.0|                 5|
|       4.0|                 3|
|       4.0|                 1|
|       1.0|                 1|
|       4.0|                 1|
|       4.0|                 5|
|       1.0|                 7|
|       1.0|                 2|
|       1.0|                 6|
|       1.0|                 2|
|       2.0|                 3|
|       1.0|                 1|
|       6.0|                 7|
|       4.0|                 5|
|       2.0|                 5|
|       1.0|                 7|
|       5.0|                 1|
|       4.0|                 5|
+----------+------------------+
only showing top 20 rows



In [None]:
rf_model.featureImportances

SparseVector(215, {0: 0.0215, 1: 0.0384, 2: 0.0233, 3: 0.0794, 4: 0.2934, 5: 0.0006, 6: 0.0106, 7: 0.0078, 8: 0.0178, 9: 0.0247, 10: 0.004, 11: 0.017, 13: 0.0195, 15: 0.0061, 16: 0.0205, 17: 0.0021, 18: 0.009, 19: 0.0086, 20: 0.0078, 21: 0.0021, 22: 0.0006, 23: 0.0014, 24: 0.0066, 25: 0.015, 26: 0.0108, 27: 0.0074, 28: 0.0097, 29: 0.0159, 30: 0.0015, 31: 0.0009, 33: 0.0028, 35: 0.0017, 36: 0.0019, 39: 0.001, 40: 0.0074, 41: 0.0037, 42: 0.004, 44: 0.0043, 47: 0.0012, 48: 0.0012, 49: 0.0029, 51: 0.0019, 53: 0.0031, 55: 0.0056, 60: 0.0049, 68: 0.0024, 71: 0.0016, 76: 0.0013, 79: 0.0006, 80: 0.0036, 84: 0.0047, 87: 0.0011, 88: 0.0025, 97: 0.003, 99: 0.006, 102: 0.0015, 118: 0.0024, 130: 0.002, 134: 0.0018, 142: 0.0008, 143: 0.0015, 156: 0.0007, 182: 0.0046, 187: 0.0553, 188: 0.0084, 189: 0.0095, 190: 0.0093, 192: 0.023, 193: 0.029, 194: 0.0121, 195: 0.0046, 196: 0.0063, 197: 0.0066, 198: 0.0008, 199: 0.005, 200: 0.0081, 201: 0.0212, 202: 0.0086, 203: 0.0049, 204: 0.0033, 205: 0.0023, 206: 