# This Notebook is designed to implement various Big Data algorithms using Apache Spark

### 1. imports

In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, LinearSVC,NaiveBayes, GBTClassifier
from pyspark.ml.feature import VectorAssembler

In [2]:
Results = {}

def transform_data(data, input_cols, output_col):
    assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    data = assembler.transform(data)
    data = data.select(['features', output_col])
    return data

def evaluate_model(model, data, model_name , date_type):

    # prdict on data
    predictions = model.transform(data)

    # Create evaluators for different metrics
    evaluator_multi = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='loanStatus', metricName='accuracy')
    evaluator_weighted_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='loanStatus', metricName='weightedPrecision')
    evaluator_weighted_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='loanStatus', metricName='weightedRecall')
    evaluator_f1 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='loanStatus', metricName='f1')

    # Calculate evaluation metrics
    accuracy = evaluator_multi.evaluate(predictions)
    weighted_precision = evaluator_weighted_precision.evaluate(predictions)
    weighted_recall = evaluator_weighted_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    # Print results
    print(f"Model: {model_name} - {date_type}")
    print(f"Accuracy: {accuracy}")
    print(f"Weighted Precision: {weighted_precision}")
    print(f"Weighted Recall: {weighted_recall}")
    print(f"F1: {f1}")
    print("\n")
    
    Results[model_name + " - " + date_type] = {
        "Accuracy": accuracy,
        "Weighted Precision": weighted_precision,
        "Weighted Recall": weighted_recall,
        "F1": f1
    }
    

In [3]:
def logistic_regression(train_data, test_data, output_col):

    # Create Logistic Regression model
    lr = LogisticRegression(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    lr_model = lr.fit(train_data)

    evaluate_model(lr_model,train_data, 'Logistic Regression', 'train')
    evaluate_model(lr_model,test_data, 'Logistic Regression', 'test')

In [4]:
def decision_tree(train_data, test_data, output_col):

    # Create Decision Tree model
    dt = DecisionTreeClassifier(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    dt_model = dt.fit(train_data)

    evaluate_model(dt_model,train_data, 'Decision Tree', 'train')
    evaluate_model(dt_model,test_data, 'Decision Tree', 'test')

In [5]:
def random_forest(train_data, test_data, output_col):

    # Create Random Forest model
    rf = RandomForestClassifier(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    rf_model = rf.fit(train_data)

    evaluate_model(rf_model,train_data, 'Random Forest', 'train')
    evaluate_model(rf_model,test_data, 'Random Forest', 'test')

In [6]:
def linear_svc(train_data, test_data, output_col):

    # Create Linear SVC model
    lsvc = LinearSVC(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    lsvc_model = lsvc.fit(train_data)

    evaluate_model(lsvc_model,train_data, 'Linear SVC', 'train')
    evaluate_model(lsvc_model,test_data, 'Linear SVC', 'test')

In [7]:
def gradient_boosted_tree(train_data, test_data, output_col):

    # Create Gradient Boosted Tree model
    gbt = GBTClassifier(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    gbt_model = gbt.fit(train_data)

    evaluate_model(gbt_model,train_data, 'Gradient Boosted Tree', 'train')
    evaluate_model(gbt_model,test_data, 'Gradient Boosted Tree', 'test')

In [18]:
def pipeline(data, input_cols, output_col):

    # Split data into training and test sets
    train_data, test_data = data.randomSplit([0.8, 0.2])

    # Transform data
    train_data = transform_data(train_data, input_cols, output_col)
    test_data = transform_data(test_data, input_cols, output_col)
    
    # Logistic Regression
    logistic_regression(train_data, test_data, output_col)

    # Decision Tree
    decision_tree(train_data, test_data, output_col)

    # Random Forest
    random_forest(train_data, test_data, output_col)

    # Linear SVC
    linear_svc(train_data, test_data, output_col)
    

    # Gradient Boosted Tree
    gradient_boosted_tree(train_data, test_data, output_col)

In [None]:
spark = SparkSession.builder.appName('classification').getOrCreate()

# Load data
train_data = spark.read.csv('dataset/preprocessed_train.csv', header=True, inferSchema=True)
test_data = spark.read.csv('dataset/preprocessed_test.csv', header=True, inferSchema=True)

# Define input columns and output column
input_cols = train_data.columns[1:-1]
output_col = 'loanStatus'

transformed_train_data = transform_data(train_data, input_cols, output_col)
transformed_test_data = transform_data(test_data, input_cols, output_col)

pipeline(train_data, input_cols, output_col)

spark.stop()

In [10]:
# save the results
with open('results_new.txt', 'w') as f:
    for key in Results.keys():
        f.write("%s,%s\n"%(key, Results[key]))