In [None]:
from pymongo import MongoClient
import logging


def sync_collections(source_db, source_collection, target_db):
    try:
        # Check if the collection exists in the target_db
        collection_exists = source_collection in target_db.list_collection_names()

        # Initialize the query
        query = {}

        # Update the query only if the collection exists
        if collection_exists:
            # Retrieve the timestamp of the newest document in the target_db's collection
            latest_doc = target_db[source_collection].find().sort("ts", -1).limit(1)
            latest_ts = latest_doc[0]["ts"] if latest_doc.count() > 0 else None
            if latest_ts:
                query = {"ts": {"$gt": latest_ts}}

        # Process documents in batches of 100
        while True:
            docs_to_sync = source_db[source_collection].find(query).limit(100)
            if docs_to_sync.count() == 0:
                break  # No more documents to process

            # Insert documents into the target_db's collection
            target_db[source_collection].insert_many(docs_to_sync)

            # Remove documents from source_db's collection after successful insertion
            ids_to_remove = [doc["_id"] for doc in docs_to_sync]
            source_db[source_collection].delete_many({"_id": {"$in": ids_to_remove}})

    except Exception as e:
        logging.error(f"An error occurred: {e}")

In [None]:
from google.cloud import bigquery
from pymongo import MongoClient
import logging


def sync_bigquery_to_mongodb(bq_client, bq_table, target_db, target_collection):
    try:
        # Check if the collection exists in the target_db
        collection_exists = target_collection in target_db.list_collection_names()

        # Initialize the SQL query for BigQuery
        base_query = f"SELECT * FROM `{bq_table}`"
        delete_query = f"DELETE FROM `{bq_table}` WHERE "

        # Timestamp filtering and deletion condition
        timestamp_condition = ""
        if collection_exists:
            latest_doc = target_db[target_collection].find().sort("ts", -1).limit(1)
            latest_ts = latest_doc[0]["ts"] if latest_doc.count() > 0 else None
            if latest_ts:
                timestamp_condition = f"ts > '{latest_ts}'"
                base_query += f" WHERE {timestamp_condition}"

        # Initialize BigQuery job for data retrieval
        query_job = bq_client.query(base_query)

        # Process results in batches and track IDs for deletion
        ids_to_delete = []
        for page in query_job.result(page_size=100):
            batch = [dict(row) for row in page]
            if batch:
                target_db[target_collection].insert_many(batch)
                ids_to_delete.extend(
                    [row["id"] for row in batch]
                )  # Assuming each row has a unique 'id'

        # Delete data from BigQuery after successful insertion into MongoDB
        if ids_to_delete:
            delete_query += f"id IN ({','.join(map(str, ids_to_delete))})"
            if timestamp_condition:
                delete_query += f" AND {timestamp_condition}"
            bq_client.query(delete_query)

    except Exception as e:
        logging.error(f"An error occurred: {e}")