In [6]:
spark

In [7]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np

In [8]:
file_path = 'gs://my-project-bucket-px/cleaned/cleaned_matchdata.parquet'
parquet_df = spark.read.parquet(file_path)

sdf = spark.createDataFrame(parquet_df.rdd, parquet_df.schema)

sdf.show()
sdf.printSchema()



+-----+---+-----+--------------+---------+---------------+--------------+-------------------+-----------------+----------------+------+
|index| hp|armor|equipmentValue|hasHelmet|num_enemy_alive|num_team_alive|enemy_in_range_2000|team_in_range_500|hp_closest_enemy|isDead|
+-----+---+-----+--------------+---------+---------------+--------------+-------------------+-----------------+----------------+------+
|    0|100|    0|           400|    false|              5|             5|                  0|                5|             100|   0.0|
|    1|100|    0|           400|    false|              5|             5|                  0|                4|             100|   0.0|
|    2|100|    0|           400|    false|              5|             5|                  0|                2|             100|   0.0|
|    3|100|    0|           400|    false|              5|             5|                  0|                2|             100|   0.0|
|    4|100|    0|           400|    false|      

                                                                                

In [9]:
sdf.sample(False, 0.2)

DataFrame[index: bigint, hp: bigint, armor: bigint, equipmentValue: bigint, hasHelmet: boolean, num_enemy_alive: bigint, num_team_alive: bigint, enemy_in_range_2000: bigint, team_in_range_500: bigint, hp_closest_enemy: bigint, isDead: double]

In [10]:
# Split the data into training and test sets
trainingData, testData = sdf.randomSplit([0.70, 0.3], seed=42)

feature_cols = [
    "hp",
    "armor",
    "equipmentValue",
    "hasHelmet",
    "num_enemy_alive",
    "num_team_alive",
    "enemy_in_range_2000",
    "team_in_range_500",
    "hp_closest_enemy"
]

# Create an assembler for the individual feature vectors and the float/double columns
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

scaler = MinMaxScaler(inputCol='features', outputCol="featuresScaled")

# Create a Linear Regression Estimator
linear_reg = LinearRegression(featuresCol='featuresScaled', labelCol='isDead')

# Create a regression evaluator (to get RMSE, R2, RME, etc.)
evaluator = RegressionEvaluator(labelCol='isDead')

# Create the pipeline   Indexer is stage 0 and Linear Regression (linear_reg)  is stage 3
regression_pipe = Pipeline(stages=[assembler, scaler, linear_reg])

# Create a grid to hold hyperparameters 
grid = ParamGridBuilder()

# Build the parameter grid
grid = grid.build()

# Create the CrossValidator using the hyperparameter grid
cv = CrossValidator(estimator=regression_pipe, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator, 
                    numFolds=3)

# Train the models
all_models  = cv.fit(trainingData)

# Show the average performance over the three folds
print(f"Average metric {all_models.avgMetrics}")

# Get the best model from all of the models trained
bestModel = all_models.bestModel

# Use the model 'bestModel' to predict the test set
test_results = bestModel.transform(testData)
# Show the predicted tip
test_results.select('hp', 'armor', 'equipmentValue', 'hasHelmet', 'num_enemy_alive', 'num_team_alive',
                    'enemy_in_range_2000', 'team_in_range_500', 'hp_closest_enemy', 'isDead', 'prediction').show(truncate=False)
# Calculate RMSE and R2
rmse = evaluator.evaluate(test_results, {evaluator.metricName:'rmse'})
r2 =evaluator.evaluate(test_results,{evaluator.metricName:'r2'})
print(f"RMSE: {rmse}  R-squared:{r2}")

24/04/20 22:01:33 WARN Instrumentation: [aa3ac30b] regParam is zero, which might cause numerical instability and overfitting.
24/04/20 22:03:58 WARN Instrumentation: [7424640a] regParam is zero, which might cause numerical instability and overfitting.
24/04/20 22:11:23 WARN Instrumentation: [b80b1ff6] regParam is zero, which might cause numerical instability and overfitting.
24/04/20 22:18:01 WARN Instrumentation: [a90f2b43] regParam is zero, which might cause numerical instability and overfitting.
24/04/20 22:24:28 WARN Instrumentation: [265df4ab] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Average metric [0.11376746050628334]


                                                                                

+---+-----+--------------+---------+---------------+--------------+-------------------+-----------------+----------------+------+-------------------+
|hp |armor|equipmentValue|hasHelmet|num_enemy_alive|num_team_alive|enemy_in_range_2000|team_in_range_500|hp_closest_enemy|isDead|prediction         |
+---+-----+--------------+---------+---------------+--------------+-------------------+-----------------+----------------+------+-------------------+
|100|0    |400           |false    |5              |5             |0                  |2                |100             |0.0   |0.18538702604823803|
|100|0    |400           |false    |5              |5             |0                  |2                |100             |0.0   |0.18538702604823803|
|100|0    |400           |false    |5              |5             |0                  |2                |100             |0.0   |0.18538702604823803|
|100|0    |400           |false    |5              |5             |2                  |2            



RMSE: 0.1138158899921759  R-squared:0.6410652558910722


                                                                                

In [14]:
model_path = "gs://my-project-bucket-px/models/csgo_linear_regression_model"
bestModel.write().overwrite().save(model_path)

                                                                                

In [18]:
trust_path = "gs://my-project-bucket-px/trusted/matchdataFeatures.parquet"
test_results.write.parquet(trust_path)

                                                                                