# Classifying stars and galaxies using machine learning

Authored by Maksim Nikiforov

NCSU ST590, Project 3

Spring, 2022

## Data ingestion

In [45]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark.pandas as ps
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


In [30]:
# Read CSV into a Spark data frame
sdss_data = spark.read.options(header="True", 
                               inferSchema='True',
                               delimiter=',').csv("sdss_test_mvnikifo.csv")
sdss_data.printSchema()

root
 |-- objID: long (nullable = true)
 |-- ra: double (nullable = true)
 |-- dec: double (nullable = true)
 |-- specObjID: long (nullable = true)
 |-- psfMag_r: double (nullable = true)
 |-- modelMag_r: double (nullable = true)
 |-- petroMag_r: double (nullable = true)
 |-- fiberMag_r: double (nullable = true)
 |-- petroRad_r: double (nullable = true)
 |-- petroR50_r: double (nullable = true)
 |-- petroR90_r: double (nullable = true)
 |-- lnLStar_r: double (nullable = true)
 |-- lnLExp_r: double (nullable = true)
 |-- lnLDeV_r: double (nullable = true)
 |-- mE1_r: double (nullable = true)
 |-- mE2_r: double (nullable = true)
 |-- mRrCc_r: double (nullable = true)
 |-- type_r: integer (nullable = true)
 |-- type: integer (nullable = true)
 |-- specClass: integer (nullable = true)



In [31]:
sdss_data.count()

100000

## Exploratory data analysis

There are missing values in this data, denoted by $0$ and $-9999$. These can be indicated more clearly with the designation "None". The number of missing values can be ascertained by converting the Spark DataFrame to a pandas-on-spark DataFrame and invoking the `.isnull().sum()` sequence of functions. 

In [32]:
sdss_data = sdss_data.replace(-9999, None)
sdss_data = sdss_data.replace(0, None)

There are nearly 12,000 rows with missing data. These can be removed to prepare the data for machine learning algorithms, leaving a total of 1,019,910 rows.

In [33]:
# Count total number of missing values
# Based on example from https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/
from pyspark.sql.functions import col, isnan, when, count

sdss_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sdss_data.columns]).show()

+-----+---+---+---------+--------+----------+----------+----------+----------+----------+----------+---------+--------+--------+-----+-----+-------+------+----+---------+
|objID| ra|dec|specObjID|psfMag_r|modelMag_r|petroMag_r|fiberMag_r|petroRad_r|petroR50_r|petroR90_r|lnLStar_r|lnLExp_r|lnLDeV_r|mE1_r|mE2_r|mRrCc_r|type_r|type|specClass|
+-----+---+---+---------+--------+----------+----------+----------+----------+----------+----------+---------+--------+--------+-----+-----+-------+------+----+---------+
|    0|  0|  0|        0|       0|         0|         0|         1|         0|         1|         1|       11|      18|      17|   29|   29|     29|     0|   0|     1180|
+-----+---+---+---------+--------+----------+----------+----------+----------+----------+----------+---------+--------+--------+-----+-----+-------+------+----+---------+



In [34]:
# Remove rows with missing values and calculate new row count
sdss_data = sdss_data.dropna()
sdss_data.count()

98781

The data also contains a `specClass` column with values that correspond to the following classifications (https://skyserver.sdss.org/dr7/en/help/browser/enum.asp?n=SpecClass): 

| Name      | Value | Description                                                                                      |
|-----------|-------|--------------------------------------------------------------------------------------------------|
| UNKNOWN   |   0   | Spectrum not classifiable (zConf < 0.25).                                                        |
| STAR      |   1   | Spectrum of a star.                                                                              |
| GALAXY    |   2   | Spectrum of a galaxy.                                                                            |
| QSO       |   3   | Spectrum of a quasi-stellar object.                                                              |
| HIZ_QSO   |   4   | Spectrum of a high-redshift quasar (z>2.3), whose redshift is confirmed by a Ly-alpha estimator. |
| SKY       |   5   | Spectrum of blank sky.                                                                           |
| STAR_LATE |   6   | Star dominated bt molecular bands M or later.                                                    |
| GAL_EM    |   7   | Emission line galaxy (placeholder).                                                              |

The intent of this project is to classify only stars and galaxies, and all other observations should be removed.

In [35]:
sdss_data = sdss_data.filter((sdss_data.specClass == 1) | \
                              (sdss_data.specClass == 2))

At this point, the data contains observations for 81,633 stars and 802,474 galaxies.

In [36]:
sdss_data.groupBy("type").count().show()

+----+-----+
|type|count|
+----+-----+
|   6| 8589|
|   3|77299|
+----+-----+



The data set should be split into a training and a testing test before applying transformations.

In [46]:
train, test = sdss_data.randomSplit([0.8,0.2])
print(train.count(), test.count())

                                                                                

68513 17375


## Machine learning models

### Random forest classifier

#### Set up transformations

In [38]:
from pyspark.ml.feature import SQLTransformer

sqlTrans = SQLTransformer(
    statement = "SELECT psfMag_r, modelMag_r, petroMag_r, fiberMag_r, \
                        petroRad_r, petroR50_r, petroR90_r, lnLStar_r, \
                        lnLExp_r, lnLDeV_r, mE1_r, mE2_r, mRrCc_r, \
                        type as label FROM __THIS__"
)

In [39]:
# Create our list of features by dropping unused columns
features_list = sqlTrans.transform(train).drop("label").columns
features_list

['psfMag_r',
 'modelMag_r',
 'petroMag_r',
 'fiberMag_r',
 'petroRad_r',
 'petroR50_r',
 'petroR90_r',
 'lnLStar_r',
 'lnLExp_r',
 'lnLDeV_r',
 'mE1_r',
 'mE2_r',
 'mRrCc_r']

We can set up transformations for our data.

In [40]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(inputCols=features_list, outputCol="features")

#scaler = StandardScaler(inputCol="unscaledFeatures", outputCol="features")

We can select an algorithm for our data and instantiate it.

In [41]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(labelCol="label", featuresCol="features")

Finally, we can set up a pipeline.

In [42]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [sqlTrans, assembler, rfc])

We can then set up cross-validation.

In [48]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator


rfcParamGrid = (ParamGridBuilder()
             .addGrid(rfc.maxDepth, [0, 10, 20, 30])
             .addGrid(rfc.numTrees, [5, 10, 20, 30, 40])
             .build())

crossVal = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = rfcParamGrid,
                          evaluator = MulticlassClassificationEvaluator(),
                          numFolds = 3)

In [49]:
# Run cross-validation, and choose the best set of parameters

cvModel = crossVal.fit(train)

                                                                                

In [52]:
rfcPredictions = cvModel.transform(test)

In [59]:
print('Accuracy:', MulticlassClassificationEvaluator().evaluate(rfcPredictions))

Accuracy: 0.9960181108638269


In [60]:
from sklearn.metrics import confusion_matrix

y_pred=rfcPredictions.select("prediction").collect()
y_orig=rfcPredictions.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

                                                                                

Confusion Matrix:
[[15603    24]
 [   45  1703]]


In [24]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
grid_lr = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator_lr = MulticlassClassificationEvaluator()

In [26]:
pipeline_lr = Pipeline(stages = [sqlTrans, assembler, lr])

In [27]:
cv = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=grid_lr, evaluator=evaluator_lr,
    parallelism=2)
cvModel_lr = cv.fit(train)

22/04/25 22:02:24 WARN CacheManager: Asked to cache already cached data.
22/04/25 22:02:24 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [28]:
test_error = MulticlassClassificationEvaluator().evaluate(cvModel_lr.transform(test))
print(test_error)

[Stage 461:>                                                        (0 + 8) / 8]

0.940197424404984


                                                                                

In [49]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)

Prediction Accuracy:  0.9951260086324447


In [60]:
faint_mag = pred.filter((pred.modelMag_r >= 20.5) & (pred.modelMag_r <= 21))

In [61]:
acc_faint = evaluator.evaluate(faint_mag)
print("Prediction Accuracy: ", acc_faint)

Prediction Accuracy:  0.9223043524303716


In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
evaluator = BinaryClassificationEvaluator(
    labelCol="label",                     
    rawPredictionCol="prediction",       
    metricName="areaUnderROC",
)
 
accuracy = evaluator.evaluate(pred)
print(f"Area under ROC = {accuracy} ")

NameError: name 'pred' is not defined

## Decision tree

In [61]:
from pyspark.ml.classification import DecisionTreeClassifier

dtClassifier = DecisionTreeClassifier()

dtPipeline = Pipeline().setStages([sqlTrans, assembler, dtClassifier])

In [62]:
dtParamGrid = (ParamGridBuilder()
             .addGrid(dtClassifier.maxDepth, [0, 5, 10, 20]) \
             .build())

dtCrossVal = CrossValidator(estimator = dtPipeline,
                          estimatorParamMaps = dtParamGrid,
                          evaluator = MulticlassClassificationEvaluator(),
                          numFolds = 3)

In [63]:
# Run cross-validation, and choose the best set of parameters

dtCVModel = dtCrossVal.fit(train)

                                                                                

In [64]:
dtPredictions = dtCVModel.transform(test)