In [1]:
pip install pymongo

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import json
from pymongo import MongoClient
from pymongo.errors import PyMongoError, BulkWriteError

# --- Configuration ---
MONGODB_URI = "mongodb://localhost:27017/"  # Default local MongoDB URI
DATABASE_NAME = "MyLocalDB"                # Your database name
COLLECTION_NAME = "Products"               # Your collection name
JSON_FILE_PATH = "openlineage.json"               # The name of your JSON file
# --- End Configuration ---

def import_json_to_mongodb(uri, db_name, collection_name, file_path):
    """
    Connects to MongoDB, reads a JSON file, and inserts the data.
    """
    client = None
    try:
        # 1. Connect to MongoDB
        client = MongoClient(uri)
        # The ismaster command is cheap and does not require auth.
        client.admin.command('ping') 
        print(f"‚úÖ Successfully connected to MongoDB at {uri}")

        # 2. Get Database and Collection
        db = client[db_name]
        collection = db[collection_name]
        print(f"‚û°Ô∏è Targeting database: {db_name}, collection: {collection_name}")

        # 3. Load JSON data from file
        with open(file_path, 'r', encoding='utf-8') as file:
            file_data = json.load(file)
            print(f"‚úÖ Successfully loaded data from {file_path}")

        # 4. Insert data based on its structure
        if isinstance(file_data, list):
            # If the JSON file is a list of documents, use insert_many()
            result = collection.insert_many(file_data)
            print(f"‚ú® Inserted {len(result.inserted_ids)} documents using insert_many().")
            print("--- First 3 IDs: ", result.inserted_ids[:3])
        elif isinstance(file_data, dict):
            # If the JSON file is a single document (dictionary), use insert_one()
            result = collection.insert_one(file_data)
            print(f"‚ú® Inserted 1 document using insert_one(). ID: {result.inserted_id}")
        else:
            print(f"‚ùå Error: JSON file content is neither a list nor a dictionary. Type: {type(file_data)}")
            return

    except ConnectionError:
        print(f"‚ùå CONNECTION ERROR: Could not connect to MongoDB. Ensure your MongoDB server is running at {uri}.")
    except FileNotFoundError:
        print(f"‚ùå FILE ERROR: The file '{file_path}' was not found.")
    except json.JSONDecodeError as e:
        print(f"‚ùå JSON ERROR: Failed to decode JSON from file. Check file syntax. Error: {e}")
    except BulkWriteError as e:
        # Catches errors like duplicate keys if you are inserting documents with specific _id fields
        print(f"‚ö†Ô∏è BULK WRITE ERROR: Some documents failed to insert. {e.details}")
    except Exception as e:
        print(f"‚ùå An unexpected error occurred: {e}")
    finally:
        # 5. Close the connection
        if client:
            client.close()
            print("‚û°Ô∏è MongoDB connection closed.")

# Run the function
import_json_to_mongodb(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME, JSON_FILE_PATH)

‚úÖ Successfully connected to MongoDB at mongodb://localhost:27017/
‚û°Ô∏è Targeting database: MyLocalDB, collection: Products
‚ùå JSON ERROR: Failed to decode JSON from file. Check file syntax. Error: Extra data: line 2 column 1 (char 711)
‚û°Ô∏è MongoDB connection closed.


In [1]:
import json
from pymongo import MongoClient
from pymongo.errors import PyMongoError, BulkWriteError

# --- Configuration ---
MONGODB_URI = "mongodb://localhost:27017/" 
DATABASE_NAME = "OpenLineageDB"               # Changed for clarity
COLLECTION_NAME = "lineage_events"           # Changed for clarity
JSON_FILE_PATH = "openlineage.json"          # Confirmed file name
# --- End Configuration ---

def import_jsonl_to_mongodb(uri, db_name, collection_name, file_path):
    """
    Connects to MongoDB, reads a JSONL file line by line, and inserts the data.
    """
    client = None
    documents = [] # List to hold all parsed documents

    try:
        # 1. Connect to MongoDB
        client = MongoClient(uri, serverSelectionTimeoutMS=5000)
        client.admin.command('ping') 
        print(f"‚úÖ Successfully connected to MongoDB at {uri}")

        # 2. Get Database and Collection
        db = client[db_name]
        collection = db[collection_name]
        print(f"‚û°Ô∏è Targeting database: {db_name}, collection: {collection_name}")

        # 3. Load JSON data line by line (Crucial change for JSONL format)
        with open(file_path, 'r', encoding='utf-8') as file:
            for line_number, line in enumerate(file, 1):
                # Skip empty lines
                if not line.strip():
                    continue
                try:
                    # Use json.loads() for a string line, not json.load() for the whole file
                    documents.append(json.loads(line))
                except json.JSONDecodeError as e:
                    print(f"‚ùå JSONL PARSE ERROR on line {line_number}: {e}")
                    # You might choose to break here or continue to the next line

            print(f"‚úÖ Successfully loaded {len(documents)} documents from {file_path}")

        # 4. Insert data using insert_many()
        if documents:
            result = collection.insert_many(documents)
            print(f"‚ú® Inserted {len(result.inserted_ids)} documents using insert_many().")
            print("--- First 3 IDs: ", result.inserted_ids[:3])
        else:
            print("‚ö†Ô∏è File was empty or contained no valid documents. No documents inserted.")

    except PyMongoError as e: 
        print(f"‚ùå MONGODB ERROR: Connection failed. Ensure your MongoDB server is running at {uri}.")
        print(f"Details: {e}")
    except FileNotFoundError:
        print(f"‚ùå FILE ERROR: The file '{file_path}' was not found. Check the path.")
    except BulkWriteError as e:
        print(f"‚ö†Ô∏è BULK WRITE ERROR: Some documents failed to insert. {e.details}")
    except Exception as e:
        print(f"‚ùå An unexpected error occurred: {e}")
    finally:
        # 5. Close the connection
        if client:
            client.close()
            print("‚û°Ô∏è MongoDB connection closed.")

# Run the function
import_jsonl_to_mongodb(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME, JSON_FILE_PATH)

‚úÖ Successfully connected to MongoDB at mongodb://localhost:27017/
‚û°Ô∏è Targeting database: OpenLineageDB, collection: lineage_events
‚úÖ Successfully loaded 2 documents from openlineage.json
‚ú® Inserted 2 documents using insert_many().
--- First 3 IDs:  [ObjectId('6932432ba25a45a9129830fc'), ObjectId('6932432ba25a45a9129830fd')]
‚û°Ô∏è MongoDB connection closed.


# Working code

In [14]:
import json
from pymongo import MongoClient, ASCENDING, WriteConcern
from pymongo.errors import CollectionInvalid
# from pymongo import MongoClient
from pymongo.errors import PyMongoError

# Upload into Mongo DB

In [9]:
# import json
# from pymongo import MongoClient
# from pymongo.errors import PyMongoError

# # --- Configuration (MUST MATCH your import script) ---
# MONGODB_URI = "mongodb://localhost:27017/"
# DATABASE_NAME = "OpenLineageDB"               # Must match DATABASE_NAME in import_mongo.txt [cite: 1]
# COLLECTION_NAME = "lineage_events"           # Must match COLLECTION_NAME in import_mongo.txt [cite: 1]
# # --- End Configuration ---

# def retrieve_data_from_mongodb(uri, db_name, collection_name):
#     """
#     Connects to MongoDB and retrieves all documents from the specified collection.
#     """
#     client = None
#     try:
#         # 1. Connect to MongoDB
#         client = MongoClient(uri, serverSelectionTimeoutMS=5000)
#         client.admin.command('ping') 
#         print(f"‚úÖ Successfully connected to MongoDB at {uri}")

#         # 2. Get Database and Collection
#         db = client[db_name]
#         collection = db[collection_name] # Using the collection from your import script [cite: 3]
#         print(f"‚û°Ô∏è Targeting database: {db_name}, collection: {collection_name}")
        
#         # 3. Define the Query (find all documents)
#         # An empty dictionary {} finds all documents in the collection
#         query_filter = {} 
        
#         # 4. Execute the Query and Retrieve Documents
#         cursor = collection.find(query_filter)
        
#         print("\nüìä Retrieved Documents:")
#         count = 0
        
#         # 5. Iterate and Print Results
#         for document in cursor:
#             # MongoDB's _id field is a BSON ObjectId, which needs to be converted
#             # to a string for clean JSON printing if you use json.dumps()
#             document['_id'] = str(document['_id'])
            
#             # Use json.dumps for clean, formatted output
#             print(json.dumps(document, indent=2))
#             count += 1
            
#             # Optional: Limit the output for large collections
#             if count >= 10:
#                 print(f"...\n(Stopping output after {count} documents for brevity.)")
#                 break
        
#         if count == 0:
#             print("‚ö†Ô∏è No documents found in the collection.")

#     except PyMongoError as e: 
#         # Handles connection issues (similar to your import code )
#         print(f"‚ùå MONGODB ERROR: Connection failed. Ensure your MongoDB server is running at {uri}.")
#         print(f"Details: {e}")
#     except Exception as e:
#         print(f"‚ùå An unexpected error occurred: {e}") # Catches other errors [cite: 8]
#     finally:
#         # 6. Close the connection
#         if client:
#             client.close()
#             print("‚û°Ô∏è MongoDB connection closed.")

# # Run the function
# retrieve_data_from_mongodb(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME)

In [15]:
import json
from pymongo import MongoClient, ASCENDING, WriteConcern
from pymongo.errors import CollectionInvalid
import json
from pymongo import MongoClient, ASCENDING, WriteConcern
from pymongo.errors import CollectionInvalid
# from pymongo import MongoClient
from pymongo.errors import PyMongoError

In [17]:
# --- Configuration ---
MONGODB_URI = "mongodb://localhost:27017/" 
DATABASE_NAME = "Pyspark_OpenLineageDB"               # Changed for clarity
COLLECTION_NAME = "lineage_events"           # Changed for clarity
JSON_FILE_PATH = "pyspark_lineage.json"          # Confirmed file name
# --- End Configuration ---

In [18]:
def upload_json_to_mongodb(file_path, mongo_uri, db_name, collection_name):
    """
    Connects to MongoDB, reads a file containing line-delimited JSON objects, 
    and inserts each object as a separate document into a specified collection.
    """
    try:
        # 1. Connect to MongoDB
        client = MongoClient(mongo_uri)
        db = client[db_name]
        collection = db[collection_name]
        print(f"‚úÖ Successfully connected to MongoDB database '{db_name}'.")

        documents_to_insert = []
        
        # 2. Read the file line by line
        with open(file_path, 'r') as f:
            for line_number, line in enumerate(f):
                line = line.strip()
                if not line:
                    continue  # Skip empty lines

                try:
                    # 3. Parse the line into a Python dictionary (JSON object)
                    document = json.loads(line)
                    documents_to_insert.append(document)
                except json.JSONDecodeError as e:
                    print(f"‚ö†Ô∏è Warning: Could not parse line {line_number + 1} as JSON. Skipping. Error: {e}")
        
        if not documents_to_insert:
            print(f"‚ùå No valid JSON documents found in '{file_path}'. Exiting.")
            return

        # 4. Insert all documents into the collection
        print(f"Attempting to insert {len(documents_to_insert)} documents into '{collection_name}'...")
        
        # Use insert_many for efficiency
        result = collection.insert_many(documents_to_insert)
        
        print(f"‚úÖ Successfully inserted {len(result.inserted_ids)} documents.")
        print(f"Collection: {db_name}.{collection_name}")
        
        # Optional: Print the IDs of the inserted documents
        # print("Inserted IDs:", result.inserted_ids)

    except FileNotFoundError:
        print(f"‚ùå Error: The file '{file_path}' was not found.")
    except Exception as e:
        print(f"‚ùå An error occurred during the MongoDB operation: {e}")
    finally:
        # 5. Close the MongoDB connection
        if 'client' in locals() and client:
            client.close()
            print("Connection closed.")

if __name__ == "__main__":
    # Ensure you have 'pymongo' installed: pip install pymongo
    upload_json_to_mongodb(JSON_FILE_PATH, MONGODB_URI, DATABASE_NAME, COLLECTION_NAME)

‚úÖ Successfully connected to MongoDB database 'Pyspark_OpenLineageDB'.
Attempting to insert 6 documents into 'lineage_events'...
‚úÖ Successfully inserted 6 documents.
Collection: Pyspark_OpenLineageDB.lineage_events
Connection closed.


## trail 2

In [34]:
# def create_unique_index(collection):
#     """Ensures a unique index exists on (run.runId, eventType) to prevent duplicates."""
#     try:
#         index_name = "run_event_unique_index"
#         collection.create_index(
#             [("run.runId", ASCENDING), ("eventType", ASCENDING)],
#             unique=True,
#             name=index_name
#         )
#         print(f"‚úÖ Ensured unique index '{index_name}' on (run.runId, eventType).")
#     except Exception as e:
#         print(f"‚ùå Error creating index: {e}")
        
# def upload_json_to_mongodb_safe(file_path, mongo_uri, db_name, collection_name):
#     """
#     Connects to MongoDB, reads line-delimited JSON, and uses upserts (update or insert) 
#     to avoid duplicate entries based on the compound key (runId, eventType).
#     """
#     try:
#         # 1. Connect to MongoDB
#         # Use WriteConcern for robust write confirmation
#         client = MongoClient(mongo_uri)
#         db = client.get_database(db_name, write_concern=WriteConcern(w="majority", wtimeout=5000))
#         collection = db[collection_name]
#         print(f"‚úÖ Successfully connected to MongoDB database '{db_name}'.")

#         # 2. Create Unique Index
#         create_unique_index(collection)

#         inserted_count = 0
#         skipped_count = 0
        
#         # 3. Read the file line by line and perform upserts
#         with open(file_path, 'r') as f:
#             for line_number, line in enumerate(f):
#                 line = line.strip()
#                 if not line:
#                     continue

#                 try:
#                     document = json.loads(line)
                    
#                     # Define the unique filter using the job's Run ID and Event Type
#                     filter_query = {
#                         "run.runId": document["run"]["runId"],
#                         "eventType": document["eventType"]
#                     }
                    
#                     # Use $set to update the document, which acts as the insertion content on upsert
#                     update_operation = {"$set": document}
                    
#                     # Perform the upsert: update if found, insert if not found
#                     result = collection.update_one(
#                         filter_query,
#                         update_operation,
#                         upsert=True
#                     )
                    
#                     # Track counts
#                     if result.upserted_id:
#                         inserted_count += 1
#                     elif result.matched_count == 1 and result.modified_count == 0:
#                          skipped_count += 1 

#                 except json.JSONDecodeError as e:
#                     print(f"‚ö†Ô∏è Warning: Could not parse line {line_number + 1} as JSON. Skipping. Error: {e}")
                
#         print("-" * 50)
#         print(f"Upload Summary for '{file_path}':")
#         print(f"‚úÖ Newly Inserted Documents (Upserts): {inserted_count}")
#         print(f"‚è≠Ô∏è Skipped (Existing and Unchanged): {skipped_count}")
#         print("-" * 50)

#     except FileNotFoundError:
#         print(f"‚ùå Error: The file '{file_path}' was not found.")
#     except Exception as e:
#         print(f"‚ùå An error occurred during the MongoDB operation: {e}")
#     finally:
#         # 4. Close the MongoDB connection
#         if 'client' in locals() and client:
#             client.close()
#             print("Connection closed.")

# if __name__ == "__main__":
#     # Ensure you have 'pymongo' installed: pip install pymongo
#     upload_json_to_mongodb_safe(JSON_FILE_PATH, MONGODB_URI, DATABASE_NAME, COLLECTION_NAME)

In [None]:
# def retrieve_and_print_json(mongo_uri, db_name, collection_name):
#     """
#     Connects to MongoDB, retrieves all documents from a collection, 
#     excludes the '_id' field, and prints the result as a list of JSON objects.
#     """
#     try:
#         # 1. Connect to MongoDB
#         client = MongoClient(mongo_uri)
#         db = client[db_name]
#         collection = db[collection_name]
#         print(f"‚úÖ Successfully connected to MongoDB database '{db_name}'.")

#         # 2. Retrieve all documents
#         # The projection {'_id': 0} excludes the MongoDB-added ObjectId field
#         cursor = collection.find({}, {'_id': 0})
        
#         retrieved_documents = list(cursor)

#         if not retrieved_documents:
#             print(f"‚ùå No documents found in collection '{collection_name}'.")
#             return

#         print(f"‚úÖ Retrieved {len(retrieved_documents)} documents. Printing documents:")
#         print("-" * 50)

#         # 3. Print each document as a formatted JSON string
#         for doc in retrieved_documents:
#             # The 'doc' is already a standard Python dictionary, 
#             # which is the direct equivalent of the original JSON object.
            
#             # Using json.dumps to print it as a single-line JSON string 
#             # to match the original file format (optional, you could use pprint for readability)
#             json_output = json.dumps(doc)
#             print(json_output)
            
#         print("-" * 50)
        
#     except Exception as e:
#         print(f"‚ùå An error occurred during the MongoDB retrieval operation: {e}")
#     finally:
#         # 4. Close the MongoDB connection
#         if 'client' in locals() and client:
#             client.close()
#             print("Connection closed.")

In [36]:
## chatgpt created this
from pymongo import MongoClient
import json

def retrieve_and_print_json(mongo_uri, db_name, collection_name):
    """
    Connects to MongoDB, retrieves only documents where eventType = COMPLETE,
    sorts by eventTime in descending order, excludes _id, and prints results.
    """
    try:
        # Connect to MongoDB
        client = MongoClient(mongo_uri)
        db = client[db_name]
        collection = db[collection_name]
        print(collection)
        print(f"‚úÖ Connected to MongoDB database '{db_name}'")

        # Query:
        # 1. Filter eventType = COMPLETE
        # 2. Sort by eventTime descending (-1)
        cursor = collection.find(
            {"eventType": "COMPLETE"},
            {"_id": 0}
        ).sort("eventTime", -1)

        completed_events = list(cursor)

        if not completed_events:
            print("‚ùå No COMPLETE events found.")
            return

        print(f"‚úÖ Retrieved {len(completed_events)} COMPLETE events (sorted by latest eventTime)")
        print("-" * 60)

        # Print each JSON
        for doc in completed_events:
            print(json.dumps(doc))

        print("-" * 60)

    except Exception as e:
        print(f"‚ùå MongoDB Error: {e}")

    finally:
        if 'client' in locals() and client:
            client.close()
            print("üîå Connection closed.")


In [37]:
from pymongo import MongoClient
import json

def retrieve_and_print_json_getlatest(mongo_uri, db_name, collection_name):
    """
    Connects to MongoDB, retrieves only documents where eventType = COMPLETE,
    gets the latest entry for each unique job.name based on eventTime,
    and prints results.
    """
    try:
        # Connect to MongoDB
        client = MongoClient(mongo_uri)
        db = client[db_name]
        collection = db[collection_name]
        print(collection)
        print(f"‚úÖ Connected to MongoDB database '{db_name}'")

        # Aggregation pipeline:
        # 1. Filter eventType = COMPLETE
        # 2. Sort by eventTime descending (to get latest first)
        # 3. Group by job.name and take the first (latest) document
        # 4. Replace root to return the original document structure
        pipeline = [
            # Filter only COMPLETE events
            {"$match": {"eventType": "COMPLETE"}},
            
            # Sort by eventTime descending (latest first)
            {"$sort": {"eventTime": -1}},
            
            # Group by job.name and take the first document (which is the latest)
            {"$group": {
                "_id": "$job.name",
                "latestEvent": {"$first": "$$ROOT"}
            }},
            
            # Replace root to get back the original document structure
            {"$replaceRoot": {"newRoot": "$latestEvent"}},
            
            # Remove MongoDB's _id field
            {"$project": {"_id": 0}},
            
            # Optional: Sort by eventTime again for final output
            {"$sort": {"eventTime": -1}}
        ]

        completed_events = list(collection.aggregate(pipeline))

        if not completed_events:
            print("‚ùå No COMPLETE events found.")
            return

        print(f"‚úÖ Retrieved {len(completed_events)} unique jobs with latest COMPLETE events")
        print("-" * 60)

        # Print each JSON
        for doc in completed_events:
            print(json.dumps(doc, default=str))  # default=str handles datetime objects

        print("-" * 60)

    except Exception as e:
        print(f"‚ùå MongoDB Error: {e}")

    finally:
        if 'client' in locals() and client:
            client.close()
            print("üîå Connection closed.")
    return completed_events

In [None]:
     
# --- Configuration ---
MONGODB_URI = "mongodb://localhost:27017/" 
DATABASE_NAME = "Pyspark_OpenLineageDB"               # Changed for clarity
COLLECTION_NAME = "lineage_events"           # Changed for clarity
JSON_FILE_PATH = "pyspark_lineage.json"          # Confirmed file name
# --- End Configuration ---
# client = MongoClient(MONGODB_URI)
# db = client[DATABASE_NAME]
# collection = db[COLLECTION_NAME]
# print(collection)

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'Pyspark_OpenLineageDB'), 'lineage_events')


In [39]:

# retrieve_and_print_json(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME)
retrieve_and_print_json_getlatest(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME)

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'Pyspark_OpenLineageDB'), 'lineage_events')
‚úÖ Connected to MongoDB database 'Pyspark_OpenLineageDB'
‚úÖ Retrieved 2 unique jobs with latest COMPLETE events
------------------------------------------------------------
{"eventTime": "2025-12-05T04:51:07.469Z", "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.40.1/integration/spark", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": {"runId": "019aecd9-65c4-77c5-ab1b-584ff2b7a5dd", "facets": {"spark_properties": {"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.40.1/integration/spark", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "properties": {"spark.master": "local[*]", "spark.app.name": "PySparkLineage"}}, "processing_engine": {"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.40.1

[{'eventTime': '2025-12-05T04:51:07.469Z',
  'producer': 'https://github.com/OpenLineage/OpenLineage/tree/1.40.1/integration/spark',
  'schemaURL': 'https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent',
  'eventType': 'COMPLETE',
  'run': {'runId': '019aecd9-65c4-77c5-ab1b-584ff2b7a5dd',
   'facets': {'spark_properties': {'_producer': 'https://github.com/OpenLineage/OpenLineage/tree/1.40.1/integration/spark',
     '_schemaURL': 'https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet',
     'properties': {'spark.master': 'local[*]',
      'spark.app.name': 'PySparkLineage'}},
    'processing_engine': {'_producer': 'https://github.com/OpenLineage/OpenLineage/tree/1.40.1/integration/spark',
     '_schemaURL': 'https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet',
     'version': '3.5.7',
     'name': 'spark',
     'openlineageAdapterVersion': '1.40.1'},
    'environment-properties': {'_producer': 'https://github

In [42]:


# def retrieve_latest_completed_job_json(mongo_uri, db_name, collection_name, job_name):
#     """
#     Connects to MongoDB and retrieves the single document for the 
#     specified job_name where 'eventType' is 'COMPLETE' and has the 
#     latest 'eventTime'.
#     """
#     client = None # Initialize client outside try block for proper closing
#     try:
#         # 1. Connect to MongoDB
#         client = MongoClient(mongo_uri)
#         db = client[db_name]
#         collection = db[collection_name]
#         print(f"‚úÖ Successfully connected to MongoDB database '{db_name}'.")

#         # 2. Define the Query and Sort/Limit Conditions
        
#         # Condition 1 & 2: Filter for specific job name AND eventType 'COMPLETE'
#         query_filter = {
#             "job.name": job_name,
#             "eventType": "COMPLETE"
#         }
        
#         # Condition 3: Sort by 'eventTime' descending (latest first)
#         sort_criteria = [("eventTime", -1)] # -1 for descending order
        
#         # Projection to exclude '_id'
#         projection = {'_id': 0}

#         # 3. Execute the Query
#         # Use limit(1) to only retrieve the very first document (the latest one)
#         latest_document_cursor = collection.find(
#             query_filter, 
#             projection
#         ).sort(sort_criteria).limit(1)
        
#         retrieved_document = list(latest_document_cursor)

#         if not retrieved_document:
#             print(f"‚ùå No 'COMPLETE' documents found for job: '{job_name}' in collection '{collection_name}'.")
#             return

#         # The result is a list with at most one document
#         doc = retrieved_document[0]
        
#         print(f"‚úÖ Retrieved latest 'COMPLETE' document for job: '{job_name}'. Printing document:")
#         print("-" * 50)

#         # 4. Print the document as a single-line JSON string
#         json_output = json.dumps(doc)
#         print(json_output)
        
#         print("-" * 50)
        
#         return doc # Optionally return the Python dictionary object

#     except Exception as e:
#         print(f"‚ùå An error occurred during the MongoDB retrieval operation: {e}")
#     finally:
#         # 5. Close the MongoDB connection
#         if client:
#             client.close()
#             print("Connection closed.")
       

In [43]:
# # --- Example Usage (Assuming you have your connection details) ---
     
# # --- Configuration ---
# MONGODB_URI = "mongodb://localhost:27017/" 
# DATABASE_NAME = "Pyspark_OpenLineageDB"               # Changed for clarity
# COLLECTION_NAME = "lineage_events"           # Changed for clarity
# JSON_FILE_PATH = "pyspark_lineage.json"          # Confirmed file name
# # --- End Configuration ---

# retrieve_latest_completed_job_json(MONGODB_URI, DATABASE_NAME, COLLECTION_NAME, TARGET_JOB_NAME)