In [None]:
!pip install pyspark
!ls

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
drive  sample_data


In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import countDistinct, isnan, when
from pyspark.sql.functions import col, count
from pyspark.sql.functions import hour, month, year, to_timestamp
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import avg, max, min


from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor, LinearRegression 
from pyspark.ml.evaluation import RegressionEvaluator


import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)

In [None]:
#create a Spark Session with Google Cloud connector
spark = SparkSession \
    .builder \
    .appName("GCS_NASA") \
    .config("spark.jars", "drive/Shareddrives/Big_Data_and_BI/connector/gcs-connector-hadoop3-latest.jar") \
    .getOrCreate()


# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Google Cloud Authentication
conf.set("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE")
conf.set("fs.gs.auth.service.account.json.keyfile","drive/Shareddrives/Big_Data_and_BI/key/secrets.json")

for item in SparkConf().getAll(): print(item)
print(conf)
df=spark.read.csv("drive/Shareddrives/Big_Data_and_BI/key/secrets.json", sep=',', inferSchema=True, header=True)



('spark.app.name', 'GCS_NASA')
('spark.master', 'local[*]')
('spark.submit.pyFiles', '')
('spark.repl.local.jars', 'file:///content/drive/Shareddrives/Big_Data_and_BI/connector/gcs-connector-hadoop3-latest.jar')
('spark.submit.deployMode', 'client')
('spark.jars', 'file:///content/drive/Shareddrives/Big_Data_and_BI/connector/gcs-connector-hadoop3-latest.jar')
('spark.ui.showConsoleProgress', 'true')
('spark.app.submitTime', '1670397717009')
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-rbf-default.xml, hdfs-site.xml, hdfs-rbf-site.xml, __spark_hadoop_conf__.xml


# Meta-data

In [None]:
bucket_name="nasa_bucket"
path=f"gs://{bucket_name}/asteroid/Asteroid_Updated.csv"
#path= "drive/Shareddrives/Big_Data_and_BI/data/NASA/Asteroid_Updated.csv"

df=spark.read.csv(path, sep=',', inferSchema=True, header=True)

In [None]:
print(f"Total records: {df.count()}")
df.show(20)

Total records: 1260411
+--------------------+------+-----+-----+-----+------+------+------+----+------+------+-----+-----+-----+--------+----------+--------------+---+---+----+----+--------+--------------------+------+----------+---------+-----+-----+----+------+------+
|           full_name|     e|    a|    q|    i|    om|     w|    ma|  ad|     n|   per|per_y| moid|class|data_arc|n_obs_used|condition_code|neo|pha|   H|   G|diameter|              extent|albedo|   rot_per|       GM|   BV|   UB|  IR|spec_B|spec_T|
+--------------------+------+-----+-----+-----+------+------+------+----+------+------+-----+-----+-----+--------+----------+--------------+---+---+----+----+--------+--------------------+------+----------+---------+-----+-----+----+------+------+
|     1 Ceres (A80...|0.0788|2.767|2.549|10.59| 80.26| 73.47| 17.22|2.99|0.2141|1680.0|  4.6| 1.58|  MBA|    9520|      1075|             0|  N|  N|3.33|0.12|   939.4|964.4 x 964.2 x 8...|  0.09|   9.07417|  62.6284|0.713|0.426|null|

In [None]:
print (f"{len(df.columns)} fields") 
df.printSchema()

31 fields
root
 |-- full_name: string (nullable = true)
 |-- e: double (nullable = true)
 |-- a: double (nullable = true)
 |-- q: double (nullable = true)
 |-- i: double (nullable = true)
 |-- om: double (nullable = true)
 |-- w: double (nullable = true)
 |-- ma: double (nullable = true)
 |-- ad: double (nullable = true)
 |-- n: double (nullable = true)
 |-- per: double (nullable = true)
 |-- per_y: double (nullable = true)
 |-- moid: double (nullable = true)
 |-- class: string (nullable = true)
 |-- data_arc: integer (nullable = true)
 |-- n_obs_used: integer (nullable = true)
 |-- condition_code: string (nullable = true)
 |-- neo: string (nullable = true)
 |-- pha: string (nullable = true)
 |-- H: double (nullable = true)
 |-- G: double (nullable = true)
 |-- diameter: double (nullable = true)
 |-- extent: string (nullable = true)
 |-- albedo: double (nullable = true)
 |-- rot_per: double (nullable = true)
 |-- GM: double (nullable = true)
 |-- BV: double (nullable = true)
 |-- UB: d

In [None]:
#user_order.select([count(when((isnan(c) | col(c).isNull()), c)).alias(c) for c in user_order.columns]).show()
#[count(when(isnan(c) | col(c).isNull(), c)) for c in user_order.columns]

# Data Processing

In [None]:
astro_df = df.dropna(subset=['diameter'])
drop_list = ["extent", 
           "rot_per", 
           "GM", "BV", "UB", "IR", 
           "spec_B",
           "spec_T", 
           "G", 'data_arc', 'H', 'albedo']
astro_df = astro_df.drop(*drop_list)

In [None]:
astro_df = astro_df.withColumn('neo', regexp_replace('neo', 'Y', 'True'))
astro_df = astro_df.withColumn('neo', regexp_replace('neo', 'N', 'False'))
astro_df = astro_df.withColumn('pha', regexp_replace('pha', 'Y', 'True'))
astro_df = astro_df.withColumn('pha', regexp_replace('pha', 'N', 'False'))
astro_df = astro_df.withColumn('n_obs_used', astro_df['n_obs_used'].cast('double'))
astro_df = astro_df.withColumn('diameter', astro_df['diameter'].cast('double'))

for column in ['neo', 'pha']:
    astro_df = astro_df.withColumn(column, astro_df[column].cast('boolean').cast('int'))
    
astro_df = astro_df.dropna(subset=['diameter'])


In [None]:
astro_df.show(5)

+--------------------+------+-----+-----+-----+------+------+------+----+------+------+-----+----+-----+----------+--------------+---+---+--------+
|           full_name|     e|    a|    q|    i|    om|     w|    ma|  ad|     n|   per|per_y|moid|class|n_obs_used|condition_code|neo|pha|diameter|
+--------------------+------+-----+-----+-----+------+------+------+----+------+------+-----+----+-----+----------+--------------+---+---+--------+
|     1 Ceres (A80...|0.0788|2.767|2.549|10.59| 80.26| 73.47| 17.22|2.99|0.2141|1680.0|  4.6|1.58|  MBA|    1075.0|             0|  0|  0|   939.4|
|     2 Pallas (A8...|0.2301| 2.77|2.132|34.93|172.92|310.86|357.85|3.41|0.2138|1680.0| 4.61|1.23|  MBA|    9046.0|             0|  0|  0|   513.0|
|     3 Juno (A804...|0.2565| 2.67|1.985|12.99|169.84|247.74|351.82|3.35|0.2259|1590.0| 4.36|1.04|  MBA|    7410.0|             0|  0|  0| 246.596|
|     4 Vesta (A80...|0.0888|2.363|2.153| 7.14|103.76| 151.6|115.13|2.57|0.2713|1330.0| 3.63|1.14|  MBA|    9451

In [None]:
condition_code_indexer = StringIndexer(inputCol="condition_code", outputCol="condition_codeIndex").setHandleInvalid("skip")
class_indexer = StringIndexer(inputCol="class", outputCol="classIndex").setHandleInvalid("skip")
onehotencoder_condition_code_vector = OneHotEncoder(inputCol="condition_codeIndex", outputCol="condition_code_vec")
onehotencoder_class_vector = OneHotEncoder(inputCol="classIndex", outputCol="class_vec")


encoding_pipeline = Pipeline(stages=[condition_code_indexer,
                            class_indexer,
                            onehotencoder_condition_code_vector,
                            onehotencoder_class_vector
                    ])

astro_df = encoding_pipeline.fit(astro_df).transform(astro_df)

In [None]:
astro_df = astro_df.drop('condition_code', 'class', 'condition_codeIndex', 'classIndex')

astro_df = astro_df.select('full_name', "a","e","i",'om','w','q','ad', 'per_y', 'n_obs_used', 'neo', 'pha', 'moid', 'n', 'per', 'ma', 'condition_code_vec', 'class_vec', 'diameter')

features = astro_df.schema.names[1:-1]

## Split our data into training, validation, and testing sets

In [None]:
train, val, test = astro_df.randomSplit([0.6, 0.2, 0.2], seed=42)

train_df = train.drop('name')
val_df = val.drop('name')
test_df = test.drop('name')


## Pack all of the feature columns into a single vector 

In [None]:
assembler = VectorAssembler(inputCols=features, outputCol='features').setHandleInvalid("skip")

test_pack = assembler.transform(test_df)
train_pack = assembler.transform(train_df)
val_pack = assembler.transform(val_df)

for field in features:
    test_pack = test_pack.drop(field)
    train_pack = train_pack.drop(field)
    val_pack = val_pack.drop(field)


    

## Scale Features

In [None]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(train_pack)

train_pack = scaler_model.transform(train_pack)
val_pack = scaler_model.transform(val_pack)
test_pack = scaler_model.transform(test_pack)

# Spark ML

Now that we’re all ready for modelling, let’s instantiate, fit, and make predictions with a few different regression methods available in SparkML:

In [None]:
gbt = GBTRegressor(featuresCol='features', labelCol='diameter', maxIter=100, maxDepth=5, seed=42, lossType='squared', stepSize=.1)
gbt_model = gbt.fit(train_pack)
gbt_pred = gbt_model.transform(val_pack)

In [None]:
rf = RandomForestRegressor(featuresCol='features', labelCol='diameter', maxDepth=5, seed=42, bootstrap=True, numTrees=100)
rf_model = rf.fit(train_pack)
rf_pred = rf_model.transform(val_pack)

In [None]:
lr = LinearRegression(featuresCol='features', labelCol='diameter', maxIter=100, loss='squaredError', elasticNetParam=0.5, regParam=0.1, fitIntercept=True, standardization=True, solver='auto', tol=.1)
lr_model = lr.fit(train_pack)
lr_pred = lr_model.transform(val_pack)

Now let’s evaluate the performance of these three models:

In [None]:
rmse = RegressionEvaluator(
    labelCol="diameter", predictionCol="prediction", metricName="rmse")

r2 = RegressionEvaluator(
    labelCol="diameter", predictionCol="prediction", metricName="r2")
    
metrics = [rmse, r2]
metric_labels = ['rmse', 'r2']

predictions = [lr_pred, rf_pred, gbt_pred]
predict_labels = ['LR', 'RF', 'GBT']

eval_list = list()

for pred in zip(predict_labels, predictions):
    name = pred[0]
    predict = pred[1]
    
    metric_vals = pd.Series(dict([(x[0], x[1].evaluate(predict)) 
                                 for x in zip(metric_labels, metrics)]),
                            name=name)
    eval_list.append(metric_vals)
    
eval_df = pd.concat(eval_list, axis=1).T
eval_df = eval_df[metric_labels]
eval_df

Unnamed: 0,rmse,r2
LR,13.708287,-1.376717
RF,6.683033,0.435117
GBT,6.590959,0.450575


Based on the results, Gradient-Boosted Regression performed best with R2 score of .45, and a Root Mean Squared Error of ~6.6 km, so let’s try to optimize the model with GridSearch and cross-validation:

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# the GradientBoostedTree model
gbt2 = GBTRegressor(featuresCol='features',
                     labelCol='diameter', 
                     predictionCol='prediction',
                     seed=13,
                     lossType='squared',
                     maxIter=100,
                     )
 
pipeline = Pipeline(stages=[gbt2])

paramgrid = (ParamGridBuilder().addGrid(gbt2.maxDepth, [2,4,6])
                               .addGrid(gbt2.stepSize, [0.0001, 0.001, 0.01, 0.1, 1.0]).build())

 
evaluator = RegressionEvaluator(labelCol='diameter', 
                                              predictionCol='prediction', 
                                              metricName='r2')
                                              
crossval = CrossValidator(estimator=pipeline, 
                          estimatorParamMaps=paramgrid, 
                          evaluator=evaluator, 
                          numFolds=3) 

gbt2_model = crossval.fit(train_pack) 

Now we need to extract our best model from the cross-validator. We can also see the feature importance of our model with the code below:

In [None]:
best_pipeline = gbt2_model.bestModel
best_gbt_model = best_pipeline.stages[0]

feature_importances = best_gbt_model.featureImportances.toArray()


feature_names = astro_df.columns[:-1]

feature_series = (pd.Series(dict(zip(features, feature_importances)))
                  .sort_values(ascending=True))

feature_series

class_vec             0.000000
condition_code_vec    0.000000
pha                   0.000085
w                     0.001119
neo                   0.001324
e                     0.002504
om                    0.005929
ma                    0.005932
moid                  0.007812
q                     0.030492
n                     0.031186
ad                    0.051679
per                   0.058555
per_y                 0.069070
a                     0.074710
i                     0.089795
n_obs_used            0.481063
dtype: float64

Finally, let’s make predictions on our test set using the optimized GBT model and see how it compares with our initial model results:

In [None]:
gbt_pred_test = best_pipeline.transform(test_pack)

predictions = [lr_pred, rf_pred, gbt_pred, gbt_pred_test]
predict_labels = ['LR', 'RF', 'GBT', 'GBT_GridSearch']

eval_list = []
for pred in zip(predict_labels, predictions):
    name = pred[0]
    predict = pred[1]
    
    metric_vals = pd.Series(dict([(x[0], x[1].evaluate(predict)) 
                                 for x in zip(metric_labels, metrics)]),
                            name=name)
    eval_list.append(metric_vals)
    

eval_df = pd.concat(eval_list, axis=1).T
eval_df = eval_df[metric_labels]
eval_df

Unnamed: 0,rmse,r2
LR,13.708287,-1.376717
RF,6.683033,0.435117
GBT,6.590959,0.450575
GBT_GridSearch,5.170815,0.48298
