In [0]:
file_location = "/FileStore/BRCA.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Patient_ID,Age,Gender,Protein1,Protein2,Protein3,Protein4,Tumour_Stage,Histology,ER status,PR status,HER2 status,Surgery_type,Date_of_Surgery,Date_of_Last_Visit,Patient_Status
TCGA-D8-A1XD,36.0,FEMALE,0.080353,0.42638,0.54715,0.27368,III,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Modified Radical Mastectomy,15-Jan-17,19-Jun-17,Alive
TCGA-EW-A1OX,43.0,FEMALE,-0.42032,0.57807,0.61447,-0.031505,II,Mucinous Carcinoma,Positive,Positive,Negative,Lumpectomy,26-Apr-17,09-Nov-18,Dead
TCGA-A8-A079,69.0,FEMALE,0.21398,1.3114,-0.32747,-0.23426,III,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Other,08-Sep-17,09-Jun-18,Alive
TCGA-D8-A1XR,56.0,FEMALE,0.34509,-0.21147,-0.19304,0.12427,II,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Modified Radical Mastectomy,25-Jan-17,12-Jul-17,Alive
TCGA-BH-A0BF,56.0,FEMALE,0.22155,1.9068,0.52045,-0.31199,II,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Other,06-May-17,27-Jun-19,Dead
TCGA-AO-A1KQ,84.0,MALE,-0.081872,1.7241,-0.057335,0.043025,III,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Modified Radical Mastectomy,18-Sep-17,15-Nov-21,Alive
TCGA-D8-A73X,53.0,FEMALE,-0.069535,1.4183,-0.36105,0.39158,II,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Simple Mastectomy,04-Feb-17,07-Feb-18,Alive
TCGA-A7-A426,50.0,FEMALE,0.67249,1.279,-0.32107,-0.11239,III,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Simple Mastectomy,16-May-17,,
TCGA-EW-A1P5,77.0,FEMALE,-0.15175,-0.66332,1.1894,0.21718,II,Infiltrating Ductal Carcinoma,Positive,Positive,Negative,Modified Radical Mastectomy,28-Sep-17,28-Sep-18,Alive
TCGA-A8-A09A,40.0,FEMALE,-0.5657,1.2668,-0.29346,0.19395,II,Infiltrating Lobular Carcinoma,Positive,Positive,Positive,Other,14-Feb-17,15-Dec-17,Alive


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
#To get the shape of the dataset
print((df.count(), len(df.columns)))

(341, 16)


In [0]:
# Spark does in-memory comoutation it stores the data along clusters in RAM so as to faster processing.
df.cache()

Out[443]: DataFrame[Patient_ID: string, Age: int, Gender: string, Protein1: double, Protein2: double, Protein3: double, Protein4: double, Tumour_Stage: string, Histology: string, ER status: string, PR status: string, HER2 status: string, Surgery_type: string, Date_of_Surgery: string, Date_of_Last_Visit: string, Patient_Status: string]

In [0]:
#2% data of the database was null
df.filter("Tumour_Stage is null").count() / df.count() *100

Out[444]: 2.0527859237536656

In [0]:
df.printSchema()

root
 |-- Patient_ID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Protein1: double (nullable = true)
 |-- Protein2: double (nullable = true)
 |-- Protein3: double (nullable = true)
 |-- Protein4: double (nullable = true)
 |-- Tumour_Stage: string (nullable = true)
 |-- Histology: string (nullable = true)
 |-- ER status: string (nullable = true)
 |-- PR status: string (nullable = true)
 |-- HER2 status: string (nullable = true)
 |-- Surgery_type: string (nullable = true)
 |-- Date_of_Surgery: string (nullable = true)
 |-- Date_of_Last_Visit: string (nullable = true)
 |-- Patient_Status: string (nullable = true)



In [0]:
#To work over columns i.e. to add or drop the columns


In [0]:
df.show()

+------------+---+------+---------+--------+---------+---------+------------+--------------------+---------+---------+-----------+--------------------+---------------+------------------+--------------+
|  Patient_ID|Age|Gender| Protein1|Protein2| Protein3| Protein4|Tumour_Stage|           Histology|ER status|PR status|HER2 status|        Surgery_type|Date_of_Surgery|Date_of_Last_Visit|Patient_Status|
+------------+---+------+---------+--------+---------+---------+------------+--------------------+---------+---------+-----------+--------------------+---------------+------------------+--------------+
|TCGA-D8-A1XD| 36|FEMALE| 0.080353| 0.42638|  0.54715|  0.27368|         III|Infiltrating Duct...| Positive| Positive|   Negative|Modified Radical ...|      15-Jan-17|         19-Jun-17|         Alive|
|TCGA-EW-A1OX| 43|FEMALE| -0.42032| 0.57807|  0.61447|-0.031505|          II|  Mucinous Carcinoma| Positive| Positive|   Negative|          Lumpectomy|      26-Apr-17|         09-Nov-18|      

In [0]:
# Handled missing values from Numeric column
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCol='Age',outputCol='Imputed_Age').setStrategy('mean')
imputer.fit(df).transform(df).show()
df.select('Age').show()

+------------+---+------+---------+--------+---------+---------+------------+--------------------+---------+---------+-----------+--------------------+---------------+------------------+--------------+-----------+
|  Patient_ID|Age|Gender| Protein1|Protein2| Protein3| Protein4|Tumour_Stage|           Histology|ER status|PR status|HER2 status|        Surgery_type|Date_of_Surgery|Date_of_Last_Visit|Patient_Status|Imputed_Age|
+------------+---+------+---------+--------+---------+---------+------------+--------------------+---------+---------+-----------+--------------------+---------------+------------------+--------------+-----------+
|TCGA-D8-A1XD| 36|FEMALE| 0.080353| 0.42638|  0.54715|  0.27368|         III|Infiltrating Duct...| Positive| Positive|   Negative|Modified Radical ...|      15-Jan-17|         19-Jun-17|         Alive|         36|
|TCGA-EW-A1OX| 43|FEMALE| -0.42032| 0.57807|  0.61447|-0.031505|          II|  Mucinous Carcinoma| Positive| Positive|   Negative|          Lump

In [0]:
df = df.withColumnRenamed('ER status','ER_status').withColumnRenamed('PR status','PR_status').withColumnRenamed('HER2 status','HER2_status')

In [0]:

# Handle missing values from Categorical dataset
'''
1. Gender null values should be replaced with Female
2. Histology null values should be replacd with Infiltrating Ductal Carcinoma
3. ER and PR status are all Positive. every person who is suffering from cancer has thses tests positive during initial stage doctors used to perform these tests.
4. HER2 test is highly important and it's conducted to check whether drugs will lower down size of tumor (Negative): drugs can't help to lower the breast cancer (Positive):drugs can effect. Hence categorical values from this column are important to handle.
5. In HER2 2% values are null and have to be filled carefully.
6. Mostly tumor stage 2 and 3 patient found this HER2 test negative. Tumor~~HER2 status.
As HER2 column is hard to fill we well use KNN Imputer
'''

df = df.na.fill('FEMALE',subset=['Gender'])
df = df.na.fill('Infiltrating Ductal Carcinoma',subset=['Histology'])
df = df.na.fill('Positive',subset=['ER_status'])
df = df.na.fill('Positive',subset=['PR_status'])
df = df.na.fill('Negative',subset=['HER2_status'])
df = df.na.fill('Modified Radical Mastectomy',subset=['Surgery_type'])

In [0]:
df.groupby('Gender').count().show()
df.groupby('Tumour_Stage').count().show()
df.groupby('Histology').count().show()
df.groupby('ER_status').count().show()
df.groupby('PR_status').count().show()
df.groupby('HER2_status').count().show()
df.groupby('Surgery_type').count().show()

+------+-----+
|Gender|count|
+------+-----+
|  MALE|    4|
|FEMALE|  337|
+------+-----+

+------------+-----+
|Tumour_Stage|count|
+------------+-----+
|        null|    7|
|         III|   81|
|          II|  189|
|           I|   64|
+------------+-----+

+--------------------+-----+
|           Histology|count|
+--------------------+-----+
|Infiltrating Duct...|  240|
|Infiltrating Lobu...|   89|
|  Mucinous Carcinoma|   12|
+--------------------+-----+

+---------+-----+
|ER_status|count|
+---------+-----+
| Positive|  341|
+---------+-----+

+---------+-----+
|PR_status|count|
+---------+-----+
| Positive|  341|
+---------+-----+

+-----------+-----+
|HER2_status|count|
+-----------+-----+
|   Positive|   29|
|   Negative|  312|
+-----------+-----+

+--------------------+-----+
|        Surgery_type|count|
+--------------------+-----+
|          Lumpectomy|   66|
|               Other|  105|
|   Simple Mastectomy|   67|
|Modified Radical ...|  103|
+--------------------+-----+



In [0]:
df= df.na.fill('Alive',subset=['Patient_Status'])

In [0]:
from pyspark.sql.functions import regexp_replace

In [0]:
df = df.na.fill('III',subset=['Tumour_Stage'])

In [0]:
# Categorical column values handling
"""
1, Gendeer column has just two nominal categories hence used StringIndexer for label encoding.
2. Except Tumor_stage others are of nominal categories hence they can be handle out using StringIndexer()
"""
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['Tumour_Stage','Gender','Histology','ER_status','PR_status','HER2_status','Surgery_type','Patient_Status'],outputCols=['impt_Tumour_stage','impt_Gender','impt_Histology','impt_ER_status','impt_PR_status','impt_HER2_status','impt_Surgery_typetype','imptPatient_Status'])

In [0]:
n_indexer = indexer.fit(df).transform(df)

In [0]:
n_indexer = n_indexer.drop("Patient_Status")

In [0]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['impt_Tumour_stage','impt_Gender','impt_Histology','impt_HER2_status','impt_Surgery_typetype'], outputCols=['encoded_Tumour_stage','encoded_Gender','encoded_Histology','encoded_HER2_status','encoded_Surgery_type'])
df_onehot = encoder.fit(n_indexer).transform(n_indexer)

In [0]:
df_onehot.printSchema()

root
 |-- Patient_ID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Protein1: double (nullable = true)
 |-- Protein2: double (nullable = true)
 |-- Protein3: double (nullable = true)
 |-- Protein4: double (nullable = true)
 |-- Tumour_Stage: string (nullable = false)
 |-- Histology: string (nullable = false)
 |-- ER_status: string (nullable = false)
 |-- PR_status: string (nullable = false)
 |-- HER2_status: string (nullable = false)
 |-- Surgery_type: string (nullable = false)
 |-- Date_of_Surgery: string (nullable = true)
 |-- Date_of_Last_Visit: string (nullable = true)
 |-- impt_Tumour_stage: double (nullable = false)
 |-- impt_Gender: double (nullable = false)
 |-- impt_Histology: double (nullable = false)
 |-- impt_ER_status: double (nullable = false)
 |-- impt_PR_status: double (nullable = false)
 |-- impt_HER2_status: double (nullable = false)
 |-- impt_Surgery_typetype: double (nullable = false)
 |-- imptPatient_Status

In [0]:
df_onehot = df_onehot.drop('impt_Tumour_stage','impt_Gender','impt_Histology','impt_HER2_status','impt_Surgery_typetype')

In [0]:
df_onehot.select("imptPatient_Status").show()

+------------------+
|imptPatient_Status|
+------------------+
|               0.0|
|               1.0|
|               0.0|
|               0.0|
|               1.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               1.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               1.0|
|               0.0|
+------------------+
only showing top 20 rows



In [0]:
df_onehot = df_onehot.withColumn("Label",col('imptPatient_Status'))

In [0]:
df_onehot = df_onehot.na.fill(1.0,subset=['imptPatient_Status'])

In [0]:
df_onehot = df_onehot.drop('ER_status','PR_status','HER2_status','Surgery_type')

In [0]:
df_onehot = df_onehot.drop('Tumour_Stage')

In [0]:
df_onehot = df_onehot.drop('Patient_ID','Gender','Histology','Date_of_Surgery','Date_of_Last_Visit')

In [0]:
'''
Before model building we used to perform train_test_split
In Spark as Data get distributed over clusters so to deal with RDD dataframe
we combine vectors of independent and dependent features
'''

from pyspark.ml.feature import VectorAssembler
feature_ass = VectorAssembler(inputCols= ['Age','Protein1','Protein2','Protein3','Protein4','impt_ER_status','impt_PR_status','encoded_Tumour_stage','encoded_Gender','encoded_Histology','encoded_HER2_status','encoded_Surgery_type',"Label"],outputCol ='vector_of_features')
final_dataset = feature_ass.transform(df_onehot)
final_dataset.show()

+---+---------+--------+---------+---------+--------------+--------------+------------------+--------------------+--------------+-----------------+-------------------+--------------------+-----+--------------------+
|Age| Protein1|Protein2| Protein3| Protein4|impt_ER_status|impt_PR_status|imptPatient_Status|encoded_Tumour_stage|encoded_Gender|encoded_Histology|encoded_HER2_status|encoded_Surgery_type|Label|  vector_of_features|
+---+---------+--------+---------+---------+--------------+--------------+------------------+--------------------+--------------+-----------------+-------------------+--------------------+-----+--------------------+
| 36| 0.080353| 0.42638|  0.54715|  0.27368|           0.0|           0.0|               0.0|       (2,[1],[1.0])| (1,[0],[1.0])|    (2,[0],[1.0])|      (1,[0],[1.0])|       (3,[1],[1.0])|  0.0|(17,[0,1,2,3,4,8,...|
| 43| -0.42032| 0.57807|  0.61447|-0.031505|           0.0|           0.0|               1.0|       (2,[0],[1.0])| (1,[0],[1.0])|       

In [0]:
df

Out[468]: DataFrame[Patient_ID: string, Age: int, Gender: string, Protein1: double, Protein2: double, Protein3: double, Protein4: double, Tumour_Stage: string, Histology: string, ER_status: string, PR_status: string, HER2_status: string, Surgery_type: string, Date_of_Surgery: string, Date_of_Last_Visit: string, Patient_Status: string]

In [0]:
#Now Let's just combine independent and dependent features
final_output = final_dataset.select("vector_of_features","Label")

In [0]:
#split the dataset into training and testing
(train_data,test_data) = final_output.randomSplit([0.80,0.20],seed=13)

In [0]:

# As we get the finalized output Now it's time to apply ML algorithm
"""
1. Logistic Regression From pyspark.classification import LogisticRegression()
"""

Out[471]: '\n1. Logistic Regression From pyspark.classification import LogisticRegression()\n2. NaiveBayes from pyspark.ml.classificaion import NaiveBayes\n3. GBTClassifier from pyspark.ml.classification import GBTClassifier\n4. RandomForestCLassifier from pyspark.ml.clasification import RandomForestClassifier\n'

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier

In [0]:
lr = LogisticRegression(labelCol = "Label",featuresCol="vector_of_features")

In [0]:
#Sometimes failed to excecute user defined function error occured: can be resolved by removing NULL values from the table.

In [0]:
lr_model = lr.fit(train_data)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1434464167246943>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mlr_model[0m [0;34m=[0m [0mlr[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtrain_data[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m             [0mcall_succeeded[0m [0;34m=[0m [0;32mFalse[0m[0;34m[0m[0;34m[0m[0m
[1;32m     29[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 30[0;31m                 [0mresult[0m [0;34m=[0m [0moriginal_method[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     31[0m          