# HBase

Con HBase vamos a simular un _clúster_ de varias máquinas con varios contenedores conectados. En el directorio `hbase` del repositorio git hay un script para ejecutar la instalación con `docker-compose`.

Para conectarse al _clúster_ con un _shell_ de hbase, hay que ejecutar, desde una terminal el siguiente comando de docker:

```bash
$ docker exec -ti hbase-regionserver hbase shell
Base Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.7, rac57c51f7ad25e312b4275665d62b34a5945422f, Fri Sep  7 16:11:05 CDT 2018

hbase(main):001:0> 
```

Comenzamos instalando e importando la librería `happybase` y creado la conexión con el Thrift Server para poder manejar la base de datos desde python.

In [1]:
!pip install happybase



In [3]:
import happybase
from contextlib import contextmanager

HBASEHOST = 'hbase-thriftserver'

class Connection():
    
    def __init__(self, host):
        self.host = host
        self._genpool()
        
    def _genpool(self):
        self.pool = happybase.ConnectionPool(size=5, host=self.host)
    
    @contextmanager
    def connection(self):
        for _ in range(5): # Probar 5 veces a regenerar el pool
            for _ in range(5): # Probar 5 veces a conectar
                with self.pool.connection() as connection:
                    try:
                        connection.tables()
                        yield connection
                        return
                    except Exception as e:
                        pass
            self._genpool()
        raise Exception("HBase Connection Error")

In [4]:
hbasecon = Connection(HBASEHOST)

Para la carga inicial, vamos a crear todas las tablas con una única familia de columnas, `rawdata`, donde meteremos toda la información _raw_ comprimida. Después podremos hacer reorganizaciones de los datos para hacer el acceso más eficiente. Es una de las muchas ventajas de no tener un esquema.

In [21]:
# Create tables
tables = ['posts', 'votes', 'users', 'tags', 'comments']
for t in tables:
    try:
        with hbasecon.connection() as connection:
            connection.create_table(
                t,
                {
                    'rawdata': dict(max_versions=1,compression='GZ')
                })
    except Exception as e:
        print("Database already exists: {0}. {1}".format(t, e))
        pass
with hbasecon.connection() as connection:
    print(connection.tables())

[b'comments', b'posts', b'tags', b'users', b'votes']


El código de importación es siempre el mismo, ya que se coge la primera fila del CSV que contiene el nombre de las columnas y se utiliza para generar nombres de columnas dentro de la familia de columnas dada como parámetro. La función `csv_to_hbase()` acepta un fichero CSV a abrir, un nombre de tabla y una familia de columnas donde agregar las columnas del fichero CSV. En nuestro caso siempre va a ser `rawdata`.

In [22]:
import csv

def csv_to_hbase(file, tablename, cf):
    with hbasecon.connection() as connection, open(file) as f:
        table = connection.table(tablename)

        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')
        
        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns = next(reader)
        columns = [cf + ':' + c for c in columns]
        
        with table.batch(batch_size=500) as b:
            for row in reader:
                # La primera columna se usará como Row Key
                b.put(row[0], dict(zip(columns[1:], row[1:])))

In [23]:
for t in tables:
    print("Importando tabla {0}...".format(t))
    %time csv_to_hbase('../'+t.capitalize() + '.csv', t, 'rawdata')

Importando tabla posts...
CPU times: user 10.8 s, sys: 281 ms, total: 11.1 s
Wall time: 22 s
Importando tabla votes...
CPU times: user 5.34 s, sys: 99.3 ms, total: 5.44 s
Wall time: 11.4 s
Importando tabla users...
CPU times: user 3.81 s, sys: 108 ms, total: 3.91 s
Wall time: 8.85 s
Importando tabla tags...
CPU times: user 28.7 ms, sys: 24 µs, total: 28.7 ms
Wall time: 60.7 ms
Importando tabla comments...
CPU times: user 4.38 s, sys: 72.6 ms, total: 4.45 s
Wall time: 8.74 s


### Construcción de estructuras anidadas

Al igual que pasaba con MongoDB, las bases de datos NoSQL como en este caso HBase permiten almacenar estructuras de datos complejas. En nuestro caso vamos a agregar los comentarios de cada pregunta o respuesta (post) en columnas del mismo. Para ello, creamos una nueva familia de columnas `comments`.

HBase es bueno para añadir columnas sencillas, por ejemplo que contengan un valor. Sin embargo, si queremos añadir objetos complejos, tenemos que jugar con la codificación de la familia de columnas y columna.

Usaremos el shell porque `happybase` no permite alterar tablas ya creadas.

En el `shell` de HBase pondremos lo siguiente:

```
disable 'posts'
alter 'posts', {NAME => 'comments', VERSIONS => 1}
enable 'posts'
```

Cada comentario que añadimos contiene, al menos:

- un id único
- un texto
- un autor
- etc.

¿Cómo se consigue meterlo en una única familia de columnas?

Hay varias formas. La que usaremos aquí, añadiremos el **id** de cada comentario como parte del nombre de la columna. Por ejemplo, el comentario con Id 2000, generará las columnas:

- `Id_2000` (valor 2000)
- `UserId_2000`
- `PostId_2000`
- `Text_2000`

con sus correspondientes valores. Así, todos los datos relativos al comentario con Id original 2000, estarán almacenados en todas las columnas que terminen en "`_2000`". La base de datos permite implementar filtros que nos permiten buscar esto de forma muy sencilla. Los veremos después.

In [26]:
with hbasecon.connection() as connection:
    comments = connection.table('comments')
    posts = connection.table('posts')

    with posts.batch(batch_size=500) as bp:
        # Hacer un scan de la tabla
        for key, data in comments.scan():
            comment = {'comments:' +
                       d.decode('utf-8').split(':')[1] + "_" +
                       key.decode('utf-8') : 
                       data[d].decode('utf-8') for d in data.keys()}
            bp.put(data[b'rawdata:PostId'], comment)

## EJERCICIO: Mostrar las filas de la tabla `users` (sólo la columna `rawdata:Location`) de usuarios de España (se supondrá que su localización (columna `rawdata:Location`) contiene `España` o `ES`, obviando mayúsculas y minúsculas).

Para este ejercicio volvemos a aplicar la función `scan` junto con el parámetro `columns`, para indicar que queremos extraer la columna `rawdata:Location`, y además añadimos un filtro sobre esa columna para que nos muestre únicamente las de usuarios de España. Para obviar las minúsculas y las mayúsculas, el filtro aplicado es de tipo `substring`, que nos permite realizar una comparación *case insensitive*. Para que incluya las localizaciones que contienen tanto España como ES, hemos realizado un `OR` que une los dos filtros `substring`.

In [27]:
with hbasecon.connection() as connection:
    users = connection.table('users')
    for key,data in users.scan(columns=['rawdata:Location'], 
                               filter="SingleColumnValueFilter('rawdata','Location', =, 'substring:españa') OR SingleColumnValueFilter('rawdata','Location', =, 'substring:ES')"):
        print (key,'->',data[b'rawdata:Location'].decode('utf-8'))

b'-1' -> en la granja de servidores
b'10039' -> Posadas, Misiónes, Argentina
b'10136' -> Madrid, España
b'10137' -> Barcelona, Espanya
b'1022' -> Santo Domingo Este, Santo Domingo, República Dominicana
b'10276' -> Barcelona, Espanya
b'10308' -> Buenos Aires, Argentina
b'10335' -> Mataró, España
b'10429' -> Valencia, España
b'10442' -> Buenos Aires, Argentina
b'10455' -> Ciudad del Este, Paraguay
b'1048' -> Raleigh, NC, United States
b'10497' -> Barcelona, España
b'10510' -> Tultitlán, Estado de Mexico
b'10531' -> Buenos Aires, Ciudad Autónoma de Buenos Aires, Argentina
b'10576' -> Valencia, España
b'10629' -> Indore, Madhya Pradesh, India
b'10642' -> Tortuguitas, Buenos Aires, Argentina
b'10645' -> Budapest, Hungría
b'10692' -> Sevilla, España
b'10699' -> Miami, FL, United States
b'10744' -> Capital Federal, Ciudad Autónoma de Buenos Aires, Argentina
b'10756' -> Buenos Aires, Argentina
b'10796' -> Zaragoza, España
b'10801' -> Buenos Aires, Ciudad Autónoma de Buenos Aires, Argentina
b'1

## EJERCICIO: Crear una nueva tabla `poststags` que, de forma eficiente, para cada _tag_, liste los `Id` de los posts que utilizan ese _tag_.


Para la realización de este ejercicio, hemos usado la estructura mostrada en la sesión 5 a la hora de crear la tabla `Links`. En este caso, se ha añadido una única familia de columnas que contiene todos los ids de los posts que utilizan un determinado tag. Para poder incluir todos los ids en esa misma familia, las diferentes columnas para cada id se han creado de la forma `Post_ID`, siendo `ID` el id del post. Además, se ha creado un filtro de Bloom de tipo `ROW`, para poder acelerar las consultas sobre las filas, estableciendo rápidamente si una fila no existe.

Para rellenar la tabla se ha hecho un `scan` sobre la tabla `posts`, extrayendo únicamente la columna `rawdata:Tags`. A continuación, de igual manera que se hizo en la sesión 1, se extraen todos los tags pasándole a la función `findall` de la librería `re` una expresión regular que indica que queremos todos los elementos que estén entre los símbolos '<' y '>'. Por cada tag obtenido, se añade una columna con el id del post (la `key` que devuelve `scan`) en el que estamos.

In [28]:
import sys
import re

class BuildPostsTags():
    def __init__(self,connection):
        
        try:
            connection.create_table(
                "poststags",
                {
                    'posts': dict(bloom_filter_type='ROW',max_versions=1),
                })
        except:
            print ("Database poststags already exists.")
            pass

        self.table = connection.table('poststags')
        self.posts = connection.table('posts')

    def run(self):
        
        with self.table.batch(batch_size=500) as pt:
            for key, data in self.posts.scan(columns=["rawdata:Tags"]):
                postid = key.strip().decode('utf-8')
                #print("\n{0}:".format(postid))
                for tag in re.findall('<(.*?)>', data[b'rawdata:Tags'].decode('utf-8')):
                    tag = tag.strip()
                    #sys.stdout.write(".")
                    pt.put(tag, {'posts:Post_' + postid : postid})


Cargamos la tabla:

In [29]:
with hbasecon.connection() as connection:
    BuildPostsTags(connection).run()

A modo de ejemplo, extraemos las primeras 20 filas de la tabla aplicando un `scan` sobre la misma:

In [30]:
with hbasecon.connection() as connection:
    poststags = connection.table('poststags')
    
    for key, data in poststags.scan(limit=20):
        print(key, '->', data, '\n')

b'.asmx' -> {b'posts:Post_27967': b'27967', b'posts:Post_35663': b'35663', b'posts:Post_37980': b'37980', b'posts:Post_51037': b'51037', b'posts:Post_54439': b'54439', b'posts:Post_71488': b'71488', b'posts:Post_80504': b'80504'} 

b'.htaccess' -> {b'posts:Post_10428': b'10428', b'posts:Post_12944': b'12944', b'posts:Post_14104': b'14104', b'posts:Post_14444': b'14444', b'posts:Post_14611': b'14611', b'posts:Post_14740': b'14740', b'posts:Post_15220': b'15220', b'posts:Post_15280': b'15280', b'posts:Post_15352': b'15352', b'posts:Post_15586': b'15586', b'posts:Post_16168': b'16168', b'posts:Post_16562': b'16562', b'posts:Post_17113': b'17113', b'posts:Post_18294': b'18294', b'posts:Post_18463': b'18463', b'posts:Post_19880': b'19880', b'posts:Post_20212': b'20212', b'posts:Post_20910': b'20910', b'posts:Post_22102': b'22102', b'posts:Post_23062': b'23062', b'posts:Post_23359': b'23359', b'posts:Post_2578': b'2578', b'posts:Post_25931': b'25931', b'posts:Post_26869': b'26869', b'posts:P

## EJERCICIO: Construya una tabla (de la forma más eficiente) que sirva para, dados un ID de usuario que pregunta y un ID de otro usuario que responde, se pueda decir (en orden constante) si forman una pareja al estilo de la RQ4. Con esa tabla, responder a la RQ4.

Para la realización del último ejercicio, creamos una tabla `users_qa` que sigue de nuevo la misma estructura que en el caso anterior. Las filas de esta tabla tendrán como clave un usuario que ha hecho como mínimo una pregunta, y como columnas todos los usuarios que le han respondido. Para incluir todos esos usuarios que responden en una misma familia de columnas `answerers`, se ha seguido la misma estrategia que en el caso anterior, llamando a las columnas de la forma `answerers:User_ID`, siendo `ID` el id del usuario que responde. Además, se ha incluido también un filtro de Bloom de tipo `ROW`.

Para rellenar la tabla, se ha hecho un `scan` sobre la tabla `posts`, añadiendo un filtro sobre las columnas para que solo nos devuelva aquellos `posts` que son respuestas. Esto se podría haber realizado sin añadir ningún filtro al `scan` y poniendo una sentencia `if()` que hiciera que solo se ejecutara el código del bucle para las respuestas, pero nos ha parecido más eficiente la primera opción, tras realizar varias ejecuciones.

El hecho de seleccionar únicamente las respuestas se debe a que permiten obtener la pregunta a la que se refieren (usando el `ParentId`), y una vez se tiene se puede extraer en orden constante cuál es el usuario que realizó la pregunta, usando una consulta `row`. Una vez obtenidos tanto el usuario que pregunta como el que responde, si ambos son distintos, y ninguno de ellos es vacío, se añade una nueva columna con el usuario que responde a la fila correspondiente al usuario que pregunta. La comprobación de que ambos usuarios sean no vacíos se debe a que hay muchos valores de la columna `rawdata:OwnerUserId` que lo son, y que si no se eliminan causan un error en la creación de la tabla.

In [32]:
class BuildQA():
    def __init__(self,connection):
        
        try:
            connection.create_table(
                "users_qa",
                {
                    'answerers': dict(bloom_filter_type='ROW',max_versions=1),
                })
        except:
            print ("Database users_qa already exists.")
            pass

        self.table = connection.table('users_qa')
        self.posts = connection.table('posts')

    def run(self):
        
        with self.table.batch(batch_size=500) as qa:

            for key, data in self.posts.scan(filter="SingleColumnValueFilter('rawdata','PostTypeId', =, 'binary:2')"):
                questioner = self.posts.row(data[b'rawdata:ParentId'], columns=[b'rawdata:OwnerUserId'])
                questioner = questioner[b'rawdata:OwnerUserId'].strip().decode('utf-8')
                answerer = data[b'rawdata:OwnerUserId'].strip().decode('utf-8')
                if questioner != answerer and questioner != "" and answerer != "":
                    qa.put(questioner, {'answerers:User_' + answerer : answerer})
                

Cargamos la tabla:

In [33]:
with hbasecon.connection() as connection:
    BuildQA(connection).run()

A continuación mostramos un ejemplo de consulta en la que, dados un usuario que pregunta y otro que responde, se obtiene si son una pareja al estilo de la RQ4. Dado que una consulta `row` se ejecuta en orden constante, hacer dos de ellas también lo es, por lo que se puede determinar si ambos son recíprocos en tiempo constante.

In [34]:
with hbasecon.connection() as connection:
    users_qa = connection.table('users_qa')
    
    user1 = b'24'
    user2 = b'301'
    if(users_qa.row(user1, columns=['answerers:User_'+user2.decode('utf-8')]) and 
       users_qa.row(user2, columns=['answerers:User_'+user1.decode('utf-8')])):
        print('Los usuarios', user1, 'y', user2, 'son recíprocos.')

Los usuarios b'24' y b'301' son recíprocos.


Por último, vamos a responder a la RQ4 usando la tabla `users_qa`. Para ello, realizamos un `scan` sobre la misma y se comprueba (usando un `row`), para cada columna de la fila, si existe una entrada en la tabla con el id del usuario que responde como clave y el el id del usuario que pregunta como columna. En ese caso, se añaden los ids de ambos usuarios a una lista de tuplas de `python`.

Además, para que no nos salgan valores repetidos, se ha añadido también la condición de que el id de uno de los usuarios sea menor que el otro.

In [35]:
with hbasecon.connection() as connection:
    users_qa = connection.table('users_qa')
    RQ4 = []
    for key, data in users_qa.scan():
        for elem in data:
            if (int(data[elem].decode('utf-8')) < int(key.decode('utf-8')) and 
                users_qa.row(data[elem], columns=['answerers:User_'+key.decode('utf-8')])):
                RQ4.append((data[elem],key))

A continuación vemos los 116 resultados devueltos por la consulta RQ4, ordenados por id del primer usuario, y después por id del segundo:

In [36]:
print(len(RQ4))
sorted(RQ4, key=lambda tup: (int(tup[0].decode('utf-8')), int(tup[1].decode('utf-8'))))

116


[(b'21', b'1184'),
 (b'22', b'288'),
 (b'22', b'342'),
 (b'22', b'729'),
 (b'23', b'25'),
 (b'24', b'25'),
 (b'24', b'95'),
 (b'24', b'301'),
 (b'24', b'463'),
 (b'24', b'2230'),
 (b'24', b'19610'),
 (b'25', b'225'),
 (b'25', b'288'),
 (b'34', b'250'),
 (b'65', b'78'),
 (b'65', b'1184'),
 (b'65', b'7661'),
 (b'65', b'13558'),
 (b'73', b'250'),
 (b'73', b'2224'),
 (b'73', b'6497'),
 (b'73', b'9897'),
 (b'73', b'29967'),
 (b'78', b'250'),
 (b'83', b'100'),
 (b'83', b'301'),
 (b'83', b'29967'),
 (b'95', b'29967'),
 (b'100', b'187'),
 (b'100', b'353'),
 (b'100', b'399'),
 (b'100', b'529'),
 (b'100', b'638'),
 (b'100', b'729'),
 (b'100', b'2004'),
 (b'100', b'2838'),
 (b'120', b'531'),
 (b'154', b'13938'),
 (b'191', b'2230'),
 (b'227', b'342'),
 (b'227', b'400'),
 (b'227', b'2004'),
 (b'250', b'399'),
 (b'250', b'729'),
 (b'250', b'731'),
 (b'250', b'2163'),
 (b'250', b'2230'),
 (b'250', b'2429'),
 (b'250', b'6491'),
 (b'250', b'9897'),
 (b'250', b'10576'),
 (b'250', b'12625'),
 (b'250', b'