In [6]:
# importing required libraries
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row, Column
import sys
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [31]:
# read a csv file
df = spark.read.csv('bank.csv',header = True, inferSchema = True)

# see the default schema of the dataframe
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



# Defining Custom Schema
Spark by default treats every data as string. Now we customize the datatypes.

In [8]:
import pyspark.sql.types as tp

# define the schema
my_schema = tp.StructType([
    tp.StructField(name= 'Batsman',      dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Batsman_Name', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Bowler',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Bowler_Name',  dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Commentary',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Detail',       dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Dismissed',    dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Id',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Isball',       dataType= tp.BooleanType(),   nullable= True),
    tp.StructField(name= 'Isboundary',   dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Iswicket',     dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Over',         dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'Runs',         dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Timestamp',    dataType= tp.TimestampType(), nullable= True)    
])

# read the data again with the defined schema
my_data = spark.read.csv('ind-ban-comment.csv',schema= my_schema,header= True)

# print the schema
my_data.printSchema()

root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: binary (nullable = true)
 |-- Iswicket: binary (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



# Drop the columns that are not required

In [10]:
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns

['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

# Get the dimensions of the data

In [11]:
(my_data.count() , len(my_data.columns))

(605, 11)

# Get the summary of the numerical columns

In [12]:
my_data.select('Isball', 'Isboundary', 'Runs').describe().show()

+-------+------------------+
|summary|              Runs|
+-------+------------------+
|  count|               605|
|   mean|0.9917355371900827|
| stddev| 1.342725481259329|
|    min|                 0|
|    max|                 6|
+-------+------------------+



# Null values in each column

In [27]:
import pyspark.sql.functions as f

data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg

DataFrame[Batsman_Name: bigint, Bowler_Name: bigint, Commentary: bigint, Detail: bigint, Dismissed: bigint, Isball: bigint, Isboundary: bigint, Iswicket: bigint, Over: bigint, Runs: bigint, Timestamp: bigint]

# Value counts of Batsman_Name column

In [28]:
my_data.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



In [29]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
SI_batsman = StringIndexer(inputCol='Batsman_Name',outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name',outputCol='Bowler_Index')

# transform the data
my_data = SI_batsman.fit(my_data).transform(my_data)
my_data = SI_bowler.fit(my_data).transform(my_data)

# view the transformed data
my_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show(10)

+-----------------+-------------+------------------+------------+
|     Batsman_Name|Batsman_Index|       Bowler_Name|Bowler_Index|
+-----------------+-------------+------------------+------------+
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
+-----------------+-------------+------------------+------------+
only showing top 10 rows

