# Data preparation

In [None]:
! pip install pyspark

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Star classification').getOrCreate()
spark

In [4]:
path = "/content/drive/MyDrive/Big data/star_classification.csv"

data = spark.read.csv(path, header=True, inferSchema=True)
data.show()

data.groupBy("class").count().show()

+--------------------+----------------+------------------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------------------+------+------------+-----+-----+--------+
|              obj_ID|           alpha|             delta|       u|       g|       r|       i|       z|run_ID|rerun_ID|cam_col|field_ID|         spec_obj_ID| class|    redshift|plate|  MJD|fiber_ID|
+--------------------+----------------+------------------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------------------+------+------------+-----+-----+--------+
|1.237660961327743...|  135.6891066036|  32.4946318397087|23.87882| 22.2753|20.39501|19.16573|18.79371|  3606|     301|      2|      79|6.543777369295181...|GALAXY|   0.6347936| 5812|56354|     171|
|1.237664879951151...|144.826100550256|  31.2741848944939|24.77759|22.83188|22.58444|21.16812|21.61427|  4518|     301|      5|     119|1.176014203670733...|GALAXY|    0.779136|10445|58158|     427|
|1.23

In [5]:
type(data)

# Data analysis

# Data balancing

In [14]:
max_count = max(59445, 18961, 21594)

# Define fractions for oversampling
data_fractions = {
    'GALAXY': max_count / 59445,
    'QSO': max_count / 18961,
    'STAR': max_count / 21594
}

total_factor = sum(data_fractions.values())
data_fractions = {key: value / total_factor for key, value in data_fractions.items()}

sampled = data.sampleBy("class", fractions=data_fractions, seed=0)
sampled.groupBy("class").count().orderBy("class").show()

+------+-----+
| class|count|
+------+-----+
|GALAXY| 8566|
|   QSO| 8752|
|  STAR| 8667|
+------+-----+



In [None]:
from imblearn.over_sampling import SMOTE
from collections import Counter

In [None]:
pd_data = data.toPandas()

In [None]:
x = pd_data.drop(['class'], axis = 1)
y = pd_data.loc[:,'class'].values

sm = SMOTE(random_state=42)
print('Original dataset shape %s' % Counter(y))
x, y = sm.fit_resample(x, y)
print('Resampled dataset shape %s' % Counter(y))

Original dataset shape Counter({'GALAXY': 59445, 'STAR': 21594, 'QSO': 18961})
Resampled dataset shape Counter({'GALAXY': 59445, 'QSO': 59445, 'STAR': 59445})


# Data analysis (balanced dataset)


# Data preparation

In [None]:
import pandas as pd

x_pd = pd.DataFrame(x)
y_pd = pd.DataFrame(y)

combined_pd = pd.concat([x_pd, y_pd], axis=1)

combined_pd = combined_pd.rename(columns={'0': 'class'})

from pyspark.sql import Row

combined_df = spark.createDataFrame(combined_pd)

train_data, test_data = combined_df.randomSplit([0.8, 0.2], seed=42)
train_data.show()

from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler

dataset_features = ['obj_ID',
                    'alpha',
                    'delta',
                    'u',
                    'g',
                    'r',
                    'i',
                    'z',
                    'run_ID',
                    'rerun_ID',
                    'cam_col',
                    'field_ID',
                    'spec_obj_ID',
                    'redshift',
                    'plate',
                    'MJD',
                    'fiber_ID']


assembler = VectorAssembler(inputCols=dataset_features,
                            outputCol="features")

train_data_assembled = assembler.transform(train_data)

new_df = train_data_assembled.select(['features','0'])

new_df.show()

new_df.printSchema()

new_df = new_df.withColumnRenamed('0', 'label')

from pyspark.ml.feature import StringIndexer

# Lista delle colonne di etichetta
label_columns = ['STAR', 'GALAXY ', 'QSO']

# Lista delle colonne di etichetta convertite
indexed_label_columns = [1,2,3]

# Lista per memorizzare gli oggetti StringIndexer
indexers = []

# Applica StringIndexer a ciascuna colonna di etichetta
for label_col, indexed_label_col in zip(label_columns, indexed_label_columns):
    indexer = StringIndexer(inputCol=label_col, outputCol=indexed_label_col)
    new_df = indexer.fit(new_df).transform(new_df)
    indexers.append(indexer)

# Training phase

In [None]:
from pyspark.ml.classification import LogisticRegression


lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model

lr.setLabelCol('0')
lrModel = lr.fit(new_df)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))