In [24]:
import pymongo
import os
import time
import functools
import json
import threading

CONNECTION_CHAIN = os.getenv("CONNECTION_CHAIN")
base_size = int(os.getenv("INSERT_SIZE", 1))
iterations = int(os.getenv("INTERATIONS", 3))

def calcular_tiempo(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        inicio = time.time()
        resultado = func(*args, **kwargs)
        fin = time.time()
        tiempo = fin - inicio
        return resultado, tiempo
    return wrapper


def testear_conexion():
    try:
        cliente = pymongo.MongoClient(CONNECTION_CHAIN)
        lista_bases_de_datos = cliente.list_database_names()
        print("Conexión exitosa. Bases de datos disponibles:")
        for base_de_datos in lista_bases_de_datos:
            print(base_de_datos)
        cliente.close()
    except Exception as e:
        print("Error al conectar a MongoDB:", e)
testear_conexion()

DATABASES = ["compras", "ventas"]
ELEMENT_TYPE: list[str] = ["1_papel", "2_boligrafo", "3_lapiz", "4_goma", "5_regla"]

Conexión exitosa. Bases de datos disponibles:
admin
config
local
sells
test


In [25]:
@calcular_tiempo
def insert_documents(size):
    cliente = pymongo.MongoClient(CONNECTION_CHAIN)
    for col_name in DATABASES:
        col = cliente.test[col_name]
        for element in ELEMENT_TYPE:
            for i in range(size):
                col.insert_one({"element": element, "amount": i})

@calcular_tiempo
def batch_insert_documents(size):
    cliente = pymongo.MongoClient(CONNECTION_CHAIN)
    for col_name in DATABASES:
        col = cliente.test[col_name]
        documents = []
        for element in ELEMENT_TYPE:
            for i in range(size):
                documents.append({"element": element, "amount": i})

        col.insert_many(documents)


def insert_documents_thread(size, col_name, element):
    cliente = pymongo.MongoClient(CONNECTION_CHAIN)
    col = cliente.test[col_name]
    for i in range(size):
        col.insert_one({"element": element, "amount": i})


@calcular_tiempo
def parallel_insert_documents(size):
    threads = []
    for col_name in DATABASES:
        for element in ELEMENT_TYPE:
            thread = threading.Thread(
                target=insert_documents_thread, args=(size, col_name, element)
            )
            threads.append(thread)
            thread.start()

    # Espera a que todos los threads terminen
    for thread in threads:
        thread.join()

In [26]:
@calcular_tiempo
def get_documents(database, element):
    cliente = pymongo.MongoClient(CONNECTION_CHAIN)
    col = cliente.test[database]
    result = list(col.find({"element": element}))
    return len(result)



@calcular_tiempo
def drop_elements(database, element):
    cliente = pymongo.MongoClient(CONNECTION_CHAIN)
    col = cliente.test[database]
    result = col.delete_many({"element": element})
    return result.deleted_count

In [27]:

def insert_data(DATABASES, ELEMENT_TYPE, base_size, i, iteration_result):
    insert_size = (2**i) * base_size * len(DATABASES) * len(ELEMENT_TYPE)
    _, insert_spent_time = insert_documents(insert_size)
    iteration_result["insert.secuential.size"]= insert_size
    iteration_result["insert.secuential.time"]= insert_spent_time
    iteration_result["insert.secuential.rate"]= insert_size / insert_spent_time

    _, insert_spent_time = batch_insert_documents(insert_size)
    iteration_result["insert.batch.size"]= insert_size
    iteration_result["insert.batch.time"]= insert_spent_time
    iteration_result["insert.batch.rate"]= insert_size / insert_spent_time

    _, insert_spent_time = parallel_insert_documents(insert_size)
    iteration_result["insert.parallel.size"]= insert_size
    iteration_result["insert.parallel.time"]= insert_spent_time
    iteration_result["insert.parallel.rate"]= insert_size / insert_spent_time


def query_data(DATABASES, ELEMENT_TYPE, iteration_result):
    for db in DATABASES:
        find_size, find_spent_time = get_documents(db, ELEMENT_TYPE[0])
        iteration_result[f"find.{db}.time"] = find_spent_time
        iteration_result[f"find.{db}.size"] = find_size
        iteration_result[f"find.{db}.rate"] = find_size / find_spent_time


def drop_data(iteration_result):
    for i in DATABASES:
        for e in ELEMENT_TYPE:
            size, time = drop_elements(i,e)
            iteration_result[f"drop.{i}.{e}.size"] = size
            iteration_result[f"drop.{i}.{e}.time"] = time

In [35]:
result = {}
for i in range(iterations):
    iteration_result = {}
    docs = pymongo.MongoClient(CONNECTION_CHAIN).test.command("dbstats")["objects"]
    print(f"Starting iteration. Documents in db: {format(docs, ",").replace(",", ".")}")
    insert_data(DATABASES, ELEMENT_TYPE,  base_size, i, iteration_result)
    query_data(DATABASES, ELEMENT_TYPE,  iteration_result)
    drop_data(iteration_result)
    print(iteration_result)
    result[f"Iter{i}"] = iteration_result

filename = os.getenv("OUTPUT_FILENAME", "/output/output.json")
with open(filename, "w") as archivo:
    json.dump(result, archivo, indent=2)


print(f"\n\nTest complete. Output {filename} wrote")

Starting iteration. Documents in db: 584.613
{}
Starting iteration. Documents in db: 584.617
{}
Starting iteration. Documents in db: 584.621
{}
