# Dask computing

In [1]:
import pymongo
from pymongo import MongoClient
from distributed import client

### 1. Conexión a la base de datos MongoDB en local.

In [2]:
# Reemplaza 'localhost' con la dirección de tu servidor MongoDB si es diferente
# y '27017' con el puerto correspondiente si has configurado uno distinto.
client = MongoClient('mongodb://localhost:27017/')

# Reemplaza 'nombre_de_tu_base_de_datos' con el nombre de tu base de datos
db = client['Contrataciones']

# Intenta obtener una colección para verificar la conexión
try:
    # Reemplaza 'nombre_de_tu_colección' con el nombre de una colección existente en tu base de datos
    colecciones = db.list_collection_names()  # Lista todas las colecciones para verificar la conexión
    if 'Contratos_EDCA' in colecciones:
        print("Conexión exitosa a la base de datos y la colección encontrada.")
    else:
        print("Conexión exitosa a la base de datos, pero la colección no fue encontrada.")
except Exception as e:
    print(f"Error al conectar a la base de datos: {e}")


# Listar todas las bases de datos
databases = client.list_database_names()

# Imprimir la lista de bases de datos
print("Bases de datos disponibles son:", databases)

Conexión exitosa a la base de datos y la colección encontrada.
Bases de datos disponibles son: ['Contrataciones', 'Test', 'admin', 'config', 'local']


### 2. Revisar el tamaño y la última actualización de los datos.

In [3]:
# Imprimir el tamaño de la base de datos, número de colecciones y última actualización

# Obteniendo el tamaño de la base de datos en GB
print(f"Obteniendo el tamaño de la base de datos.")
db_stats = db.command("dbstats")
db_size_gb = db_stats['storageSize'] / (1024**3)  # Convertir bytes a GB
print(f"Tamaño de la base de datos: {db_size_gb} GB")

# Obteniendo el número de documentos en la colección 'Contratos_EDCA'
print(f"Obteniendo el número de documentos. Este proceso puede tardar hasta 3 minutos...")
num_documents = db['Contratos_EDCA'].count_documents({})
print(f"Número de documentos en la colección 'Contratos_EDCA': {num_documents}")

# Obteniendo la última actualización de la base de datos
# Esto asume que hay una colección que registra las actualizaciones
# Reemplaza 'nombre_de_tu_colección_de_registro' con el nombre real de tu colección de registro

#  try: [HAY QUE CAMBIAR ESTO POR LA FECHA DEL ÚLTIMO CONTRATO]
#     last_update_collection = db['Contratos_EDCA']
#     last_update = last_update_collection.find().sort("fecha", -1).limit(1)
#     for update in last_update:
#         print(f"Última actualización: {update['fecha']}")
# except Exception as e:
#     print(f"No se pudo obtener la última actualización: {e}")


Obteniendo el tamaño de la base de datos.
Tamaño de la base de datos: 7.33831787109375 GB
Obteniendo el número de documentos. Este proceso puede tardar hasta 3 minutos...
Número de documentos en la colección 'Contratos_EDCA': 2742266


In [13]:
from pymongo import MongoClient
from datetime import datetime

# Conexión a MongoDB
cliente_mongo = MongoClient('mongodb://localhost:27017/')
db = cliente_mongo['Contrataciones']  # Reemplaza con el nombre de tu base de datos si es necesario

# Contar el número de documentos con fecha del año 2024 en la colección Sample_Contratos_EDCA
inicio_2024 = datetime(2024, 1, 1)
fin_2024 = datetime(2024, 12, 31)
numero_documentos_2024 = db['Contratos_EDCA'].count_documents({'fecha': {'$gte': inicio_2024, '$lte': fin_2024}})
print(f"Número de documentos en la base de datos 'Contrataciones' para el año 2024: {numero_documentos_2024}")



Número de documentos en la colección 'Sample_Contratos_EDCA' para el año 2024: 0


In [9]:
from dask.distributed import Client, as_completed
from pymongo import MongoClient
import dask

# Inicializar el cliente Dask Distributed de manera que evite el conflicto de puertos
client = Client(processes=False)

# Conexión a MongoDB
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client['Contrataciones']  # Reemplaza con el nombre de tu base de datos

def obtener_tamano_db():
    db_stats = db.command("dbstats")
    db_size_gb = db_stats['storageSize'] / (1024**3)  # Convertir bytes a GB
    return db_size_gb

def contar_documentos():
    num_documents = db['Contratos_EDCA'].count_documents({})
    return num_documents

# Crear tareas Dask
tarea_tamano_db = dask.delayed(obtener_tamano_db)()
tarea_contar_docs = dask.delayed(contar_documentos)()

# Ejecutar tareas en paralelo y manejar los resultados a medida que se completan
futuros = dask.persist(tarea_tamano_db, tarea_contar_docs)
resultados = []
for futuro in as_completed(futuros):
    resultados.append(futuro.result())

print(f"Tamaño de la base de datos: {resultados[0]} GB")
print(f"Número de documentos en la colección 'Contratos_EDCA': {resultados[1]}")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52288 instead
2024-03-26 14:08:13,168 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 2 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x11603ec30>
 0. contar_documentos-4f598d2a-1158-468e-a096-40be417fb194
 1. obtener_tamano_db-7dfc818a-e043-4d5c-bb54-804b0ae8e861
>.
Traceback (most recent call last):
  File "/Users/jldelda/Documents/4 WORK/7 Mottum.io/2 CONSULTORÍAS/2024_01 UNDP_SESNA ANTICORRUPCIÓN_MEXICO/3 Desarrollo/Limpieza_Datos_Contrataciones/pymongo_env/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jldelda/Documents/4 WORK/7 Mottum.io/2 CONSULTORÍAS/2024_01 UNDP_SESNA ANTICORRUPCIÓN_MEXICO/3 Desarrollo/Limpieza_Datos_Contrataciones/pymongo_env/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 2 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x11603ec30>\n 0. contar_documentos-4f598d2a-1158-468e-a096-40be417fb194\n 1. obtener_tamano_db-7dfc818a-e043-4d5c-bb54-804b0ae8e861\n>')

In [11]:
from dask.distributed import Client, as_completed
import dask

# Inicializar el cliente Dask Distributed especificando un puerto para evitar conflictos
client = Client(processes=False, dashboard_address=':0')

def obtener_tamano_db():
    from pymongo import MongoClient
    # Conexión a MongoDB dentro de la función
    mongo_client = MongoClient('mongodb://localhost:27017/')
    db = mongo_client['Contrataciones']  # Reemplaza con el nombre de tu base de datos
    db_stats = db.command("dbstats")
    db_size_gb = db_stats['storageSize'] / (1024**3)  # Convertir bytes a GB
    return db_size_gb

def contar_documentos():
    from pymongo import MongoClient
    # Conexión a MongoDB dentro de la función
    mongo_client = MongoClient('mongodb://localhost:27017/')
    db = mongo_client['Contrataciones']
    num_documents = db['Contratos_EDCA'].count_documents({})
    return num_documents

# Crear tareas Dask
tarea_tamano_db = dask.delayed(obtener_tamano_db)()
tarea_contar_docs = dask.delayed(contar_documentos)()

# Ejecutar tareas en paralelo y manejar los resultados a medida que se completan
futuros = dask.persist(tarea_tamano_db, tarea_contar_docs)
resultados = []
for futuro in as_completed(futuros):
    resultados.append(futuro.result())

print(f"Tamaño de la base de datos: {resultados[0]} GB")
print(f"Número de documentos en la colección 'Contratos_EDCA': {resultados[1]}")

TypeError: Input must be a future, got Delayed('obtener_tamano_db-43a1b43d-fa36-49e5-9835-e1c0bd099d19')

### 3. Crear una muestra pequeña de datos para confirmar que los comandos funcionan.

Guardaremos la nueva muestra _Sample_Contratos_EDCA_ en una nueva colección.

In [10]:
# Generando la muestra aleatoria y guardándola en una nueva colección
sample_size = int(num_documents * 0.05)  # Calcula el 5% del total de documentos
print(f"Número de documentos en la muestra aleatoria es: {sample_size}")

# Realiza la agregación para obtener la muestra aleatoria
sample = db['Contratos_EDCA'].aggregate([
    { '$sample': { 'size': sample_size } }
])
sample_documents = list(sample)
print(f"Sample creado")

# Guarda la muestra en una nueva colección
db['Sample_Contratos_EDCA'].insert_many(sample_documents)
print(f"Muestra guardada en la colección 'Sample_Contratos_EDCA'!")



Número de documentos en la muestra aleatoria es: 137113
Sample creado
Muestra guardada en la colección 'Sample_Contratos_EDCA'!


In [19]:
# Realizando la consulta con sort y limit
resultados = Sample_Contratos_EDCA.find().sort('publishedDate', -1).limit(5)

# Iterando sobre los resultados y haciendo algo con ellos
for documento in resultados:
    print(documento)

AttributeError: 'CommandCursor' object has no attribute 'find'