In [30]:
import pandas as pd
import numpy as np
import mysql.connector
import shutil
import os

from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from configparser import ConfigParser

RANDOM_STATE=123

FEATURES = ['est_diameter_min', 'est_diameter_max', 'relative_velocity',
            'miss_distance', 'absolute_magnitude']
LABEL = 'hazardous'
COLUMNS =  ['id', 'name'] + FEATURES + [LABEL]

MYSQL_DB = 'NeoDB'
MYSQL_DS = 'Neo'
MYSQL_TRAIN_DS = 'NeoTrain'
MYSQL_TEST_DS = 'NeoTest'
MYSQL_CATBOOST_RESULT = 'NeoCatBoostResult'
MYSQL_GBT_RESULT = 'NeoGBTResult'

parser = ConfigParser()
parser.read('pass.cfg')
mysql_login = 'root'
mysql_pass = parser.get('mysql', 'password')

Helpful functions

In [28]:
def df2sqlstring(df):
    '''Concatenates dataframe rows into a single-string, compatible to SQL-syntax'''
    tuples = list(df.itertuples(index=False, name=None))
    return ','.join(['(' + ','.join([str(w) for w in t]) + ')' for t in tuples])

def mysql2pandas_df(db, table, mysql_user, mysql_pass, columns):
    '''Loads MySQL table to pandas dataframe'''
    db_connection = mysql.connector.connect(user=mysql_login, password=mysql_pass)
    db_cursor = db_connection.cursor()
    db_cursor.execute(f'USE {db}')
    db_cursor.execute(f'SELECT {",".join(columns)} FROM {table}')
    df = pd.DataFrame(db_cursor.fetchall())
    df.columns = columns
    db_cursor.close()
    db_connection.close()
    return df    
    
def pandas_df2mysql(df, db, table, mysql_user, mysql_pass, col_types):
    '''Loads pandas dataframe to mysql table'''
    columns = ', '.join(df.columns)
    columns_and_types = ', '.join([f'{df.columns[i]} {col_types[i]}' for i in range(len(df.columns))])
    db_connection = mysql.connector.connect(user=mysql_login, password=mysql_pass)
    db_cursor = db_connection.cursor()
    db_cursor.execute(f'USE {db}')
    db_cursor.execute(f'DROP TABLE IF EXISTS {table}')
    db_cursor.execute(f'CREATE TABLE {table}({columns_and_types});');
    db_cursor.execute(f'INSERT INTO {table} ({columns}) VALUES ' + df2sqlstring(df) + ';')
    db_cursor.execute('FLUSH TABLES;')
    db_cursor.close()
    db_connection.close()

def mysql2pyspark_df(builder, db, table, mysql_user, mysql_pass):
    '''Loads MySQL table to pyspark dataframe'''
    return builder.read.format('jdbc').option('url', f'jdbc:mysql://localhost:3306/{db}') \
    .option('driver', 'com.mysql.cj.jdbc.Driver') \
    .option('dbtable', table) \
    .option('user', mysql_user).option('password', mysql_pass).load()

def pyspark_df2mysql(df, db, table, mysql_user, mysql_pass):
    '''Loads pyspark dataframe to mysql table'''
    return df.write.format('jdbc').option('url', f'jdbc:mysql://localhost:3306/{db}') \
    .option('driver', 'com.mysql.cj.jdbc.Driver') \
    .option('dbtable', table) \
    .option('user', mysql_user).option('password', mysql_pass).save()

#### Split the dataset into train/test to fit the CatBoost and GBTClassifier models in the same conditions
- downsample negative examples
- stratify
- ignore sentry_object feature, because it always equals 0
- ignore orbiting_body feature, because it always equals 'Earth'

In [29]:
# Load the dataset and split to train/test
df = mysql2pandas_df(MYSQL_DB, MYSQL_DS, mysql_login, mysql_pass, COLUMNS)

# Balance positive and negative examples
df1 = df[df.hazardous == 1]
df2 = df[df.hazardous == 0].sample(frac=1).head(df1.shape[0])
df = pd.concat([df1, df2], axis=0).sample(frac=1)

X_train, X_test, _, _ = train_test_split(df[COLUMNS], df[LABEL], test_size=0.1,
                                         stratify=df[LABEL], random_state=RANDOM_STATE)

n_train, n_train_pos  = X_train.shape[0], sum(X_train[LABEL])
print(f'Train set: {n_train}, positive: {n_train_pos}/{np.round(n_train_pos/n_train*100, 3)}%')
n_test, n_test_pos  = X_test.shape[0], sum(X_test[LABEL])
print(f'Test set: {n_test}, positive: {n_test_pos}/{np.round(n_test_pos/n_test*100, 3)}%')

# Load train/test datasets back to MySQL
column_types = ['INT', 'VARCHAR(1000)'] + ['FLOAT'] * 5 + ['BOOLEAN']
X_train.name = '"' + X_train.name + '"'
pandas_df2mysql(X_train, MYSQL_DB, MYSQL_TRAIN_DS, mysql_login, mysql_pass, column_types)
X_test.name = '"' + X_test.name + '"'
pandas_df2mysql(X_test, MYSQL_DB, MYSQL_TEST_DS, mysql_login, mysql_pass, column_types)

Train set: 15912, positive: 7956/50.0%
Test set: 1768, positive: 884/50.0%


#### Train CatBoost

In [4]:
from catboost import CatBoostClassifier
from sklearn.model_selection import GridSearchCV, cross_validate
from sklearn.metrics import precision_score, recall_score, f1_score, fbeta_score
from sklearn.metrics import make_scorer, confusion_matrix

Load train/test data from MySQL

In [5]:
X_train = mysql2pandas_df(MYSQL_DB, MYSQL_TRAIN_DS, mysql_login, mysql_pass, COLUMNS)
y_train = X_train[LABEL]
X_test = mysql2pandas_df(MYSQL_DB, MYSQL_TEST_DS, mysql_login, mysql_pass, COLUMNS)
y_test = X_test[LABEL]

Optimize CatBoost hyperparameters: depth, l2_leaf_reg (5 folds)

The evaluation metric is F-beta (beta=2, recall is more important)

In [6]:
catboost_params = {
    'iterations': [1000],
    'verbose': [False],
    'depth': [2, 3, 4, 5],
    'l2_leaf_reg': [3, 4, 5],
    'random_seed': [RANDOM_STATE]
}

catboost_clf = CatBoostClassifier()
fbeta_scorer = make_scorer(fbeta_score, beta=2)
grid_catboost = GridSearchCV(estimator=catboost_clf, param_grid=catboost_params, cv=5,
                             scoring=fbeta_scorer, n_jobs=-1)
grid_catboost.fit(X_train[FEATURES], y_train)

best_catboost_params = grid_catboost.best_params_
print(f'CatBoost best score: {grid_catboost.best_score_}')
print(f'CatBoost best parameters: {best_catboost_params}')

CatBoost best score: 0.9433070467418461
CatBoost best parameters: {'depth': 2, 'iterations': 1000, 'l2_leaf_reg': 5, 'random_seed': 123, 'verbose': False}


Calculate precision, recall, f1, f-beta to understand results better

In [12]:
scoring = {
    'precision': make_scorer(precision_score),
    'recall': make_scorer(recall_score),
    'f1': make_scorer(f1_score),
    'f-beta': make_scorer(fbeta_score, beta=2)
}

# Calculate precision, recall, f1 and f-beta on cross-validation
catboost_clf = CatBoostClassifier(depth=2, iterations=1000, l2_leaf_reg=5,
                                  verbose=False, random_seed=RANDOM_STATE)
scores = cross_validate(catboost_clf, X_train[FEATURES], y_train,
                        cv=5, n_jobs=-1, scoring=scoring)
print('CV-metrics')
for m in scoring.keys():
    test_avg_score = np.round(np.mean(scores['test_' + m][0]), 4)
    print(f'[{m}] test avg score: {test_avg_score}')

CV-metrics
[precision] test avg score: 0.8045
[recall] test avg score: 0.983
[f1] test avg score: 0.8849
[f-beta] test avg score: 0.9413


Calculate prediction for the test dataset

In [18]:
best_catboost_model = CatBoostClassifier(depth=2, iterations=1000, l2_leaf_reg=5, verbose=False,
                                         random_seed=RANDOM_STATE)
best_catboost_model.fit(X_train[FEATURES], y_train)
y_pred_catboost = best_catboost_model.predict(X_test[FEATURES])

tn, fp, fn, tp = confusion_matrix(y_test, y_pred_catboost).ravel()
print(f'Hazardous asteroids: {np.sum(y_test)}')
print(f'Found TP hazardous asteroids: {tp}')
print(f'Found FP hazardous asteroids: {fp}')
print(f'Precision: {np.round(precision_score(y_test, y_pred_catboost), 4)}')
print(f'Recall: {np.round(recall_score(y_test, y_pred_catboost), 4)}')
print(f'f1: {np.round(f1_score(y_test, y_pred_catboost), 4)}')
print(f'f-beta: {np.round(fbeta_score(y_test, y_pred_catboost, beta=2), 4)}')

Hazardous asteroids: 884
Found TP hazardous asteroids: 868
Found FP hazardous asteroids: 220
Precision: 0.7978
Recall: 0.9819
f1: 0.8803
f-beta: 0.9386


Save CatBoost results

In [13]:
# Create and archive parquet files
prediction_df = X_test.copy()
prediction_df['y_pred'] = y_pred_catboost
outdir = './data/catboost_prediction'
if not os.path.exists(outdir):
    os.mkdir(outdir)

prediction_df.to_parquet('data/catboost_prediction/result')
shutil.make_archive('data/catboost_prediction', 'zip', 'data/catboost_prediction')

# Save to MySQL table
column_types_prediction = ['INT', 'VARCHAR(1000)'] + ['FLOAT'] * 5 + ['BOOLEAN'] * 2
prediction_df.name = '"' + prediction_df.name + '"'
pandas_df2mysql(prediction_df, MYSQL_DB, MYSQL_CATBOOST_RESULT, mysql_login, mysql_pass, column_types_prediction)

# Save model
best_catboost_model.save_model('models/model_catboost.cbm')

#### Train GBTClassifier [PySpark]

In [14]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk/'

Load train/test data from MySQL

In [16]:
spark = SparkSession.builder.appName('Neo')\
.config('spark.jars', 'mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar').getOrCreate()

df_train = mysql2pyspark_df(spark, MYSQL_DB, MYSQL_TRAIN_DS, mysql_login, mysql_pass)
df_train = df_train.withColumn('hazardous', df_train.hazardous.cast('float'))
df_test = mysql2pyspark_df(spark, MYSQL_DB, MYSQL_TEST_DS, mysql_login, mysql_pass)
df_test = df_test.withColumn('hazardous', df_test.hazardous.cast('float'))

assembler = VectorAssembler(inputCols=FEATURES, outputCol='features')
X_train_vec = assembler.transform(df_train)
X_test_vec = assembler.transform(df_test)

Optimize GBTClassifier hyperparameters: minInstancesPerNode, maxDepth, maxBins (5 folds)

The evaluation metric is F-beta (beta=2, recall is more important)

In [23]:
gbt_clf = GBTClassifier(labelCol='hazardous', featuresCol='features')
evaluator = MulticlassClassificationEvaluator(metricName='weightedFMeasure', beta=2.0, labelCol='hazardous')
grid = ParamGridBuilder() \
     .addGrid(gbt_clf.minInstancesPerNode, [1, 2, 3]) \
     .addGrid(gbt_clf.maxDepth, [2, 3, 4, 5]) \
     .addGrid(gbt_clf.maxBins, [16, 32, 64]) \
     .build()

crossval = CrossValidator(estimator=gbt_clf, estimatorParamMaps=grid,
                          evaluator=evaluator, numFolds=5, seed=RANDOM_STATE)
cv_gbt_model = crossval.fit(X_train_vec)

In [19]:
model_gbt = cv_gbt_model.bestModel
print(f'GBT best AVG score: {np.max(cv_gbt_model.avgMetrics)}')
print(f'GBT best parameters:')
print(f'\tminInstancesPerNode: {model_gbt.getMinInstancesPerNode()}')
print(f'\tmaxDepth: {model_gbt.getMaxDepth()}')
print(f'\tmaxBins: {model_gbt.getMaxBins()}')

GBT best AVG score: 0.8732904411279291
GBT best parameters:
	minInstancesPerNode: 1
	maxDepth: 5
	maxBins: 16


Calculate precision, recall, f1, f-beta to understand results better

In [20]:
def print_cv_metric(data, est, est_params, evaluator, metric_name, folds=5, seed=RANDOM_STATE):
    cv_model = CrossValidator(estimator=est, estimatorParamMaps=est_params,
                              evaluator=evaluator, numFolds=folds, seed=RANDOM_STATE).fit(data)
    print(f'[{metric_name}] test avg score: {np.round(cv_model.avgMetrics, 4)[0]}')

gbt_clf = GBTClassifier(labelCol='hazardous', featuresCol='features')
grid = ParamGridBuilder() \
     .addGrid(gbt_clf.minInstancesPerNode, [3]) \
     .addGrid(gbt_clf.maxDepth, [5]) \
     .addGrid(gbt_clf.maxBins, [16]) \
     .build()
eval_precision = MulticlassClassificationEvaluator(metricName='precisionByLabel', labelCol='hazardous')
eval_recall = MulticlassClassificationEvaluator(metricName='recallByLabel', labelCol='hazardous')
eval_f1 = MulticlassClassificationEvaluator(metricName='f1', labelCol='hazardous')
eval_fbeta = MulticlassClassificationEvaluator(metricName='weightedFMeasure', beta=2.0, labelCol='hazardous')

print('CV-metrics')
print_cv_metric(X_train_vec, gbt_clf, grid, eval_precision, 'precision')
print_cv_metric(X_train_vec, gbt_clf, grid, eval_recall, 'recall')
print_cv_metric(X_train_vec, gbt_clf, grid, eval_f1, 'f1')
print_cv_metric(X_train_vec, gbt_clf, grid, eval_fbeta, 'f-beta')

CV-metrics
[precision] test avg score: 0.9819
[recall] test avg score: 0.7664
[f1] test avg score: 0.8747
[f-beta] test avg score: 0.873


Calculate prediction for the test dataset

In [21]:
best_gbt_model = GBTClassifier(labelCol='hazardous', featuresCol='features',
                        minInstancesPerNode=3, maxDepth=5, maxBins=16)
best_gbt_model = best_gbt_model.fit(X_train_vec)
df_pred_gbt = best_gbt_model.transform(X_test_vec)
df_pred_gbt.select('name', 'prediction', 'hazardous').show(3)

+------------+----------+---------+
|        name|prediction|hazardous|
+------------+----------+---------+
|  (2021 RH6)|       0.0|      0.0|
|(2014 QX432)|       1.0|      1.0|
|  (2015 YA1)|       0.0|      0.0|
+------------+----------+---------+
only showing top 3 rows



In [26]:
scoreAndLabels = df_pred_gbt.rdd.map(lambda t: (t.prediction, t.hazardous))
metrics = MulticlassMetrics(scoreAndLabels)
y_true = int(df_pred_gbt.agg({'hazardous': 'sum'}).collect()[0][0])
y_pred = int(df_pred_gbt.agg({'prediction': 'sum'}).collect()[0][0])
tp = int(metrics.truePositiveRate(1.0) * y_true)
fp = int(metrics.falsePositiveRate(1.0) * y_pred)
print(f'Hazardous asteroids: {y_true}')
print(f'Found TP hazardous asteroids: {tp}')
print(f'Found FP hazardous asteroids: {fp}')
print(f'Precision: {np.round(metrics.precision(1.0), 4)}')
print(f'Recall: {np.round(metrics.recall(1.0), 4)}')
print(f'f1: {np.round(metrics.fMeasure(1.0), 5)}')
print(f'f-beta: {np.round(metrics.weightedFMeasure(2.0), 4)}')

Hazardous asteroids: 884
Found TP hazardous asteroids: 871
Found FP hazardous asteroids: 273
Precision: 0.7976
Recall: 0.9853
f1: 0.88158
f-beta: 0.864


Save GBTClassifier results

In [33]:
# Create and archive parquet files
out_dir = './data/gbt_prediction'
df_pred_gbt.write.mode('overwrite').parquet(out_dir)
shutil.make_archive(out_dir, 'zip', out_dir)

# Save to MySQL table
pyspark_df2mysql(df_pred_gbt.select(COLUMNS + ['prediction']), MYSQL_DB, MYSQL_GBT_RESULT, mysql_login, mysql_pass)

# Save model
model_gbt.save('models/model_gbt')