In [1]:
# Import libraries

from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
import pyspark.sql.functions as F

In [2]:
Spark = SparkSession.builder.getOrCreate()

In [3]:
# Load csv file

df = Spark.read.csv('data.csv', inferSchema = True, header = True)

In [4]:
df.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|
+--------+---------+-----------+------------+---

In [5]:
# number of rows

df.count()

569

In [6]:
# List of columns

df.columns

['id',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst']

In [7]:
# number of columns

len(df.columns)

32

In [8]:
# Print out the schema in tree format

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullable = true)
 |-- radi

In [9]:
df.describe().show()

+-------+--------------------+---------+------------------+-----------------+-----------------+-----------------+--------------------+-------------------+-------------------+--------------------+--------------------+----------------------+------------------+------------------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+-------------------+--------------------+-------------------+-----------------------+
|summary|                  id|diagnosis|       radius_mean|     texture_mean|   perimeter_mean|        area_mean|     smoothness_mean|   compactness_mean|     concavity_mean| concave points_mean|       symmetry_mean|fractal_dimension_mean|         radius_se|        texture_se|      perimeter_se|          area_se|       smoothness_se|      compactness_se|  

In [10]:
df.head()

Row(id=842302, diagnosis='M', radius_mean=17.99, texture_mean=10.38, perimeter_mean=122.8, area_mean=1001.0, smoothness_mean=0.1184, compactness_mean=0.2776, concavity_mean=0.3001, concave points_mean=0.1471, symmetry_mean=0.2419, fractal_dimension_mean=0.07871, radius_se=1.095, texture_se=0.9053, perimeter_se=8.589, area_se=153.4, smoothness_se=0.006399, compactness_se=0.04904, concavity_se=0.05373, concave points_se=0.01587, symmetry_se=0.03003, fractal_dimension_se=0.006193, radius_worst=25.38, texture_worst=17.33, perimeter_worst=184.6, area_worst=2019.0, smoothness_worst=0.1622, compactness_worst=0.6656, concavity_worst=0.7119, concave points_worst=0.2654, symmetry_worst=0.4601, fractal_dimension_worst=0.1189)

In [11]:
# convert string column 'diagnosis' to 0s and 1s

from pyspark.ml.feature import StringIndexer


In [13]:
# Create new target column by transforming 'diagnosis' to numbers '0' and '1'
indexer = StringIndexer(inputCol = 'diagnosis', outputCol = 'diagnosis_num')
indexed = indexer.fit(df).transform(df)

In [15]:
indexed.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+-------------+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|diagnosis_num|
+--------+---------+

In [17]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [19]:
indexed.columns

['id',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst',
 'diagnosis_num']

In [21]:
# Merge features into a vector column using VectorAssembler

assembler = VectorAssembler(inputCols = [ 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst'], outputCol = 'features')

In [23]:
output = assembler.transform(indexed)

In [24]:
output.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+-------------+--------------------+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|diagnosis_num|

In [27]:
output.select('features','diagnosis_num').show()

+--------------------+-------------+
|            features|diagnosis_num|
+--------------------+-------------+
|[17.99,10.38,122....|          1.0|
|[20.57,17.77,132....|          1.0|
|[19.69,21.25,130....|          1.0|
|[11.42,20.38,77.5...|          1.0|
|[20.29,14.34,135....|          1.0|
|[12.45,15.7,82.57...|          1.0|
|[18.25,19.98,119....|          1.0|
|[13.71,20.83,90.2...|          1.0|
|[13.0,21.82,87.5,...|          1.0|
|[12.46,24.04,83.9...|          1.0|
|[16.02,23.24,102....|          1.0|
|[15.78,17.89,103....|          1.0|
|[19.17,24.8,132.4...|          1.0|
|[15.85,23.95,103....|          1.0|
|[13.73,22.61,93.6...|          1.0|
|[14.54,27.54,96.7...|          1.0|
|[14.68,20.13,94.7...|          1.0|
|[16.13,20.68,108....|          1.0|
|[19.81,22.15,130....|          1.0|
|[13.54,14.36,87.4...|          0.0|
+--------------------+-------------+
only showing top 20 rows



In [28]:
# This is the data format to be considered by pyspark LinearRegression

final_data = output.select('features','diagnosis_num')

In [31]:
# Randomly split the data into training 75% and testing 25% sets

train_data, test_data = final_data.randomSplit([0.75,0.25])

In [32]:
train_data.show()

+--------------------+-------------+
|            features|diagnosis_num|
+--------------------+-------------+
|[6.981,13.43,43.7...|          0.0|
|[7.729,25.49,47.9...|          0.0|
|[8.196,16.84,51.7...|          0.0|
|[8.219,20.7,53.27...|          0.0|
|[8.571,13.1,54.53...|          0.0|
|[8.597,18.6,54.09...|          0.0|
|[8.618,11.79,54.3...|          0.0|
|[8.671,14.45,54.4...|          0.0|
|[8.726,15.83,55.8...|          0.0|
|[8.734,16.84,55.2...|          0.0|
|[8.878,15.49,56.7...|          0.0|
|[9.0,14.4,56.36,2...|          0.0|
|[9.029,17.33,58.7...|          0.0|
|[9.042,18.9,60.07...|          0.0|
|[9.173,13.86,59.2...|          0.0|
|[9.268,12.87,61.4...|          0.0|
|[9.333,21.94,59.0...|          0.0|
|[9.397,21.68,59.7...|          0.0|
|[9.405,21.7,59.6,...|          0.0|
|[9.436,18.32,59.8...|          0.0|
+--------------------+-------------+
only showing top 20 rows



In [33]:
test_data.show()

+--------------------+-------------+
|            features|diagnosis_num|
+--------------------+-------------+
|[7.691,25.44,48.3...|          0.0|
|[7.76,24.54,47.92...|          0.0|
|[8.598,20.98,54.6...|          0.0|
|[8.888,14.64,58.7...|          0.0|
|[8.95,15.76,58.74...|          0.0|
|[9.295,13.9,59.96...|          0.0|
|[9.423,27.88,59.2...|          0.0|
|[9.465,21.01,60.1...|          0.0|
|[9.567,15.91,60.2...|          0.0|
|[9.731,15.34,63.7...|          0.0|
|[9.738,11.97,61.2...|          0.0|
|[9.742,19.12,61.9...|          0.0|
|[9.755,28.2,61.68...|          0.0|
|[9.777,16.99,62.5...|          0.0|
|[9.787,19.94,62.1...|          0.0|
|[9.876,19.4,63.95...|          0.0|
|[10.03,21.28,63.1...|          0.0|
|[10.08,15.11,63.7...|          0.0|
|[10.2,17.48,65.05...|          0.0|
|[10.29,27.61,65.6...|          0.0|
+--------------------+-------------+
only showing top 20 rows



In [34]:
from pyspark.ml.regression import LinearRegression

In [35]:
# Create lineat regression model object

lr = LinearRegression(featuresCol = 'features', labelCol = 'diagnosis_num')

In [36]:
# Build linear regression model based on train set

trained_model = lr.fit(train_data)

In [38]:
trained_model

LinearRegressionModel: uid=LinearRegression_7363f3d88b14, numFeatures=30

In [39]:
results = trained_model.evaluate(train_data)

In [40]:
# Training R2

print(results.r2)

0.7787078257011542


In [41]:
unlabeled_data = test_data.select('features')

In [43]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[7.691,25.44,48.3...|
|[7.76,24.54,47.92...|
|[8.598,20.98,54.6...|
|[8.888,14.64,58.7...|
|[8.95,15.76,58.74...|
|[9.295,13.9,59.96...|
|[9.423,27.88,59.2...|
|[9.465,21.01,60.1...|
|[9.567,15.91,60.2...|
|[9.731,15.34,63.7...|
|[9.738,11.97,61.2...|
|[9.742,19.12,61.9...|
|[9.755,28.2,61.68...|
|[9.777,16.99,62.5...|
|[9.787,19.94,62.1...|
|[9.876,19.4,63.95...|
|[10.03,21.28,63.1...|
|[10.08,15.11,63.7...|
|[10.2,17.48,65.05...|
|[10.29,27.61,65.6...|
+--------------------+
only showing top 20 rows



In [44]:
predictions = trained_model.transform(unlabeled_data)

In [45]:
predictions.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|[7.691,25.44,48.3...|-0.32867226741752154|
|[7.76,24.54,47.92...|-0.15924104561550778|
|[8.598,20.98,54.6...|-0.19804406760160753|
|[8.888,14.64,58.7...| -0.5546113276696061|
|[8.95,15.76,58.74...| -0.5137607963811313|
|[9.295,13.9,59.96...| 0.05058228107317997|
|[9.423,27.88,59.2...|-0.09304848603479643|
|[9.465,21.01,60.1...|0.001459290115363...|
|[9.567,15.91,60.2...|0.062328931002573196|
|[9.731,15.34,63.7...|-0.15970881668925752|
|[9.738,11.97,61.2...|-0.24039455229519557|
|[9.742,19.12,61.9...|0.006992464920107189|
|[9.755,28.2,61.68...|-0.05782607476353885|
|[9.777,16.99,62.5...|0.021563050357197078|
|[9.787,19.94,62.1...|-0.05162592951521...|
|[9.876,19.4,63.95...|-0.01566222227303...|
|[10.03,21.28,63.1...|-0.06445891875117438|
|[10.08,15.11,63.7...| 0.07289462057386431|
|[10.2,17.48,65.05...| -0.0845609126054272|
|[10.29,27.61,65.6...| 0.0824869