## Problem Statement
We'll be working with a real data set about seeds, from UCI repository: https://archive.ics.uci.edu/ml/datasets/seeds.

The examined group comprised kernels belonging to three different varieties of wheat: Kama, Rosa and Canadian, 70 elements each, randomly selected for 
the experiment. High quality visualization of the internal kernel structure was detected using a soft X-ray technique. It is non-destructive and considerably cheaper than other more sophisticated imaging techniques like scanning microscopy or laser technology. The images were recorded on 13x18 cm X-ray KODAK plates. Studies were conducted using combine harvested wheat grain originating from experimental fields, explored at the Institute of Agrophysics of the Polish Academy of Sciences in Lublin. 

The data set can be used for the tasks of classification and cluster analysis.


Attribute Information:

To construct the data, seven geometric parameters of wheat kernels were measured: 
1. area A, 
2. perimeter P, 
3. compactness C = 4*pi*A/P^2, 
4. length of kernel, 
5. width of kernel, 
6. asymmetry coefficient 
7. length of kernel groove. 
All of these parameters were real-valued continuous.

Let's see if we can cluster them in to 3 groups with K-means!

### Importing PySpark

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.1.1-bin-hadoop3.2')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('clustering').getOrCreate()

### Importing Data

In [5]:
df = spark.read.csv('seeds_dataset.csv',inferSchema=True,header=True)

## Exploratory Data Analysis

In [6]:
df.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



### Shape of the dataset


In [10]:
print((df.count(),len(df.columns)))

(210, 7)


In [12]:
 df.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

### Missing Values

In [13]:
from pyspark.sql.functions import isnan, when, count, col

In [15]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+---------+-----------+----------------+---------------+---------------------+----------------+
|area|perimeter|compactness|length_of_kernel|width_of_kernel|asymmetry_coefficient|length_of_groove|
+----+---------+-----------+----------------+---------------+---------------------+----------------+
|   0|        0|          0|               0|              0|                    0|               0|
+----+---------+-----------+----------------+---------------+---------------------+----------------+



In [16]:
df.show(1)

+-----+---------+-----------+----------------+---------------+---------------------+----------------+
| area|perimeter|compactness|length_of_kernel|width_of_kernel|asymmetry_coefficient|length_of_groove|
+-----+---------+-----------+----------------+---------------+---------------------+----------------+
|15.26|    14.84|      0.871|           5.763|          3.312|                2.221|            5.22|
+-----+---------+-----------+----------------+---------------+---------------------+----------------+
only showing top 1 row



## Data Preprocessing

### Transforming data for the model

In [18]:
from pyspark.ml.feature import VectorAssembler

In [19]:
assembler = VectorAssembler(inputCols=df.columns,outputCol='features')

In [20]:
final_data = assembler.transform(df)

In [21]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



### Scaling the data

In [22]:
from pyspark.ml.feature import StandardScaler

In [23]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [24]:
scaler_model = scaler.fit(final_data)

In [25]:
#New final data 
final_data = scaler_model.transform(final_data)

In [27]:
final_data.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove',
 'features',
 'scaledFeatures']

## Model Building

In [28]:
from pyspark.ml.clustering import KMeans

In [29]:
kmeans = KMeans(featuresCol='scaledFeatures',k=3)

In [30]:
#Fitting the model
model = kmeans.fit(final_data)

## Evaluation

In [35]:
print('Cluster Center')
print(model.clusterCenters())
#We get 3 arrays denoting 3 types of clusters that we defined

Cluster Center
[array([ 4.07497225, 10.14410142, 35.89816849, 11.80812742,  7.54416916,
        3.15410901, 10.38031464]), array([ 6.35645488, 12.40730852, 37.41990178, 13.93860446,  9.7892399 ,
        2.41585013, 12.29286107]), array([ 4.96198582, 10.97871333, 37.30930808, 12.44647267,  8.62880781,
        1.80061978, 10.41913733])]


In [39]:
#Clusters Created
model.transform(final_data).select('prediction').show(5)

+----------+
|prediction|
+----------+
|         2|
|         2|
|         2|
|         2|
|         2|
+----------+
only showing top 5 rows

