In [60]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SQLContext

In [5]:
sc = SparkSession \
     .builder \
     .appName('classification with pyspark') \
     .config("spark.some.config.option", "some-value") \
     .getOrCreate()

In [39]:
dt = sc.read.csv('D:\Data Sets\penguins_size.csv',  header=True)
dt.head()

Row(species='Adelie', island='Torgersen', culmen_length_mm='39.1', culmen_depth_mm='18.7', flipper_length_mm='181', body_mass_g='3750', sex='MALE')

In [42]:
dt.show(5)

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|              181|       3750|  MALE|
| Adelie|Torgersen|            39.5|           17.4|              186|       3800|FEMALE|
| Adelie|Torgersen|            40.3|             18|              195|       3250|FEMALE|
| Adelie|Torgersen|              NA|             NA|               NA|         NA|    NA|
| Adelie|Torgersen|            36.7|           19.3|              193|       3450|FEMALE|
+-------+---------+----------------+---------------+-----------------+-----------+------+
only showing top 5 rows



In [79]:
dt.printSchema()

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- culmen_length_mm: string (nullable = true)
 |-- culmen_depth_mm: string (nullable = true)
 |-- flipper_length_mm: string (nullable = true)
 |-- body_mass_g: string (nullable = true)
 |-- sex: string (nullable = true)



Changing the data type of columns culmen_length, culmen depth, flipper length and body mass index to float and integer

In [147]:
from pyspark.sql.types import IntegerType, FloatType

#dt.withColumn("age",df.age.cast(IntegerType()))

df = dt.withColumn("culmen_depth_mm",dt.culmen_depth_mm.cast(FloatType()))\
                   .withColumn("culmen_length_mm",dt.culmen_length_mm.cast(FloatType()))\
                    .withColumn("flipper_length_mm",dt.flipper_length_mm.cast('float'))\
                     .withColumn("body_mass_g",dt.body_mass_g.cast('int'))

In [148]:
df.printSchema()

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- culmen_length_mm: float (nullable = true)
 |-- culmen_depth_mm: float (nullable = true)
 |-- flipper_length_mm: float (nullable = true)
 |-- body_mass_g: integer (nullable = true)
 |-- sex: string (nullable = true)



finding null values in columns

In [110]:
df.filter(df.sex.contains('NA')).show()

+-------+---------+----------------+---------------+-----------------+-----------+---+-----------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|sex|body_mass_index_g|
+-------+---------+----------------+---------------+-----------------+-----------+---+-----------------+
| Adelie|Torgersen|            null|           null|             null|         NA| NA|             null|
| Adelie|Torgersen|            34.1|           18.1|            193.0|       3475| NA|             3475|
| Adelie|Torgersen|            42.0|           20.2|            190.0|       4250| NA|             4250|
| Adelie|Torgersen|            37.8|           17.1|            186.0|       3300| NA|             3300|
| Adelie|Torgersen|            37.8|           17.3|            180.0|       3700| NA|             3700|
| Adelie|    Dream|            37.5|           18.9|            179.0|       2975| NA|             2975|
| Gentoo|   Biscoe|            44.5|           14.3|   

In [106]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull() | col(c).contains('NA'), c)).alias(c) for c in df.columns]).show()

+-------+------+----------------+---------------+-----------------+-----------+---+-----------------+
|species|island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|sex|body_mass_index_g|
+-------+------+----------------+---------------+-----------------+-----------+---+-----------------+
|      0|     0|               2|              2|                2|          2| 10|                2|
+-------+------+----------------+---------------+-----------------+-----------+---+-----------------+



In [118]:
df.show()

+-------+---------+----------------+---------------+-----------------+-----------+------+-----------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|body_mass_index_g|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|       3750|  MALE|             3750|
| Adelie|Torgersen|            39.5|           17.4|            186.0|       3800|FEMALE|             3800|
| Adelie|Torgersen|            40.3|           18.0|            195.0|       3250|FEMALE|             3250|
| Adelie|Torgersen|            null|           null|             null|         NA|    NA|             null|
| Adelie|Torgersen|            36.7|           19.3|            193.0|       3450|FEMALE|             3450|
| Adelie|Torgersen|            39.3|           20.6|            190.0|       3650|  MALE|             3650|
| Adelie|Torgersen|         

In [121]:
df.dropna().show()

DataFrame[species: string, island: string, culmen_length_mm: float, culmen_depth_mm: float, flipper_length_mm: float, body_mass_g: string, sex: string, body_mass_index_g: int]

In [124]:
df.where(col('sex').contains('NA')).show()

+-------+---------+----------------+---------------+-----------------+-----------+---+-----------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|sex|body_mass_index_g|
+-------+---------+----------------+---------------+-----------------+-----------+---+-----------------+
| Adelie|Torgersen|            null|           null|             null|         NA| NA|             null|
| Adelie|Torgersen|            34.1|           18.1|            193.0|       3475| NA|             3475|
| Adelie|Torgersen|            42.0|           20.2|            190.0|       4250| NA|             4250|
| Adelie|Torgersen|            37.8|           17.1|            186.0|       3300| NA|             3300|
| Adelie|Torgersen|            37.8|           17.3|            180.0|       3700| NA|             3700|
| Adelie|    Dream|            37.5|           18.9|            179.0|       2975| NA|             2975|
| Gentoo|   Biscoe|            44.5|           14.3|   

In [149]:
df_new = df.where(df.sex != 'NA')
df_new.show(10)

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|       3750|  MALE|
| Adelie|Torgersen|            39.5|           17.4|            186.0|       3800|FEMALE|
| Adelie|Torgersen|            40.3|           18.0|            195.0|       3250|FEMALE|
| Adelie|Torgersen|            36.7|           19.3|            193.0|       3450|FEMALE|
| Adelie|Torgersen|            39.3|           20.6|            190.0|       3650|  MALE|
| Adelie|Torgersen|            38.9|           17.8|            181.0|       3625|FEMALE|
| Adelie|Torgersen|            39.2|           19.6|            195.0|       4675|  MALE|
| Adelie|Torgersen|            41.1|           17.6|            182.0|       3200|FEMALE|
| Adelie|T

Encode the categorical variables

In [164]:
from collections import defaultdict
data_types = defaultdict(list)
for entry in df.schema.fields:
  data_types[str(entry.dataType)].append(entry.name)

In [165]:
data_types

defaultdict(list,
            {'StringType': ['species', 'island', 'sex'],
             'FloatType': ['culmen_length_mm',
              'culmen_depth_mm',
              'flipper_length_mm'],
             'IntegerType': ['body_mass_g']})

In [170]:
cat_cols = [var for var in data_types["StringType"]]
cat_cols

['species', 'island', 'sex']

In [199]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder,StandardScaler
stage_string_index = [StringIndexer(inputCol=col, outputCol=col+' string_indexed') for col in cat_cols]
stage_onehot_enc =   [OneHotEncoder(inputCol=col+' string_indexed', outputCol=col+' onehot_enc') for col in cat_cols]


In [200]:
from pyspark.ml import Pipeline
ppl = Pipeline(stages= stage_string_index + stage_onehot_enc)
df_trans = ppl.fit(df_new).transform(df_new)
df_trans.show(10)

+-------+---------+----------------+---------------+-----------------+-----------+------+----------------------+---------------------+------------------+------------------+-----------------+--------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|species string_indexed|island string_indexed|sex string_indexed|species onehot_enc|island onehot_enc|sex onehot_enc|
+-------+---------+----------------+---------------+-----------------+-----------+------+----------------------+---------------------+------------------+------------------+-----------------+--------------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|       3750|  MALE|                   0.0|                  2.0|               0.0|     (2,[0],[1.0])|        (2,[],[])| (2,[0],[1.0])|
| Adelie|Torgersen|            39.5|           17.4|            186.0|       3800|FEMALE|                   0.0|                  2.0|               1.0|     (2,[0],[1.0])|    

Scaling numerical data 

In [None]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
assembler = [VectorAssembler(inputCols=[col], outputCol=col+'_vec') for col in ['culmen_length_mm','culmen_depth_mm','flipper_length_mm','body_mass_g']]

scale = [StandardScaler(inputCol=col+'_vec', outputCol=col+'_scaled') for col in ['culmen_length_mm','culmen_depth_mm','flipper_length_mm','body_mass_g']]

pipe = Pipeline(stages = assembler + scale)

df_scale = pipe.fit(df_trans).transform(df_trans)

#df_scale.show(5)

In [223]:
df_scale.toPandas()['species string_indexed']

0.0    146
1.0    120
2.0     68
Name: species string_indexed, dtype: int64

In [224]:
train_set, test_set =df_scale.randomSplit([0.80,0.20])

In [271]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier



features = VectorAssembler(inputCols=[ 'island onehot_enc', 'sex onehot_enc',
                                        'culmen_length_mm_scaled','culmen_depth_mm_scaled','flipper_length_mm_scaled',
                                         'body_mass_g_scaled'], outputCol='features')

model = LogisticRegression(featuresCol='features', labelCol= 'species string_indexed')

pipe_lr = Pipeline(stages = [features, model])
model_fit = pipe_lr.fit(train_set)

In [272]:
features.transform(df_scale).toPandas()['features'][0]

DenseVector([0.0, 0.0, 1.0, 0.0, 7.1605, 9.5025, 12.9081, 4.6593])

In [269]:
test = test_set.select([ 'island onehot_enc', 'sex onehot_enc',
                                        'culmen_length_mm_scaled','culmen_depth_mm_scaled','flipper_length_mm_scaled',
                                         'body_mass_g_scaled'])

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

predict = model_fit.transform(test)


In [255]:
pred_after = predict.toPandas()['prediction']
pred_before = test_set.toPandas()['species string_indexed']

In [256]:
count = 0
for i,j in zip(pred_before, pred_after):
    if i==j:
        count+=1
accuracy = (count/len(pred_after))*100
accuracy

100.0

In [243]:
predict.toPandas()['prediction'].value_counts()

0.0    34
1.0    19
2.0    15
Name: prediction, dtype: int64

In [244]:
test.toPandas()

Unnamed: 0,island onehot_enc,sex onehot_enc,culmen_length_mm_scaled,culmen_depth_mm_scaled,flipper_length_mm_scaled,body_mass_g_scaled
0,"(1.0, 0.0)","(0.0, 1.0)",[6.684343208491507],[8.43534758882998],[12.908125711278174],[3.5410935175610545]
1,"(1.0, 0.0)","(1.0, 0.0)",[6.8857888887611605],[9.705731232652996],[13.835228662916938],[4.65933357573823]
2,"(1.0, 0.0)","(1.0, 0.0)",[6.995668922304787],[10.163069150584121],[13.549966216258857],[4.845706918767759]
3,"(1.0, 0.0)","(0.0, 1.0)",[7.142174935100514],[8.892685506761106],[13.264703769600777],[4.410835785032191]
4,"(1.0, 0.0)","(0.0, 1.0)",[7.2703679582701914],[8.994316585957268],[13.763913051252418],[3.9759646512966227]
...,...,...,...,...,...,...
63,"(1.0, 0.0)","(1.0, 0.0)",[9.120007854332272],[8.079639780869215],[16.33127507117515],[7.392809273504658]
64,"(1.0, 0.0)","(1.0, 0.0)",[9.120007854332272],[8.53697769880034],[16.40259068283967],[7.082187035122109]
65,"(1.0, 0.0)","(1.0, 0.0)",[9.138321542554095],[8.181270860065379],[15.190225284542825],[6.709440349063051]
66,"(1.0, 0.0)","(0.0, 1.0)",[9.248200877501949],[7.723932457521352],[15.404172119536385],[6.212444767650973]


In [None]:
evaluator= BCE(labelCol = "", rawPredictionCol="probability", metricName= "areaUnderROC")
accuracy = evaluator.evaluate(predict)