In [None]:
import pandas as pd
import numpy as np

# Load the dataset
df = pd.read_excel("BlinkIT Grocery Data.xlsx", sheet_name="BlinkIT Grocery Data")
# Display basic info
print("Initial dataset info:")
print(df.info())

Initial dataset info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8523 entries, 0 to 8522
Data columns (total 12 columns):
 #   Column                     Non-Null Count  Dtype  
---  ------                     --------------  -----  
 0   Item Fat Content           8523 non-null   object 
 1   Item Identifier            8523 non-null   object 
 2   Item Type                  8523 non-null   object 
 3   Outlet Establishment Year  8523 non-null   int64  
 4   Outlet Identifier          8523 non-null   object 
 5   Outlet Location Type       8523 non-null   object 
 6   Outlet Size                8523 non-null   object 
 7   Outlet Type                8523 non-null   object 
 8   Item Visibility            8523 non-null   float64
 9   Item Weight                7060 non-null   float64
 10  Sales                      8523 non-null   float64
 11  Rating                     8523 non-null   float64
dtypes: float64(4), int64(1), object(7)
memory usage: 799.2+ KB
None


In [3]:
import pandas as pd

# Load and convert
df = pd.read_excel("BlinkIT Grocery Data.xlsx", sheet_name="BlinkIT Grocery Data")
df.to_csv("blinkit_data.csv", index=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BlinkitDataCleaning") \
    .getOrCreate()

# Load CSV
df_spark = spark.read.option("header", "true").option("inferSchema", "true").csv("blinkit_data.csv")

# Fill missing Item Weight with mean
mean_weight = df_spark.select(avg("Item Weight")).collect()[0][0]
df_spark = df_spark.fillna({"Item Weight": mean_weight})

# Show preview
df_spark.show(5)


+----------------+---------------+--------------------+-------------------------+-----------------+--------------------+-----------+-----------------+---------------+-----------+--------+------+----------+
|Item Fat Content|Item Identifier|           Item Type|Outlet Establishment Year|Outlet Identifier|Outlet Location Type|Outlet Size|      Outlet Type|Item Visibility|Item Weight|   Sales|Rating|Outlet Age|
+----------------+---------------+--------------------+-------------------------+-----------------+--------------------+-----------+-----------------+---------------+-----------+--------+------+----------+
|         Regular|          FDX32|Fruits and Vegeta...|                     2012|           OUT049|              Tier 1|     Medium|Supermarket Type1|      0.1000135|       15.1|145.4786|   5.0|        13|
|         Low Fat|          NCB42|  Health and Hygiene|                     2022|           OUT018|              Tier 3|     Medium|Supermarket Type2|    0.008596051|       11.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg

# STEP 1: Create SparkSession with MongoDB Connector
spark = SparkSession.builder \
    .appName("BlinkitDataToMongo") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/BlinkitSalesDB.sales_data") \
    .getOrCreate()

# STEP 2: Load the CSV File
df = spark.read.option("header", "true").option("inferSchema", "true").csv("blinkit_data.csv")

# STEP 3: Clean 'Item Fat Content'
df = df.withColumn("Item Fat Content", 
    when(col("Item Fat Content").isin("LF", "low fat"), "Low Fat")
    .when(col("Item Fat Content") == "reg", "Regular")
    .otherwise(col("Item Fat Content"))
)

# STEP 4: Fill Missing 'Item Weight'
mean_weight = df.select(avg("Item Weight")).first()[0]
df = df.fillna({"Item Weight": mean_weight})

# STEP 5: Create 'Outlet Age'
df = df.withColumn("Outlet Age", 2025 - col("Outlet Establishment Year"))

# Optional: Preview
df.show(5)

# STEP 6: Write to MongoDB
df.write.format("mongodb").mode("append").save()

print("✅ Data successfully written to MongoDB!")

# Stop Spark session
spark.stop()

+----------------+---------------+--------------------+-------------------------+-----------------+--------------------+-----------+-----------------+---------------+-----------+--------+------+----------+
|Item Fat Content|Item Identifier|           Item Type|Outlet Establishment Year|Outlet Identifier|Outlet Location Type|Outlet Size|      Outlet Type|Item Visibility|Item Weight|   Sales|Rating|Outlet Age|
+----------------+---------------+--------------------+-------------------------+-----------------+--------------------+-----------+-----------------+---------------+-----------+--------+------+----------+
|         Regular|          FDX32|Fruits and Vegeta...|                     2012|           OUT049|              Tier 1|     Medium|Supermarket Type1|      0.1000135|       15.1|145.4786|   5.0|        13|
|         Low Fat|          NCB42|  Health and Hygiene|                     2022|           OUT018|              Tier 3|     Medium|Supermarket Type2|    0.008596051|       11.

Py4JJavaError: An error occurred while calling o67.save.
: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
	at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.addConnectionStringDatabaseAndCollection(AbstractMongoConfig.java:312)
	at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.createUsageOptions(AbstractMongoConfig.java:290)
	at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.lambda$new$1(AbstractMongoConfig.java:82)
	at scala.Option.map(Option.scala:230)
	at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.<init>(AbstractMongoConfig.java:82)
	at com.mongodb.spark.sql.connector.config.WriteConfig.<init>(WriteConfig.java:242)
	at com.mongodb.spark.sql.connector.config.MongoConfig.writeConfig(MongoConfig.java:79)
	at com.mongodb.spark.sql.connector.config.MongoConfig.toWriteConfig(MongoConfig.java:239)
	at com.mongodb.spark.sql.connector.MongoTable.newWriteBuilder(MongoTable.java:111)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.org$apache$spark$sql$execution$datasources$v2$V2Writes$$newWriteBuilder(V2Writes.scala:145)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:46)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:44)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:40)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:315)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
	... 84 more
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 84 more
