In [10]:
import pyspark
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, isnull, sum, median, mean, first, max, min, mode
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

from pyspark.ml.feature import Bucketizer, QuantileDiscretizer
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("test").getOrCreate()

# Load data

In [112]:
df=spark.read.csv('cust_data.csv',inferSchema=True,header=True)

In [5]:
df.show()

+---+------+---+-------------------+---------+------------------------------+---------+--------+------------------+-------------------+-------+-----+
| id|Gender|Age|Has_Mobile_Contract|Area_Code|Currently_Holds_Second_Product|   Tenure|App_User|Num_website_visits|Acquisition_Channel|Revenue|Label|
+---+------+---+-------------------+---------+------------------------------+---------+--------+------------------+-------------------+-------+-----+
|  1|  Male| 44|                  1|       28|                             0|> 2 Years|     Yes|              NULL|                 26|  40.45|    1|
|  2|  Male| 76|                  1|        3|                             0| 1-2 Year|      No|              NULL|                 26|  33.54|    0|
|  3|  Male| 47|                  1|       28|                             0|> 2 Years|     Yes|              NULL|                 26|  38.29|    1|
|  4|  Male| 21|                  1|       11|                             1| < 1 Year|      No|    

In [6]:
print(f"Number of rows: {df.count()} and columns: {len(df.columns)}")

Number of rows: 303469 and columns: 12


In [7]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Has_Mobile_Contract: integer (nullable = true)
 |-- Area_Code: integer (nullable = true)
 |-- Currently_Holds_Second_Product: integer (nullable = true)
 |-- Tenure: string (nullable = true)
 |-- App_User: string (nullable = true)
 |-- Num_website_visits: string (nullable = true)
 |-- Acquisition_Channel: integer (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Label: integer (nullable = true)



In [113]:
# Handle out of range values on the Age column
df = df.withColumn('Age', when((df['Age'] == 190) | (df['Age'] == -1), np.nan).otherwise(df.Age))

# Handle missing values

In [114]:
df = df.drop('id', 'Num_website_visits')

In [11]:
def fill_categorical_nulls_with_mode(df: pyspark.sql.DataFrame, columns: list):
    mode_values = {}
    for column in columns:
        mode_value = df.select(mode(column)).first()[0]
        mode_values[column] = mode_value
    return df.fillna(mode_values)

In [12]:
def fill_numerical_nulls_with_median(df: pyspark.sql.DataFrame, columns: list):
    median_values = {}
    for column in columns:
        median_value = df.select(median(column)).first()[0]
        median_values[column] = median_value
    return df.fillna(median_values)

In [115]:
categorical_columns = ['Has_Mobile_Contract', 'Tenure']
df = fill_categorical_nulls_with_mode(df, categorical_columns)

In [116]:
numerical_columns = ['Area_Code', 'Age']
df = fill_numerical_nulls_with_median(df, numerical_columns)

In [15]:
null_counts = [sum(when(isnull(col), 1).otherwise(0)).alias(col) for col in df.columns]
null_df = df.select(null_counts)

result = null_df.toPandas().transpose().reset_index()
result['null_percentage'] = (result[0] / df.count() * 100).round(3)

schema = StructType([
    StructField('column', StringType(), False),
    StructField('null_counts', LongType(), False),
    StructField('null_percentage', DoubleType(), False)
])

result_df = df.sparkSession.createDataFrame(
    result.rename(
        columns={
            "index": "column_name",
            0: "null_count"
        }
    ),
    schema=schema
)

result_df.orderBy(col('null_counts').desc()).show()

+--------------------+-----------+---------------+
|              column|null_counts|null_percentage|
+--------------------+-----------+---------------+
|              Tenure|          0|            0.0|
|              Gender|          0|            0.0|
|            App_User|          0|            0.0|
|                 Age|          0|            0.0|
| Acquisition_Channel|          0|            0.0|
| Has_Mobile_Contract|          0|            0.0|
|             Revenue|          0|            0.0|
|           Area_Code|          0|            0.0|
|               Label|          0|            0.0|
|Currently_Holds_S...|          0|            0.0|
+--------------------+-----------+---------------+



In [16]:
df.describe().show()

+-------+------+------------------+-------------------+------------------+------------------------------+---------+--------+-------------------+------------------+-------------------+
|summary|Gender|               Age|Has_Mobile_Contract|         Area_Code|Currently_Holds_Second_Product|   Tenure|App_User|Acquisition_Channel|           Revenue|              Label|
+-------+------+------------------+-------------------+------------------+------------------------------+---------+--------+-------------------+------------------+-------------------+
|  count|303469|            303469|             303469|            303469|                        303469|   303469|  303469|             303469|            303469|             303469|
|   mean|  NULL| 38.81678524000804| 0.9978515103684397|26.381597461355195|            0.4579314526360188|     NULL|    NULL| 112.11268696308355|30.536945223385494|0.12290546975144084|
| stddev|  NULL|15.496530282611687|0.04630205922464289|  13.2321347315552|      

# Feature Engineering

## Make the age categorical

In [117]:
bucketizer = Bucketizer(splits=[0, 25, 35, 45, 55, 100],
                        inputCol='Age',
                        outputCol='bins')
df = bucketizer.setHandleInvalid('keep').transform(df)

In [118]:
df = df.withColumn('age_bins',
    when(col('bins') == 0.0, '18-25')
    .when(col('bins') == 1.0, '26-35')
    .when(col('bins') == 2.0, '36-45')
    .when(col('bins') == 3.0, '46-55')
    .when(col('bins') == 4.0, '55+')
    .otherwise(col('bins'))
)

In [119]:
df.show()

+------+----+-------------------+---------+------------------------------+---------+--------+-------------------+-------+-----+----+--------+
|Gender| Age|Has_Mobile_Contract|Area_Code|Currently_Holds_Second_Product|   Tenure|App_User|Acquisition_Channel|Revenue|Label|bins|age_bins|
+------+----+-------------------+---------+------------------------------+---------+--------+-------------------+-------+-----+----+--------+
|  Male|44.0|                  1|       28|                             0|> 2 Years|     Yes|                 26|  40.45|    1| 2.0|   36-45|
|  Male|76.0|                  1|        3|                             0| 1-2 Year|      No|                 26|  33.54|    0| 4.0|     55+|
|  Male|47.0|                  1|       28|                             0|> 2 Years|     Yes|                 26|  38.29|    1| 3.0|   46-55|
|  Male|21.0|                  1|       11|                             1| < 1 Year|      No|                152|  28.62|    0| 0.0|   18-25|
|Femal

In [120]:
df.groupBy('age_bins').count().withColumnRenamed('count', 'cnt_per_group').withColumn('perc_of_count_total', (col('cnt_per_group') / df.count()) * 100).show()

+--------+-------------+-------------------+
|age_bins|cnt_per_group|perc_of_count_total|
+--------+-------------+-------------------+
|   18-25|        93868|  24.63022389919944|
|   26-35|        88006| 23.092081268088656|
|   46-55|        65236| 17.117412603743286|
|   36-45|        67290| 17.656366026517347|
|     55+|        66709|  17.50391620245127|
+--------+-------------+-------------------+



## Make the revenue categorical
https://stackoverflow.com/questions/54803107/what-are-alternative-methods-for-pandas-quantile-and-cut-in-pyspark-1-6

In [121]:
discretizer = QuantileDiscretizer(
    numBuckets=4,
    inputCol='Revenue',
    outputCol='Revenue_Group'
)
df_bins = discretizer.fit(df).transform(df)

In [122]:
df = df_bins.withColumn('Revenue_Group',
    when(col('Revenue_Group') == 0.0, 'Low')
    .when(col('Revenue_Group') == 1.0, 'Medium')
    .when(col('Revenue_Group') == 2.0, 'High')
    .when(col('Revenue_Group') == 3.0, 'Very High')
    .otherwise(col('Revenue_Group'))
)

In [123]:
df.groupBy('Revenue_Group').count().orderBy('count', ascending=False).show()

+-------------+-----+
|Revenue_Group|count|
+-------------+-----+
|    Very High|95420|
|         High|95345|
|       Medium|95235|
|          Low|95109|
+-------------+-----+



## Convert categorical to numerical

In [124]:
df.show()

+------+----+-------------------+---------+------------------------------+---------+--------+-------------------+-------+-----+----+--------+-------------+
|Gender| Age|Has_Mobile_Contract|Area_Code|Currently_Holds_Second_Product|   Tenure|App_User|Acquisition_Channel|Revenue|Label|bins|age_bins|Revenue_Group|
+------+----+-------------------+---------+------------------------------+---------+--------+-------------------+-------+-----+----+--------+-------------+
|  Male|44.0|                  1|       28|                             0|> 2 Years|     Yes|                 26|  40.45|    1| 2.0|   36-45|    Very High|
|  Male|76.0|                  1|        3|                             0| 1-2 Year|      No|                 26|  33.54|    0| 4.0|     55+|         High|
|  Male|47.0|                  1|       28|                             0|> 2 Years|     Yes|                 26|  38.29|    1| 3.0|   46-55|         High|
|  Male|21.0|                  1|       11|                     

In [125]:
df = df.withColumn(
    'Gender_Numeric',
    when(col('Gender') == 'Male', 0).when(col('Gender') == 'Female', 1).otherwise(None)
)

In [126]:
df = df.withColumn(
    'Tenure_Numeric',
    when(col('Tenure') == '< 1 Year', 0)
    .when(col('Tenure') == '1-2 Year', 1)
    .when(col('Tenure') == '> 2 Years', 2)
    .otherwise(None)
)

In [127]:
df = df.withColumn(
    'App_User_Numeric',
    when(col('App_User') == 'Yes', 0)
    .when(col('App_User') == 'No', 1)
    .otherwise(None)
)

In [128]:
# Create the window specification
area_code_window = Window.partitionBy('Area_Code')

# Add the new feature to your DataFrame
df = df.withColumn('Area_Code_Avg_Revenue', mean('Revenue').over(area_code_window))

In [129]:
# Create the window specification
acquisition_channel_window = Window.partitionBy('Acquisition_Channel')

# Add the new feature to your DataFrame
df = df.withColumn('Channel_Avg_Revenue', mean('Revenue').over(acquisition_channel_window))

In [130]:
df.show()

+------+----+-------------------+---------+------------------------------+--------+--------+-------------------+-------+-----+----+--------+-------------+--------------+--------------+----------------+---------------------+-------------------+
|Gender| Age|Has_Mobile_Contract|Area_Code|Currently_Holds_Second_Product|  Tenure|App_User|Acquisition_Channel|Revenue|Label|bins|age_bins|Revenue_Group|Gender_Numeric|Tenure_Numeric|App_User_Numeric|Area_Code_Avg_Revenue|Channel_Avg_Revenue|
+------+----+-------------------+---------+------------------------------+--------+--------+-------------------+-------+-----+----+--------+-------------+--------------+--------------+----------------+---------------------+-------------------+
|Female|48.0|                  1|        1|                             1|1-2 Year|      No|                 12|   2.63|    0| 3.0|   46-55|          Low|             1|             1|               1|   2.7112896825397494|  33.57289910600262|
|  Male|58.0|           