In [45]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when

In [2]:
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

In [3]:
application_df = spark.read.csv("application_record.csv", header=True, inferSchema=True)
credit_df = spark.read.csv("credit_record.csv", header=True, inferSchema=True)

In [6]:
begin_month = credit_df.groupBy("ID").agg({"MONTHS_BALANCE": "min"}).withColumnRenamed("min(MONTHS_BALANCE)", "begin_month").select("ID", (col("begin_month") * -1).alias("begin_month"))
new_data = application_df.join(begin_month, "ID", "left")

In [8]:
credit_df = credit_df.withColumn("dep_value", when((col("STATUS") == "2") | (col("STATUS") == "3") | (col("STATUS") == "4") | (col("STATUS") == "5"), "Yes").otherwise(None))

In [27]:
new_data2 = new_data.join(credit_df.select("ID", "dep_value"), on='ID', how='left')
new_data2 = new_data2.dropDuplicates()
new_data2 = new_data2.orderBy("ID")

In [28]:
new_data2 = new_data2.withColumn("target", when(col("dep_value") == "Yes", 1).otherwise(0))

In [29]:
features = ['AMT_INCOME_TOTAL', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'CNT_CHILDREN']
dataset = new_data2.select(features + ['target'])

In [41]:
# Определение категориальных и численных признаков
categorical_features = ['CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY']
numeric_features = ['AMT_INCOME_TOTAL', 'CNT_CHILDREN']

# Определение размера тестовой выборки
test_size = 0.3

# Создание пайплайна для преобразования категориальных признаков
categorical_stages = []
for feature in categorical_features:
    string_indexer = StringIndexer(inputCol=feature, outputCol=f"{feature}_index")
    encoder = OneHotEncoder(inputCol=f"{feature}_index", outputCol=f"{feature}_encoded")
    categorical_stages += [string_indexer, encoder]

# Создание пайплайна для масштабирования численных признаков
numeric_stages = []
for feature in numeric_features:
    assembler = VectorAssembler(inputCols=[feature], outputCol=f"{feature}_vector")
    scaler = MinMaxScaler(inputCol=f"{feature}_vector", outputCol=f"{feature}_scaled")
    numeric_stages += [assembler, scaler]

# Векторный ассемблер для объединения признаков в один вектор
assembler = VectorAssembler(inputCols=[f"{feature}_scaled" for feature in numeric_features] +
                                      [f"{feature}_encoded" for feature in categorical_features],
                            outputCol="features")

# Создание пайплайна
pipeline = Pipeline(stages=categorical_stages + numeric_stages + [assembler])

# Применение пайплайна к данным
transformed_data = pipeline.fit(dataset).transform(dataset)

# Разделение на тренировочный и тестовый наборы
train_data, test_data = transformed_data.randomSplit([1.0 - test_size, test_size], seed=42)

# Выделение X_train, y_train, X_test, y_test
X_train = train_data.select("features")
y_train = train_data.select("target")

X_test = test_data.select("features")
y_test = test_data.select("target")

In [42]:
log_reg = LogisticRegression(featuresCol="features", labelCol="target")

In [43]:
pipeline = Pipeline(stages=[log_reg])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

predictions.select("prediction", "target", "features").show()

+----------+------+--------------------+
|prediction|target|            features|
+----------+------+--------------------+
|       0.0|     0|[1.33850890108419...|
|       0.0|     0|[1.33850890108419...|
|       0.0|     0|[1.33850890108419...|
|       0.0|     0|[2.67701780216838...|
|       0.0|     0|[4.68478115379467...|
|       0.0|     0|[4.68478115379467...|
|       0.0|     0|[4.68478115379467...|
|       0.0|     0|[6.02329005487886...|
|       0.0|     0|[8.03105340650515...|
|       0.0|     0|[8.03105340650515...|
|       0.0|     0|[8.03105340650515...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|(5,[0,2],[8.03105...|
|       0.0|     0|[8.03105340650515...|
|       0.0|     0|[8.03105340650515...|
|       0.0|     0|[8.03105340650515...|
+----------+------+--------------------+
only showing top

In [46]:
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")

train_accuracy = evaluator.evaluate(model.transform(train_data))
print(f"Точность модели на трейне: {train_accuracy:.2%}")

test_accuracy = evaluator.evaluate(model.transform(test_data))
print(f"Точность модели на тесте: {test_accuracy:.2%}")

Точность модели на трейне: 99.86%
Точность модели на тесте: 99.86%
