In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, isnan
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

spark = SparkSession.builder \
    .appName("CreditScoringSpark") \
    .getOrCreate()

data = spark.read.csv("loan_data.csv", header=True, inferSchema=True)

In [None]:
import pyspark

pyspark_version = pyspark.__version__
print(f"PySpark Version: {pyspark_version}")

PySpark Version: 3.5.4


In [None]:
numeric_cols = ["loan_amnt", "annual_inc", "dti"]
categorical_cols = ["grade", "home_ownership", "purpose"]
label_col = "loan_status_binary"

data = data.withColumn(
    label_col,
    when(col("loan_status").isin(["Charged Off","Default","Late"]), 1).otherwise(0)
)

for c in [label_col] + numeric_cols:
    data = data.filter((col(c).isNotNull()) & (~isnan(col(c))))

indexers = []
for cat_col in categorical_cols:
    indexers.append(
        StringIndexer(inputCol=cat_col, outputCol=cat_col + "_indexed", handleInvalid="skip")
    )

assembler_inputs = [c + "_indexed" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="raw_features")

scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

rf = RandomForestClassifier(
    labelCol=label_col,
    featuresCol="features",
    seed=42
)

pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100])
             .addGrid(rf.maxDepth, [5, 10, None])
             .addGrid(rf.minInstancesPerNode, [1, 5])
             .build())

evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2)

cvModel = cv.fit(train_data)

bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data)

auc = evaluator.evaluate(predictions)
print(f"Test ROC-AUC: {auc}")

predictions_pd = predictions.select("prediction","probability",label_col).toPandas()

spark.stop()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `loan_status` cannot be resolved. Did you mean one of the following? [`int.rate`, `dti`, `fico`, `installment`, `pub.rec`].;
'Project [credit.policy#17, purpose#18, int.rate#19, installment#20, log.annual.inc#21, dti#22, fico#23, days.with.cr.line#24, revol.bal#25, revol.util#26, inq.last.6mths#27, delinq.2yrs#28, pub.rec#29, not.fully.paid#30, CASE WHEN 'loan_status IN (Charged Off,Default,Late) THEN 1 ELSE 0 END AS loan_status_binary#45]
+- Relation [credit.policy#17,purpose#18,int.rate#19,installment#20,log.annual.inc#21,dti#22,fico#23,days.with.cr.line#24,revol.bal#25,revol.util#26,inq.last.6mths#27,delinq.2yrs#28,pub.rec#29,not.fully.paid#30] csv


In [None]:

# Display schema and first few rows
data.printSchema()
data.show(5)

# Drop irrelevant columns (as an example)
data = data.drop("id", "member_id", "url", "desc")

# Handle missing values: Fill or drop
data = data.na.fill({
    "annual_inc": 0,
    "dti": 0,
    "loan_amnt": 0,
    "funded_amnt": 0,
    "funded_amnt_inv": 0,
    "total_pymnt": 0,
    "total_rec_int": 0
})

# Convert categorical variables to numeric
data = data.withColumn("home_ownership", when(col("home_ownership") == "RENT", 1)
                       .when(col("home_ownership") == "OWN", 2)
                       .when(col("home_ownership") == "MORTGAGE", 3)
                       .otherwise(0))

# Clean up any string columns if needed
data = data.withColumn("purpose", regexp_replace(col("purpose"), " ", "_")) \
           .withColumn("purpose", trim(col("purpose")))

# Show cleaned data
data.show(5)


In [None]:
from pyspark.sql.functions import expr

# Example: Create Debt-to-Income (DTI) Ratio
data = data.withColumn("dti_ratio", col("dti") / 100)

# Create Loan-to-Value (LTV) ratio
data = data.withColumn("ltv_ratio", col("loan_amnt") / (col("home_value") * 0.8))  # Assuming home_value is provided

# Drop any rows with invalid LTV ratio if necessary
data = data.filter(col("ltv_ratio").isNotNull())

# Select relevant features for modeling
features = data.select("loan_amnt", "annual_inc", "dti", "home_ownership", "purpose", "dti_ratio", "ltv_ratio", "default")


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Assemble features
assembler = VectorAssembler(inputCols=["loan_amnt", "annual_inc", "dti_ratio", "home_ownership"], outputCol="features")
data = assembler.transform(features)

# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="default")

# Create a pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Fit the model
model = pipeline.fit(train_data)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="default")
roc_auc = evaluator.evaluate(predictions)

print(f"ROC-AUC: {roc_auc:.2f}")


import joblib
joblib.dump(model, 'credit_risk_model.pkl')
