In [None]:
# Install PySpark
!pip install pyspark findspark -q

# Set up the environment
import findspark
findspark.init()

# Import SparkSession and other necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf, when
from pyspark.sql.types import IntegerType

# Create a Spark Session (This will be the environment for your 'Hive' operations)
spark = SparkSession.builder\
    .appName("TelecomHiveEmulation")\
    .config("spark.driver.memory", "4g")\
    .getOrCreate()

print("PySpark Session Initialized.")

PySpark Session Initialized.


In [None]:
# The dataset  provided is 'WA_Fn-UseC_-Telco-Customer-Churn.csv'
file_path = "WA_Fn-UseC_-Telco-Customer-Churn.csv"

# Loading the data into a Spark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

#  Data Cleaning (Handling Missing Values and Type Correction)


df = df.withColumn('TotalCharges', when(col('TotalCharges') == ' ', lit(0)).otherwise(col('TotalCharges')).cast(IntegerType()))

# Drop any rows with remaining nulls if necessary (though the main issue is fixed above)
df = df.dropna()

# Check the schema to confirm types
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: integer (nullable = true)
 |-- Churn: string (nullable = true)



In [None]:
# Creating a temporary SQL view named 'telecom_data' which acts as your 'Hive Table'
df.createOrReplaceTempView("telecom_data")
print("Spark SQL Table (Hive Emulation) 'telecom_data' created successfully.")

# Displaying the total count and a sample of the data using Spark SQL
spark.sql("SELECT COUNT(*) AS total_records FROM telecom_data").show()
spark.sql("SELECT customerID, tenure, MonthlyCharges, TotalCharges, Churn FROM telecom_data LIMIT 5").show()

Spark SQL Table (Hive Emulation) 'telecom_data' created successfully.
+-------------+
|total_records|
+-------------+
|         7043|
+-------------+

+----------+------+--------------+------------+-----+
|customerID|tenure|MonthlyCharges|TotalCharges|Churn|
+----------+------+--------------+------------+-----+
|7590-VHVEG|     1|         29.85|          29|   No|
|5575-GNVDE|    34|         56.95|        1889|   No|
|3668-QPYBK|     2|         53.85|         108|  Yes|
|7795-CFOCW|    45|          42.3|        1840|   No|
|9237-HQITU|     2|          70.7|         151|  Yes|
+----------+------+--------------+------------+-----+



In [None]:
spark.sql("""SELECT
    customerID,
    tenure,
    Contract,
    MonthlyCharges,
    TotalCharges
FROM
    telecom_data
WHERE
    Churn = 'Yes' AND tenure <= 12 -- Customers who churned early (within the first year)
ORDER BY
    MonthlyCharges DESC, -- Prioritize high monthly cost
    tenure ASC -- Prioritize shortest tenure
LIMIT 10""").show()

+----------+------+--------------+--------------+------------+
|customerID|tenure|      Contract|MonthlyCharges|TotalCharges|
+----------+------+--------------+--------------+------------+
|1583-IHQZE|    12|Month-to-month|        112.95|        1384|
|9851-KIELU|    10|Month-to-month|         110.1|        1043|
|3992-YWPKO|     6|Month-to-month|         109.9|         669|
|2027-FECZV|    12|Month-to-month|         106.7|        1253|
|1400-MMYXY|     3|Month-to-month|         105.9|         334|
|3932-CMDTD|     4|      One year|        105.65|         443|
|3389-YGYAI|     8|Month-to-month|         105.5|         829|
|5052-PNLOS|     3|Month-to-month|        105.35|         323|
|4587-VVTOX|     6|Month-to-month|         105.3|         545|
|6496-SLWHQ|     3|Month-to-month|         105.0|         294|
+----------+------+--------------+--------------+------------+



In [None]:
spark.sql("""SELECT
    InternetService,
    COUNT(customerID) AS total_customers,
    ROUND(AVG(MonthlyCharges), 2) AS avg_monthly_charges
FROM
    telecom_data
WHERE
    InternetService = 'Fiber optic' -- High-bandwidth service
GROUP BY
    InternetService""").show()

+---------------+---------------+-------------------+
|InternetService|total_customers|avg_monthly_charges|
+---------------+---------------+-------------------+
|    Fiber optic|           3096|               91.5|
+---------------+---------------+-------------------+



In [None]:
spark.sql("""SELECT
    Contract,
    COUNT(CASE WHEN Churn = 'Yes' THEN 1 END) AS churn_count_proxy,
    COUNT(customerID) AS total_customers,
    ROUND((COUNT(CASE WHEN Churn = 'Yes' THEN 1 END) / COUNT(customerID)) * 100, 2) AS churn_rate_percent
FROM
    telecom_data
GROUP BY
    Contract
ORDER BY
    churn_rate_percent DESC""").show()

+--------------+-----------------+---------------+------------------+
|      Contract|churn_count_proxy|total_customers|churn_rate_percent|
+--------------+-----------------+---------------+------------------+
|Month-to-month|             1655|           3875|             42.71|
|      One year|              166|           1473|             11.27|
|      Two year|               48|           1695|              2.83|
+--------------+-----------------+---------------+------------------+



In [None]:
spark.sql("""SELECT
    customerID,
    tenure,
    Contract,
    MonthlyCharges,
    TotalCharges
FROM
    telecom_data
WHERE
    Churn = 'Yes' AND tenure <= 12 -- Customers who churned early (within the first year)
ORDER BY
    MonthlyCharges DESC, -- Prioritize high monthly cost
    tenure ASC -- Prioritize shortest tenure
LIMIT 10""").show()

+----------+------+--------------+--------------+------------+
|customerID|tenure|      Contract|MonthlyCharges|TotalCharges|
+----------+------+--------------+--------------+------------+
|1583-IHQZE|    12|Month-to-month|        112.95|        1384|
|9851-KIELU|    10|Month-to-month|         110.1|        1043|
|3992-YWPKO|     6|Month-to-month|         109.9|         669|
|2027-FECZV|    12|Month-to-month|         106.7|        1253|
|1400-MMYXY|     3|Month-to-month|         105.9|         334|
|3932-CMDTD|     4|      One year|        105.65|         443|
|3389-YGYAI|     8|Month-to-month|         105.5|         829|
|5052-PNLOS|     3|Month-to-month|        105.35|         323|
|4587-VVTOX|     6|Month-to-month|         105.3|         545|
|6496-SLWHQ|     3|Month-to-month|         105.0|         294|
+----------+------+--------------+--------------+------------+



In [None]:
spark.sql("""SELECT
    InternetService,
    COUNT(customerID) AS total_customers,
    ROUND(AVG(MonthlyCharges), 2) AS avg_monthly_charges
FROM
    telecom_data
WHERE
    InternetService = 'Fiber optic' -- High-bandwidth service
GROUP BY
    InternetService""").show()

+---------------+---------------+-------------------+
|InternetService|total_customers|avg_monthly_charges|
+---------------+---------------+-------------------+
|    Fiber optic|           3096|               91.5|
+---------------+---------------+-------------------+



In [None]:
spark.sql("""SELECT
    Contract,
    COUNT(CASE WHEN Churn = 'Yes' THEN 1 END) AS churn_count_proxy,
    COUNT(customerID) AS total_customers,
    ROUND((COUNT(CASE WHEN Churn = 'Yes' THEN 1 END) / COUNT(customerID)) * 100, 2) AS churn_rate_percent
FROM
    telecom_data
GROUP BY
    Contract
ORDER BY
    churn_rate_percent DESC""").show()

+--------------+-----------------+---------------+------------------+
|      Contract|churn_count_proxy|total_customers|churn_rate_percent|
+--------------+-----------------+---------------+------------------+
|Month-to-month|             1655|           3875|             42.71|
|      One year|              166|           1473|             11.27|
|      Two year|               48|           1695|              2.83|
+--------------+-----------------+---------------+------------------+



In [None]:
spark.sql("""SELECT
    Contract,
    COUNT(customerID) AS number_of_subscribers,
    ROUND(AVG(MonthlyCharges), 2) AS avg_monthly_revenue,
    ROUND(AVG(TotalCharges), 2) AS avg_total_revenue
FROM
    telecom_data
GROUP BY
    Contract
ORDER BY
    avg_monthly_revenue DESC""").show()

+--------------+---------------------+-------------------+-----------------+
|      Contract|number_of_subscribers|avg_monthly_revenue|avg_total_revenue|
+--------------+---------------------+-------------------+-----------------+
|Month-to-month|                 3875|               66.4|          1368.79|
|      One year|                 1473|              65.05|          3032.14|
|      Two year|                 1695|              60.77|          3706.47|
+--------------+---------------------+-------------------+-----------------+



In [None]:
# 1. Define the Python function
def churn_risk_score(tenure, monthly_charges):
    """Calculates a simple risk score: Higher charges + Lower tenure = Higher risk."""
    if tenure == 0:
        return 100 # Max risk for brand new customers
    else:
        # Risk Score = (MonthlyCharges / 10) / tenure
        # Higher charges increase score, higher tenure decreases score
        return int((monthly_charges / 10) / tenure)

# 2. Register the Python function as a PySpark UDF
# The return type should be specified (IntegerType() in this case)
spark.udf.register("churn_score_udf", churn_risk_score, IntegerType())

# 3. Use the UDF in a Spark SQL Query
print("\n--- Custom UDF (churn_score_udf) Execution ---")

spark.sql("""
SELECT
    customerID,
    tenure,
    MonthlyCharges,
    Churn,
    churn_score_udf(tenure, MonthlyCharges) AS RiskScore
FROM
    telecom_data
WHERE
    Churn = 'No' -- Focus on non-churned customers to identify future risk
ORDER BY
    RiskScore DESC
LIMIT 10
""").show()

# Stop the Spark session when finished
# spark.stop()


--- Custom UDF (churn_score_udf) Execution ---
+----------+------+--------------+-----+---------+
|customerID|tenure|MonthlyCharges|Churn|RiskScore|
+----------+------+--------------+-----+---------+
|4472-LVYGI|     0|         52.55|   No|      100|
|3115-CZMZD|     0|         20.25|   No|      100|
|5709-LVOEQ|     0|         80.85|   No|      100|
|4367-NUYAO|     0|         25.75|   No|      100|
|1371-DWPAZ|     0|         56.05|   No|      100|
|7644-OMVMY|     0|         19.85|   No|      100|
|3213-VVOLG|     0|         25.35|   No|      100|
|2520-SGTTA|     0|          20.0|   No|      100|
|2923-ARZLG|     0|          19.7|   No|      100|
|4075-WKNIU|     0|         73.35|   No|      100|
+----------+------+--------------+-----+---------+

