## NoSQL (MongoDB) (sesión 2)


## Introducción

![MongoDB](https://webassets.mongodb.com/_com_assets/cms/MongoDB_Logo_FullColorBlack_RGB-4td3yuxzjs.png)

Esta hoja muestra cómo acceder a bases de datos MongoDB y también a conectar la salida con Jupyter. Se puede utilizar el *shell* propio de MongoDB en el contenedor usando el programa `mongo`. La diferencia es que ese programa espera código Javascript y aquí trabajaremos con Python.

In [None]:
RunningInCOLAB: bool = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False

In [None]:
if not RunningInCOLAB:
    !sudo apt-get update -qq
    !sudo apt-get install -y -qq gpg p7zip

## Instalación inicial de MongoDB (sólo necesaria en Google Colab)

In [None]:
if RunningInCOLAB:
  !wget -q https://raw.githubusercontent.com/dsevilla/bdge/refs/heads/24-25/addendum/mongo-utils/run-mongo-local.sh
  !bash run-mongo-local.sh

## Descarga de los datos en formato CSV

 - Formato: 7zipped
 - Ficheros:
   - **Comments**.csv
       - Id
       - PostId
       - Score
       - Text, e.g.: "@Stu Thompson: Seems possible to me - why not try it?"
       - CreationDate, e.g.:"2008-09-06T08:07:10.730"
       - UserId
   - **Posts**.csv
       - Id
       - PostTypeId
          - 1: Question
          - 2: Answer
       - ParentID (only present if PostTypeId is 2)
       - AcceptedAnswerId (only present if PostTypeId is 1)
       - CreationDate
       - Score
       - ViewCount
       - Body
       - OwnerUserId
       - LastEditorUserId
       - LastEditorDisplayName="Jeff Atwood"
       - LastEditDate="2009-03-05T22:28:34.823"
       - LastActivityDate="2009-03-11T12:51:01.480"
       - CommunityOwnedDate="2009-03-11T12:51:01.480"
       - ClosedDate="2009-03-11T12:51:01.480"
       - Title=
       - Tags=
       - AnswerCount
       - CommentCount
       - FavoriteCount
   - **Tags**.csv
    - Id
    - Count
    - ExcerptPostId
    - TagName
    - WikiPostId
   - **Users**.csv
     - Id
     - Reputation
     - CreationDate
     - DisplayName
     - EmailHash
     - LastAccessDate
     - WebsiteUrl
     - Location
     - Age
     - AboutMe
     - Views
     - UpVotes
     - DownVotes
   - **Votes**.csv
     - Id
     - PostId
     - VoteTypeId
        - ` 1`: AcceptedByOriginator
        - ` 2`: UpMod
        - ` 3`: DownMod
        - ` 4`: Offensive
        - ` 5`: Favorite - if VoteTypeId = 5 UserId will be populated
        - ` 6`: Close
        - ` 7`: Reopen
        - ` 8`: BountyStart
        - ` 9`: BountyClose
        - `10`: Deletion
        - `11`: Undeletion
        - `12`: Spam
        - `13`: InformModerator
     - CreationDate
     - UserId (only for VoteTypeId 5)
     - BountyAmount (only for VoteTypeId 9)

In [None]:
!wget https://github.com/dsevilla/bd2-data/raw/main/es.stackoverflow/es.stackoverflow.csv.7z.001 -O - > es.stackoverflow.csv.7z
!wget https://github.com/dsevilla/bd2-data/raw/main/es.stackoverflow/es.stackoverflow.csv.7z.002 -O - >> es.stackoverflow.csv.7z

In [None]:
!7zr x -aoa es.stackoverflow.csv.7z
!rm es.stackoverflow.csv.7z

In [None]:
!head Users.csv

## Instalación de la librería `pymongo`

In [None]:
%pip install pymongo tqdm[notebook] pandas matplotlib

In [None]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

%matplotlib inline
matplotlib.style.use('ggplot')

Usaremos la librería `pymongo` para python. La cargamos a continuación.

In [None]:
from pymongo import MongoClient

## Conexión a la base de datos

La conexión se inicia con `MongoClient` en el `host` descrito en el fichero `docker-compose.yml` (`mongo`), o bien a `localhost` si lo estamos haciendo en Colab.

In [None]:
import os
db_hostname: str = "localhost" if RunningInCOLAB else os.getenv('DB_HOSTNAME', "mongo")

In [None]:
from typing import Any

client: MongoClient[dict[str, Any]] = MongoClient(db_hostname,27017)
client

In [None]:
client.list_database_names()

Las bases de datos se crean conforme se nombran. Se puede utilizar la notación punto o la de diccionario. Las colecciones también.

In [None]:
from pymongo.database import Database

db: Database = client.stackoverflow
# db: Database = client['stackoverflow'] # (equivalente)
db

## Importación de los ficheros CSV

Por ahora creamos una colección diferente para cada uno. Después estudiaremos cómo poder optimizar el acceso usando agregación.

In [None]:
import csv
import sys
from datetime import datetime
from pymongo.collection import Collection
from typing import Any
from collections.abc import Callable, Iterable, Iterator
from tqdm.notebook import tqdm

def batched(iterable: Iterable, n) -> Iterable:
    from itertools import islice
    if n < 1:
        raise ValueError('n must be at least one')
    it: Iterator = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch

def csv_to_mongo(file: str, coll: Collection):
    """
    Carga un fichero CSV en Mongo. file especifica el fichero, coll la colección
    dentro de la base de datos, y date_cols las columnas que serán interpretadas
    como fechas.
    """
    # Convertir todos los elementos que se puedan a números
    def to_numeric(d: str) -> str| int | float:
        if len(d) == 0:
            return ''
        if not ((d[0] >= '0' and d[0] <= '9') or d[0] == '-' or d[0] == '+' or d[0]=='.'):
            return d
        try:
            return int(d) if abs(int(d)) <= sys.maxsize else d # Ensure number is inside MongoDB max number range
        except ValueError:
            try:
                return float(d)
            except ValueError:
                return d

    def to_date(d: str) -> datetime | None:
        """To ISO Date. If this cannot be converted, return NULL (None)"""
        try:
            return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            return None

    coll.drop()

    with open(file, encoding='utf-8') as f:
        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')

        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns: list[str] = next(reader)

        # Las columnas que contienen 'Date' se interpretan como fechas
        func_to_cols: list[Callable[[str], Any]] = list(map(lambda c: to_date if 'date' in c.lower() else to_numeric, columns))

        for batch in batched(tqdm(reader, desc='Leyendo e insertando filas...'), 10000):
            docs: list[dict[str, str | datetime | float | int | None]] = []
            for row in batch:
                built_row: list[str | datetime | float | int | None] = [func(e) for (func,e) in zip(func_to_cols, row)]
                docs.append(dict(zip(columns, built_row)))
            coll.insert_many(docs)

        print("¡Hecho!")

In [None]:
csv_to_mongo('Posts.csv',db.posts)

In [None]:
csv_to_mongo('Users.csv',db.users)

In [None]:
csv_to_mongo('Votes.csv',db.votes)

In [None]:
csv_to_mongo('Comments.csv',db.comments)

In [None]:
csv_to_mongo('Tags.csv',db.tags)

In [None]:
posts: Collection[dict[str, Any]] = db.posts
posts

In [None]:
posts.count_documents({})

## Framework de Agregación

Framework de agregación:
- Aquí está la referencia de las diferentes etapas por las que puede pasar un pipeline: https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/.
- Y aquí los distintos operadores que se permiten dentro de las etapas: https://docs.mongodb.com/manual/reference/operator/aggregation/.
- Y aquí incluso un libro completo con usos prácticos de ejecutar agregación: https://www.practical-mongodb-aggregations.com/.

A continuación un vídeo interesante:

In [None]:
from IPython.display import YouTubeVideo
YouTubeVideo('VSX4a3h4SmQ',width=600)

Aqui tenemos un esquema con un ejemplo básico de agregación.

![](https://miro.medium.com/max/1060/1*2lDBxvZ8Cr3JYkoODTa0lQ.png)

## Algunos operadores de agregación



### `$match`

Este operador permite filtrar los documentos que queremos que pasen a la siguiente fase del pipeline definiendo una serie de condiciones sobre los campos de los mismos.

Vamos a filtrar aquellos documentos que tengan un `Score` igual o superior (`$gte`) a 150.

In [None]:
from pymongo.command_cursor import CommandCursor

respuestas: CommandCursor[dict[str, Any]] = posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 150}}}
])
list(respuestas)

### `$project`

El operador `$project`permite filtar qué campos de los documentos queremos usar en la siguientes fases de agregación.

Generalmente este operador suele combinarse con otros como `$find`, `$match` o `$lookup` en fases más avanzadas de la agregación.

En el siguiente ejemplo, realizamos un filtrado en donde solo nos quedamos con el campo `Id` de los posts.

In [None]:
respuestas: CommandCursor[dict[str, Any]] = db['posts'].aggregate( [
    {'$project' : { 'Id' : True }},
    {'$limit': 20} ])
list(respuestas)

### `$group`

La función `$group` se usa dentro del *pipeline* de agregación de documentos seguido por Mongo (`.aggregate`). Esta función admite dos parámetros diferentes:
* `_id`: El identificador por el que queremos agrupar los documentos.
* `campo`: Expresión mediante la cual queremos aggregar los documentos (*opcional*).

Vamos a contar el número de posts por `OwnerUserId` en la colección `posts`.

In [None]:
from pymongo.command_cursor import CommandCursor

users_count_scores: CommandCursor[dict[str, Any]] = db.posts.aggregate(
    [
        {
            "$group":{
                "_id": "$OwnerUserId",
                "count": {"$sum": 1}
            }
        },
        {
            "$limit": 10
        }
    ]
)
list(users_count_scores)

Ahora vamos computar el `score` medio por usuario en base a todos sus posts.

In [None]:
users_avg_scores: CommandCursor[dict[str, Any]] = db.posts.aggregate(
    [{
        "$group":{
            "_id": "$OwnerUserId",
            "avg_score": {"$avg":'$Score'}
        }
        },
        { "$limit": 10 }
    ]
)
list (users_avg_scores)

### `$lookup`

El operador `$lookup` permite realizar búsquedas en otras colecciones. Podrían interpretarse como un *join* en el modelo relacional. Esta función admite cuatro parámetros diferentes:

*   `from`: especifica la colección en la misma base de datos con la que hacer la búsqueda (o el *join*).
*   `localField`: Especifica el campo de los documentos de entrada a usar para realizar una comparación de igualdad con el campo `foreignField` de los documentos de la colección `from`.  
*   `foreignField`: Especifica el campo de los documentos de la colección `from` a usar en la comparación de igualdad con el `localField`.
*   `as`: Especifica el nombre del nuevo campo array que se añadirá a los documentos de entrada. Este nuevo campo contiene los documentos coincidentes de la colección `from`.

En el ejemplo siguiente, en primer lugar seleccionamos los posts con un `Score` igual o mayor a 150 con `$match` para a continuación seleccionar los usuarios que han publicado dichos posts incluyéndolo en un nuevo campo llamado `owner`.

In [None]:
respuestas: CommandCursor[dict[str, Any]] = posts.aggregate( [
    {'$match': { 'Score' : {'$gte': 150}}},
    {'$lookup': {
        'from': "users",
        'localField': "OwnerUserId",
        'foreignField': "Id",
        'as': "owner"}
    }
])
list(respuestas)

### `$arrayElemAt`

El `$lookup` genera un _array_ con todos los resultados. El operador `$arrayElementAt` accede al primer elemento.

In [None]:
respuestas: CommandCursor[dict[str, Any]] = db.posts.aggregate( [
    {'$match': { 'Score' : {'$gte': 150}}},
    {'$lookup': {
        'from': "users",
        'localField': "OwnerUserId",
        'foreignField': "Id",
        'as': "owner"}
    },
    { '$project' : {
        'Id' : True,
        'Score' : True,
        'username' : {'$arrayElemAt' : ['$owner.DisplayName', 0]},
        'owner.DisplayName' : True
      }}
    ])
list(respuestas)

### `$unwind`

Este operador *desdobla* cada fila por cada elemento del array.

Ej: El siguiente código:

```python
db.inventario.insert_one({ "_id" : 1, "item" : "ABC1", "tallas": [ "S", "M", "L"] })
db.inventario.aggregate( [ { "$unwind" : "$tallas" } ] )
```

Devolverá:

```json
{ "_id" : 1, "item" : "ABC1", "tallas" : "S" }
{ "_id" : 1, "item" : "ABC1", "tallas" : "M" }
{ "_id" : 1, "item" : "ABC1", "tallas" : "L" }
```

En el ejemplo de procesamiento de posts que estamos llevando a cabo, como sabemos que el array `$owner` sólo contiene un elemento, sólo habrá una fila por fila original, pero sin el _array_. Finalmente se puede proyectar el campo que se quiera. En este caso el `ownerDisplayName` que es proyectado (renombrado) como `username`.

In [None]:
respuestas: CommandCursor[dict[str, Any]] = db.posts.aggregate( [
    { '$match': { 'Score' : {'$gte': 150}}},
    { '$lookup': {
        'from': "users",
        'localField': "OwnerUserId",
        'foreignField': "Id",
        'as': "owner"
      }
    },
    { '$unwind': '$owner'},
    { '$project' : {
          'username': '$owner.DisplayName'
      }
    }
    ])
list(respuestas)

### `$push`

Devuelve un array con *todos* los valores que resultan de aplicar una determinada expresión a los documentos que forman parte del pipeline.

Obtener el listado con el identificador de `Posts` (`Id`) asociados a cada valor de `Score`.

In [None]:
posts_by_score: CommandCursor[dict[str, Any]] = db.posts.aggregate([
    { '$limit': 2000 },
    {
        '$group':{
            '_id': '$Score',
            'posts':{
                '$push': {
                    'post': '$Id'
                }
            }
        }
    }

])
list(posts_by_score)

### `$addToSet`

 Devuelve un array de todos los valores únicos que resultan de aplicar una expresión a cada documento de un grupo.


Vamos a obtener el listado de `Tags` asociadas a cada tipo de licencia  `ContentLicense`.

In [None]:
license_and_tags: CommandCursor[dict[str, Any]] = db.posts.aggregate([
    { "$match" : {"PostTypeId": 1}},
    {"$limit": 1000},

    {'$group':{
            '_id':'$ContentLicense',
            'AllTags': { '$addToSet': "$Tags" }
        }
    }
])
list(license_and_tags)

### `$out`

Este comando permite volcar el resultado de un pipeline de agregación en una nueva colección en la base de datos.

In [None]:
respuestas: CommandCursor[dict[str, Any]] = db.posts.aggregate( [
        { '$match': { 'Score' : {'$gte': 40}}},
        { '$lookup': {
            'from': "users",
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"
          }
        },
        { '$unwind': '$owner'},
        { '$project' : {
             'username': '$owner.DisplayName'
          }
        },
        {'$out': "stackoverflow_users"}
      ])

Un documento de ejemplo de la colección generada `stackoverflow_users`:

In [None]:
pp(db.stackoverflow_users.find_one())

In [None]:
db.stackoverflow_users.count_documents({})

## Ejemplos básicos de agregación

### Ejemplo 1: Asociación usuarios con Tags

Con Agregación, vamos a construir una colección que asocia un usuario con los tags que ha usado en todas sus preguntas.

*Por cuestiones de rendimiento vamos a limitar la agregación a 500 elementos.*

In [None]:
user_tags: CommandCursor[dict[str, Any]] = db.posts.aggregate([
        { "$match" : {"PostTypeId": 1}},

        {"$limit": 500},

        { '$lookup': {
            'from': "users",
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"
          }
        },

        {'$project':{
            'Tags': True,
            'userid' : {'$arrayElemAt' : ['$owner.Id', 0]},
        }},

        {'$group':{
            '_id':'$userid',
            'AllTags': { '$addToSet': "$Tags" }
        }}
]
)

user_tags_lst = list(user_tags)
user_tags_lst

### Ejemplo 2: Asociación Tags con usuarios

Ahora, dado un ID de tag, qué usuarios han hecho preguntas de ese tag.

*De nuevo limitamos la búsqueda a 500 documentos*

In [None]:
tags_users: CommandCursor[dict[str, Any]] = db.posts.aggregate( [
        { "$match" : {"PostTypeId": 1}},

        {"$limit": 500},

        { '$lookup': {
            'from': "users",
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"
          }
        },

        {'$project':{
            'Tags': True,
            'userid' : {'$arrayElemAt' : ['$owner.Id', 0]},
        }},

        {'$group':{
            '_id':'$Tags',
            'AllUsers': { '$addToSet': "$userid" }
        }}
    ])

tags_users_lst = list(tags_users)
tags_users_lst

## Ejercicios

### EJ1: Obtener el resultado de documentos *verbosos* en donde se indique el texto `Body` de una pregunta así como el nombre del usuario que la formuló (`DisplayName`)

In [None]:
verbose_posts: CommandCursor[dict[str, Any]] = db.posts.aggregate([
    { "$match" : {"PostTypeId": 1}},
    { "$limit": 1000},
    { '$lookup': {
        'from': "users",
        'localField': "OwnerUserId",
        'foreignField': "Id",
        'as': "owner"
      }
    },
    { '$project': {
        '_id': False,
        'Body': True,
        'username': {'$arrayElemAt' : ['$owner.DisplayName', 0]}
      }
    },
])

list(verbose_posts)

### EJ2: Obtener las respuestas escritas en Enero de 2022 con un `Score` superior a 3

In [None]:
start = datetime(2022, 1, 1)
end = datetime(2022, 1, 31)

respuestas_Enero_2022: CommandCursor[dict[str, Any]] = db.posts.aggregate( [
        { '$match': {
            'PostTypeId' : 2,
            'Score' : {'$gte': 3},
            'CreationDate': {'$gte': start, '$lte': end}
        }},
        { '$limit': 5}
      ])

list(respuestas_Enero_2022)

In [None]:
print("Eso es todo amigos!")