In [0]:
silver_df = spark.table("workspace.default.silver_telco_churn")

In [0]:
silver_df.count()
silver_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [0]:
from pyspark.sql.functions import when, col

features_df = (
    silver_df
    .withColumn(
        "SeniorCitizen",
        when(col("SeniorCitizen") == "Yes", 1)
        .when(col("SeniorCitizen") == "No", 0)
        .otherwise(None)
    )
    .select(
        "gender",
        "SeniorCitizen",
        "Partner",
        "Dependents",
        "tenure",
        "PhoneService",
        "MultipleLines",
        "InternetService",
        "OnlineSecurity",
        "OnlineBackup",
        "DeviceProtection",
        "TechSupport",
        "StreamingTV",
        "StreamingMovies",
        "Contract",
        "PaperlessBilling",
        "PaymentMethod",
        "MonthlyCharges",
        "TotalCharges",
        "Churn"
    )
)


In [0]:
features_df.write.mode("overwrite").saveAsTable(
    "workspace.default.gold_telco_churn_features"
)

In [0]:
df = spark.table("workspace.default.gold_telco_churn_features")

In [0]:
df.printSchema()
df.select("Churn").groupBy("Churn").count().show()

root
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 5174|
|  Yes| 1869|
+-----+-----+



In [0]:
from pyspark.sql.functions import when

df = df.withColumn(
    "label",
    when(df.Churn == "Yes", 1).otherwise(0)
)

In [0]:
categorical_cols = [
    "gender", "Partner", "Dependents", "PhoneService",
    "MultipleLines", "InternetService", "OnlineSecurity",
    "OnlineBackup", "DeviceProtection", "TechSupport",
    "StreamingTV", "StreamingMovies", "Contract",
    "PaperlessBilling", "PaymentMethod"
]

numerical_cols = [
    "SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"
]

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
    for col in categorical_cols
]

encoders = [
    OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_ohe")
    for col in categorical_cols
]


In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[f"{col}_ohe" for col in categorical_cols] + numerical_cols,
    outputCol="features"
)

In [0]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")


In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages=indexers + encoders + [assembler, lr]
)

In [0]:
from pyspark.sql.functions import col, sum

df.select(
    sum(col("SeniorCitizen").isNull().cast("int")).alias("SeniorCitizen_nulls"),
    sum(col("tenure").isNull().cast("int")).alias("tenure_nulls"),
    sum(col("MonthlyCharges").isNull().cast("int")).alias("MonthlyCharges_nulls"),
    sum(col("TotalCharges").isNull().cast("int")).alias("TotalCharges_nulls")
).show()

+-------------------+------------+--------------------+------------------+
|SeniorCitizen_nulls|tenure_nulls|MonthlyCharges_nulls|TotalCharges_nulls|
+-------------------+------------+--------------------+------------------+
|                  0|           0|                   0|                11|
+-------------------+------------+--------------------+------------------+



In [0]:
df = df.fillna({
    "TotalCharges": 0.0,
    "MonthlyCharges": 0.0,
    "tenure": 0,
    "SeniorCitizen": 0
})

In [0]:
df.select(
    sum(col("TotalCharges").isNull().cast("int")).alias("TotalCharges_nulls")
).show()

+------------------+
|TotalCharges_nulls|
+------------------+
|                 0|
+------------------+



In [0]:
model = pipeline.fit(train_df)

[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-6999136843369320>, line 1[0m
[0;32m----> 1[0m model [38;5;241m=[39m pipeline[38;5;241m.[39mfit(train_df)

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39;00m:
[0;32m---> 30[0m     result [38;5;241m=[39m original_method([38;5;28mself[39m, [38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     31[0m     call_succeeded [38;5;241m=[39m [38;5;28;01mTrue[39;00m
[1;32m     32[0m     [38;5;28;01mreturn[39;00m result

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/ml/base.py:203[0m, 