In [1]:
from pyspark import SparkContext
""" El número de cores que vamos a utilizar la indicamos al crear el SparkContext mediante el argumento 
local[n] siendo n el número de cores.

Para cambiar este número deberemos parar el SparkContext mediante sc.stop y volver a lanzarlo cambiando n"""
sc = SparkContext("local[6]")

# Definción de las funciones que usaremos

## Función de procesado de los RDDs al formato deseado

In [2]:
def procesado(rdd):
    """ 1º Mapeamos para dividir cada línea en los distintos elementos usando como separador la coma.
    
    2º Mapeamos de manera que tenemos (usuario, modelo, actividad) como clave y luego tenemos las 3
    posiciones en X,Y,Z. Las mapeo 3 veces de manera normal para el cálculo de la media, el máximo y el
    mínimo y los mapeo una vez al cuadrado para sacar la std. El últimmo elemento del mape será el uno 
    que ya hemos usado varias veces para hacer un conteo de los elementos y poder calcular la media.
    
    3º Reducimos mediante la llave que hemos definido antes y hacemos las siguientes operaciones 2 a 2:
        - Sumamos los elementos 0, 1, 2 para calcular luego la media
        - Sumamos los elementos 3, 4, 5 para calcular luego las std
        - Con los valores 6, 7, 8 nos vamos quedando con el valor máximo a cada par de datos
        - Con los valores 9, 10, 11 nos vamos quedando con el valor mínimo a cada par de datos
    
    4º Mapeamos los valores (usamos mapValues por "aplanar" la salida) de manera que:
        - Dividimos los valores 0, 1, 2 entre el 12 que tiene el número total de datos y tenemos la media
        - Hacemos lo mismo con los valores cuadráticos 3, 4, 5 siendo un paso intermedio para la std
        - Los valores 6 - 11 representan ya los máximos y mínimos en x,y,x respectivamente así que los
        pasamos directamente sin cambiar
        
    5º Por último tenemos un map que nos da:
        - Los valores 0 - 2 (media), 6 - 8 (máximo), 9 - 11 (mínimo) ya son resultados finales así que los
        pasamos tal cual
        - Los valores 3, 4, 5 (cuadrados entre el número total de datos) los restamos al valor de la media
        al cuadrado y luego hacemos la raiz cuadrada como hicimos en los ejercicios básicos
        """
    rdd_fin = (rdd
                   .map(lambda l: l.split(','))
                   .map(lambda p: [(p[6], p[7], p[9])
                                   ,(float(p[3]), float(p[4]), float(p[5])
                                     ,float(p[3])**2, float(p[4])**2, float(p[5])**2
                                     ,float(p[3]), float(p[4]), float(p[5])
                                     ,float(p[3]), float(p[4]), float(p[5]),1)])
                   .reduceByKey(lambda v1,v2: (v1[0]+v2[0], v1[1]+v2[1], v1[2]+v2[2]
                                              ,v1[3]+v2[3],v1[4]+v2[4], v1[5]+v2[5]
                                              ,v1[6] if v1[6]>v2[6] else v2[6], v1[7] if v1[7]>v2[7] else v2[7], v1[8] if v1[8]>v2[8] else v2[8]
                                              ,v1[9] if v1[9]<v2[9] else v2[9], v1[10] if v1[10]<v2[10] else v2[10], v1[11] if v1[11]<v2[11] else v2[11]
                                              ,v1[12]+v2[12]))
                   .mapValues(lambda v: (v[0]/v[12],v[1]/v[12],v[2]/v[12]
                                         ,v[3]/v[12],v[4]/v[12],v[5]/v[12]
                                         ,v[6],v[7],v[8]
                                         ,v[9],v[10],v[11]))
                   .mapValues (lambda v: (v[0],v[1],v[2]
                                          ,(v[3]-v[0]**2)**0.5,(v[4]-v[1]**2)**0.5,(v[5]-v[2]**2)**0.5
                                          ,v[6],v[7],v[8]
                                          ,v[9],v[10],v[11])))
    return rdd_fin

## Función de unión de los 4 RDDs a uno solo

In [3]:
def JoinAll(rdd1, rdd2, rdd3, rdd4):
    """ 
    Los 4 argumentos que le pasamos son los 4 RDDs que estamos creando a partir de los ficheros.
    
    Unimos por un lado los RDDs de los teléfonos y por otro los de los relojes mediante un join 
    aprovechando que tienen las mismas claves 2 a 2.
    
    Finalmente unimos los 2 RDDs resultantes en otro. Estos no comparten claves así que tenemos que 
    usar otro tipo de join. Podríamos usar Left o Right indiferentemente. También podríamos usar Full,
    que nos daría lo mismo pero con más Nones por las no coincidencias de las llaves.
    
    Otra opción sería usar union directamente como se dice en el enunciado debido a que tienen la misma
    estructura los 4 RDDs y por lo tanto no nos dará ningún problema"""
    
    joined_1 = rdd1.join(rdd2) 
    joined_2 = rdd3.join(rdd4)
    joined = joined_2.union(joined_1)
    return joined

# Lectura de los ficheros para crear los RDD

In [4]:
lines_ph_ac = sc.textFile("Phones_accelerometer.csv")
lines_ph_gy = sc.textFile("Phones_gyroscope.csv")
lines_wa_ac = sc.textFile("Watch_accelerometer.csv")
lines_wa_gy = sc.textFile("Watch_gyroscope.csv")

In [6]:
rdd_ph_ac = procesado(lines_ph_ac)
rdd_ph_gy = procesado(lines_ph_gy)
rdd_wa_ac = procesado(lines_wa_ac)
rdd_wa_gy = procesado(lines_wa_gy)

In [8]:
rdd_all = JoinAll(rdd_ph_ac, rdd_ph_gy, rdd_wa_ac, rdd_wa_gy)

In [7]:
rdd_all.take(3)

In [9]:
%time rdd_all.collect()
print("ya")

Wall time: 4min 23s
ya


In [9]:
# sc.stop()

# Tabla de tiempos de ejecución para el collect()

| Número de cores | Tiempo de ejecución |
| :- | :- 
| 2 | 7min 12s
| 4 | 4min 24s
| 6 | 3min 59s 