In [1]:
from pymongo import MongoClient

import duckdb
# import pandas as pd
from datetime import datetime

import pprint

import time

import pyarrow as pa
import pyarrow.compute as pc

import cProfile
import pstats



In [2]:
def create_duckdb_file(filename):
    """
    Creates a file-based DuckDB database and returns the connection.
  
    Args:
      filename: The name of the DuckDB database file to create.
  
    Returns:
      duckdb.DuckDBPyConnection: The DuckDB connection object.
    """
    conn = duckdb.connect(database=filename)
    return conn


In [3]:
# Example usage:
filename = "biosamples.duckdb"
duckdb_conn = create_duckdb_file(filename)

In [4]:
# MongoDB connection details
connection_string = "mongodb://localhost:27017/"
db_name = "biosamples"
collection_name = "biosamples"

# Connect to MongoDB
client = MongoClient(connection_string)
db = client[db_name]
collection = db[collection_name]

In [5]:
# Retrieve the first document
first_document = collection.find_one()


In [6]:
# # Print the document
# pprint.pprint(first_document)

In [7]:
# df['content'].value_counts()

In [8]:
def extract_biosample_data_arrow(collection, conn, paths, max_docs=None, batch_size=5000):
    """
    Extracts scalar data from multiple BioSample paths, dynamically identifies columns,
    and loads data into DuckDB using PyArrow for performance.

    Args:
        collection: pymongo collection object
        conn: duckdb connection object
        paths: list of paths to extract data from.
        max_docs: Maximum number of documents to process.
        batch_size: Number of documents to process per batch.
    """
    total_docs = max_docs or collection.estimated_document_count()
    processed_docs = 0
    all_columns = {path: set() for path in paths}

    # Create empty tables for each path in DuckDB
    for path in paths:
        table_name = path.split(".")[-1].lower()
        conn.execute(f"CREATE OR REPLACE TABLE {table_name} (id TEXT)")

    cursor = collection.find({}).batch_size(batch_size)

    start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{start_time}] Starting extraction for paths: {paths}")
    for doc in cursor:
        if max_docs and processed_docs >= max_docs:
            break

        for path in paths:
            path_parts = path.split(".")[1:] if path != "BioSample" else []
            current_data = doc

            for part in path_parts:
                current_data = current_data.get(part)
                if current_data is None:
                    break

            if current_data:
                if isinstance(current_data, dict):
                    scalar_data = {
                        k: [v] for k, v in current_data.items() if isinstance(v, (str, int, float, bool))
                    }
                    scalar_data["id"] = [doc["id"]]
                    arrow_table = pa.Table.from_pydict(scalar_data)
                elif isinstance(current_data, list) and all(isinstance(item, dict) for item in current_data):
                    keys = set(key for item in current_data for key in item.keys())
                    scalar_data = {
                        k: [item.get(k, None) for item in current_data] for k in keys
                    }
                    scalar_data["id"] = [doc["id"]] * len(current_data)
                    arrow_table = pa.Table.from_pydict(scalar_data)
                else:
                    print(f"Skipping unexpected data structure for path '{path}': {current_data}")
                    continue

                # Track columns for the path
                all_columns[path].update(arrow_table.schema.names)

                # Adjust DuckDB schema dynamically
                table_name = path.split(".")[-1].lower()
                existing_columns = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
                existing_column_names = {col[1] for col in existing_columns}

                # Add missing columns to DuckDB
                for col in arrow_table.schema.names:
                    if col not in existing_column_names:
                        col_escaped = f'"{col}"'  # Escape column name
                        # print(f"Adding missing column '{col}' to table '{table_name}'.")
                        col_type = (
                            "INTEGER" if pa.types.is_integer(arrow_table.schema.field(col).type) else
                            "DOUBLE" if pa.types.is_floating(arrow_table.schema.field(col).type) else
                            "TEXT"
                        )
                        conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_escaped} {col_type}")

                # Fill missing columns in PyArrow table
                for col in existing_column_names:
                    if col not in arrow_table.schema.names:
                        # print(f"Filling missing column '{col}' with NULL values for table '{table_name}'.")
                        arrow_table = arrow_table.append_column(col, pa.nulls(len(arrow_table)))

                # Insert data into DuckDB
                conn.register("arrow_table", arrow_table)
                columns = ", ".join([f'"{col}"' for col in existing_column_names])
                conn.execute(f"INSERT INTO {table_name} ({columns}) SELECT {columns} FROM arrow_table")

        processed_docs += 1
        if processed_docs % 1000 == 0:
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{current_time}] Processed {processed_docs}/{total_docs} documents.")

    end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{end_time}] Completed processing {processed_docs} documents.")

In [9]:
paths = [
    "BioSample",
    "BioSample.Attributes.Attribute",
    "BioSample.Curation",
    "BioSample.Description.Comment.Paragraph",
    "BioSample.Description.Organism",
    "BioSample.Description.Organism.OrganismName",
    "BioSample.Description.Synonym",
    "BioSample.Description.Title",
    "BioSample.Ids.Id",
    "BioSample.Links.Link",
    "BioSample.Models.Model",
    "BioSample.Owner.Name",
    "BioSample.Package",
    "BioSample.Status"
]

max_docs =  10_000
batch_size = 10_000

# extract_all_paths_data(collection, duckdb_conn, paths, max_docs=max_docs, client=client, batch_size=batch_size)

# extract_biosample_data_arrow(collection, duckdb_conn, paths, max_docs=max_docs, batch_size=batch_size)


In [10]:
def profile_extraction(collection, conn, paths, max_docs=None, batch_size=5000):
    """
    Profiles the extract_biosample_data_arrow function.
    """
    profiler = cProfile.Profile()
    profiler.enable()
    extract_biosample_data_arrow(collection, conn, paths, max_docs=max_docs, batch_size=batch_size)
    profiler.disable()
    stats = pstats.Stats(profiler).sort_stats('cumulative')
    stats.print_stats(10)  # Show the top 10 time-consuming calls


In [11]:
profile_extraction(collection, duckdb_conn, paths, max_docs=max_docs, batch_size=batch_size)

[2024-12-09 10:12:17] Starting extraction for paths: ['BioSample', 'BioSample.Attributes.Attribute', 'BioSample.Curation', 'BioSample.Description.Comment.Paragraph', 'BioSample.Description.Organism', 'BioSample.Description.Organism.OrganismName', 'BioSample.Description.Synonym', 'BioSample.Description.Title', 'BioSample.Ids.Id', 'BioSample.Links.Link', 'BioSample.Models.Model', 'BioSample.Owner.Name', 'BioSample.Package', 'BioSample.Status']
[2024-12-09 10:12:36] Processed 1000/10000 documents.
[2024-12-09 10:12:56] Processed 2000/10000 documents.
[2024-12-09 10:13:14] Processed 3000/10000 documents.
[2024-12-09 10:13:30] Processed 4000/10000 documents.
[2024-12-09 10:13:48] Processed 5000/10000 documents.
[2024-12-09 10:14:06] Processed 6000/10000 documents.
[2024-12-09 10:14:24] Processed 7000/10000 documents.
[2024-12-09 10:14:43] Processed 8000/10000 documents.
[2024-12-09 10:15:02] Processed 9000/10000 documents.
[2024-12-09 10:15:20] Processed 10000/10000 documents.
[2024-12-09 1

In [12]:
# Close the connection when you're finished
duckdb_conn.close()

# close the pymongo connection
client.close()

3370.4docs/min. much slower than the pandas approach.