# Setup MongoDB


In [8]:
import pymongo
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client.healthcare_supply_chain
products = db.products
events = db.events

# Ingest CSV and Json to MongoDB database

In [12]:
import csv

def process_batch(batch, header, collection):
    """
    Process a batch of rows.
    convert a row in dicitonary to put it in Mongodb
    """
    for row in batch:
        # Example: Print each row
        doc = dict(map(lambda i: (header[i], row[i]), range(len(header)-1)))
        try:
            db[collection].insert_one(doc)
        except pymongo.errors.DuplicateKeyError:
            print("Key :" + doc["_id"] + " alredy in collection")

def ingest_csv(filename, batch_size=1000):
    """
    Ingest a CSV file in batches. So we can't run out of memory
    """
    with open(filename, 'r') as file:
        csv_reader = csv.reader(file)
        header = next(csv_reader)  # Read the header row
        header[0] = '_id'
        batch = []
        for row in csv_reader:
            batch.append(row)
            if len(batch) >= batch_size:
                process_batch(batch, header, "products")
                batch = []
        # Process the remaining rows (if any)
        
        if batch:
            process_batch(batch, header, "products")

if __name__ == "__main__":
    filename = "Healthcare_Supply_Chain_Products.csv" 
    ingest_csv(filename)

In [13]:
products.count_documents({})

100

In [104]:
products.find_one()

{'_id': 'P10000',
 'Product Name': 'Product 0',
 'Category': 'Personal Protective Equipment',
 'Supplier ID': 'S400'}

In [14]:
import json

def process_batch(batch, collection):
    """
    Process a batch of rows.
    convert a row in dicitonary to put it in Mongodb
    """
    for item in batch:
        item["_id"] = item.pop("Event ID")
        try:
            db[collection].insert_one(item)
        except pymongo.errors.DuplicateKeyError:
            print("Key :" + item["_id"] + " alredy in collection")

def ingest_json(filename, batch_size=1000):
    """
    Ingest a json file in batches. So we can't run out of memory
    """
    with open(filename, 'r') as file:
        batch = []
        for line in file:
            # Parse the JSON line one by one
            batch.append(json.loads(line))
            if len(batch) >= batch_size:
                process_batch(batch, "events")
                batch = []
        if batch:  # pour gérer le cas où le dernier lot est inférieur à batch_size
            process_batch(batch, "events")
            
if __name__ == "__main__":
    filename = "Healthcare_Supply_Chain_Events.json"
    ingest_json(filename)

In [15]:
events.count_documents({})

1000

In [103]:
events.find_one()

{'_id': 'E50000',
 'Product ID': 'P10018',
 'Timestamp': 1713235746125,
 'Location': 'L102',
 'Quantity': 44,
 'Status': 'Shipped'}

# Clean and Validate documents in database
Data are already indexed over there ID

We can add index to improve aggregates/sorting functions

New indexes : 
- Products
    - Category
    - Supplier ID
- Events
    - timestamp & location
    - status

In [16]:
db.products.create_index(["Category"], unique=False)
db.products.create_index(["Supplier ID"], unique=False)
db.events.create_index([("timestamp", pymongo.ASCENDING),
                       ("location")], unique=False)
db.events.create_index(["status"], unique=False)

'status_1'

# Creating a table

Amount of product ordered in a day

In [46]:
from datetime import datetime

daily_totals_collection = db['daily_totals']

pipeline = [
        {
            "$match": {"Status": "Ordered"}
        },
        {
            "$lookup": {
                "from": "products",
                "localField": "Product ID",
                "foreignField": "_id",
                "as": "product"
            }
        },
        {
            "$unwind": "$product"
        },
        {
            "$project": {
                "_id": 0,
                "date": {"$dateToString": {"format": "%Y-%m-%d", "date": {"$toDate": {"$multiply": ["$Timestamp", 1]}}}},
                "Product Name": "$product.Product Name",
                "Quantity": "$Quantity"
            }
        },
        {
            "$group": {
                "_id": {"date": "$date", "Product Name": "$Product Name"},
                "total_amount": {"$sum": "$Quantity"}
            }
        }
    ]

results = events.aggregate(pipeline)

# Insert daily totals into the new collection
for result in results:
    daily_totals_collection.insert_one({
        "date": result['_id']['date'],
        "Product Name": result['_id']['Product Name'],
        "total_amount": result['total_amount']
    })

In [47]:
daily_totals_collection.find_one()

{'_id': ObjectId('66270c39434585a54a35b2d7'),
 'date': '2024-04-16',
 'Product Name': 'Product 41',
 'total_amount': 117}

So there is 117 product 41 ordered on the 2024-04-16