# Método del "elemento pivote"

## 1. Préambulo

Spark context y lectura del archivo. 

In [1]:
from pyspark import SparkContext
import pyspark.sql
from pyspark.sql import Row
sc=SparkContext()
ss=pyspark.sql.SparkSession(sc)

In [88]:
rdd=sc.textFile('cociente_clean_uniform_od.csv').map(lambda x: x.split(';'))
print('Numero de registros: ',rdd.count())
rdd.take(10)

Numero de registros:  32152


[['', 'ASENJO HERRANZ', 'REBECA', '20/02/1980', '{2}'],
 ['', 'BES SEVIL', 'J VICENTE', '05/04/1956', '{8, 175}'],
 ['', 'AMARILLA DIAZ', 'RANULFO', '22/11/1967', '{9}'],
 ['', 'AMBRONA DE LA FUE', 'M CARM', '21/06/1945', '{161, 11, 163}'],
 ['', 'CANTERO OLMEDO', 'C', '30/04/1954', '{15}'],
 ['', 'CASTILLO HERNANDE', 'PABLO', '25/03/1953', '{17}'],
 ['', 'CALLE CALLE', 'M ANUNCIACION', '25/03/1947', '{22}'],
 ['', 'DE FRUTOS PEREZ', 'M PILAR', '12/12/1954', '{34}'],
 ['', 'GUTIERREZ LORENZO', 'J M', '31/08/1969', '{608, 1328, 46}'],
 ['', 'GARCIA RUIZ BLAS', '299724', '04/12/1940', '{47}']]

In [89]:
def blank_to_none(x):
    if x[0]!='':
        return x
    else:
        x[0]=None
        return x
    
def date_to_int(x):
    x[3]=int(x[3].replace("/",""))
    return x

def string_to_set(x):
    x[4]=list(eval(x[4]))
    return x

In [90]:
rdd=rdd.map(string_to_set).map(blank_to_none).map(date_to_int)

In [2]:
from pyspark.sql.types import *
from datetime import datetime

In [75]:
schema = StructType([StructField('num', dataType=IntegerType(), nullable=True),
                     StructField('apellidos', StringType(), True),
                     StructField('nombre', StringType(), True),
                     StructField('fecha', IntegerType(), True),
                     StructField('index', ArrayType(IntegerType()), True)])

In [91]:
df=ss.createDataFrame(rdd, schema)
df = df.withColumn("num", df["num"].cast(IntegerType()))

In [92]:
df.printSchema()

root
 |-- num: integer (nullable = true)
 |-- apellidos: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- fecha: integer (nullable = true)
 |-- index: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [94]:
df.show()

+----+-----------------+--------------+--------+---------------+
| num|        apellidos|        nombre|   fecha|          index|
+----+-----------------+--------------+--------+---------------+
|null|   ASENJO HERRANZ|        REBECA|20021980|            [2]|
|null|        BES SEVIL|     J VICENTE| 5041956|       [8, 175]|
|null|    AMARILLA DIAZ|       RANULFO|22111967|            [9]|
|null|AMBRONA DE LA FUE|        M CARM|21061945| [161, 11, 163]|
|null|   CANTERO OLMEDO|             C|30041954|           [15]|
|null|CASTILLO HERNANDE|         PABLO|25031953|           [17]|
|null|      CALLE CALLE| M ANUNCIACION|25031947|           [22]|
|null|  DE FRUTOS PEREZ|       M PILAR|12121954|           [34]|
|null|GUTIERREZ LORENZO|           J M|31081969|[608, 1328, 46]|
|null| GARCIA RUIZ BLAS|        299724| 4121940|           [47]|
|null|HORCAJADA JIMENEZ|       M PILAR| 9011942|           [50]|
|null|     LOPEZ VILLAR|           M J|23011962|           [63]|
|null|    LOPEZM ORTEGA| 

In [None]:
def merge(x):
        if len(x[1])==1:                          
        return (x[0],list(x[1]))
    else:                                      
        lista=list(x[1])
        for i in reversed(range(len(lista))):            
            for j in range(0,i):
                if Matching(lista[i],lista[j]):
                    lista[j][3]+=lista[i][3]
                    if lista[j][0]=='': lista[j][0]=lista[i][0]  #Línea 'ad hoc' para no perder el número de historia.
                    del lista[i]
                    break
        return (x[0],lista) 

In [96]:
df.groupBy("fecha").show

AttributeError: 'GroupedData' object has no attribute 'show'

## 2. Rutinas necesarias

### 2.1 Función de Matching

Adaptada a las necesidades del elemento pivote, esto es, sólo hay que comparar 3 campos dado que el 4o es sobre el que se esta pivotando.

In [None]:
def Matching(x,y):
    if x[0]==y[0]:
        if x[1]==y[1]:
            return True
        elif x[2]==y[2]:
            return True
    elif x[1]==y[1] and x[2]==y[2]:
        return True
    else:
        return False

### 2.2 Función de Pivotaje

Función que pivota sobre el elemento clave _(por defecto, el primero)_, y se deja como valor el resto. El valor esta en forma de lista y no de tupla para poder editar sus valores al fusionar dos registros.

In [None]:
def Pivotaje(x):
    return [x[0],[x[1],x[2],x[3],x[4]]]

def PivotajeInv(x):
    return [x[0],x[1][0],x[1][1],x[1][2],x[1][3]]
        

###  2.3 Compare and Merge

Función que procede de la siguiente manera:

1. Si la entrada solo tiene un valor, lo devuelve y lo deja intacto.

2. En caso contrario, se transforma el conjunto de registros con la misma fecha a una lista `lista` para recorrerla de la siguiente manera: Se toma un elemento `i` de la lista (empezando **desde el final**), y se va comparando con el elemento primero, luego con el segundo, etc. 

3. En cuanto coincide con algun elemento `j`, se realizan las siguientes instrucciones:
    Primero, se añaden los índices asociados al paciente `i` a la lista de índices del paciente `j`.
    Segundo, si el numero de historia en la entrada `j` esta vacío (cosa bastante frecuente), se actualiza al de la entrada `i`.
    Por último, se borra la entrada `i` de la lista.

4. Se devuelve `(x[0],lista)`

In [None]:
def Merge(x):
    if len(x[1])==1:                          
        return (x[0],list(x[1]))
    else:                                      
        lista=list(x[1])
        for i in reversed(range(len(lista))):            
            for j in range(0,i):
                if Matching(lista[i],lista[j]):
                    lista[j][3]+=lista[i][3]
                    if lista[j][0]=='': lista[j][0]=lista[i][0]  #Línea 'ad hoc' para no perder el número de historia.
                    del lista[i]
                    break
        return (x[0],lista) 

## 3. Ejecución y pruebas

## 3.1 Pruebas con una muestra de 20 fechas

In [None]:
rdd2=rdd.map(Pivotaje).groupByKey()


In [None]:
a=rdd2.take(20)
b=sc.parallelize(a)

Vemos los bloques dentro de los cuales vamos a comparar:

In [None]:
for (a,d) in b.collect():
    print("Bloque de ",a)
    for i in d:
        print(i)
    print("\n")

Aplicamos la función de Merge y vemos el resultado

In [None]:
c=b.map(Merge)
c.collect()

In [None]:
c.flatMapValues(lambda x: x).collect()

## 3.2 Ejecución con todos los registros

In [None]:
import time

### 3.2.1 Fecha

In [None]:
rdd.

In [None]:
t=time.time()
rdd3=rdd2.map(Merge).flatMapValues(lambda x: x).map(PivotajeInv)
print("Tiempo: ", time.time()-t)
print("Numero de registros: ", rdd3.count())
print("Errores: ", N-rdd3.count())