In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, mean, lower, stddev, udf
from pyspark.sql.types import StringType

# Initialize Spark session with adjusted configuration
spark = SparkSession.builder \
    .appName("DataCleaning") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .getOrCreate()

# Set log level to WARN
spark.sparkContext.setLogLevel("WARN")

24/05/18 12:02:31 WARN Utils: Your hostname, shivus-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.13.228.27 instead (on interface en0)
24/05/18 12:02:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/18 12:02:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/18 12:02:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [11]:
# Load datasets

bigbasket_df = spark.read.csv("BigBasket_Products.csv", header=True, inferSchema=True)

In [12]:
# Schema information for BigBasket products
print("Schema information for BigBasket products:")
bigbasket_df.printSchema()

Schema information for BigBasket products:
root
 |-- ProductID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub_category: string (nullable = true)
 |-- ProductBrand: string (nullable = true)
 |-- Price (INR): string (nullable = true)
 |-- Market_price: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Description: string (nullable = true)



In [13]:
# Display the count of missing values in each column

print("Missing values in BigBasket products:")
bigbasket_df.select([count(when(col(c).isNull(), c)).alias(c) for c in bigbasket_df.columns]).show()


Missing values in BigBasket products:
+---------+-------+--------+------------+------------+-----------+------------+----+------+-----------+
|ProductID|Product|Category|Sub_category|ProductBrand|Price (INR)|Market_price|Type|Rating|Description|
+---------+-------+--------+------------+------------+-----------+------------+----+------+-----------+
|        0|   5379|    6972|        7921|        8470|       8867|        9091|9255| 18016|       9589|
+---------+-------+--------+------------+------------+-----------+------------+----+------+-----------+



In [14]:
# Identify and count duplicates in BigBasket dataset
bigbasket_duplicates = bigbasket_df.groupBy(bigbasket_df.columns).count().filter("count > 1")
num_bigbasket_duplicates = bigbasket_duplicates.count()
print(f"Number of duplicates in BigBasket dataset: {num_bigbasket_duplicates}")


# Remove duplicates
bigbasket_df = bigbasket_df.dropDuplicates()


Number of duplicates in BigBasket dataset: 1510


In [15]:
# Standardize text data to lower case
bigbasket_df = bigbasket_df.withColumn('category', lower(col('category')))


In [16]:
# Function to filter outliers based on Z-score
def filter_outliers(df, column):
    stats = df.select(mean(col(column)), stddev(col(column))).first()
    mean_val, stddev_val = stats[0], stats[1]
    return df.filter((col(column) > (mean_val - 3 * stddev_val)) & (col(column) < (mean_val + 3 * stddev_val)))
    bigbasket_df = bigbasket_df.withColumn('sale_price', col('sale_price').cast('float'))



In [18]:
# Display cleaned data
print("Cleaned Myntra catalog data:")
bigbasket_df.show(10)

Cleaned Myntra catalog data:
+---------+--------------------+--------------------+--------------------+---------------+-----------+------------+--------------------+------+--------------------+
|ProductID|             Product|            category|        Sub_category|   ProductBrand|Price (INR)|Market_price|                Type|Rating|         Description|
+---------+--------------------+--------------------+--------------------+---------------+-----------+------------+--------------------+------+--------------------+
|      145|Turmeric Powder/A...|foodgrains, oil &...|    Masalas & Spices|     Aashirvaad|         32|          64|     Powdered Spices|   4.2|Aashirvaad Turmer...|
|      661|       Under Eye Gel|    beauty & hygiene|           Skin Care|Organic Harvest|     505.75|         595|            Eye Care|     2|Wake up to eyes t...|
|      908|Face Wash - Blush...|    beauty & hygiene|           Skin Care|          Lakme|     153.75|         205|           Face Care|   4.3|Use