In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

## Utilizing the diabetes dataset from https://archive.ics.uci.edu/ml/datasets/breast+cancer+wisconsin+%28original%29 in the attempt to predict malignant or benign tumors

## Target variable has two possible outcomes
### 1.Malignant Tumor (reflected as 4 in original dataset)
### 2. Benign Tumor (reflected as 2 in original dataset)


# For context, malignant tumors are seen as more dangerous than benign.

In [67]:
df = spark.read.csv(r'C:\Users\jackf\Desktop\Python\Test Data\breast-cancer-wisconsin.data.csv',
                    header=True)

In [68]:
#data set size
print(f"The size of the data set is: {df.count()} rows x {len(df.columns)} columns")

The size of the data set is: 740 rows x 11 columns


In [69]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- ClumpThickness: string (nullable = true)
 |-- UnifromityCellSize: string (nullable = true)
 |-- UniformityCellShape: string (nullable = true)
 |-- Adhesion: string (nullable = true)
 |-- EpithelialCellSize: string (nullable = true)
 |-- BareNuclei: string (nullable = true)
 |-- BlandChromatin: string (nullable = true)
 |-- NormalNuclei: string (nullable = true)
 |-- Mitoses: string (nullable = true)
 |-- Target: string (nullable = true)



## Pre-processing

In [79]:
#We can remove the ID column as it's not relevant
df = df.drop("ID")

In [77]:
df = df.na.fill(0)

In [78]:
#Update the target for clarity, change Malignant to 1 and Benign to 0
df = df.withColumn("Target", F.when(F.col("Target") == 4,1).otherwise(0))

In [80]:
#investigate the distribution of data
count_pos = df.where(F.col("Target") == 1).count()
count_neg = df.where(F.col("Target") == 0).count()

In [85]:
#Display the counts and percentage
print(f"The number of malignant cases are {count_pos} or {round((count_pos/df.count())*100,2)}% \nThe number of benign cases are {count_neg} or {round((count_neg)/(df.count())*100,2)}%") 

The number of malignant cases are 253 or 34.19% 
The number of benign cases are 487 or 65.81%


In [86]:
df.printSchema()

root
 |-- ClumpThickness: string (nullable = true)
 |-- UnifromityCellSize: string (nullable = true)
 |-- UniformityCellShape: string (nullable = true)
 |-- Adhesion: string (nullable = true)
 |-- EpithelialCellSize: string (nullable = true)
 |-- BareNuclei: string (nullable = true)
 |-- BlandChromatin: string (nullable = true)
 |-- NormalNuclei: string (nullable = true)
 |-- Mitoses: string (nullable = true)
 |-- Target: integer (nullable = false)



In [87]:
for col in df.columns:
    df = df.withColumn(col, F.col(col).cast("double"))
df.printSchema()

root
 |-- ClumpThickness: double (nullable = true)
 |-- UnifromityCellSize: double (nullable = true)
 |-- UniformityCellShape: double (nullable = true)
 |-- Adhesion: double (nullable = true)
 |-- EpithelialCellSize: double (nullable = true)
 |-- BareNuclei: double (nullable = true)
 |-- BlandChromatin: double (nullable = true)
 |-- NormalNuclei: double (nullable = true)
 |-- Mitoses: double (nullable = true)
 |-- Target: double (nullable = false)



## Dive into the relationship of the features with the target

In [90]:
for i in df.columns:
    t = df.corr("Target", i)
    
    if t > .6:
        print(f"{i} has a pearson score of {t}")

ClumpThickness has a pearson score of 0.7063781135590114
UnifromityCellSize has a pearson score of 0.8167100574055357
UniformityCellShape has a pearson score of 0.8155992074898362
Adhesion has a pearson score of 0.6910229647315073
EpithelialCellSize has a pearson score of 0.6796918453202054
BareNuclei has a pearson score of 0.8166268113918972
BlandChromatin has a pearson score of 0.7647049652976591
NormalNuclei has a pearson score of 0.6958434231476123
Target has a pearson score of 1.0


In [101]:
selected_features = df.select("ClumpThickness",
                              "UnifromityCellSize",
                              "UniformityCellShape",
                              "Adhesion",
                              "EpithelialCellSize",
                              "BareNuclei",
                               "BlandChromatin",
                              "NormalNuclei")

In [108]:
selected_features.show(2)

+--------------+------------------+-------------------+--------+------------------+----------+--------------+------------+
|ClumpThickness|UnifromityCellSize|UniformityCellShape|Adhesion|EpithelialCellSize|BareNuclei|BlandChromatin|NormalNuclei|
+--------------+------------------+-------------------+--------+------------------+----------+--------------+------------+
|           5.0|               1.0|                1.0|     1.0|               2.0|       1.0|           3.0|         1.0|
|           5.0|               4.0|                4.0|     5.0|               7.0|      10.0|           3.0|         2.0|
+--------------+------------------+-------------------+--------+------------------+----------+--------------+------------+
only showing top 2 rows



## Utilize VectorAssembler to place features into a single vector

In [109]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=selected_features.columns,
                           outputCol='features',
                           handleInvalid='skip')
df = assembler.transform(df)

In [110]:
df.printSchema()

root
 |-- ClumpThickness: double (nullable = true)
 |-- UnifromityCellSize: double (nullable = true)
 |-- UniformityCellShape: double (nullable = true)
 |-- Adhesion: double (nullable = true)
 |-- EpithelialCellSize: double (nullable = true)
 |-- BareNuclei: double (nullable = true)
 |-- BlandChromatin: double (nullable = true)
 |-- NormalNuclei: double (nullable = true)
 |-- Mitoses: double (nullable = true)
 |-- Target: double (nullable = false)
 |-- features: vector (nullable = true)



In [111]:
df.select("features").show(3,truncate=False)

+----------------------------------+
|features                          |
+----------------------------------+
|[5.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0] |
|[5.0,4.0,4.0,5.0,7.0,10.0,3.0,2.0]|
|[3.0,1.0,1.0,1.0,2.0,2.0,3.0,1.0] |
+----------------------------------+
only showing top 3 rows



## Standardize 

In [113]:
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler().setInputCol("features").setOutputCol("scaled_features")
df = standardScaler.fit(df).transform(df)


In [114]:
df.select("scaled_features").show(3,truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaled_features                                                                                                                                           |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.7772572343497226,0.3259878010202712,0.33426212675591777,0.3485101727570922,0.9012476431727784,0.2751797395931149,1.2230607806221265,0.3284538054668826]|
|[1.7772572343497226,1.3039512040810848,1.337048507023671,1.742550863785461,3.154366751104724,2.751797395931149,1.2230607806221265,0.6569076109337652]     |
|[1.0663543406098335,0.3259878010202712,0.33426212675591777,0.3485101727570922,0.9012476431727784,0.5503594791862298,1.2230607806221265,0.3284538054668826]|
+---------------------------------------------------------

## Train/Test Split

In [115]:
train_df, test_df = df.randomSplit([0.7,0.3], seed=111)

In [116]:
data_set = float(train_df.select("Target").count())
numPositives = train_df.select("Target").where("Target == 1").count()
per_one =(float(numPositives)/float(data_set))*100
numNegatives = float(data_set - numPositives)
print(f"The number of 1's are {numPositives} and the number of 0's are {numNegatives}")
print(f"Percentage of 1's are {per_one}")

The number of 1's are 173 and the number of 0's are 318.0
Percentage of 1's are 35.234215885947044


In [117]:
#Imbalance of 1's and 0's in Train df
BalanceRatio = numNegatives/data_set
print(f"The balancing ration is {BalanceRatio}")

The balancing ration is 0.6476578411405295


In [119]:
train_df = train_df.withColumn("classWeights",
                               F.when(train_df.Target == 1, BalanceRatio).otherwise(1 - BalanceRatio))
train_df.select("classWeights").show(3)

+------------------+
|      classWeights|
+------------------+
|0.3523421588594705|
|0.3523421588594705|
|0.3523421588594705|
+------------------+
only showing top 3 rows



In [120]:
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='scaled_features',
                   outputCol='aspect',
                   labelCol="Target",
                   fpr=0.05)

In [121]:
train_df = css.fit(train_df).transform(train_df)
test_df = css.fit(test_df).transform(test_df)
test_df.select("aspect").show(5,truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|aspect                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.3554514468699445,0.3259878010202712,0.33426212675591777,0.3485101727570922,0.4506238215863892,0.2751797395931149,0.4076869268740422,0.3284538054668826]|
|[0.3554514468699445,0.3259878010202712,0.33426212675591777,0.3485101727570922,0.4506238215863892,0.2751797395931149,0.4076869268740422,0.3284538054668826]|
|[0.3554514468699445,0.3259878010202712,0.33426212675591777,0.3485101727570922,0.4506238215863892,0.2751797395931149,0.8153738537480844,0.3284538054668826]|
|[0.3554514468699445,0.3259878010202712,0.3342621267559177

## Begin structuring the model

In [122]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Target',
                        featuresCol="aspect",
                        weightCol="classWeights", 
                        maxIter=10)

In [123]:
model = lr.fit(train_df)
predict_train = model.transform(train_df)
predict_test = model.transform(test_df)
predict_test.select("Target","prediction").show()

+------+----------+
|Target|prediction|
+------+----------+
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
+------+----------+
only showing top 20 rows



## Evaluate the prediction

In [124]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                          labelCol='Target')
predict_test.select("Target","rawPrediction","prediction","probability").show(truncate=False)

+------+--------------------------------------+----------+------------------------------------------+
|Target|rawPrediction                         |prediction|probability                               |
+------+--------------------------------------+----------+------------------------------------------+
|0.0   |[5.734945227861855,-5.734945227861855]|0.0       |[0.9967793467861066,0.0032206532138935413]|
|0.0   |[5.734945227861855,-5.734945227861855]|0.0       |[0.9967793467861066,0.0032206532138935413]|
|0.0   |[5.685754838632626,-5.685754838632626]|0.0       |[0.9966175096894179,0.003382490310582163] |
|0.0   |[5.631926395898063,-5.631926395898063]|0.0       |[0.9964311136047697,0.003568886395230487] |
|0.0   |[5.631926395898063,-5.631926395898063]|0.0       |[0.9964311136047697,0.003568886395230487] |
|0.0   |[5.631926395898063,-5.631926395898063]|0.0       |[0.9964311136047697,0.003568886395230487] |
|0.0   |[5.631926395898063,-5.631926395898063]|0.0       |[0.9964311136047697,0.00

In [125]:
print(f"Area under ROC for train is {evaluator.evaluate(predict_train)}")

Area under ROC for train is 0.9951285127422111


In [126]:
print(f"Area under ROC for train is {evaluator.evaluate(predict_test)}")

Area under ROC for train is 0.9971516463484107


In [128]:
predict_test.select("Target","prediction","probability").where(F.col("Target") != F.col("prediction")).show(truncate=False)

+------+----------+----------------------------------------+
|Target|prediction|probability                             |
+------+----------+----------------------------------------+
|1.0   |0.0       |[0.5134524967072315,0.48654750329276863]|
|1.0   |0.0       |[0.6822217381697119,0.3177782618302882] |
|1.0   |0.0       |[0.8918504063304283,0.10814959366957175]|
|0.0   |1.0       |[0.2092372937599848,0.7907627062400152] |
|0.0   |1.0       |[0.1102100020861575,0.8897899979138425] |
|0.0   |1.0       |[0.33775277443517393,0.6622472255648261]|
+------+----------+----------------------------------------+

