<a href="https://colab.research.google.com/github/pratiksha806/dataAnaylis/blob/main/big_data_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# 📦 Step 1: Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, avg, to_date

# 🔥 Step 2: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Amazon Sales Data Analysis 2025") \
    .getOrCreate()

# 📁 Step 3: Load Dataset
file_path = "sample_data/amazon_sales_data 2025.csv"  # If using locally, update this path
df = spark.read.csv(file_path, header=True, inferSchema=True)

# 🔍 Step 4: Preview Data
df.show(5)
df.printSchema()
# Convert Date string to proper date format
df = df.withColumn("Date", to_date(col("Date"), "dd-MM-yy"))

# Cast numerical columns
df = df.withColumn("Price", col("Price").cast("double")) \
       .withColumn("Quantity", col("Quantity").cast("int")) \
       .withColumn("Total Sales", col("Total Sales").cast("double"))

# Drop rows with any nulls in important fields
df_clean = df.dropna(subset=["Date", "Product", "Category", "Price", "Quantity", "Total Sales"])

df_clean.show(5)
top_products = df_clean.groupBy("Product") \
    .agg(_sum("Total Sales").alias("TotalRevenue")) \
    .orderBy(col("TotalRevenue").desc())

top_products.show(5)
top_customers = df_clean.groupBy("Customer Name") \
    .agg(_sum("Total Sales").alias("TotalSpent")) \
    .orderBy(col("TotalSpent").desc())

top_customers.show(5)
top_products.write.csv("output/top_products.csv", header=True)
top_customers.write.csv("output/top_customers.csv", header=True)


+--------+--------+-------------+-----------+-----+--------+-----------+-------------+-----------------+--------------+---------+
|Order ID|    Date|      Product|   Category|Price|Quantity|Total Sales|Customer Name|Customer Location|Payment Method|   Status|
+--------+--------+-------------+-----------+-----+--------+-----------+-------------+-----------------+--------------+---------+
| ORD0001|14-03-25|Running Shoes|   Footwear|   60|       3|        180|   Emma Clark|         New York|    Debit Card|Cancelled|
| ORD0002|20-03-25|   Headphones|Electronics|  100|       4|        400|Emily Johnson|    San Francisco|    Debit Card|  Pending|
| ORD0003|15-02-25|Running Shoes|   Footwear|   60|       2|        120|     John Doe|           Denver|    Amazon Pay|Cancelled|
| ORD0004|19-02-25|Running Shoes|   Footwear|   60|       3|        180|Olivia Wilson|           Dallas|   Credit Card|  Pending|
| ORD0005|10-03-25|   Smartwatch|Electronics|  150|       3|        450|   Emma Clark|    

In [3]:
# Import ML & Spark tools
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when

# Step 1: Filter & Prepare Data
df_ml = df_clean.select("Product", "Category", "Price", "Quantity", "Customer Location", "Payment Method", "Status")

# Step 2: Encode Categorical Labels
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_index").fit(df_ml)
    for col in ["Product", "Category", "Customer Location", "Payment Method", "Status"]
]
for indexer in indexers:
    df_ml = indexer.transform(df_ml)

# Step 3: Binary classification: Cancelled = 1, Others = 0
df_ml = df_ml.withColumn("label", when(col("Status_index") == 0.0, 1).otherwise(0))  # adjust depending on index mapping

# Step 4: Feature Vector
assembler = VectorAssembler(
    inputCols=["Product_index", "Category_index", "Price", "Quantity", "Customer Location_index", "Payment Method_index"],
    outputCol="features"
)
df_ready = assembler.transform(df_ml)

# Step 5: Split Train/Test
train_data, test_data = df_ready.randomSplit([0.7, 0.3], seed=42)

# Step 6: Train Classifier (Logistic Regression)
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Step 7: Predictions
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)

# Step 8: Evaluate Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")


+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    1|       0.0|[0.61073771798179...|
|    0|       0.0|[0.58934166687224...|
|    0|       0.0|[0.66872100772732...|
|    0|       0.0|[0.64424985424017...|
|    0|       0.0|[0.61338408223597...|
+-----+----------+--------------------+
only showing top 5 rows

Test Accuracy: 0.74
