# End to End PySpark Clustering: Part I Using Colab for PySpark and Collecting Data

A notebook for collecting the data that will drive a planetary clustering project. The data comes from: https://api.le-systeme-solaire.net/en/ 

The notebook will focus on using the requests package to get data from the API and using parameters as well as storing the data in a DataFrame.

In [1]:
import requests

check = requests.get('https://api.le-systeme-solaire.net/rest')
check  # check API is available

<Response [200]>

The full request is much longer than the above, the data need is much more specific than the whole dataset available through the API. Using the API documentation the parameters you can add include a data to exclude option. So with the request we will add a parameters option.

In [2]:
url = 'https://api.le-systeme-solaire.net/rest/bodies'
params = {'exclude' :'mass,vol,moons,discoveredBy,discoveryDate,alternativeName,axialTilt,avgTemp,mainAnomaly,argPeriapsis,longAscNode,rel,aroundPlanet,sideralOrbit,sideralRotation,dimension,flattening,polarRadius'}

all_data = requests.get(url, params).json()
all_data.get('bodies')

[{'aphelion': 405500,
  'bodyType': 'Moon',
  'density': 3.344,
  'eccentricity': 0.0549,
  'englishName': 'Moon',
  'equaRadius': 1738.1,
  'escape': 2380.0,
  'gravity': 1.62,
  'id': 'lune',
  'inclination': 5.145,
  'isPlanet': False,
  'meanRadius': 1737.0,
  'name': 'La Lune',
  'perihelion': 363300,
  'semimajorAxis': 384400},
 {'aphelion': 9518,
  'bodyType': 'Moon',
  'density': 1.9,
  'eccentricity': 0.0151,
  'englishName': 'Phobos',
  'equaRadius': 13.0,
  'escape': 11.39,
  'gravity': 0.0057,
  'id': 'phobos',
  'inclination': 1.075,
  'isPlanet': False,
  'meanRadius': 11.1,
  'name': 'Phobos',
  'perihelion': 9234,
  'semimajorAxis': 9378},
 {'aphelion': 23471,
  'bodyType': 'Moon',
  'density': 1.75,
  'eccentricity': 0.0002,
  'englishName': 'Deimos',
  'equaRadius': 7.8,
  'escape': 5.556,
  'gravity': 0.003,
  'id': 'deimos',
  'inclination': 1.075,
  'isPlanet': False,
  'meanRadius': 6.2,
  'name': 'Deïmos',
  'perihelion': 23456,
  'semimajorAxis': 23459},
 {'aphe

## Setting up a Spark environment in Google Colab

There are parts of this code that will need to be updated over time as Spark and Hadoop versions change the links to their downloads must also change (i.e. 3.2.0 may become 3.2.1).

In [3]:
# download JDK and Spark/Hadoop
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

--2022-01-16 11:34:29--  https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300965906 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.0-bin-hadoop3.2.tgz’


2022-01-16 11:34:40 (26.6 MB/s) - ‘spark-3.2.0-bin-hadoop3.2.tgz’ saved [300965906/300965906]



In [4]:
# unpack Spark and Hadoop
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

Set the home env variables to run spark 

In [5]:
# set home paths 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

Use findspark package to manage and access Spark context.

In [6]:
# install import and initialise findspark
!pip install -q findspark
import findspark
findspark.init()

In [7]:
# build spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [8]:
# get data from all data and dump into JSON file
import json
data = all_data.get('bodies')
data_json = json.dumps(data)

with open('data.json', 'w') as f:
    json.dump(data, f)

In [9]:
# set up schema 
from pyspark.sql.types import StructType, StructField, BooleanType, FloatType, StringType

schema = StructType([
  StructField("id", StringType(), False),
  StructField("englishName", StringType(), True),
  StructField("isPlanet", BooleanType(), True),
  StructField("density", FloatType(), True),
  StructField("gravity", FloatType()),
  StructField("escape", FloatType())
])


In [10]:
# read in data usinf schema and show first five rows 
df = spark.read.json('data.json', schema)
df.show(5)

+------+-----------+--------+-------+-------+------+
|    id|englishName|isPlanet|density|gravity|escape|
+------+-----------+--------+-------+-------+------+
|  lune|       Moon|   false|  3.344|   1.62|2380.0|
|phobos|     Phobos|   false|    1.9| 0.0057| 11.39|
|deimos|     Deimos|   false|   1.75|  0.003| 5.556|
|    io|         Io|   false|   3.53|   1.79|   0.0|
|europe|     Europa|   false|   3.01|   1.31|   0.0|
+------+-----------+--------+-------+-------+------+
only showing top 5 rows



## Part II: Cleaning and Model Building

### Cleaning

The data cleaning step involves checking the data structure and finding null values, further preprocessing is done as part of the model building step.

In [11]:
df.printSchema()  # check schema 

root
 |-- id: string (nullable = true)
 |-- englishName: string (nullable = true)
 |-- isPlanet: boolean (nullable = true)
 |-- density: float (nullable = true)
 |-- gravity: float (nullable = true)
 |-- escape: float (nullable = true)



In [12]:
from pyspark.sql.functions import when, count, isnull

# alias all columns 
cols = df.columns

# show count of when the column is null for each of those columns
df.select([count(when(isnull(c), c)).alias(c) for c in cols]).show()

+---+-----------+--------+-------+-------+------+
| id|englishName|isPlanet|density|gravity|escape|
+---+-----------+--------+-------+-------+------+
|  0|          0|       0|      0|      0|     0|
+---+-----------+--------+-------+-------+------+



In [13]:
from pyspark.sql.functions import isnan

num_cols = ['density', 'gravity', 'escape']

df.select([count(when(isnan(c), c)).alias(c) for c in num_cols]).show()

+-------+-------+------+
|density|gravity|escape|
+-------+-------+------+
|      0|      0|     0|
+-------+-------+------+



Finding outliers

In [34]:
import pandas as pd
import plotly.express as px

# convert the data frame to pandas for visualisation
outliers = df.toPandas()

# plot a scatter plot of all the points to find obvious outliers
scatter = px.scatter(outliers, 'gravity', 'escape', hover_name='englishName', template='ggplot2')
scatter.show()

Jupiter is far and away an outlier which will skew the clustering algorithm and could cause it to be defined as a cluster of it's own even after feature scaling. For this reason it is removed from the dataset.

In [36]:
# filter out Jupiter from PySpark DF as it was a clear outlier
df=df.filter(df.id!='jupiter')

# repeat above process to ensure the removal has worked properly
outliers = df.toPandas()

scatter = px.scatter(outliers, 'gravity', 'escape', hover_name='englishName', template='ggplot2')
scatter.show()

### Model Building 

As part of the model building the pipeline will assemble the vector for each column and scale the column so that they exist in a scaled feature space for assessing similarity.

Finally the KMeans clustering model is built.

In [37]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

# set up each processing step with the correct input columns and output
assemble=VectorAssembler(inputCols=num_cols, outputCol='features')
scale=StandardScaler(inputCol='features',outputCol='standardized')
km = KMeans(featuresCol = 'standardized')

# assemble the pipeline 
pipe = Pipeline(stages=[assemble, scale, km])

A silhouette score is generated for each value of K that is tested. With silhouette scoring the aim is to get the highest score possible this score indicates how dense and well defined clusters are.

In [38]:
from pyspark.ml.evaluation import ClusteringEvaluator

# set up evaluator 
evaluator = ClusteringEvaluator()

# test between k=2 and 10 
for k in range(2,10):
  # set the KMeans stage of the pipe to hold each value of K and the random seed = 1 and fit that pipe to data  
  kmeans = pipe.getStages()[-1].setK(k).setSeed(1)  
  model = pipe.fit(df)
  
  # build a preds dataset of each k value
  preds = model.transform(df)

  # silhouette score each prediction set and print formatted output 
  silhouette = evaluator.evaluate(preds)
  print(f'Tested: {k} clusters: {silhouette}')

Tested: 2 clusters: 0.985906445228239
Tested: 3 clusters: 0.9853183536337629
Tested: 4 clusters: 0.9288686572576516
Tested: 5 clusters: 0.9063524257467865
Tested: 6 clusters: 0.8596777095050427
Tested: 7 clusters: 0.8360492437185751
Tested: 8 clusters: -0.7386038523165039
Tested: 9 clusters: 0.8514223592590151


The highest scoring value for K was 2. However there is a tradeoff to be considered here as 2 clusters would not tell us much about the underlying structure of the data, alternatively the silhouette score at k=3 is also very high. Three clusters should separate our data in a more meaningful way.

In [43]:
pipe.getStages()[-1].setK(3).setSeed(1)  # set the random seed for the algorithm and the value for k

# fit model and transform the data
model = pipe.fit(df)
clusters = model.transform(df)
clusters.show()

+----------+-----------+--------+-------+-------+------+--------------------+--------------------+----------+
|        id|englishName|isPlanet|density|gravity|escape|            features|        standardized|prediction|
+----------+-----------+--------+-------+-------+------+--------------------+--------------------+----------+
|      lune|       Moon|   false|  3.344|   1.62|2380.0|[3.34400010108947...|[5.04482713061263...|         0|
|    phobos|     Phobos|   false|    1.9| 0.0057| 11.39|[1.89999997615814...|[2.86637892886519...|         0|
|    deimos|     Deimos|   false|   1.75|  0.003| 5.556|[1.75,0.003000000...|[2.64008588866244...|         0|
|        io|         Io|   false|   3.53|   1.79|   0.0|[3.52999997138977...|[5.32543034939712...|         0|
|    europe|     Europa|   false|   3.01|   1.31|   0.0|[3.00999999046325...|[4.54094771411208...|         0|
|  ganymede|   Ganymede|   false|   1.94|  1.428|   0.0|[1.94000005722045...|[2.92672387146975...|         0|
|  callist

In [45]:
vis_df = clusters.toPandas()

# build figure with 3D numeric dimensions and categorical isPlanet and prediction dimensions
fig = px.scatter_3d(vis_df, x='gravity', y='escape', z='density', color='prediction', symbol='isPlanet', template='ggplot2', hover_name='englishName')
fig.show()

The final clusters are shown in the 3D chart above. This chart indicates how well the model has performed. Layered on top are a few extra dimensions with the diamond shape indicating whether or not the body is a planet, the color indicating the cluster and the three spatial dimensions showing the density, escape velocity and gravity.

The model has accurately clustered to find the large planets, small planets and other bodies. Further work could be done to extract those other bodies into their own classes but for now this model works as a perfect example of unsupervised machine learning for finding patterns in the data.

As we find more planets beyond our solar system and take their measurements models like these could help to classify them against what we already know.