<a href="https://colab.research.google.com/github/yazicibrahim/credit-risk-pyspark/blob/main/Untitled51.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LoanRiskPrediction").getOrCreate()


In [3]:
from google.colab import files
uploaded = files.upload()  # CSV dosyanı yükle

df = spark.read.csv("/content/german_credit_data.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()


Saving german_credit_data.csv to german_credit_data (1).csv
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
|_c0|Age|   Sex|Job|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+
|  0| 67|  male|  2|    own|             NA|          little|         1169|       6|           radio/TV|
|  1| 22|female|  2|    own|         little|        moderate|         5951|      48|           radio/TV|
|  2| 49|  male|  1|    own|         little|              NA|         2096|      12|          education|
|  3| 45|  male|  2|   free|         little|          little|         7882|      42|furniture/equipment|
|  4| 53|  male|  2|   free|         little|          little|         4870|      24|                car|
+---+---+------+---+-------+---------------+----------------+-------------+--------+----------------

In [5]:
from pyspark.sql.functions import col

# Eksik değer kontrolü
df.select([col(c).isNull().alias(c) for c in df.columns]).show()




+-----+-----+-----+-----+-------+---------------+----------------+-------------+--------+-------+
|  _c0|  Age|  Sex|  Job|Housing|Saving accounts|Checking account|Credit amount|Duration|Purpose|
+-----+-----+-----+-----+-------+---------------+----------------+-------------+--------+-------+
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|false|  false|          false|           false|        false|   false|  false|
|false|false|false|f

In [7]:
from pyspark.sql.functions import expr

df = df.withColumn("credit_per_age", expr("`Credit amount` / Age"))
df = df.withColumn("installment_rate", expr("`Credit amount` / Duration"))
df.show(5)


+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+------------------+------------------+
|_c0|Age|   Sex|Job|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|    credit_per_age|  installment_rate|
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+------------------+------------------+
|  0| 67|  male|  2|    own|             NA|          little|         1169|       6|           radio/TV| 17.44776119402985|194.83333333333334|
|  1| 22|female|  2|    own|         little|        moderate|         5951|      48|           radio/TV|             270.5|123.97916666666667|
|  2| 49|  male|  1|    own|         little|              NA|         2096|      12|          education|42.775510204081634|174.66666666666666|
|  3| 45|  male|  2|   free|         little|          little|         7882|      42|furniture/equipment|175.15555555555557|187.66666666666666|

In [8]:
from pyspark.ml.feature import StringIndexer

categorical_cols = ["Sex","Housing","Saving accounts","Checking account","Purpose"]

for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name+"_index")
    df = indexer.fit(df).transform(df)

df.show(5)


+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+------------------+------------------+---------+-------------+---------------------+----------------------+-------------+
|_c0|Age|   Sex|Job|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|    credit_per_age|  installment_rate|Sex_index|Housing_index|Saving accounts_index|Checking account_index|Purpose_index|
+---+---+------+---+-------+---------------+----------------+-------------+--------+-------------------+------------------+------------------+---------+-------------+---------------------+----------------------+-------------+
|  0| 67|  male|  2|    own|             NA|          little|         1169|       6|           radio/TV| 17.44776119402985|194.83333333333334|      0.0|          0.0|                  1.0|                   1.0|          1.0|
|  1| 22|female|  2|    own|         little|        moderate|         5951|      48|           r

In [10]:
# Sütun isimlerini listele
print(df.columns)


['_c0', 'Age', 'Sex', 'Job', 'Housing', 'Saving accounts', 'Checking account', 'Credit amount', 'Duration', 'Purpose', 'credit_per_age', 'installment_rate', 'Sex_index', 'Housing_index', 'Saving accounts_index', 'Checking account_index', 'Purpose_index']


In [11]:
from pyspark.sql.functions import when

final_df = assembler.transform(df)

# Yeni label kolonu ekle
final_df = final_df.withColumn("label", when(df["Credit amount"] > 5000, 1).otherwise(0))

final_df.select("features", "label").show(5, truncate=False)


+--------------------------------------------------------------------------------+-----+
|features                                                                        |label|
+--------------------------------------------------------------------------------+-----+
|[67.0,2.0,1169.0,6.0,17.44776119402985,194.83333333333334,0.0,0.0,1.0,1.0,1.0]  |0    |
|[22.0,2.0,5951.0,48.0,270.5,123.97916666666667,1.0,0.0,0.0,2.0,1.0]             |1    |
|[49.0,1.0,2096.0,12.0,42.775510204081634,174.66666666666666,0.0,0.0,0.0,0.0,4.0]|0    |
|[45.0,2.0,7882.0,42.0,175.15555555555557,187.66666666666666,0.0,2.0,0.0,1.0,2.0]|1    |
|[53.0,2.0,4870.0,24.0,91.88679245283019,202.91666666666666,0.0,2.0,0.0,1.0,0.0] |0    |
+--------------------------------------------------------------------------------+-----+
only showing top 5 rows



In [12]:
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=42)


In [13]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)

preds = lr_model.transform(test_df)
preds.select("label", "prediction", "probability").show(10, truncate=False)


+-----+----------+---------------------------+
|label|prediction|probability                |
+-----+----------+---------------------------+
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|1    |1.0       |[1.818900765513856E-30,1.0]|
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
|0    |0.0       |[1.0,0.0]                  |
+-----+----------+---------------------------+
only showing top 10 rows



In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(preds)
print("Logistic Regression AUC:", auc)


Logistic Regression AUC: 0.998881528004818


In [15]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)
rf_model = rf.fit(train_df)

rf_preds = rf_model.transform(test_df)
auc_rf = evaluator.evaluate(rf_preds)
print("Random Forest AUC:", auc_rf)


Random Forest AUC: 0.9993117095414265


In [16]:
rf_model.write().overwrite().save("/content/rf_model")
print("Model kaydedildi!")


Model kaydedildi!
