In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('Utkarsh Kuchhal Practise').getOrCreate()

In [4]:
df_pyspark=spark.read.option('header','true').csv('ind-ban-comment.csv',inferSchema=True)

In [5]:
df_pyspark.show()

+-------+-----------------+------+------------------+--------------------+------+---------+---+------+----------+--------+----+----+-------------------+
|Batsman|     Batsman_Name|Bowler|       Bowler_Name|          Commentary|Detail|Dismissed| Id|Isball|Isboundary|Iswicket|Over|Runs|          Timestamp|
+-------+-----------------+------+------------------+--------------------+------+---------+---+------+----------+--------+----+----+-------------------+
|  28994|   Mohammed Shami| 63881| Mustafizur Rahman|OUT! Bowled! 5-fe...|     W|    28994|346|  true|      null|       1|49.6|   0|2019-07-02 13:18:47|
|   5132|Bhuvneshwar Kumar| 63881| Mustafizur Rahman|WIDE AND RUN OUT!...|  W+wd|     5132|344|  true|      null|       1|49.6|   1|2019-07-02 13:17:28|
|  28994|   Mohammed Shami| 63881| Mustafizur Rahman|Back of a length ...|  null|     null|343|  true|      null|    null|49.5|   1|2019-07-02 13:16:03|
|   5132|Bhuvneshwar Kumar| 63881| Mustafizur Rahman|Just 1 run off th...|  null| 

In [6]:
df_pyspark.printSchema()

root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: integer (nullable = true)
 |-- Iswicket: integer (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [7]:
df_pyspark.describe().show()

+-------+------------------+-----------------+------------------+-----------------+--------------------+------+------------------+-----------------+----------+--------+------------------+------------------+
|summary|           Batsman|     Batsman_Name|            Bowler|      Bowler_Name|          Commentary|Detail|         Dismissed|               Id|Isboundary|Iswicket|              Over|              Runs|
+-------+------------------+-----------------+------------------+-----------------+--------------------+------+------------------+-----------------+----------+--------+------------------+------------------+
|  count|               605|              605|               605|              605|                 605|    40|                19|              605|        67|      19|               605|               605|
|   mean|31971.652892561982|             null| 35304.43636363636|             null|                null|  null| 29728.21052631579| 169.099173553719|       1.0|     1.0| 24.

In [8]:
df_pyspark = df_pyspark.drop(*['Batsman', 'Bowler', 'Id'])
df_pyspark.columns

['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

#Data preparation and feature engineering

In [9]:
from pyspark.sql.functions import isnull, when, count, col

df_pyspark.select([count(when(isnull(c), c)).alias(c) for c in df_pyspark.columns]).show()

+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+
|Batsman_Name|Bowler_Name|Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|Timestamp|
+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+
|           0|          0|         0|   565|      586|     0|       538|     586|   0|   0|        0|
+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+



In [10]:
# value counts of Batsman_Name column
df_pyspark.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



In [11]:
from pyspark.ml.feature import StringIndexer

SI_batsman = StringIndexer(inputCol='Batsman_Name',outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name',outputCol='Bowler_Index')

df_pyspark = SI_batsman.fit(df_pyspark).transform(df_pyspark)
df_pyspark = SI_bowler.fit(df_pyspark).transform(df_pyspark)

In [12]:
df_pyspark.show()

+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+-------------------+-------------+------------+
|     Batsman_Name|       Bowler_Name|          Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|          Timestamp|Batsman_Index|Bowler_Index|
+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+-------------------+-------------+------------+
|   Mohammed Shami| Mustafizur Rahman|OUT! Bowled! 5-fe...|     W|    28994|  true|      null|       1|49.6|   0|2019-07-02 13:18:47|         18.0|         0.0|
|Bhuvneshwar Kumar| Mustafizur Rahman|WIDE AND RUN OUT!...|  W+wd|     5132|  true|      null|       1|49.6|   1|2019-07-02 13:17:28|         16.0|         0.0|
|   Mohammed Shami| Mustafizur Rahman|Back of a length ...|  null|     null|  true|      null|    null|49.5|   1|2019-07-02 13:16:03|         18.0|         0.0|
|Bhuvneshwar Kumar| Mustafizur Rah

##One Hot Encoding

In [13]:
from pyspark.ml.feature import OneHotEncoder

In [14]:
OHE = OneHotEncoder(inputCols=['Batsman_Index', 'Bowler_Index'],outputCols=['Batsman_OHE', 'Bowler_OHE'])

df_pyspark = OHE.fit(df_pyspark).transform(df_pyspark)

df_pyspark.show()

+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+-------------------+-------------+------------+---------------+--------------+
|     Batsman_Name|       Bowler_Name|          Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|          Timestamp|Batsman_Index|Bowler_Index|    Batsman_OHE|    Bowler_OHE|
+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+-------------------+-------------+------------+---------------+--------------+
|   Mohammed Shami| Mustafizur Rahman|OUT! Bowled! 5-fe...|     W|    28994|  true|      null|       1|49.6|   0|2019-07-02 13:18:47|         18.0|         0.0|(19,[18],[1.0])|(11,[0],[1.0])|
|Bhuvneshwar Kumar| Mustafizur Rahman|WIDE AND RUN OUT!...|  W+wd|     5132|  true|      null|       1|49.6|   1|2019-07-02 13:17:28|         16.0|         0.0|(19,[16],[1.0])|(11,[0],[1.0])|
|   Mohammed Shami| Mustafizur Rahman|Ba

In [15]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Isboundary',
                                       'Iswicket',
                                       'Over',
                                       'Runs',
                                       'Batsman_OHE',
                                       'Bowler_OHE'],
                           outputCol='vector')

# fill the null values
df_pyspark = df_pyspark.fillna(0)

final_data = assembler.transform(df_pyspark)

In [16]:
final_data.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(34,[1,2,22,23],[...|
|(34,[1,2,3,20,23]...|
|(34,[2,3,22,23],[...|
|(34,[2,3,20,23],[...|
|(34,[1,2,11,23],[...|
|(34,[2,11,23],[49...|
|(34,[2,11,23],[49...|
|(34,[2,3,11,31],[...|
|(34,[0,2,3,11,31]...|
|(34,[2,11,31],[48...|
|(34,[2,11,31],[48...|
|(34,[0,2,3,11,31]...|
|(34,[2,3,11,31],[...|
|(34,[2,20,23],[47...|
|(34,[2,3,11,23],[...|
|(34,[2,11,23],[47...|
|(34,[2,3,20,23],[...|
|(34,[1,2,17,23],[...|
|(34,[2,3,11,23],[...|
|(34,[2,3,11,31],[...|
+--------------------+
only showing top 20 rows



#Pipeline

In [17]:
sample_data_train = spark.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



In [18]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [19]:
stage_1 = StringIndexer(inputCol= 'feature_2', outputCol= 'feature_2_index')

stage_2 = StringIndexer(inputCol= 'feature_3', outputCol= 'feature_3_index')

stage_3 = OneHotEncoder(inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], outputCols= ['feature_2_encoded', 'feature_3_encoded'])

stage_4 = VectorAssembler(inputCols=['feature_1', 'feature_2_encoded', 'feature_3_encoded', 'feature_4'], outputCol='features')
                      
stage_5 = LogisticRegression(featuresCol='features',labelCol='label')

# setup the pipeline
regression_pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4, stage_5])

model = regression_pipeline.fit(sample_data_train)

sample_data_train = model.transform(sample_data_train)

sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.225955524188...|[1.21497707416456...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-18.358243476371...|[1.06442767136925...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.3563077812100...|[0.99999998933509...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[27.4123701891423...|[0.99999999999875...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-35.975077024041...|[2.37805865550653...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.2316263544839...|[0.99999998791893...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-19.243972103991...|[4.38984416425464...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+

