In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=4462a157805dd4f3f6c4f0c86e478d6146af266cd0099abab4b7a080bfc744ce
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Window
from pyspark.sql.functions import rank
from pyspark.sql import SparkSession

In [67]:
spark = SparkSession.builder.appName("Formula1").getOrCreate()

#Carrengando datasets
results_df = spark.read.csv("/content/results.csv", header=True, inferSchema=True)
races_df = spark.read.csv("/content/races.csv", header=True, inferSchema=True)
constructors_df = spark.read.csv("/content/constructors.csv", header=True, inferSchema=True)

In [68]:
#Renomeando colunas existentes em mais de um datasets
results_df = results_df.withColumnRenamed("time", "race_time") \
                       .withColumnRenamed("name", "constructor_name") \
                       .withColumnRenamed("url", "result_url")
races_df = races_df.withColumnRenamed("time", "race_start_time") \
                   .withColumnRenamed("name", "race_name") \
                   .withColumnRenamed("url", "race_url")

In [69]:
#JOIN dos Datasets
df = results_df.join(races_df, on="raceId", how="inner") \
               .join(constructors_df, on="constructorId", how="inner")

#Classificação por raceId e positionOrder
window = Window.partitionBy("raceId").orderBy("positionOrder")

df = df.withColumn("rank", rank().over(window))

#Iniciando tratamento dos dados
#Tratando coluna de milliseconds para um tipo numerico no caso utilizei o float
df = df.withColumn("milliseconds", col("milliseconds").cast("float"))
#Removendo linhas com valores nulos e para zero
df = df.na.drop()
df = df.fillna(0)

In [70]:
#Criando colunas de target para predição onde irei tentar prever qual o numero do piloto estará no podium de acordo com o circuito
df = df.withColumn("target_1st", when(col("rank") == 1, col("number")).otherwise(0)) \
       .withColumn("target_2nd", when(col("rank") == 2, col("number")).otherwise(0)) \
       .withColumn("target_3rd", when(col("rank") == 3, col("number")).otherwise(0))

In [71]:
#Filtrando os casos nulos para as colunas targets
df = df.filter((col("target_1st").isNotNull()) | (col("target_2nd").isNotNull()) | (col("target_3rd").isNotNull()))

In [72]:
#Transformando os valores nulos para zero
df = df.fillna(0)

In [73]:
#Tipando os campos targets para numericos, no caso utilizei tipo inteiro
df = df.withColumn("target_1st", col("target_1st").cast("int"))
df = df.withColumn("target_2nd", col("target_2nd").cast("int"))
df = df.withColumn("target_3rd", col("target_3rd").cast("int"))

In [74]:
#Foi necessario incluir o filtro para <= 100 pois estava recebendo erro pois os valores de target precisava ser de 0 a 100
df = df.filter((col("target_1st") <= 100))
df = df.filter((col("target_2nd") <= 100))
df = df.filter((col("target_3rd") <= 100))

In [75]:
#Filtrando apenas os dados do pódio, as 3 primeiras posicoes
df = df.filter(col("rank") <= 3)

In [76]:
#Filtrando periodo dos ultimos 5 anos
df = df.filter((col("year") >= 2019))

In [77]:
df.show()

+-------------+------+--------+--------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+----+-----+---------+--------------------+----------+---------------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+--------------+--------+-----------+--------------------+----------+----------+----------+
|constructorId|raceId|resultId|driverId|number|grid|position|positionText|positionOrder|points|laps|  race_time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|year|round|circuitId|           race_name|      date|race_start_time|            race_url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|constructorRef|    name|nationality|                 url|target_1st|target_2nd|target_3rd|
+-------------+------+--------+--------+------+----+--------+------------+--

In [78]:
#Indexar colunas categóricas (ex: circuitId, number)
indexer_constructor = StringIndexer(inputCol="number", outputCol="number_index")
indexer_circuit = StringIndexer(inputCol="circuitId", outputCol="circuit_index")

In [79]:
#VectorAssembler para combinar features
assembler = VectorAssembler(
    inputCols=["grid", "laps", "milliseconds", "circuit_index"],
    outputCol="features"
)

In [80]:
#Modelo RandomForest para prever o 1º lugar, precisei incluir o parametro maxBins=100 pois estava recebendo erro onde ultrapassava maxBins de 32
rf_1st = RandomForestClassifier(featuresCol="features", labelCol="target_1st", predictionCol="prediction_1st", maxBins=100)

In [81]:
#Criando o Pipeline
pipeline = Pipeline(stages=[indexer_constructor, indexer_circuit, assembler, rf_1st])

In [82]:
#Dividindo os dados em treino e teste 80% e 20%
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [83]:
#Treinando o modelo
model_1st = pipeline.fit(train_data)

In [84]:
#Realizando previsões baseado no treinamento do modelo
predictions_1st = model_1st.transform(test_data)

In [85]:
#Avaliando o modelo usando precisão para 1º lugar
evaluator = MulticlassClassificationEvaluator(labelCol="target_1st", predictionCol="prediction_1st", metricName="accuracy")
accuracy_1st = evaluator.evaluate(predictions_1st)
print(f"Precisao do modelo para previsao do 1st lugar: {accuracy_1st}")

Precisao do modelo para previsao do 1st lugar: 0.7222222222222222


# Previsao para 2nd lugar

In [86]:
#Modelo RandomForest para prever o 2º lugar
rf_2nd = RandomForestClassifier(featuresCol="features", labelCol="target_2nd", predictionCol="prediction_2nd", maxBins=100)

In [87]:
#Criando o Pipeline para prever o 2º lugar
pipeline_2nd = Pipeline(stages=[indexer_constructor, indexer_circuit, assembler, rf_2nd])

In [88]:
#Dividindo os dados em treino e teste 80% e 20%
train_data_2nd, test_data_2nd = df.randomSplit([0.8, 0.2], seed=42)

In [89]:
#Treinando o modelo
model_2nd = pipeline_2nd.fit(train_data_2nd)

In [90]:
#Realizando previsões baseado no treinamento do modelo
predictions_2nd = model_2nd.transform(test_data_2nd)

In [91]:
#Avaliando o modelo usando precisão para 2º lugar
evaluator_2nd = MulticlassClassificationEvaluator(labelCol="target_2nd", predictionCol="prediction_2nd", metricName="accuracy")
accuracy_2nd = evaluator_2nd.evaluate(predictions_2nd)
print(f"Precisao do modelo para previsao do 2nd lugar: {accuracy_2nd}")


Precisao do modelo para previsao do 2nd lugar: 0.7037037037037037


# Previsao para 3rd lugar

In [92]:
#Modelo RandomForest para prever o 3º lugar
rf_3rd = RandomForestClassifier(featuresCol="features", labelCol="target_3rd", predictionCol="prediction_3rd", maxBins=100)

In [93]:
#Criando o Pipeline
pipeline_3rd = Pipeline(stages=[indexer_constructor, indexer_circuit, assembler, rf_3rd])

In [94]:
#Dividindo os dados em treino e teste 80% e 20%
train_data_3rd, test_data_3rd = df.randomSplit([0.8, 0.2], seed=42)

In [95]:
#Treinando o modelo
model_3rd = pipeline_3rd.fit(train_data_3rd)

In [96]:
#Realizando previsões baseado no treinamento do modelo
predictions_3rd = model_3rd.transform(test_data_3rd)

In [97]:
#Avaliando o modelo usando precisão para 3º lugar
evaluator_3rd = MulticlassClassificationEvaluator(labelCol="target_3rd", predictionCol="prediction_3rd", metricName="accuracy")
accuracy_3rd = evaluator_3rd.evaluate(predictions_3rd)
print(f"Precisao do modelo para previsao do 3rd lugar: {accuracy_3rd}")


Precisao do modelo para previsao do 3rd lugar: 0.6111111111111112
