In [267]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler , StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [225]:
# Set JAVA_HOME to the path of Java 17
# (This command finds the path dynamically using the mac system tool)
java17_path = os.popen("/usr/libexec/java_home -v 17").read().strip()

if java17_path:
    os.environ["JAVA_HOME"] = java17_path
    print(f"Successfully set JAVA_HOME to: {java17_path}")
else:
    print("Error: Java 17 not found! Please verify installation.")

Successfully set JAVA_HOME to: /Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home


In [226]:
#%pip install pyspark
#The above step will install the pyspark in the current Terminal session

In [227]:
spark=SparkSession.builder.appName("LearnSpark").getOrCreate()
spark

In [228]:
#Test if the spark variable to properly set and working correctly
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [229]:
#Read the csv file with option that first record is a header and derive the schema based on all the data
df_pyspark=spark.read.csv('retail_store.csv',header=True,inferSchema=True)

In [230]:
df_pyspark.show()

+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|Customer_ID|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|      C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|      C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|      C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|      C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|      C-005|     F| 55|  West|             Gold|   Clothing|      45.0|      

In [231]:
df_pyspark.printSchema()

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Membership_Status: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Churn_Risk: integer (nullable = true)



In [232]:
type(df_pyspark)

pyspark.sql.classic.dataframe.DataFrame

In [233]:
#This is similar to Panda dataframe to see first 10 records unless you mention a number
df_pyspark.head(3)

[Row(Transaction_ID=1001, Date=datetime.date(2023, 1, 1), Customer_ID='C-001', Gender='F', Age=28, Region='North', Membership_Status='Gold', Category='Electronics', Unit_Price=500.0, Quantity=1, Discount=0.05, Churn_Risk=0),
 Row(Transaction_ID=1002, Date=datetime.date(2023, 1, 2), Customer_ID='C-002', Gender='M', Age=45, Region='South', Membership_Status='Silver', Category='Clothing', Unit_Price=35.5, Quantity=2, Discount=0.0, Churn_Risk=0),
 Row(Transaction_ID=1003, Date=datetime.date(2023, 1, 3), Customer_ID='C-003', Gender='F', Age=32, Region='East', Membership_Status='Bronze', Category='Home', Unit_Price=120.0, Quantity=1, Discount=0.1, Churn_Risk=1)]

In [234]:
df_pyspark.columns

['Transaction_ID',
 'Date',
 'Customer_ID',
 'Gender',
 'Age',
 'Region',
 'Membership_Status',
 'Category',
 'Unit_Price',
 'Quantity',
 'Discount',
 'Churn_Risk']

In [235]:
#Show selected columns only
df_pyspark.select(['Customer_ID','Gender','Age']).show()

+-----------+------+---+
|Customer_ID|Gender|Age|
+-----------+------+---+
|      C-001|     F| 28|
|      C-002|     M| 45|
|      C-003|     F| 32|
|      C-004|     M| 22|
|      C-005|     F| 55|
|      C-001|     F| 28|
|      C-006|     M| 60|
|      C-007|     M| 35|
|      C-008|     F| 41|
|      C-002|     M| 45|
|      C-009|     F| 25|
|      C-010|     M| 38|
|      C-011|     F| 48|
|      C-005|     F| 55|
|      C-012|     M| 29|
|      C-013|     F| 33|
|      C-014|     M| 52|
|      C-015|     F| 21|
|      C-001|     F| 28|
|      C-016|     M| 40|
+-----------+------+---+



In [236]:
df_pyspark.describe().show()

+-------+-----------------+-----------+------+-----------------+------+-----------------+--------+---------------+------------------+-------------------+------------------+
|summary|   Transaction_ID|Customer_ID|Gender|              Age|Region|Membership_Status|Category|     Unit_Price|          Quantity|           Discount|        Churn_Risk|
+-------+-----------------+-----------+------+-----------------+------+-----------------+--------+---------------+------------------+-------------------+------------------+
|  count|               20|         20|    20|               20|    20|               20|      20|             20|                20|                 20|                20|
|   mean|           1010.5|       NULL|  NULL|             38.0|  NULL|             NULL|    NULL|        233.275|              1.75|0.06750000000000002|              0.35|
| stddev|5.916079783099616|       NULL|  NULL|11.77419122264321|  NULL|             NULL|    NULL|301.82041867902|1.1180339887498951|0.

In [237]:
#Add a new column which is dervied based on existing columns
df_pyspark=df_pyspark.withColumn('Final_Total',col('Unit_Price') * col('Quantity') * (1 - col('Discount')))
df_pyspark.show()

+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+-----------+
|Transaction_ID|      Date|Customer_ID|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|Final_Total|
+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+-----------+
|          1001|2023-01-01|      C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|      475.0|
|          1002|2023-01-02|      C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|       71.0|
|          1003|2023-01-03|      C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|      108.0|
|          1004|2023-01-03|      C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|     1020.0|
|          1005|2023

In [238]:
#Drop or remove the column
df_pyspark=df_pyspark.drop('Final_Total')
df_pyspark.show()

+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|Customer_ID|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+-----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|      C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|      C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|      C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|      C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|      C-005|     F| 55|  West|             Gold|   Clothing|      45.0|      

In [239]:
#Renaming a column
df_pyspark=df_pyspark.withColumnRenamed('Customer_ID','CustomerId')
df_pyspark.show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [240]:
#Remove null values records. By default it remove a record having any of it's column having null value.
df_pyspark=df_pyspark.na.fill('Missing Value')
df_pyspark.show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [241]:
### any==how
df_pyspark.na.drop(how="any").show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [242]:
##threshold
df_pyspark.na.drop(how="any",thresh=3).show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [243]:
##Subset
df_pyspark.na.drop(how="any",subset=['Age']).show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [244]:
### Filling the Missing Value
df_pyspark.na.fill('Missing Values',['Region','Age']).show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [245]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Unit_Price', 'Quantity'], 
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Unit_Price', 'Quantity']]
    ).setStrategy("median")

In [246]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+-----------+------------------+----------------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|Age_imputed|Unit_Price_imputed|Quantity_imputed|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+-----------+------------------+----------------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|         28|             500.0|               1|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|         45|              35.5|               2|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1

### Filter Operations

In [247]:
df_pyspark.filter('Membership_Status = "Gold"').show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     0.0|         0|
|          1006|2023-01-05|     C-001|     F| 28| North|             Gold|       Home|      80.0|       2|    0.05|         0|
|          1012|2023-01-10|     C-010|     M| 38|  East|             Gold|       Home|     200.0|       1|    0.15|         0|
|          1014|2023-01-12|     C-005|     F| 55|  West|             Gold|   Clothing|      30.0|       1|     

In [254]:
df_pyspark.filter((df_pyspark['Membership_Status'] == "Gold") & (df_pyspark['Unit_Price'] > 200)).show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1019|2023-01-17|     C-001|     F| 28| North|             Gold|Electronics|     250.0|       1|    0.05|         0|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+



In [None]:
#Group by Category
df_pyspark.groupBy('Category').sum().show()

+-----------+-------------------+--------+---------------+-------------+-------------------+---------------+
|   Category|sum(Transaction_ID)|sum(Age)|sum(Unit_Price)|sum(Quantity)|      sum(Discount)|sum(Churn_Risk)|
+-----------+-------------------+--------+---------------+-------------+-------------------+---------------+
|       Home|               6063|     189|          680.0|           11|0.44999999999999996|              3|
|Electronics|               7070|     264|         3650.0|            8| 0.6000000000000001|              2|
|   Clothing|               7077|     307|          335.5|           16|0.30000000000000004|              2|
+-----------+-------------------+--------+---------------+-------------+-------------------+---------------+



In [257]:
#Total sum of Quantity
df_pyspark.agg({'Quantity':'sum'}).show()

+-------------+
|sum(Quantity)|
+-------------+
|           35|
+-------------+



In [260]:
df_pyspark.show()

+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|Transaction_ID|      Date|CustomerId|Gender|Age|Region|Membership_Status|   Category|Unit_Price|Quantity|Discount|Churn_Risk|
+--------------+----------+----------+------+---+------+-----------------+-----------+----------+--------+--------+----------+
|          1001|2023-01-01|     C-001|     F| 28| North|             Gold|Electronics|     500.0|       1|    0.05|         0|
|          1002|2023-01-02|     C-002|     M| 45| South|           Silver|   Clothing|      35.5|       2|     0.0|         0|
|          1003|2023-01-03|     C-003|     F| 32|  East|           Bronze|       Home|     120.0|       1|     0.1|         1|
|          1004|2023-01-03|     C-004|     M| 22| North|         Standard|Electronics|    1200.0|       1|    0.15|         0|
|          1005|2023-01-04|     C-005|     F| 55|  West|             Gold|   Clothing|      45.0|       3|     

In [268]:
# 1. Handle Categorical Data: Convert 'Gender' and 'Membership_Status' to numbers
# (Machine learning models generally only understand numbers)
gender_indexer = StringIndexer(inputCol="Gender", outputCol="Gender_Index")
membership_indexer = StringIndexer(inputCol="Membership_Status", outputCol="Membership_Index")

In [269]:
# 2. Assemble Features: Combine inputs into a 'features' vector
# We will use Age, Unit_Price, Quantity, and the converted categorical columns
assembler = VectorAssembler(
    inputCols=["Age", "Unit_Price", "Quantity", "Gender_Index", "Membership_Index"],
    outputCol="features"
)

In [270]:
# 3. Define the Model: Logistic Regression to predict 'Churn_Risk'
lr = LogisticRegression(featuresCol="features", labelCol="Churn_Risk")

In [271]:
# 4. Create a Pipeline: Chain the steps together
pipeline = Pipeline(stages=[gender_indexer, membership_indexer, assembler, lr])

In [272]:
# 5. Split Data (80% training, 20% testing)
train_data, test_data = df_pyspark.randomSplit([0.8, 0.2])

In [273]:
# 6. Train and Predict
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

25/12/24 22:23:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [275]:
# Show results
predictions.select("CustomerId", "Churn_Risk", "prediction", "probability").show()

+----------+----------+----------+--------------------+
|CustomerId|Churn_Risk|prediction|         probability|
+----------+----------+----------+--------------------+
|     C-003|         1|       1.0|[0.37725034520961...|
+----------+----------+----------+--------------------+

