# Base de datos Cassandra

## Conexion y borrado de datos

In [1]:
%load_ext cql

In [2]:
%%cql
DROP KEYSPACE practica_8anu;

'No results.'

In [3]:
%%cql
CREATE KEYSPACE practica_8anu 
WITH replication = {'class':'SimpleStrategy', 'replication_factor': 1};

'No results.'

In [4]:
%cql USE practica_8anu;

'No results.'

## Creacion de tablas

** TABLA ESCALADORES **

In [5]:
%%cql 
CREATE TABLE escaladores (
    id_escalador    int,
    nombre          text,
    sexo            text,
    pais            text,
    anio_comienzo   int,
    PRIMARY KEY (id_escalador)
);

'No results.'

** TABLA DE ASCENSOS **

%%cql 
CREATE TABLE ascensos (
    uid                  uuid,
    id_escalador         int,
    nombre_escalador     text,
    pais_escalador       text,
    anio_comienzo        int,
    nombre_via           text,
    dificultad           int,
    grado_frances        text,
    tipo_encadenamiento  text,
    risco                text,
    pais_risco           text,
    PRIMARY KEY (uid, id_escalador, tipo_encadenamiento)
);

In [6]:
%%cql 
CREATE TABLE ascensos (
    uid                  uuid,
    id_escalador         int,
    nombre_escalador     text,
    pais_escalador       text,
    anio_comienzo        int,
    nombre_via           text,
    dificultad           int,
    grado_frances        text,
    tipo_encadenamiento  text,
    risco                text,
    pais_risco           text,
    PRIMARY KEY (id_escalador, tipo_encadenamiento, dificultad, pais_risco, uid)
);

'No results.'

** TABLA DE ASCENSOS ACUMULADOS POR ESCALADOR **

**Ascensos acumulados por sexo y escalador**

In [7]:
%%cql 
CREATE TABLE ascensos_acumulados_hombres (
    id_escalador    int,
    num_ascensos    counter,
    dificultad_acum counter,
    PRIMARY KEY (id_escalador)
);

'No results.'

In [8]:
%%cql 
CREATE TABLE ascensos_acumulados_mujeres (
    id_escalador    int,
    num_ascensos    counter,
    dificultad_acum counter,
    PRIMARY KEY (id_escalador)
);

'No results.'

** Acumulados de escaladores españoles y no españoles en españa: **

In [9]:
%%cql 
CREATE TABLE ascensos_acum_esp_esp (
    id_escalador    int,
    nombre_escalador text,
    pais_escalador   text,
    num_ascensos    counter,
    PRIMARY KEY (id_escalador, nombre_escalador, pais_escalador)
);

'No results.'

In [10]:
%%cql 
CREATE TABLE ascensos_acum_noesp_esp (
    id_escalador    int,
    nombre_escalador text,
    pais_escalador   text,
    num_ascensos    counter,
    PRIMARY KEY (id_escalador, nombre_escalador, pais_escalador )
);

'No results.'

**Ascensos acumulados por risco y pais:**

In [11]:
%%cql 
CREATE TABLE ascensos_acum_pais_risco (
    pais_risco   text,
    risco        text,
    num_ascensos counter,
    PRIMARY KEY (pais_risco, risco)
);

'No results.'

** TABLA DE DIFICULTADES **

In [12]:
%%cql 
CREATE TABLE dificultades (
    dificultad    int,
    grado_frances text,
    PRIMARY KEY (dificultad)
);

'No results.'

** VISTAS MATERIALIZADAS **

In [13]:
%%cql 
CREATE MATERIALIZED VIEW ascensos_por_encadenamiento_escalador AS
    SELECT * FROM ascensos
    WHERE uid IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND id_escalador IS NOT NULL
        AND nombre_escalador IS NOT NULL
        AND pais_escalador IS NOT NULL
        AND anio_comienzo IS NOT NULL
        AND nombre_via IS NOT NULL
        AND dificultad IS NOT NULL
        AND grado_frances IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND risco IS NOT NULL
        AND pais_risco IS NOT NULL 
    PRIMARY KEY (tipo_encadenamiento, id_escalador, dificultad, nombre_via, pais_risco, uid) 
    WITH CLUSTERING ORDER BY (dificultad DESC, nombre_via ASC);

'No results.'

In [14]:
%%cql 
CREATE MATERIALIZED VIEW ascensos_por_encadenamiento_dificultad AS
    SELECT * FROM ascensos
    WHERE uid IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND id_escalador IS NOT NULL
        AND nombre_escalador IS NOT NULL
        AND pais_escalador IS NOT NULL
        AND anio_comienzo IS NOT NULL
        AND nombre_via IS NOT NULL
        AND dificultad IS NOT NULL
        AND grado_frances IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND risco IS NOT NULL
        AND pais_risco IS NOT NULL 
    PRIMARY KEY (tipo_encadenamiento, dificultad, id_escalador, nombre_via, pais_risco, uid) 
    WITH CLUSTERING ORDER BY (dificultad DESC);

'No results.'

In [15]:
%%cql 
CREATE MATERIALIZED VIEW ascensos_por_escalador_paisRisco AS
    SELECT * FROM ascensos
    WHERE uid IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND id_escalador IS NOT NULL
        AND nombre_escalador IS NOT NULL
        AND pais_escalador IS NOT NULL
        AND anio_comienzo IS NOT NULL
        AND nombre_via IS NOT NULL
        AND dificultad IS NOT NULL
        AND grado_frances IS NOT NULL
        AND tipo_encadenamiento IS NOT NULL
        AND risco IS NOT NULL
        AND pais_risco IS NOT NULL 
    PRIMARY KEY (id_escalador, pais_risco, dificultad, tipo_encadenamiento, nombre_via, uid) 
    WITH CLUSTERING ORDER BY (dificultad DESC);

'No results.'

## Lectura de la informacion en Pandas

In [16]:
import pandas as pd

df_dificultades = pd.read_csv("./data/dificultades.csv",encoding='utf-8');
df_encadenamientos = pd.read_csv("./data/tipos_encadenamiento.csv", encoding='utf-8');
df_escaladores = pd.read_csv("./data/escaladores_lite_2017.csv", encoding='utf-8');
df_ascensos = pd.read_csv("./data/ascensos_lite_2017.csv", encoding='utf-8');

# 'merge' del tipo de encadenamiento en el data frame de ascensos
df_ascensos = pd.merge(df_ascensos, df_encadenamientos, on = ['id_tipo_encadenamiento'], how = 'inner')
columnas = ['id_escalador',
            'id_dificultad',
            'tipo_encadenamiento',
            'nombre_via',
            'risco',
            'sector',
            'pais']
df_ascensos = df_ascensos[columnas]

## Desnormalización del data frame de ascensos

Voy a incluir la información del usuario en cada registro de ascenso, para ir creando los buckets que van interesando según las preguntas a resolver

In [17]:
df_ascensos = pd.merge(df_ascensos, df_escaladores, on = ['id_escalador'], how = 'left');
df_ascensos = pd.merge(df_ascensos, df_dificultades, on = ['id_dificultad'], how = 'left');

# Renombrar campos por claridad
df_ascensos = df_ascensos.rename(
    columns={
        'id_dificultad' : 'dificultad',
        'nombre': 'nombre_escalador',
        'ciudad':'ciudad_escalador',
        'pais_y': 'pais_escalador',
        'comienzo': 'anio_comienzo',
        'pais_x': 'pais_risco'});


## Carga de datos en Cassandra

In [18]:
from cassandra.cluster import Cluster, BatchStatement, ConsistencyLevel
cluster = Cluster()
session = cluster.connect('practica_8anu')

**CARGA EN TABLAS DE ASCENSOS Y ACUMULADOS**

In [None]:
def insert_ascensos(df):
    cql_insert_ascenso = """
        INSERT INTO ascensos (
            uid,
            id_escalador,
            nombre_escalador,
            pais_escalador,
            anio_comienzo,
            nombre_via,
            dificultad,
            grado_frances,
            tipo_encadenamiento,
            risco,
            pais_risco
        ) VALUES (now(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
    

    
    for index in df.index:
        data = [
            df.ix[index,"id_escalador"],
            df.ix[index,"nombre_escalador"],
            df.ix[index,"pais_escalador"],
            df.ix[index,"anio_comienzo"],
            df.ix[index,"nombre_via"],
            df.ix[index,"dificultad"],
            df.ix[index,"grado_frances"],
            df.ix[index,"tipo_encadenamiento"],
            df.ix[index,"risco"],
            df.ix[index,"pais_risco"]
        ]

        session.execute(cql_insert_ascenso, data)
        
        session.execute("""UPDATE ascensos_acum_pais_risco
                            SET num_ascensos = num_ascensos + 1
                            WHERE pais_risco = %s AND risco = %s""",
                            [df.ix[index,"pais_risco"],
                             df.ix[index,"risco"]])
        
        
        if (df.ix[index,"sexo"] == 'Hombre'):
            session.execute("""UPDATE ascensos_acumulados_hombres
                               SET num_ascensos = num_ascensos + 1,
                                   dificultad_acum = dificultad_acum + %s
                               WHERE id_escalador = %s""",
                            [df.ix[index,"dificultad"], df.ix[index,"id_escalador"]])
        else:
            session.execute("""UPDATE ascensos_acumulados_mujeres
                               SET num_ascensos = num_ascensos + 1,
                                   dificultad_acum = dificultad_acum + %s
                               WHERE id_escalador = %s""",
                            [df.ix[index,"dificultad"], df.ix[index,"id_escalador"]])
            
        if (df.ix[index,"pais_risco"] == 'ESP'):
            if(df.ix[index,"pais_escalador"] == 'ESP'):
                session.execute("""UPDATE ascensos_acum_esp_esp
                                SET num_ascensos = num_ascensos + 1
                                WHERE id_escalador = %s 
                                    AND nombre_escalador = %s
                                    AND pais_escalador = %s""",
                                [df.ix[index,"id_escalador"],
                                 df.ix[index,"nombre_escalador"],
                                 df.ix[index,"pais_escalador"]])
            else:
                session.execute("""UPDATE ascensos_acum_noesp_esp
                                SET num_ascensos = num_ascensos + 1
                                WHERE id_escalador = %s 
                                    AND nombre_escalador = %s
                                    AND pais_escalador = %s""",
                                [df.ix[index,"id_escalador"],
                                 df.ix[index,"nombre_escalador"],
                                 df.ix[index,"pais_escalador"]])
        

In [None]:
insert_ascensos(df_ascensos)

** CARGA EN TABLA DE DIFICULTADES**

In [None]:
def insert_dificultades(df):
    cql_insert = """
        INSERT INTO dificultades (
            dificultad,
            grado_frances
        ) VALUES (%s, %s)"""
    
    for index in df.index:
        data = [df.ix[index, "id_dificultad"],
                df.ix[index, "grado_frances"]]
        session.execute(cql_insert, data)

In [None]:
insert_dificultades(df_dificultades)

** CARGA EN TABLA DE ESCALADORES **

In [None]:
def insert_escaladores(df):
    cql_insert = """
        INSERT INTO escaladores (
            id_escalador,
            nombre,
            sexo,
            pais,
            anio_comienzo
        ) VALUES (%s, %s, %s, %s, %s)"""
    
    for index in df.index:
        data = [
            df.ix[index, "id_escalador"],
            df.ix[index, "nombre"],
            df.ix[index, "sexo"],
            df.ix[index, "pais"],
            df.ix[index, "comienzo"]]
        session.execute(cql_insert, data)

In [None]:
insert_escaladores(df_escaladores)

# RESPUESTAS A LAS PREGUNTAS

In [None]:
def execute_query(cql, data=[]):
    rows = session.execute(cql, data)
    return pd.DataFrame(list(rows))

### 1.a) Los 10 escaladores (hombres) más activos (orden auxiliar por Id)

Esto requiere varias consultas a dos tablas diferentes:
- Una consulta a la tabla de acumulados de hombres de la que se obtienen los ids de los escaladores con mas ascensos acumulados
- 10 consultas a la tabla de escaladores, para sacar los datos del escalador.

Por desgracia las tablas de acumulación (con columnas counter) tienen las siguientes particularidades:

- exceptuando una columna que hará de partition key, estas tablas no pueden contener columnas de otro tipo diferente a counter a no ser que esas nuevas columnas se introduzcan como parte de la primary key: https://stackoverflow.com/questions/32562500/cassandra-non-counter-family


- una columna counter no puede ser clustering key por lo que no se puede utilizar como columna de ordenación. Esto hace que la ordenación haya hacerla fuera.


In [None]:
cql = """
SELECT *
FROM ascensos_acumulados_hombres
"""
df_id_acum = execute_query(cql)

df_id_acum = df_id_acum.sort_values(['num_ascensos', 'id_escalador'],ascending=[False, False]).head(10)

In [None]:
cql = """
SELECT *
FROM escaladores
WHERE id_escalador=%s
"""

df = pd.DataFrame()
for index in df_id_acum.index:
    df_escalador = execute_query(cql, [df_id_acum.ix[index,'id_escalador']])
    df_escalador["num_ascensos"] = df_id_acum.ix[index, "num_ascensos"]
    
    df = df.append(df_escalador)
    
df = df.reset_index(drop=True)
id_escalador_mas_activo = df.iloc[0]['id_escalador']

columnas = ["id_escalador", "nombre", "sexo", "pais", "num_ascensos"]
df[columnas]  
    

### 1.b) Los 10 escaladoras (mujeres) más activas (orden auxiliar por Id)

In [None]:
cql = """
SELECT *
FROM ascensos_acumulados_mujeres
"""
df_id_acum = execute_query(cql)

df_id_acum = df_id_acum.sort_values(['num_ascensos', 'id_escalador'],ascending=[False, False]).head(10)

In [None]:
cql = """
SELECT *
FROM escaladores
WHERE id_escalador=%s
"""

df = pd.DataFrame()
for index in df_id_acum.index:
    df_escalador = execute_query(cql, [df_id_acum.ix[index,'id_escalador']])
    df_escalador["num_ascensos"] = df_id_acum.ix[index, "num_ascensos"]
    
    df = df.append(df_escalador)
    
df = df.reset_index(drop=True)

id_escaladora_mas_activa = df.iloc[0]['id_escalador']

columnas = ["id_escalador", "nombre", "sexo", "pais", "num_ascensos"]
df[columnas] 



### 2. Lista de los 10 ascensos "On sight" de la escaladora más activa en orden decreciente de dificultad (y por nombre de via ascendete)

De la pregunta anterior nos hemos guardado el id de la escaladora más activa. 

Para la respuesta a esta pregunta he creado una vista materializada de la tabla de ascensos, ascensos_por_encadenamiento, cuya clave de particion es el tipo de encadenamiento y la ordenacion se hace por dificultad y por nombre de via. 

In [None]:
cql = """
SELECT *
FROM ascensos_por_encadenamiento_escalador
WHERE tipo_encadenamiento = 'Onsight' AND id_escalador = %s 
LIMIT 10
"""

df = execute_query(cql, [id_escaladora_mas_activa])
columnas = ['nombre_via', 'grado_frances', 'risco', 'pais_risco']
df[columnas]

### 3. Dificultad media de los ascensos del escalador más activo

Esto requiere dos consultas:

- A la tabla ascensos_acumulados_hombres para obtener los datos para poder calcular la dificultad media.
- A la tabla de dificultades_grado para obtener el grado frances de la media


In [None]:
# query a tabla de ascensos acumulados
cql = """
SELECT *
FROM ascensos_acumulados_hombres
WHERE id_escalador = %s
"""

df = execute_query(cql, [id_escalador_mas_activo])
dificultad_media = int(round(df["dificultad_acum"]/df["num_ascensos"]))

# query a la tabla de dificultades
cql = """
SELECT grado_frances
FROM dificultades
WHERE dificultad = %s
"""
execute_query(cql, [dificultad_media])


### 4.a) Los 10 ascensos mas dificiles

La vista materializada tiene ordenados los **ascensos_por_encadenamiento** para cada tipo de encadenamiento. Podríamos resolver esta cuestion de la manera siguiente:

- Obtener mediante 4 queries los 10 primeros ascensos para cada tipo de encadenamiento
- Unir (append) los data frames resultado
- Reordenar el data frame

Pero en este caso vamos a recurrir a un conocimiento a priori del domino de la escalada: los grados más difíciles se encadenan siempre en "Red point"; esto se debe a que los ascensos más dificiles requieren siempre del escalador un entrenamiento en la propia via, por lo que el ascenso final de la via es "Red point".

In [None]:
cql = """
SELECT *
FROM ascensos_por_encadenamiento_dificultad
WHERE tipo_encadenamiento = 'Redpoint'
LIMIT 10
"""

df = execute_query(cql)
columnas = ['id_escalador','nombre_escalador','pais_escalador','nombre_via','grado_frances','tipo_encadenamiento','risco','pais_risco']
df[columnas]

### 4.b) Los 10 ascensos mas dificiles a vista (On sight)

In [None]:
cql = """
SELECT *
FROM ascensos_por_encadenamiento_dificultad
WHERE tipo_encadenamiento = 'Onsight'
LIMIT 10
"""

df = execute_query(cql)
columnas = ['id_escalador',
            'nombre_escalador',
            'pais_escalador',
            'nombre_via',
            'grado_frances',
            'tipo_encadenamiento',
            'risco',
            'pais_risco']
df[columnas]

### 5.a) Grado medio y maximo de los ascensos en España de los 10 escaladores NO ESPAÑOLES con mas ascensos en España

La obtencion de esta información mediante accesos a cassandra requiere varios accesos. Esto se debe a que se quiere visualizar 
información de varias tablas. nombre del escalador, pais, numero de ascensos en españa, grado medio y grado maximo.

- Query a la tabla de acumulados de ascensos de escaladores no españoles en españa la obtencion de los escaladores con mas ascensos en España

- Query a la tabla de dificultades y mapear la dificultad media a su grado correspondiente

- Por cada uno de los escaladores:
    - Query a la vista materializada de ascensos por escaldor y pais del risco para la obtencion de la dificultad maxima
    
    
Aprovechando al maximo la funcionalidad de la bbdd supone entonces 12 queries!

In [None]:
cql = """
SELECT *
FROM dificultades
"""
# query para la obtencion del grado medio a partir de la dificultad media
df_dificultades = execute_query(cql)

# configuro dificultad como indice del data frame para el futuro mapping
df_dificultades = df_dificultades.sort_values('dificultad',ascending=True);
df_dificultades = df_dificultades.set_index('dificultad');

In [None]:
cql = """
SELECT *
FROM ascensos_acum_noesp_esp
"""
df_ids = execute_query(cql)
df_ids = df_ids.sort_values(['num_ascensos', 'id_escalador'],ascending=[False, True]).head(10)


cql_dif_mediaYmax = """
SELECT count(dificultad) AS num_ascensos,
       avg(dificultad) AS dificultad_media,
       max(dificultad) AS dificultad_maxima
FROM ascensos_por_escalador_paisRisco
WHERE id_escalador=%s AND pais_risco='ESP'
LIMIT 1
"""


df = pd.DataFrame()
for index,row in df_ids.iterrows():
    
    #query para obtención de grado maximo del escalador
    df_query = execute_query(cql_dif_mediaYmax, [row['id_escalador']])   
    
    # añade columnas que faltan
    df_query['nombre_escalador'] = row['nombre_escalador'];
    df_query['pais_escalador'] = row['pais_escalador'];
    
    #añade al dataframe resultado
    df = df.append(df_query, ignore_index=True)
    
df['grado_medio']=df['dificultad_media'].map(df_dificultades['grado_frances']);
df['grado_maximo']=df['dificultad_maxima'].map(df_dificultades['grado_frances']);
columnas = ["nombre_escalador", "pais_escalador", "num_ascensos", "grado_medio", "grado_maximo"]
df[columnas] 

### 5.b) Grado medio y maximo de los ascensos en España de los 10 escaladores ESPAÑOLES con mas ascensos en España

In [None]:
cql = """
SELECT *
FROM ascensos_acum_esp_esp
"""
df_ids = execute_query(cql)
df_ids = df_ids.sort_values(['num_ascensos', 'id_escalador'],ascending=[False, True]).head(10)


cql_dif_mediaYmax = """
SELECT count(dificultad) AS num_ascensos,
       avg(dificultad) AS dificultad_media,
       max(dificultad) AS dificultad_maxima
FROM ascensos_por_escalador_paisRisco
WHERE id_escalador=%s AND pais_risco='ESP'
LIMIT 1
"""


df = pd.DataFrame()
for index,row in df_ids.iterrows():
    
    #query para obtención de grado maximo del escalador
    df_query = execute_query(cql_dif_mediaYmax, [row['id_escalador']])   
    
    # añade columnas que faltan
    df_query['nombre_escalador'] = row['nombre_escalador'];
    df_query['pais_escalador'] = row['pais_escalador'];
    
    #añade al dataframe resultado
    df = df.append(df_query, ignore_index=True)
    
df['grado_medio']=df['dificultad_media'].map(df_dificultades['grado_frances']);
df['grado_maximo']=df['dificultad_maxima'].map(df_dificultades['grado_frances']);
columnas = ["nombre_escalador", "pais_escalador", "num_ascensos", "grado_medio", "grado_maximo"]
df[columnas] 

### 6.a) Dificultad media y maxima de los ascensos NO "Top Rope" de los escaladores con menos de 3 años de experiencia

### 6.b) Dificultad media y maxima de los ascensos NO "Top Rope" de los escaladores con entre 10 y 30 años de experiencia

### 7. Los 10 riscos españoles (o zonas) con mas ascensos por orden decreciente de numero de ascensos

### 8.a) Los 10 sectores españoles con mayor nivel de difcultad media de ascensos ordenadas por orden decreciente de dificultad y por numero de ascensos decreciente

### 8.b) Las 10 sectores españoles con menor nivel de dificultad medio de ascensos ordenadas por orden creciente de dificultad y por numero de ascensos decreciente