<a href="https://colab.research.google.com/github/prithvikavoori/PySparkMllib/blob/main/KmeansClustering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The examined group comprised kernels belonging to three different varieties of wheat: Kama, Rosa and Canadian, 70 elements each, randomly selected for the experiment. High quality visualization of the internal kernel structure was detected using a soft X-ray technique. It is non-destructive and considerably cheaper than other more sophisticated imaging techniques like scanning microscopy or laser technology. The images were recorded on 13x18 cm X-ray KODAK plates. Studies were conducted using combine harvested wheat grain originating from experimental fields, explored at the Institute of Agrophysics of the Polish Academy of Sciences in Lublin.

the data set is from  UCI repository: https://archive.ics.uci.edu/ml/datasets/seeds.

In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [2]:
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [9]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [10]:

# install findspark using pip
!pip install -q findspark


In [11]:

#initialize spark 
import findspark
findspark.init()


In [8]:

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('clusteringPyspark').getOrCreate()

In [13]:
from pyspark.ml.clustering import KMeans

# Loads data.
dataset = spark.read.csv("/content/seeds_dataset.csv",header=True,inferSchema=True)

In [14]:
dataset.head()

Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)

In [15]:
dataset.describe().show()

+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|summary|              area|         perimeter|         compactness|   length_of_kernel|   width_of_kernel|asymmetry_coefficient|   length_of_groove|
+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|  count|               210|               210|                 210|                210|               210|                  210|                210|
|   mean|14.847523809523816|14.559285714285718|  0.8709985714285714|  5.628533333333335| 3.258604761904762|   3.7001999999999997|  5.408071428571429|
| stddev|2.9096994306873647|1.3059587265640225|0.023629416583846364|0.44306347772644983|0.3777144449065867|   1.5035589702547392|0.49148049910240543|
|    min|             10.59|             12.41|              0.8081|              4.899|            

In [16]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [17]:
dataset.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [18]:
vec_assembler = VectorAssembler(inputCols = dataset.columns, outputCol='features')

In [19]:
final_data = vec_assembler.transform(dataset)

In [20]:
#scale the data
# it is good idea scale our data to deal with the curse of dimensionality
from pyspark.ml.feature import StandardScaler


In [21]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [22]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

In [23]:
# Normalize each feature to have unit standard deviation.
final_data = scalerModel.transform(final_data)

In [24]:
# Trains a k-means model.
kmeans = KMeans(featuresCol='scaledFeatures',k=3)
model = kmeans.fit(final_data)

In [33]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 6.35645488 12.40730852 37.41990178 13.93860446  9.7892399   2.41585013
 12.29286107]
[ 4.07497225 10.14410142 35.89816849 11.80812742  7.54416916  3.15410901
 10.38031464]
[ 4.96198582 10.97871333 37.30930808 12.44647267  8.62880781  1.80061978
 10.41913733]


In [34]:
model.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         0|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         1|
+----------+
only showing top 20 rows

