In [192]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import DataFrame
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import PCA, PCAModel
import seaborn as sns
import numpy as np
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import time
from os.path import isdir
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt

In [198]:
def assemble_features(dataframe: DataFrame, features_columns):
    assembler = VectorAssembler(inputCols=features_columns, outputCol='features')
    return assembler.transform(dataframe)


def scale_features(dataframe: DataFrame, with_mean=True, with_std=True):
    scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=with_mean, withStd=with_std).fit(
        dataframe)
    return scaler.transform(dataframe)


def corr_features(dataframe: DataFrame, method='pearson'):
    return Correlation.corr(dataframe, "features", method=method).head()[0].toArray()


def write_training_time(path:str, training_time):
    with open("{}/training_time.txt".format(path), 'w') as f:
        f.write('{:.2f}'.format(training_time))


def read_training_time(path:str):
    with open("{}/training_time.txt".format(path)) as f:
        result = float(f.read())
    return result


def show_confusion_matrix(predictions: DataFrame, model_name: str):
    metrics = MulticlassMetrics(predictions.select('prediction', 'label').rdd)
    sns.heatmap(metrics.confusionMatrix().toArray(), annot=True)
    plt.savefig('{}.png'.format(model_name))
    plt.show()


def show_auc(predictions: DataFrame, model_name: str):
    print("AUC for {}: {}".format(model_name, BinaryClassificationEvaluator().evaluate(predictions)))

In [2]:
breast_cancer_data = spark.read.options(header=True, inferSchema='True').csv("breast cancer.csv")

In [7]:
class Preprocessor:
    def __init__(self, dataframe: DataFrame, true_condition, label_column: str, features_columns, normalize=True):
        self.dataframe = assemble_features(dataframe, features_columns)
        if normalize:
            self.dataframe = scale_features(self.dataframe, True, True)
        self.dataframe = self.dataframe.withColumn("label", true_condition(col(label_column)))
        features = col('scaled_features').alias('features') if normalize else col('features')
        self.dataframe = self.dataframe.select("label", features)

In [8]:
def preprocess_data(dataframe: DataFrame):
    preprocessor = Preprocessor(dataframe, lambda d: (d == 'M').cast('float'), 'diagnosis', dataframe.columns[2:], True)
    return preprocessor.dataframe


def save_correlation_matrix(dataframe: DataFrame):
    correlation_matrix = corr_features(dataframe)
    sns.heatmap(correlation_matrix)
    plt.savefig('correlation_matrix.png')
    plt.show()

In [9]:
n_features = len(breast_cancer_data.columns[2:])
print("n_features:{}".format(n_features))
df = preprocess_data(breast_cancer_data)

n_features:30


In [55]:
pca_model: PCAModel = PCA(k=n_features, inputCol='features', outputCol='pca_features').fit(df)

In [59]:
np.sum(pca_model.explainedVariance[:15])

0.9864881227145469

In [65]:
(df_train, df_test) = df.randomSplit([0.8, 0.2])

In [197]:
class LogisticRegressionBuilder:
    def __init__(self, training_df: DataFrame):
        self.model_name = 'Logistic Regression'
        if(isdir(self.model_name)):
            self.model = LogisticRegressionModel.load(self.model_name)
            self.training_time = read_training_time(self.model_name)
        else:
            lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)
            paramGrid = ParamGridBuilder() \
                        .addGrid(lr.regParam, [0.1,0.01,0.001]) \
                        .addGrid(lr.elasticNetParam, [0, 0.25, 0.5, 0.75, 1]) \
                        .build()
            crossval = CrossValidator(estimator=lr,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=3)
            start = time.time()
            self.model = crossval.fit(training_df).bestModel
            self.training_time = time.time()-start
            self.model.save(self.model_name)
            write_training_time(self.model_name, self.training_time)
        
        

In [196]:
lrb = LogisticRegressionBuilder(df_train)
print("Got best LR model in {} seconds".format(lrb.training_time))

Got best LR model in 66.7 seconds


In [199]:
lr_predictions = lrb.model.transform(df_test)

In [157]:
show_auc(lr_predictions, lrb.model_name)
show_confusion_matrix(lr_predictions, lrb.model_name)

AUC for RL_Model: 0.9979583503470805
