In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col
import pickle
from pyspark.mllib.evaluation import *

In [None]:
sparks = sparksSession.builder.appName("LoanProcessing").getOrCreate()

In [None]:
# Load your loan_process_df into a PySpark DataFrame
loan_process_df = spark.read.csv("merged.csv",header=True, inferSchema=True)

In [None]:
loan_process_df=loan_process_df.dropna()

In [None]:
loan_process_df.show()

+------------------------+-----------------+---------------------+----------------+---------------------+----------------------------+--------------------------+-----------------------+------------------------+----------------------+-------------------------+--------------------+-----------------------+--------------------+----------------------+--------------------------+-----------------+----------------------+-------------------------+-------------------------+----------+----------------------+---------------+----------------+-------------------+----------------+---------------+-------------------+-------------------+--------------------+-----------------------+----------------------+---------------------+------------------------------+---------------------+-------------------+-------------------+-------------------+------------------------+-------------------------------+------------------------------+---------------------------+------------------------------+----------------------

In [None]:
loan_process_df.printSchema()

root
 |-- NAME_CONTRACT_TYPE_prevd: string (nullable = true)
 |-- AMT_ANNUITY_prevd: double (nullable = true)
 |-- AMT_APPLICATION_prevd: double (nullable = true)
 |-- AMT_CREDIT_prevd: double (nullable = true)
 |-- AMT_GOODS_PRICE_prevd: double (nullable = true)
 |-- NAME_CASH_LOAN_PURPOSE_prevd: string (nullable = true)
 |-- NAME_CONTRACT_STATUS_prevd: string (nullable = true)
 |-- NAME_PAYMENT_TYPE_prevd: string (nullable = true)
 |-- CODE_REJECT_REASON_prevd: string (nullable = true)
 |-- NAME_CLIENT_TYPE_prevd: string (nullable = true)
 |-- NAME_GOODS_CATEGORY_prevd: string (nullable = true)
 |-- NAME_PORTFOLIO_prevd: string (nullable = true)
 |-- NAME_PRODUCT_TYPE_prevd: string (nullable = true)
 |-- CHANNEL_TYPE_prevd: string (nullable = true)
 |-- SELLERPLACE_AREA_prevd: integer (nullable = true)
 |-- NAME_SELLER_INDUSTRY_prevd: string (nullable = true)
 |-- CNT_PAYMENT_prevd: double (nullable = true)
 |-- NAME_YIELD_GROUP_prevd: string (nullable = true)
 |-- PRODUCT_COMBINATIO

In [None]:
# Select categorical columns
cat_cols = [col_name for col_name, col_type in loan_process_df.dtypes if col_type == "string"]

In [None]:
cat_cols

['NAME_CONTRACT_TYPE_prevd',
 'NAME_CASH_LOAN_PURPOSE_prevd',
 'NAME_CONTRACT_STATUS_prevd',
 'NAME_PAYMENT_TYPE_prevd',
 'CODE_REJECT_REASON_prevd',
 'NAME_CLIENT_TYPE_prevd',
 'NAME_GOODS_CATEGORY_prevd',
 'NAME_PORTFOLIO_prevd',
 'NAME_PRODUCT_TYPE_prevd',
 'CHANNEL_TYPE_prevd',
 'NAME_SELLER_INDUSTRY_prevd',
 'NAME_YIELD_GROUP_prevd',
 'PRODUCT_COMBINATION_prevd',
 'DAYS_DECISION_GROUP_prevd',
 'NAME_CONTRACT_TYPE_apd',
 'CODE_GENDER_apd',
 'FLAG_OWN_CAR_apd',
 'FLAG_OWN_REALTY_apd',
 'NAME_TYPE_SUITE_apd',
 'NAME_INCOME_TYPE_apd',
 'NAME_EDUCATION_TYPE_apd',
 'NAME_FAMILY_STATUS_apd',
 'NAME_HOUSING_TYPE_apd',
 'OCCUPATION_TYPE_apd',
 'WEEKDAY_APPR_PROCESS_START_apd',
 'ORGANIZATION_TYPE_apd',
 'AGE_GROUP_apd',
 'EMPLOYMENT_YEAR_apd']

In [None]:
# Create StringIndexer and OneHotEncoder stages for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in cat_cols]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in cat_cols]

In [None]:
# Combine indexers and encoders into a single list of stages
stages = indexers + encoders

In [None]:
# Create a VectorAssembler to combine features into a single vector column
assembler = VectorAssembler(inputCols=[col + "_encoded" for col in cat_cols], outputCol="features")

In [None]:
# Combine all stages into a single pipeline
stages += [assembler]
pipeline = Pipeline(stages=stages)

In [None]:
# Fit the pipeline to the data
pipeline_model = pipeline.fit(loan_process_df)
transformed_df = pipeline_model.transform(loan_process_df)

In [None]:
# Select features and target column
selected_cols = ["features", "TARGET_apd"]
selected_df = transformed_df.select(*selected_cols)

In [None]:
# Split the data into training and test sets
train_df, test_df = selected_df.randomSplit([0.80, 0.20], seed=42)

In [None]:
train_df

DataFrame[features: vector, TARGET_apd: int]

In [None]:
# Train RandomForestClassifier
rf= RandomForestClassifier(featuresCol="features", labelCol="TARGET_apd", numTrees=100)
rf_model = rf.fit(train_df)

In [None]:
# Make predictions on the test data
rf_pred = rf_model.transform(test_df)

In [None]:
# Evaluate the RandomForestClassifier
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="TARGET_apd")
rf_auc = evaluator.evaluate(rf_pred)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
# Calculate accuracy using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(rf_pred)

# Calculate recall using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedRecall")
recall = evaluator.evaluate(rf_pred)

# Calculate precision using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedPrecision")
precision = evaluator.evaluate(rf_pred)


# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Recall: {recall}")
print(f"Precision: {precision}")

Accuracy: 0.9129844899292866
Recall: 0.9129844899292866
Precision: 0.8335406788514397


In [None]:
rf_auc

0.6261960022518456

In [None]:
# Train LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="TARGET_apd")
lr_model = lr.fit(train_df)

In [None]:
# Make predictions on the test data
lr_predictions = lr_model.transform(test_df)

In [None]:
# Evaluate the LogisticRegression
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="TARGET_apd")
lr_auc = evaluator.evaluate(lr_predictions)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
# Calculate accuracy using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(lr_predictions)

# Calculate recall using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedRecall")
recall = evaluator.evaluate(lr_predictions)

# Calculate precision using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedPrecision")
precision = evaluator.evaluate(lr_predictions)


# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Recall: {recall}")
print(f"Precision: {precision}")


Accuracy: 0.9129986965431757
Recall: 0.9129986965431757
Precision: 0.8843383543835006


In [None]:
lr_auc

0.6562738114358088

In [None]:
# Assuming you have a DataFrame containing model predictions, named 'predictions_df'
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(lr_predictions)

print("Accuracy = {:.2%}".format(accuracy))
print("Test Error = {:.2%}".format(1.0 - accuracy))

Accuracy = 91.30%
Test Error = 8.70%


In [None]:
  # Train DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="TARGET_apd")
dt_model = dt.fit(train_df)

In [None]:
# Make predictions on the test data
dt_predictions = dt_model.transform(test_df)

In [None]:
# Calculate accuracy using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(dt_predictions)

# Calculate recall using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedRecall")
recall = evaluator.evaluate(dt_predictions)

# Calculate precision using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedPrecision")
precision = evaluator.evaluate(dt_predictions)


# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Recall: {recall}")
print(f"Precision: {precision}")


Accuracy: 0.9130057998501202
Recall: 0.9130057998501202
Precision: 0.9096996917207614


In [None]:
# Assuming you have a DataFrame containing model predictions, named 'predictions_df'
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(dt_predictions)

print("Accuracy = {:.2%}".format(accuracy))
print("Test Error = {:.2%}".format(1.0 - accuracy))

Accuracy = 91.30%
Test Error = 8.70%


In [None]:
# Evaluate the LogisticRegression
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="TARGET_apd")
dt_auc = evaluator.evaluate(dt_predictions)

In [None]:
dt_auc

0.5591824048016929

In [None]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming your DataFrame has the correct structure with columns "TARGET_apd" and "features"
lsvc = LinearSVC(featuresCol="features", labelCol="TARGET_apd", maxIter=10, regParam=0.1)
model = lsvc.fit(train_df)




Accuracy: 0.9129844899292866


In [None]:
svm_pred=model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol="TARGET_apd", metricName="accuracy")
accuracy = evaluator.evaluate(svm_pred)
# Calculate recall using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedRecall")
recall = evaluator.evaluate(svm_pred)

# Calculate precision using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="TARGET_apd", metricName="weightedPrecision")
precision = evaluator.evaluate(svm_pred)
# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Recall: {recall}")
print(f"Precision: {precision}")

Accuracy: 0.9129844899292866
Recall: 0.9129844899292866
Precision: 0.8335406788514397


In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="TARGET_apd")
svm_auc = evaluator.evaluate(svm_pred)
svm_auc

0.6147778077787431

In [None]:
a = loan_process_df.select('TARGET_apd').groupBy('TARGET_apd').count()

In [None]:
a.show()

+----------+-------+
|TARGET_apd|  count|
+----------+-------+
|         1| 122025|
|         0|1286917|
+----------+-------+

