## Import the Libraries 

In [None]:
from numpy import array
from math import sqrt
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors
spark = SparkSession.builder.appName("PCA").getOrCreate()

## Download the Dataset 

In [2]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/breast-cancer-wisconsin.csv

--2020-06-22 21:19:25--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/breast-cancer-wisconsin.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.124.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.124.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24063 (23K) [text/plain]
Saving to: ‘breast-cancer-wisconsin.csv.2’


2020-06-22 21:19:25 (359 KB/s) - ‘breast-cancer-wisconsin.csv.2’ saved [24063/24063]



## Load the Data in Spark DataFrame 

In [2]:
df = spark.read.csv('breast-cancer-wisconsin.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- Clump_Thickness: integer (nullable = true)
 |-- Cell_Size: integer (nullable = true)
 |-- Cell_Shape: integer (nullable = true)
 |-- Marginal_Adhesion: integer (nullable = true)
 |-- Epithelial_Cell_Size: integer (nullable = true)
 |-- Bare_Nuclei: double (nullable = true)
 |-- Bland_Chromatin: integer (nullable = true)
 |-- Normal_Nucleoli: integer (nullable = true)
 |-- Mitoses: integer (nullable = true)
 |-- Class: integer (nullable = true)



## Data Processing for Machine Learning model  

In [170]:
vectorAssembler = VectorAssembler(inputCols = ['Clump_Thickness', 'Cell_Size', 'Cell_Shape',\
'Marginal_Adhesion', 'Epithelial_Cell_Size', 'Normal_Nucleoli', 'Bland_Chromatin',\
'Bare_Nuclei', 'Mitoses', 'Class'], outputCol = 'features')

transformed_df = vectorAssembler.transform(df)
transformed_df=transformed_df.select("features")
transformed_df.show()

+--------------------+
|            features|
+--------------------+
|[5.0,1.0,1.0,1.0,...|
|[5.0,4.0,4.0,5.0,...|
|[3.0,1.0,1.0,1.0,...|
|[6.0,8.0,8.0,1.0,...|
|[4.0,1.0,1.0,3.0,...|
|[8.0,10.0,10.0,8....|
|[1.0,1.0,1.0,1.0,...|
|[2.0,1.0,2.0,1.0,...|
|[2.0,1.0,1.0,1.0,...|
|[4.0,2.0,1.0,1.0,...|
|[1.0,1.0,1.0,1.0,...|
|[2.0,1.0,1.0,1.0,...|
|[5.0,3.0,3.0,3.0,...|
|[1.0,1.0,1.0,1.0,...|
|[8.0,7.0,5.0,10.0...|
|[7.0,4.0,6.0,4.0,...|
|[4.0,1.0,1.0,1.0,...|
|[4.0,1.0,1.0,1.0,...|
|[10.0,7.0,7.0,6.0...|
|[6.0,1.0,1.0,1.0,...|
+--------------------+
only showing top 20 rows



In [172]:
DenseVector_rows=[]
for row in transformed_df.take(20):
    DenseVector_rows.append(Vectors.dense(  tuple(row.__getitem__("features").values.tolist())))
    
DenseVector_rows

[DenseVector([5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0]),
 DenseVector([5.0, 4.0, 4.0, 5.0, 7.0, 2.0, 3.0, 10.0, 1.0, 2.0]),
 DenseVector([3.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 2.0, 1.0, 2.0]),
 DenseVector([6.0, 8.0, 8.0, 1.0, 3.0, 7.0, 3.0, 4.0, 1.0, 2.0]),
 DenseVector([4.0, 1.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0]),
 DenseVector([8.0, 10.0, 10.0, 8.0, 7.0, 7.0, 9.0, 10.0, 1.0, 4.0]),
 DenseVector([1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 10.0, 1.0, 2.0]),
 DenseVector([2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0]),
 DenseVector([2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 5.0, 2.0]),
 DenseVector([4.0, 2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0]),
 DenseVector([1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 2.0]),
 DenseVector([2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0]),
 DenseVector([5.0, 3.0, 3.0, 3.0, 2.0, 4.0, 4.0, 3.0, 1.0, 4.0]),
 DenseVector([1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 3.0, 1.0, 2.0]),
 DenseVector([8.0, 7.0, 5.0, 10.0, 7.0, 5.0, 5.0, 9.0, 4.0, 4.0]),
 Den

In [173]:
rows = sc.parallelize(DenseVector_rows)
rows.take(5)

[DenseVector([5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0]),
 DenseVector([5.0, 4.0, 4.0, 5.0, 7.0, 2.0, 3.0, 10.0, 1.0, 2.0]),
 DenseVector([3.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 2.0, 1.0, 2.0]),
 DenseVector([6.0, 8.0, 8.0, 1.0, 3.0, 7.0, 3.0, 4.0, 1.0, 2.0]),
 DenseVector([4.0, 1.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0])]

## Model Implementation  

In [174]:
rowmatrix = RowMatrix(rows)
pca = rowmatrix.computePrincipalComponents(3)
pca

DenseMatrix(10, 3, [-0.3191, -0.427, -0.405, -0.3734, -0.2689, -0.2401, -0.1983, -0.4808, ..., -0.3121, 0.5289, 0.236, -0.4011, -0.0538, -0.2046, 0.2123, 0.1377], 0)