# MongoDB Sales Data Analysis

## Store and Query JSON Data in MongoDB

This notebook demonstrates:
- Storing order data as nested JSON documents (customer, product, region)
- CRUD operations (Create, Read, Update, Delete)
- Querying orders in date ranges
- Aggregation pipelines for grouping sales by region and category
- Comparison of NoSQL flexibility vs Relational SQL model

### Tech Stack
- **MongoDB**: NoSQL database
- **Python**: PyMongo for MongoDB operations
- **Pandas**: Data processing

### Dataset
Amazon Sales Report CSV file with order data


In [None]:
# Import required libraries
import pandas as pd
from pymongo import MongoClient
from datetime import datetime
from dateutil import parser
import json
from typing import List, Dict, Any
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")


## 1. Connect to MongoDB

First, ensure MongoDB is running on your system. If using MongoDB Atlas (cloud), update the connection string.


In [None]:
# Connect to MongoDB
# For local MongoDB: mongodb://localhost:27017/
# For MongoDB Atlas: 
connection_string = "mongodb://localhost:27017/"
database_name = "amazon_sales_db"
collection_name = "orders"

client = MongoClient(connection_string)
db = client[database_name]
collection = db[collection_name]

print(f"âœ“ Connected to MongoDB")
print(f"âœ“ Database: {database_name}")
print(f"âœ“ Collection: {collection_name}")


## 2. Load and Transform CSV Data

Transform flat CSV data into nested JSON documents with:
- **Customer** information (city, state, postal code, country)
- **Product** information (style, SKU, category, size, ASIN)
- **Region** information (shipping location)
- **Sales** information (channel, fulfilment, amount, quantity)
- **Fulfillment** information (courier status, fulfilled by)

### Why Nested Documents?

In **SQL (Relational Model)**:
- Data is normalized across multiple tables
- Requires JOINs to combine related data
- Foreign key relationships maintain integrity

In **MongoDB (NoSQL)**:
- Data is denormalized in nested documents
- All related data in one document (no JOINs needed)
- Easy to add new fields without schema changes
- Natural representation of hierarchical data


In [None]:
def transform_csv_to_json(csv_file_path: str) -> List[Dict[str, Any]]:
    """
    Transform CSV data into nested JSON documents.
    This demonstrates NoSQL flexibility - all related data in one document.
    """
    print(f"Reading CSV file: {csv_file_path}")
    df = pd.read_csv(csv_file_path)
    print(f"Loaded {len(df)} rows from CSV")
    
    documents = []
    for idx, row in df.iterrows():
        # Parse date
        try:
            order_date = parser.parse(str(row['Date']))
        except:
            order_date = datetime.now()
        
        # Build nested document structure
        document = {
            "order_id": str(row.get('Order ID', '')),
            "date": order_date,
            "status": str(row.get('Status', '')),
            
            # Customer information (nested)
            "customer": {
                "city": str(row.get('ship-city', '')),
                "state": str(row.get('ship-state', '')),
                "postal_code": str(row.get('ship-postal-code', '')),
                "country": str(row.get('ship-country', ''))
            },
            
            # Product information (nested)
            "product": {
                "style": str(row.get('Style', '')),
                "sku": str(row.get('SKU', '')),
                "category": str(row.get('Category', '')),
                "size": str(row.get('Size', '')),
                "asin": str(row.get('ASIN', ''))
            },
            
            # Region information (nested)
            "region": {
                "city": str(row.get('ship-city', '')),
                "state": str(row.get('ship-state', '')),
                "postal_code": str(row.get('ship-postal-code', '')),
                "country": str(row.get('ship-country', ''))
            },
            
            # Sales information (nested)
            "sales": {
                "channel": str(row.get('Sales Channel ', '')),
                "fulfilment": str(row.get('Fulfilment', '')),
                "service_level": str(row.get('ship-service-level', '')),
                "quantity": int(row.get('Qty', 0)) if pd.notna(row.get('Qty')) else 0,
                "currency": str(row.get('currency', '')),
                "amount": float(row.get('Amount', 0)) if pd.notna(row.get('Amount')) else 0.0,
                "b2b": str(row.get('B2B', 'FALSE')).upper() == 'TRUE'
            },
            
            # Fulfillment information (nested)
            "fulfillment": {
                "fulfilled_by": str(row.get('fulfilled-by', '')),
                "courier_status": str(row.get('Courier Status', ''))
            },
            
            # Promotions (array)
            "promotions": []
        }
        
        # Parse promotions if available
        if pd.notna(row.get('promotion-ids')) and str(row.get('promotion-ids')).strip():
            promotions_str = str(row.get('promotion-ids'))
            document["promotions"] = [p.strip() for p in promotions_str.split(',') if p.strip()]
        
        documents.append(document)
        
        # Progress indicator for large files
        if (idx + 1) % 10000 == 0:
            print(f"  Processed {idx + 1:,} rows...")
    
    print(f"âœ“ Transformed {len(documents)} rows into JSON documents")
    return documents

# Transform the CSV file
csv_file = "Amazon Sale Report(in).csv"
documents = transform_csv_to_json(csv_file)

# Show sample document structure
if documents:
    print("\nðŸ“„ Sample Document Structure:")
    print(json.dumps(documents[0], indent=2, default=str))


## 3. CRUD Operations

### 3.1 CREATE - Insert Orders into MongoDB


In [None]:
# Clear existing collection (optional - for clean start)
# Uncomment to clear existing data
# collection.delete_many({})

# Insert documents into MongoDB
print("=== CREATE OPERATION ===")
print(f"Inserting {len(documents)} documents...")

# Insert in batches for better performance
batch_size = 1000
inserted_count = 0

for i in range(0, len(documents), batch_size):
    batch = documents[i:i + batch_size]
    result = collection.insert_many(batch)
    inserted_count += len(result.inserted_ids)
    if (i // batch_size + 1) % 10 == 0:
        print(f"  Inserted batch {i // batch_size + 1}: {inserted_count:,}/{len(documents):,} documents")

print(f"âœ“ Successfully inserted {inserted_count:,} orders")

# Verify insertion
total_docs = collection.count_documents({})
print(f"âœ“ Total documents in collection: {total_docs:,}")


### 3.2 READ - Retrieve Orders


In [None]:
# Read operations
print("=== READ OPERATION ===")

# Read all orders (with limit)
limit = 5
orders = list(collection.find().limit(limit))
print(f"Retrieved {len(orders)} orders\n")

# Display sample orders
for i, order in enumerate(orders, 1):
    print(f"Order {i}:")
    print(f"  Order ID: {order.get('order_id')}")
    print(f"  Date: {order.get('date')}")
    print(f"  Status: {order.get('status')}")
    print(f"  Customer: {order.get('customer', {}).get('city')}, {order.get('customer', {}).get('state')}")
    print(f"  Product: {order.get('product', {}).get('category')} - {order.get('product', {}).get('style')}")
    print(f"  Amount: {order.get('sales', {}).get('amount')} {order.get('sales', {}).get('currency')}")
    print()

# Read with specific criteria
print("\n--- Orders with amount > 500 INR ---")
high_value_orders = list(collection.find({"sales.amount": {"$gt": 500}}).limit(3))
for order in high_value_orders:
    print(f"  {order.get('order_id')}: {order.get('sales', {}).get('amount')} INR")


## 4. Query Orders in Date Range

**SQL Equivalent:**
```sql
SELECT * FROM orders 
WHERE date BETWEEN '2022-04-01' AND '2022-04-30'
JOIN customers ON orders.customer_id = customers.id
JOIN products ON orders.product_id = products.id
```

**MongoDB:**
- Simple query with date range
- All related data already in document (no JOINs needed)


In [None]:
# Query orders in a specific date range
print("=== QUERY: Orders in Date Range ===")

start_date = "2022-04-01"
end_date = "2022-04-30"

start = parser.parse(start_date)
end = parser.parse(end_date)

query = {
    "date": {
        "$gte": start,
        "$lte": end
    }
}

orders_in_range = list(collection.find(query))
total_amount = sum(order.get('sales', {}).get('amount', 0) for order in orders_in_range)
total_quantity = sum(order.get('sales', {}).get('quantity', 0) for order in orders_in_range)

print(f"Date Range: {start_date} to {end_date}")
print(f"âœ“ Found {len(orders_in_range):,} orders")
print(f"âœ“ Total Amount: {total_amount:,.2f} INR")
print(f"âœ“ Total Quantity: {total_quantity:,}")

# Show sample results
if orders_in_range:
    print("\nSample orders:")
    for order in orders_in_range[:5]:
        print(f"  {order.get('order_id')}: {order.get('date')} - {order.get('sales', {}).get('amount')} INR")


## 5. Aggregation Pipeline - Sales by Region

**SQL Equivalent:**
```sql
SELECT state, 
       SUM(amount) as total_sales, 
       COUNT(*) as order_count,
       AVG(amount) as avg_order_value
FROM orders o
JOIN regions r ON o.region_id = r.id
WHERE amount > 0
GROUP BY state
ORDER BY total_sales DESC
LIMIT 10
```

**MongoDB Aggregation:**
- More flexible pipeline
- Can reshape data on the fly
- No JOINs needed


In [None]:
# Aggregation: Sales by Region (State)
print("=== AGGREGATION: Sales by Region (State) ===")

pipeline = [
    {
        "$match": {
            "sales.amount": {"$gt": 0}  # Only count orders with amount > 0
        }
    },
    {
        "$group": {
            "_id": "$region.state",
            "total_sales": {"$sum": "$sales.amount"},
            "order_count": {"$sum": 1},
            "total_quantity": {"$sum": "$sales.quantity"},
            "avg_order_value": {"$avg": "$sales.amount"}
        }
    },
    {
        "$sort": {"total_sales": -1}
    },
    {
        "$limit": 10  # Top 10 states
    }
]

results = list(collection.aggregate(pipeline))

print(f"\nðŸ“Š Top 10 States by Sales:")
print(f"{'State':<25} {'Total Sales':<15} {'Orders':<10} {'Avg Order Value':<15}")
print("-" * 70)

for result in results:
    state = result.get('_id', 'Unknown')
    total = result.get('total_sales', 0)
    count = result.get('order_count', 0)
    avg = result.get('avg_order_value', 0)
    print(f"{state:<25} {total:>12,.2f} INR {count:>8} {avg:>12,.2f} INR")


## 6. Aggregation Pipeline - Sales by Category

**SQL Equivalent:**
```sql
SELECT category, 
       SUM(amount) as total_sales, 
       COUNT(*) as order_count,
       COUNT(DISTINCT sku) as unique_products
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE amount > 0
GROUP BY category
ORDER BY total_sales DESC
```

**MongoDB:**
- Access nested fields directly
- Use $addToSet for unique values
- Flexible $project stage to reshape output


In [None]:
# Aggregation: Sales by Category
print("=== AGGREGATION: Sales by Category ===")

pipeline = [
    {
        "$match": {
            "sales.amount": {"$gt": 0}
        }
    },
    {
        "$group": {
            "_id": "$product.category",
            "total_sales": {"$sum": "$sales.amount"},
            "order_count": {"$sum": 1},
            "total_quantity": {"$sum": "$sales.quantity"},
            "avg_order_value": {"$avg": "$sales.amount"},
            "unique_products": {"$addToSet": "$product.sku"}
        }
    },
    {
        "$project": {
            "_id": 1,
            "total_sales": 1,
            "order_count": 1,
            "total_quantity": 1,
            "avg_order_value": 1,
            "unique_product_count": {"$size": "$unique_products"}
        }
    },
    {
        "$sort": {"total_sales": -1}
    }
]

results = list(collection.aggregate(pipeline))

print(f"\nðŸ“Š Sales by Category:")
print(f"{'Category':<20} {'Total Sales':<15} {'Orders':<10} {'Products':<10} {'Avg Value':<15}")
print("-" * 80)

for result in results:
    category = result.get('_id', 'Unknown')
    total = result.get('total_sales', 0)
    count = result.get('order_count', 0)
    products = result.get('unique_product_count', 0)
    avg = result.get('avg_order_value', 0)
    print(f"{category:<20} {total:>12,.2f} INR {count:>8} {products:>8} {avg:>12,.2f} INR")


## 7. Complex Aggregation - Sales by Region AND Category

This demonstrates MongoDB's flexible aggregation capabilities with multiple grouping fields.


In [None]:
# Complex aggregation: Sales by Region AND Category
print("=== AGGREGATION: Sales by Region AND Category ===")

pipeline = [
    {
        "$match": {
            "sales.amount": {"$gt": 0}
        }
    },
    {
        "$group": {
            "_id": {
                "state": "$region.state",
                "category": "$product.category"
            },
            "total_sales": {"$sum": "$sales.amount"},
            "order_count": {"$sum": 1}
        }
    },
    {
        "$sort": {"total_sales": -1}
    },
    {
        "$limit": 15
    }
]

results = list(collection.aggregate(pipeline))

print(f"\nðŸ“Š Top 15 State-Category Combinations:")
print(f"{'State':<20} {'Category':<20} {'Total Sales':<15} {'Orders':<10}")
print("-" * 70)

for result in results:
    state = result.get('_id', {}).get('state', 'Unknown')
    category = result.get('_id', {}).get('category', 'Unknown')
    total = result.get('total_sales', 0)
    count = result.get('order_count', 0)
    print(f"{state:<20} {category:<20} {total:>12,.2f} INR {count:>8}")


## 8. UPDATE Operation


In [None]:
# UPDATE operation
print("=== UPDATE OPERATION ===")

# Get a sample order
sample_order = collection.find_one()
if sample_order:
    order_id = sample_order['order_id']
    old_status = sample_order.get('status', 'Unknown')
    
    print(f"Updating order {order_id}")
    print(f"  Old status: {old_status}")
    
    # Update the order status
    new_status = "Updated Status"
    result = collection.update_one(
        {"order_id": order_id},
        {"$set": {"status": new_status}}
    )
    
    if result.matched_count > 0:
        print(f"âœ“ Successfully updated {result.modified_count} order(s)")
        print(f"  New status: {new_status}")
        
        # Verify update
        updated_order = collection.find_one({"order_id": order_id})
        print(f"  Verified status: {updated_order.get('status')}")
    else:
        print(f"âœ— Order {order_id} not found")
else:
    print("No orders found in collection")


## 9. DELETE Operation


In [None]:
# DELETE operation (commented out to preserve data)
print("=== DELETE OPERATION ===")
print("(Skipped to preserve data - uncomment to test)")

# Uncomment below to test delete operation
# sample_order = collection.find_one()
# if sample_order:
#     order_id = sample_order['order_id']
#     print(f"Deleting order {order_id}")
#     
#     result = collection.delete_one({"order_id": order_id})
#     
#     if result.deleted_count > 0:
#         print(f"âœ“ Successfully deleted order {order_id}")
#     else:
#         print(f"âœ— Order {order_id} not found")
# else:
#     print("No orders found in collection")


## 10. Collection Statistics


In [None]:
# Collection statistics
print("=== COLLECTION STATISTICS ===")

total_docs = collection.count_documents({})
print(f"Total Orders: {total_docs:,}")

# Count by status
status_pipeline = [
    {"$group": {"_id": "$status", "count": {"$sum": 1}}},
    {"$sort": {"count": -1}}
]
status_counts = list(collection.aggregate(status_pipeline))
print(f"\nOrders by Status:")
for status in status_counts:
    print(f"  {status['_id']}: {status['count']:,}")

# Date range
date_pipeline = [
    {"$group": {
        "_id": None,
        "min_date": {"$min": "$date"},
        "max_date": {"$max": "$date"}
    }}
]
date_range = list(collection.aggregate(date_pipeline))
if date_range:
    print(f"\nDate Range:")
    print(f"  From: {date_range[0]['min_date']}")
    print(f"  To: {date_range[0]['max_date']}")

# Total sales
sales_pipeline = [
    {"$match": {"sales.amount": {"$gt": 0}}},
    {"$group": {
        "_id": None,
        "total_sales": {"$sum": "$sales.amount"},
        "total_quantity": {"$sum": "$sales.quantity"},
        "avg_order_value": {"$avg": "$sales.amount"}
    }}
]
sales_stats = list(collection.aggregate(sales_pipeline))
if sales_stats:
    print(f"\nSales Statistics:")
    print(f"  Total Sales: {sales_stats[0]['total_sales']:,.2f} INR")
    print(f"  Total Quantity: {sales_stats[0]['total_quantity']:,}")
    print(f"  Average Order Value: {sales_stats[0]['avg_order_value']:,.2f} INR")


## 11. Comparison: NoSQL (MongoDB) vs Relational SQL Model


In [None]:
print("="*80)
print("COMPARISON: NoSQL (MongoDB) vs Relational SQL Model")
print("="*80)
print()

comparison_points = {
    "1. DATA STRUCTURE": {
        "SQL": "Normalized across multiple tables (orders, customers, products, regions)",
        "MongoDB": "Denormalized nested documents (all data in one document)"
    },
    "2. SCHEMA": {
        "SQL": "Fixed schema, requires ALTER TABLE for changes",
        "MongoDB": "Flexible schema, can add fields without migration"
    },
    "3. QUERIES": {
        "SQL": "JOINs required to combine related data",
        "MongoDB": "No JOINs needed, all data in document"
    },
    "4. AGGREGATIONS": {
        "SQL": "GROUP BY with JOINs, complex for nested data",
        "MongoDB": "Flexible aggregation pipeline, can reshape data dynamically"
    },
    "5. PERFORMANCE": {
        "SQL": "JOINs can be expensive, but optimized with indexes",
        "MongoDB": "Single document read, but larger document size"
    },
    "6. USE CASES": {
        "SQL": "Best for structured data, complex relationships, ACID transactions",
        "MongoDB": "Best for semi-structured data, rapid iteration, horizontal scaling"
    }
}

for point, details in comparison_points.items():
    print(f"{point}:")
    print(f"  SQL:      {details['SQL']}")
    print(f"  MongoDB:  {details['MongoDB']}")
    print()

print("EXAMPLE - Query with date range and region:")
print("SQL:")
print("  SELECT o.*, c.*, p.* FROM orders o")
print("  JOIN customers c ON o.customer_id = c.id")
print("  JOIN products p ON o.product_id = p.id")
print("  WHERE o.date BETWEEN '...' AND '...' AND c.state = '...'")
print()
print("MongoDB:")
print("  db.orders.find({")
print("    'date': {$gte: start, $lte: end},")
print("    'region.state': '...'")
print("  })")
print()
print("="*80)


In [None]:
# Query orders by status
print("=== Query: Orders by Status ===")

statuses = ["Shipped", "Cancelled", "Shipped - Delivered to Buyer"]
for status in statuses:
    count = collection.count_documents({"status": status})
    print(f"  {status}: {count:,} orders")


### Query: Top Products by Sales


In [None]:
# Top products by sales
print("=== Query: Top Products by Sales ===")

pipeline = [
    {"$match": {"sales.amount": {"$gt": 0}}},
    {"$group": {
        "_id": "$product.sku",
        "product_name": {"$first": "$product.style"},
        "category": {"$first": "$product.category"},
        "total_sales": {"$sum": "$sales.amount"},
        "order_count": {"$sum": 1},
        "total_quantity": {"$sum": "$sales.quantity"}
    }},
    {"$sort": {"total_sales": -1}},
    {"$limit": 10}
]

results = list(collection.aggregate(pipeline))

print(f"\nTop 10 Products by Sales:")
print(f"{'SKU':<20} {'Product':<20} {'Category':<15} {'Sales':<15} {'Orders':<10}")
print("-" * 90)

for result in results:
    sku = result.get('_id', 'Unknown')
    product = result.get('product_name', 'Unknown')
    category = result.get('category', 'Unknown')
    sales = result.get('total_sales', 0)
    orders = result.get('order_count', 0)
    print(f"{sku:<20} {product[:18]:<20} {category[:13]:<15} {sales:>12,.2f} INR {orders:>8}")


## 13. Cleanup and Close Connection


In [None]:
# Close MongoDB connection
client.close()
print("âœ“ MongoDB connection closed")
