<a href="https://colab.research.google.com/github/nachiiiket/OptiexIOT_FinalScripts/blob/main/Optiex_final_AnomalyDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Requirements to run the script

In [None]:
!apt-get install -y wget gnupg
!wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | apt-key add -
!echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-6.0.list
!apt-get update
!apt-get install -y mongodb-database-tools
!pip install pymongo

Connection requirements for mongo client

In [None]:
from pymongo import MongoClient, UpdateOne
from collections import deque

client = MongoClient("Specify_your_connection_URL")

# Choose your database and collection
db = client["testdb"]          # replace with your DB name
collection = db["sensors"]     # replace with your collection name

Main Script - keep the Chunk fetch and bulk limit as it is for larger data

In [None]:
# --- Config ---
CHUNK_FETCH = 5000    # fetch this many docs at once
BULK_LIMIT  = 1000     # flush bulk writes every 1000 updates

bulk_updates = []
carryover = []   # keep last 2 docs from previous batch
count = 0

def flush_updates():
    """Write accumulated bulk updates to DB."""
    global bulk_updates
    if bulk_updates:
        collection.bulk_write(bulk_updates, ordered=False)
        bulk_updates = []

# --- Stream through DB in batches ---
batch_cursor = collection.find({}, {"Server Timestamp": 1, "Value": 1}).sort("Server Timestamp", -1)  # DESCENDING like best script
batch = []

for doc in batch_cursor:
    batch.append(doc)

    if len(batch) == CHUNK_FETCH:
        # merge with carryover from previous batch
        chunk = carryover + batch

        # keep last 2 docs for overlap
        carryover = chunk[-2:]
        process_chunk = chunk[:-2]

        i = 1
        n = len(process_chunk)

        while i < n - 1:
            prev = process_chunk[i - 1]["Value"]
            curr = process_chunk[i]["Value"]
            nxt  = process_chunk[i + 1]["Value"]

            anomaly_flag = None
            new_value = None

            # ----------- RESET -----------
            if curr < prev and nxt > prev:
                anomaly_flag = "Yes (Reset)"

            # ----------- FAULTY SENSOR -----------
            elif curr == 0:
                start = i
                while i < n and process_chunk[i]["Value"] == 0:
                    i += 1
                end = i - 1

                if i < n:  # we have an "after" value
                    after = process_chunk[i]["Value"]
                    if after < prev:
                        new_value = (prev + after) / 2
                        for j in range(start, end + 1):
                            bulk_updates.append(UpdateOne(
                                {"_id": process_chunk[j]["_id"]},
                                {"$set": {"Anomaly Flag": "Yes (Faulty)", "Value": new_value}}
                            ))
                    else:
                        bulk_updates.append(UpdateOne(
                            {"_id": process_chunk[start]["_id"]},
                            {"$set": {"Anomaly Flag": "Yes (Reset)"}}
                        ))
                else:  # reached end of dataset
                    new_value = prev
                    for j in range(start, end + 1):
                        bulk_updates.append(UpdateOne(
                            {"_id": process_chunk[j]["_id"]},
                            {"$set": {"Anomaly Flag": "Yes (Faulty)", "Value": new_value}}
                        ))
                continue  # skip normal increment, since i already moved

            # ----------- NEGATIVE DIFFERENCE -----------
            elif nxt > curr:
                anomaly_flag = "Yes (Negative Difference)"

            # --- Add to bulk ---
            update_fields = {}
            if anomaly_flag:
                update_fields["Anomaly Flag"] = anomaly_flag
            if new_value is not None:
                update_fields["Value"] = new_value   # overwrite Value as requested

            if update_fields:
                bulk_updates.append(UpdateOne({"_id": process_chunk[i]["_id"]}, {"$set": update_fields}))

            # --- Flush bulk if needed ---
            if len(bulk_updates) >= BULK_LIMIT:
                flush_updates()

            i += 1
            count += 1

            # --- Log every 50k processed rows ---
            if count % 50000 == 0:
                print(f"Processed {count} rows...")

        # reset batch
        batch = []

# process leftovers
if carryover:
    process_chunk = carryover
    # (same loop as above could be run if needed, but usually overlap is small)

flush_updates()
print(f"Finished processing {count} rows.")
