In [6]:
!pip install pyspark==3.1.2




In [8]:
import os
os.environ['PYSPARK_PYTHON'] = r'C:\Users\asus\anaconda3\python.exe'
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_321'  # Update this to your Java installation path
os.environ['SPARK_HOME'] = r'C:\Users\asus\Documents\BDT\BigData\BigData\spark-3.1.2-bin-hadoop3.2'  # Update this to your Spark installation path
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("Linear Regression Model").getOrCreate()

data = spark.read.csv(r"C:\Users\asus\Downloads\Ecommerce_Customers.csv", header=True, inferSchema=True)

# Print schema and sample data
data.printSchema()
data.show(3)

# Assemble features
assembler = VectorAssembler(
    inputCols=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'],
    outputCol='features'
)

# Prepare the features and label (target column for regression)
output = assembler.transform(data)
final_data = output.select('features', 'Yearly Amount Spent')

# Split the data into training and test sets
train_data, test_data = final_data.randomSplit([0.7, 0.3])

# Initialize Linear Regression model
lin_reg = LinearRegression(labelCol='Yearly Amount Spent', featuresCol='features')

# Fit the model
lin_reg_model = lin_reg.fit(train_data)

# Evaluate the model on test data
pred_data = lin_reg_model.evaluate(test_data)

# Print evaluation metrics
print("Root Mean Squared Error (RMSE):", pred_data.rootMeanSquaredError)
print("R-Squared (R2):", pred_data.r2)

# To see predictions
pred_data.predictions.show(3)

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)

+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|   Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|   Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Ar

In [10]:
final_data.show(5)

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
+--------------------+-------------------+
only showing top 5 rows



# logistic Regression

In [7]:
import os
os.environ['PYSPARK_PYTHON'] = r'C:\Users\asus\anaconda3\python.exe'
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_321'  # Update this to your Java installation path
os.environ['SPARK_HOME'] = r'C:\Users\asus\Documents\BDT\BigData\BigData\spark-3.1.2-bin-hadoop3.2'  # Update this to your Spark installation path
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder.appName("Logistic Regression Model").getOrCreate()

# Load the dataset
data = spark.read.csv(r"C:\Users\asus\Downloads\Ecommerce_Customers.csv", header=True, inferSchema=True)

# Print schema and sample data
data.printSchema()
data.show(3)

# Create a binary label column ('Churn') based on 'Yearly Amount Spent'
# Assuming: If 'Yearly Amount Spent' < 500, classify as 'Churn' (1), else 'Not Churned' (0)
data = data.withColumn("Churn", when(data["Yearly Amount Spent"] < 500, 1).otherwise(0))

# Assemble features
assembler = VectorAssembler(
    inputCols=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'],
    outputCol='features'
)

# Prepare the features and label (target column for classification)
output = assembler.transform(data)
final_data = output.select('features', 'Churn')

# Split the data into training and test sets
train_data, test_data = final_data.randomSplit([0.7, 0.3])

# Initialize Logistic Regression model
log_reg = LogisticRegression(labelCol='Churn', featuresCol='features')

# Fit the model
log_reg_model = log_reg.fit(train_data)

# Evaluate the model on test data
pred_data = log_reg_model.evaluate(test_data)

# Print evaluation metrics
print("Accuracy:", pred_data.accuracy)
print("Precision:", pred_data.precisionByLabel)
print("Recall:", pred_data.recallByLabel)
print("F1 Score:", pred_data.fMeasureByLabel())

# To see predictions
pred_data.predictions.show(5)


root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)

+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|   Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|   Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Ar