In [1]:
import pandas as pd
# Load the dataset
file_path = 'WA_Fn-UseC_-Telco-Customer-Churn.csv'
df = pd.read_csv(file_path)

# Display the first few rows of the dataset
print(df.head())

   customerID  gender  SeniorCitizen Partner Dependents  tenure PhoneService  \
0  7590-VHVEG  Female              0     Yes         No       1           No   
1  5575-GNVDE    Male              0      No         No      34          Yes   
2  3668-QPYBK    Male              0      No         No       2          Yes   
3  7795-CFOCW    Male              0      No         No      45           No   
4  9237-HQITU  Female              0      No         No       2          Yes   

      MultipleLines InternetService OnlineSecurity  ... DeviceProtection  \
0  No phone service             DSL             No  ...               No   
1                No             DSL            Yes  ...              Yes   
2                No             DSL            Yes  ...               No   
3  No phone service             DSL            Yes  ...              Yes   
4                No     Fiber optic             No  ...               No   

  TechSupport StreamingTV StreamingMovies        Contract Pape

In [2]:
import sqlite3

# Create a connection to SQLite database
conn = sqlite3.connect(':memory:')  # Use ':memory:' for a temporary database

# Load the DataFrame into the SQLite database
df.to_sql('telco_churn', conn, index=False, if_exists='replace')

# Verify the table is created
query = "SELECT name FROM sqlite_master WHERE type='table';"
tables = pd.read_sql(query, conn)
print("Tables in database:", tables)


Tables in database:           name
0  telco_churn


In [3]:
# Example: Count the number of customers who churned
query = """
SELECT COUNT(*) AS churned_customers
FROM telco_churn
WHERE Churn = 'Yes';
"""
result = pd.read_sql(query, conn)
print(result)


   churned_customers
0               1869


In [4]:
query = """
SELECT *, tenure * MonthlyCharges AS TotalRevenue
FROM telco_churn;
"""
result = pd.read_sql(query, conn)
print(result)

      customerID  gender  SeniorCitizen Partner Dependents  tenure  \
0     7590-VHVEG  Female              0     Yes         No       1   
1     5575-GNVDE    Male              0      No         No      34   
2     3668-QPYBK    Male              0      No         No       2   
3     7795-CFOCW    Male              0      No         No      45   
4     9237-HQITU  Female              0      No         No       2   
...          ...     ...            ...     ...        ...     ...   
7038  6840-RESVB    Male              0     Yes        Yes      24   
7039  2234-XADUH  Female              0     Yes        Yes      72   
7040  4801-JZAZL  Female              0     Yes        Yes      11   
7041  8361-LTMKD    Male              1     Yes         No       4   
7042  3186-AJIEK    Male              0      No         No      66   

     PhoneService     MultipleLines InternetService OnlineSecurity  ...  \
0              No  No phone service             DSL             No  ...   
1        

#Customer Segmentation

In [5]:
query = """
SELECT customerID, tenure, MonthlyCharges
FROM telco_churn
WHERE tenure < 12 AND Churn = 'Yes';

"""
result = pd.read_sql(query, conn)
print(result)

     customerID  tenure  MonthlyCharges
0    3668-QPYBK       2           53.85
1    9237-HQITU       2           70.70
2    9305-CDSKC       8           99.65
3    4190-MFLUW      10           55.20
4    8779-QRDMV       1           39.65
..          ...     ...             ...
994  5482-NUPNA       4           60.40
995  1122-JWTJW       1           70.65
996  8775-CEBBJ       9           44.20
997  6894-LFHLY       1           75.75
998  8361-LTMKD       4           74.40

[999 rows x 3 columns]


#EDA

In [7]:
query = """
SELECT Contract, AVG(MonthlyCharges) AS AvgMonthlyCharges
FROM telco_churn
GROUP BY Contract;

"""
result = pd.read_sql(query, conn)
print(result)

         Contract  AvgMonthlyCharges
0  Month-to-month          66.398490
1        One year          65.048608
2        Two year          60.770413


#Count total records

In [8]:
query = "SELECT COUNT(*) AS total_customers FROM telco_churn;"
total_customers = pd.read_sql(query, conn)
print(total_customers)


   total_customers
0             7043


Count Churned Customers:

In [9]:
query = "SELECT COUNT(*) AS churned_customers FROM telco_churn WHERE Churn = 'Yes';"
churned_customers = pd.read_sql(query, conn)
print(churned_customers)


   churned_customers
0               1869


Average Monthly Charges by Contract Type:

In [10]:
query = """
SELECT Contract, AVG(MonthlyCharges) AS AvgMonthlyCharges
FROM telco_churn
GROUP BY Contract;
"""
avg_charges = pd.read_sql(query, conn)
print(avg_charges)


         Contract  AvgMonthlyCharges
0  Month-to-month          66.398490
1        One year          65.048608
2        Two year          60.770413


#Feature Engineering


Adding Total Revenue (Tenure * MonthlyCharges):

In [11]:
query = """
SELECT *, tenure * MonthlyCharges AS TotalRevenue
FROM telco_churn;
"""
df_with_features = pd.read_sql(query, conn)
print(df_with_features.head())


   customerID  gender  SeniorCitizen Partner Dependents  tenure PhoneService  \
0  7590-VHVEG  Female              0     Yes         No       1           No   
1  5575-GNVDE    Male              0      No         No      34          Yes   
2  3668-QPYBK    Male              0      No         No       2          Yes   
3  7795-CFOCW    Male              0      No         No      45           No   
4  9237-HQITU  Female              0      No         No       2          Yes   

      MultipleLines InternetService OnlineSecurity  ... TechSupport  \
0  No phone service             DSL             No  ...          No   
1                No             DSL            Yes  ...          No   
2                No             DSL            Yes  ...          No   
3  No phone service             DSL            Yes  ...         Yes   
4                No     Fiber optic             No  ...          No   

  StreamingTV StreamingMovies        Contract PaperlessBilling  \
0          No             

Create Churn Probability Segments

In [12]:
query = """
SELECT customerID, tenure, MonthlyCharges, Churn,
       CASE
           WHEN tenure < 12 AND MonthlyCharges > 70 THEN 'High Risk'
           ELSE 'Low Risk'
       END AS RiskCategory
FROM telco_churn;
"""
risk_segments = pd.read_sql(query, conn)
print(risk_segments.head())


   customerID  tenure  MonthlyCharges Churn RiskCategory
0  7590-VHVEG       1           29.85    No     Low Risk
1  5575-GNVDE      34           56.95    No     Low Risk
2  3668-QPYBK       2           53.85   Yes     Low Risk
3  7795-CFOCW      45           42.30    No     Low Risk
4  9237-HQITU       2           70.70   Yes    High Risk


#Automate SQL Reports

In [13]:
query = """
SELECT customerID, tenure, MonthlyCharges, TotalCharges, Churn
FROM telco_churn
WHERE Churn = 'Yes' OR (tenure < 12 AND MonthlyCharges > 70);
"""
high_risk_report = pd.read_sql(query, conn)
print(high_risk_report.head())


   customerID  tenure  MonthlyCharges TotalCharges Churn
0  3668-QPYBK       2           53.85       108.15   Yes
1  9237-HQITU       2           70.70       151.65   Yes
2  9305-CDSKC       8           99.65        820.5   Yes
3  7892-POOKP      28          104.80      3046.05   Yes
4  0280-XJGEX      49          103.70       5036.3   Yes


#Saving

In [15]:
# Save the updated DataFrame with features to a new CSV
output_path = 'churn_with_features.csv'
df_with_features.to_csv(output_path, index=False)
print(f"Preprocessed dataset saved to {output_path}")


Preprocessed dataset saved to churn_with_features.csv


#Entry of Pyspark OR PySpark in the house

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Customer Churn Prediction") \
    .getOrCreate()

In [18]:
file_path = '/content/churn_with_features.csv'
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Display schema
data.printSchema()

# Show a sample of the dataset
data.show(5)


root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- TotalRevenue: double (nullable = true)

+----------+------+-------------+----

#Clean and Transform Data

In [19]:
# Fill null values
data = data.na.fill({'TotalRevenue': 0, 'tenure': 0, 'MonthlyCharges': 0})

# Cast numerical columns to appropriate types
data = data.withColumn("TotalRevenue", col("TotalRevenue").cast("float")) \
           .withColumn("MonthlyCharges", col("MonthlyCharges").cast("float")) \
           .withColumn("tenure", col("tenure").cast("int"))

#Feature Selection

In [20]:
from pyspark.ml.feature import StringIndexer

# Index categorical columns
indexer = StringIndexer(inputCols=["gender", "Partner", "Dependents", "PhoneService", "Churn"],
                        outputCols=["gender_index", "Partner_index", "Dependents_index", "PhoneService_index", "Churn_index"])
indexed_data = indexer.fit(data).transform(data)

# Drop original columns if needed
indexed_data = indexed_data.drop("gender", "Partner", "Dependents", "PhoneService", "Churn")
indexed_data.show(5)

+----------+-------------+------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+------------+------------+-------------+----------------+------------------+-----------+
|customerID|SeniorCitizen|tenure|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|TotalRevenue|gender_index|Partner_index|Dependents_index|PhoneService_index|Churn_index|
+----------+-------------+------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+------------+------------+-------------+----------------+------------------+-----------+
|7590-VHVEG|            0|     1|No p

Assemble Features into a Vector

In [21]:
from pyspark.ml.feature import VectorAssembler

# Define feature columns
feature_cols = ["tenure", "MonthlyCharges", "TotalRevenue", "gender_index", "Partner_index", "Dependents_index", "PhoneService_index"]

# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_data = assembler.transform(indexed_data)

# Show final dataset with features
final_data.select("features", "Churn_index").show(5)


+--------------------+-----------+
|            features|Churn_index|
+--------------------+-----------+
|[1.0,29.850000381...|        0.0|
|(7,[0,1,2],[34.0,...|        0.0|
|(7,[0,1,2],[2.0,5...|        1.0|
|[45.0,42.29999923...|        0.0|
|[2.0,70.699996948...|        1.0|
+--------------------+-----------+
only showing top 5 rows



Train a Predictive Model

In [22]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

Logistic Regression

In [23]:
from pyspark.ml.classification import LogisticRegression

# Initialize the model
lr = LogisticRegression(featuresCol="features", labelCol="Churn_index")

# Train the model
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)
predictions.select("features", "Churn_index", "prediction").show(5)


+--------------------+-----------+----------+
|            features|Churn_index|prediction|
+--------------------+-----------+----------+
|(7,[0,1,2],[4.0,7...|        1.0|       1.0|
|[71.0,109.6999969...|        0.0|       0.0|
|[7.0,48.200000762...|        0.0|       0.0|
|[72.0,101.3000030...|        0.0|       0.0|
|[1.0,25.100000381...|        1.0|       0.0|
+--------------------+-----------+----------+
only showing top 5 rows



Evaluation

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate using AUC (Area Under ROC)
evaluator = BinaryClassificationEvaluator(labelCol="Churn_index")
auc = evaluator.evaluate(predictions)
print(f"Model AUC: {auc}")


Model AUC: 0.8124125349860056


Save Model and Predictions

In [None]:
# Save model
lr_model.save("churn_model")

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define UDF to convert features vector to a string
vector_to_string_udf = udf(lambda vector: str(vector), StringType())

# Add a new column with features as strings
predictions_with_string_features = predictions.withColumn("features_string", vector_to_string_udf(predictions["features"]))

# Select columns to save
columns_to_save = ["features_string", "Churn_index", "prediction"]

# Save the predictions as a CSV
predictions_with_string_features.select(*columns_to_save).write.csv("churn_predictions", header=True)


In [30]:
!ls churn_predictions


part-00000-81f0cbeb-7172-42a1-bb62-917b5e19d542-c000.csv  _SUCCESS


In [31]:
from google.colab import files
!zip -r churn_predictions.zip churn_predictions/
files.download("churn_predictions.zip")

  adding: churn_predictions/ (stored 0%)
  adding: churn_predictions/part-00000-81f0cbeb-7172-42a1-bb62-917b5e19d542-c000.csv (deflated 82%)
  adding: churn_predictions/.part-00000-81f0cbeb-7172-42a1-bb62-917b5e19d542-c000.csv.crc (stored 0%)
  adding: churn_predictions/._SUCCESS.crc (stored 0%)
  adding: churn_predictions/_SUCCESS (stored 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>