In [1]:
# !pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from csv import reader
from pyspark.sql.types import DoubleType,StringType,IntegerType,DateType
from pyspark.sql.functions import col,when,count,desc
from pyspark.sql.functions import format_string
import sys
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
sc=SparkContext.getOrCreate()
spark = SparkSession(sc)

ratings = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
          .option("database", "IPL").option("collection", "ratings").load()


In [2]:
ratings=ratings.drop("_id")
ratings.dtypes
ratings=ratings.withColumnRenamed("label","type")
ratings.dtypes

In [3]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

from distutils.version import LooseVersion

categoricalColumns = ["Batting_Hand", "Bowling_Skill", "Player_Name", "type"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol="Buy_Type", outputCol="label")
stages += [label_stringIdx]

In [4]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "EconomyRate", "RatingNorm", "StrikeRate", "runs", "wickets"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [5]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(ratings)
preppedDataDF = pipelineModel.transform(ratings)


In [6]:
preppedDataDF.select(["Buy_Type","label"]).show()

In [7]:
(trainingData, testData) = preppedDataDF.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

In [8]:
from pyspark.ml.classification import RandomForestClassifier
# from pyspark.mllib.tree import RandomForest, RandomForestModel

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# model = RandomForest.trainClassifier(trainingData, numClasses=4,categoricalFeaturesInfo={},numTrees=3, featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [9]:
predictions = rfModel.transform(testData)
predictions.printSchema()
# RandomForestClassificationModel.transform

In [10]:
selected = predictions.select("label", "prediction", "probability", "Player_Name", "Buy_Type")
display(selected)

label,prediction,probability,Player_Name,Buy_Type
0.0,0.0,"List(1, 4, List(), List(0.7799185243054638, 0.15632476450814703, 0.04737766328288053, 0.016379047903508675))",WPUJC Vaas,Avoid
2.0,2.0,"List(1, 4, List(), List(0.1848933440203622, 0.24904381185338637, 0.364139117914291, 0.20192372621196036))",MJ McClenaghan,Good Buy
2.0,2.0,"List(1, 4, List(), List(0.11519023399114783, 0.23931458351279292, 0.461955953878161, 0.18353922861789826))",Mustafizur Rahman,Good Buy
2.0,2.0,"List(1, 4, List(), List(0.1809347859227832, 0.2879938063854808, 0.3762644014590123, 0.15480700623272378))",BE Hendricks,Good Buy
1.0,0.0,"List(1, 4, List(), List(0.5639539573088351, 0.38389202493395497, 0.04257107492077839, 0.009582942836431418))",BB Sran,Maybe
0.0,0.0,"List(1, 4, List(), List(0.6658635117219838, 0.2469881602195348, 0.055053010007463245, 0.032095318051018215))",S Aravind,Avoid
0.0,0.0,"List(1, 4, List(), List(0.8242512748882145, 0.13104650110488364, 0.032954586359803605, 0.011747637647098422))",SB Wagh,Avoid
2.0,2.0,"List(1, 4, List(), List(0.254673071932518, 0.24800453802262518, 0.3297195851431966, 0.16760280490166013))",Sohail Tanvir,Good Buy
0.0,0.0,"List(1, 4, List(), List(0.7706378225510779, 0.17610546626253304, 0.04021099661621387, 0.013045714570175346))",S Chanderpaul,Avoid
2.0,1.0,"List(1, 4, List(), List(0.22478539406660528, 0.3173053595178476, 0.31240298956457646, 0.14550625685097063))",PP Chawla,Good Buy


In [12]:
predictions.select("prediction").distinct().show()

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [14]:
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(lrModel.summary.roc.select('FPR').collect(),
         lrModel.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

In [15]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("label","Player_Name","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [16]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)