In [13]:
#pyspark setup

!apt-get update
# Install Java 8 (required by Spark)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Spark
!pip install pyspark

# setup environment variables
import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.189.91.83)] [Connected                                                                                                     Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connected to r2u.stat.illinois.edu (192                                                                                                    Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to r2u.stat.illinois.edu (192.17.190.167)] [Connect0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Waiting for headers] [Connected to ppa.launchp0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.8                               

In [17]:
# Step 1: Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, count, when, regexp_extract

# Step 2: Initialize PySpark Session
spark = SparkSession.builder \
    .appName("Retail and Sentiment Analytics") \
    .getOrCreate()

# Step 3: Load Datasets
sentiment_df = spark.read.csv("reviews.csv", header=True, inferSchema=True)
retail_df = spark.read.csv("Copy of Online Retail.csv", header=True, inferSchema=True)

# Step 4: Preview the Data
print("Initial Data Preview - Reviews DataFrame:")
sentiment_df.show(5, truncate=False)

print("Initial Data Preview - Online Retail DataFrame:")
retail_df.show(5, truncate=False)

# Step 5: Confirm Schema
print("Schema for Reviews DataFrame:")
sentiment_df.printSchema()

print("Schema for Online Retail DataFrame:")
retail_df.printSchema()

# Step 6: Handle Missing Values
print("Missing values in Reviews DataFrame:")
sentiment_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sentiment_df.columns]).show()

print("Missing values in Online Retail DataFrame:")
retail_df.select([count(when(col(c).isNull(), c)).alias(c) for c in retail_df.columns]).show()

# Drop rows with missing values
sentiment_df = sentiment_df.dropna(how='any')
retail_df = retail_df.dropna(how='any')

# Step 7: Remove Duplicates
sentiment_df = sentiment_df.dropDuplicates()
retail_df = retail_df.dropDuplicates()

# Step 8: Validate and Convert Dates in Online Retail DataFrame
# Add a column to check if InvoiceDate is in the expected format
retail_df = retail_df.withColumn(
    "ValidDate", regexp_extract(col("InvoiceDate"), r"\d{2}-\d{2}-\d{4} \d{2}:\d{2}", 0)
)

# Filter out rows with invalid InvoiceDate
invalid_dates = retail_df.filter(col("ValidDate") == "").select("InvoiceDate").distinct()
if invalid_dates.count() > 0:
    print("Rows with invalid InvoiceDate format:")
    invalid_dates.show()

retail_df = retail_df.filter(col("ValidDate") != "").drop("ValidDate")

# Convert InvoiceDate to Timestamp
retail_df = retail_df.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "dd-MM-yyyy HH:mm"))

# Step 9: Post-Cleansing Preview
print("Data Preview after Cleansing - Reviews DataFrame:")
sentiment_df.show(5, truncate=False)

print("Data Preview after Cleansing - Online Retail DataFrame:")
retail_df.show(5, truncate=False)

# Step 10: Post-Cleansing Row Count
print("Post-cleansing number of rows in Reviews DataFrame:", sentiment_df.count())
print("Post-cleansing number of rows in Online Retail DataFrame:", retail_df.count())

# Step 11: Summary Statistics for Validation
print("Summary Statistics for Reviews DataFrame:")
sentiment_df.describe().show()

print("Summary Statistics for Online Retail DataFrame:")
retail_df.describe().show()

# Step 12: Additional Checks
# Check distinct values in the Sentiment column of Reviews DataFrame
print("Distinct Sentiments in Reviews DataFrame:")
sentiment_df.select("Sentiment").distinct().show()

# Check distinct countries in the Online Retail DataFrame
print("Distinct Countries in Online Retail DataFrame:")
retail_df.select("Country").distinct().show()

Initial Data Preview - Reviews DataFrame:
+-------------------------------------------------------------------------------------------+---------+
|Review                                                                                     |Sentiment|
+-------------------------------------------------------------------------------------------+---------+
|This product exceeded my expectations! It's high-quality and performs exceptionally well.  |Positive |
|The product was decent. It worked fine, but it wasn't anything special.                    |Neutral  |
|I had a terrible experience with this company. The customer service was rude and unhelpful.|Negative |
|It's an okay product. Nothing to write home about.                                         |Neutral  |
|Disappointed with the product. It didn't meet my expectations.                             |Negative |
+-------------------------------------------------------------------------------------------+---------+
only showing top 5 row

In [22]:
from pyspark.sql.functions import col, lag, sum as F_sum, when
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Step 1: Aggregate data by StockCode and InvoiceDate
daily_sales = retail_df.groupBy("StockCode", "InvoiceDate") \
    .agg(F_sum("Quantity").alias("DailySales")) \
    .orderBy("StockCode", "InvoiceDate")

# Step 2: Create lag features
window_spec = Window.partitionBy("StockCode").orderBy("InvoiceDate")
for lag_value in range(1, 4):  # Creating lag1, lag2, lag3
    daily_sales = daily_sales.withColumn(f"lag_{lag_value}", lag("DailySales", lag_value).over(window_spec))

# Step 3: Filter out rows with null lag values
daily_sales = daily_sales.dropna()

# Step 4: Assemble features and label
feature_columns = [f"lag_{i}" for i in range(1, 4)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
daily_sales = assembler.transform(daily_sales).withColumnRenamed("DailySales", "label")

# Step 5: Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler_model = scaler.fit(daily_sales)
daily_sales = scaler_model.transform(daily_sales)

# Step 6: Split data into training and testing sets
train_data, test_data = daily_sales.randomSplit([0.8, 0.2], seed=42)

# Step 7: Build a regression model
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="label", predictionCol="prediction")

# Step 8: Hyperparameter tuning with CrossValidator
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

# Step 9: Train the model
if train_data.count() > 0:
    cv_model = crossval.fit(train_data)
else:
    raise ValueError("Training data is empty. Check your preprocessing steps!")

# Step 10: Evaluate the model
best_model = cv_model.bestModel
test_predictions = best_model.transform(test_data)

# Step 10: Evaluate the model
best_model = cv_model.bestModel

# Print the best model parameters
print("Best Model Details:")
print(f" - Intercept: {best_model.intercept}")
print(f" - Coefficients: {best_model.coefficients}")
print(f" - Regularization Parameter (regParam): {best_model._java_obj.getRegParam()}")
print(f" - ElasticNet Parameter (elasticNetParam): {best_model._java_obj.getElasticNetParam()}")

# Evaluate the model on test data
test_predictions = best_model.transform(test_data)

rmse = evaluator.evaluate(test_predictions)
mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae").evaluate(test_predictions)

print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)

# Display predictions
print("Test Predictions (first 10 rows):")
test_predictions.select("StockCode", "InvoiceDate", "label", "prediction").show(10, truncate=False)




Best Model Details:
 - Intercept: 12.775064112647463
 - Coefficients: [1.0216104462881062,0.633935504452857,0.4032026125489518]
 - Regularization Parameter (regParam): 0.5
 - ElasticNet Parameter (elasticNetParam): 1.0
Root Mean Squared Error (RMSE): 44.1066292260956
Mean Absolute Error (MAE): 13.017012658261402
Test Predictions (first 10 rows):
+---------+-------------------+-----+------------------+
|StockCode|InvoiceDate        |label|prediction        |
+---------+-------------------+-----+------------------+
|10002    |2010-12-08 12:24:00|12   |12.936339325047348|
|10002    |2010-12-09 18:58:00|12   |12.736913715746129|
|10002    |2010-12-10 12:33:00|12   |13.257888679312948|
|10002    |2011-01-05 14:48:00|12   |12.551646606139398|
|10002    |2011-01-16 15:50:00|6    |12.518028516462392|
|10002    |2011-01-20 10:43:00|6    |12.7833205605585  |
|10002    |2011-01-31 09:57:00|120  |12.732488197441771|
|10002    |2011-02-25 09:09:00|24   |12.731580068067478|
|10002    |2011-04-18 12:

In [None]:
from google.colab import drive
drive.mount('/content/drive')