# Training model

In [None]:
# Load cleaned data from Feature Store into DataFrame
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
df = fs.read_table(name='main.default.user_financial_features')

In [None]:
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2])

print(f"Train count: {train_df.count()}")
print(f"Test count: {test_df.count()}")

# Store test set for later use
test_df.write.format("delta").mode("overwrite").saveAsTable("main.default.lending_club_test")

Train count: 300327
Test count: 75061


In [None]:
# Drop user_id, not a feature
train_df = train_df.drop("user_id")

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# Get columns of type string
categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]

# Create name for categorical columns - one for index, one for OHE
index_output_cols = [x + "Index" for x in categorical_cols]
ohe_output_cols = [x + "OHE" for x in categorical_cols]

# Assign index to each category
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

# Convert category index to binary vector (dummy variables)
ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

# Create vector of features (numerical + categorical)
numeric_cols = [field for (field, dataType) in train_df.dtypes if (dataType in ["int", "double"])]
assembler_inputs = ohe_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [None]:
import mlflow
import mlflow.spark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

experiment_name = "/Shared/lending_club/"
mlflow.set_experiment(experiment_name)
mlflow.autolog(disable=True)

# Start a new MLflow run
with mlflow.start_run(run_name="Logistic Regression") as run:
    
    # Define model
    lr = LogisticRegression(featuresCol="features", labelCol="loan_status")

    # Define pipeline
    pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vec_assembler, lr])

    # Train model
    model = pipeline.fit(train_df)

    # Apply the pipeline model to the test dataset.
    pred_df = model.transform(test_df)

    # Evaluate model
    multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="loan_status", predictionCol="prediction", metricName="f1")
    f1_score = multiclass_evaluator.evaluate(pred_df)

    multiclass_evaluator.setMetricName("accuracy")
    accuracy = multiclass_evaluator.evaluate(pred_df)

    # Log model metrics
    mlflow.log_metric("F1 Score", f1_score)
    mlflow.log_metric("Accuracy", accuracy)

    # Create model signature and log model
    from mlflow.models import infer_signature
    signature = infer_signature(test_df.drop("loan_status"), pred_df.select("prediction"))
    mlflow.spark.log_model(model, "model", signature=signature)

    print(f"Current run ID: {mlflow.active_run().info.run_id}")

2023/10/14 18:31:52 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/25 [00:00<?, ?it/s]

2023/10/14 18:31:54 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Current run ID: 65a6460acd1c41cfbaec59d03b808e15
