In [1]:
sc

<pyspark.context.SparkContext at 0x7f77c41cfbd0>

In [2]:
# vamos a hacer la media de 3 personas por sexo.
ps = [('Pepe', 'M', 32), 
     ('Veronica', 'F',37),
     ('Juan', 'M', 28)]

In [3]:
# Para crear un objeto para guardar los datos.
rdd = sc.parallelize(ps)

In [4]:
rdd
# No saca nada porque aún no se hace nada.

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

In [5]:
rdd.collect()
# Par verla se usa collect

[('Pepe', 'M', 32), ('Veronica', 'F', 37), ('Juan', 'M', 28)]

In [11]:
# En python la media de todos se haría. Hacemos la suma para no importar el modulo de math.
sum([x[2] for x in ps])

97

In [12]:
# en el rdd no se puede iterar.
rdd.map(lambda x: x[2]).sum()

97

In [13]:
# o la media:
rdd.map(lambda x: x[2]).mean()

32.333333333333336

In [14]:
### Vamos ahora a hacerlo por el genero.
kv = rdd.map(lambda x: (x[1], x[2]))

In [16]:
kv.collect()

[('M', 32), ('F', 37), ('M', 28)]

In [17]:
# Una clave con varios valores. Ej:
kv2 = rdd.map(lambda x: (x[1], (x[0],x[2])))

In [18]:
kv2.collect()

[('M', ('Pepe', 32)), ('F', ('Veronica', 37)), ('M', ('Juan', 28))]

In [21]:
# seguimos con el 14. Se agrupa por clave. M y F.
kv.groupByKey().collect()

[('F', <pyspark.resultiterable.ResultIterable at 0x7f77b055a210>),
 ('M', <pyspark.resultiterable.ResultIterable at 0x7f77b055aad0>)]

In [22]:
# Calculamos la suma por claves o grupos de valores M y F. Transformamos 2 eltos a 2 eltos que son las medias.
kv.groupByKey().map(lambda vs: (x[0], sum(x[1]))).collect()

[('F', 37), ('M', 60)]

In [27]:
# Otra forma de hacerlo es usando mapvalues. Se usa cuando no tenemos que usar para nada la clave.
kv.groupByKey().mapValues(lambda vs: sum(vs)).collect()

[('F', 37), ('M', 60)]

In [24]:
# Ahora la media
kv.groupByKey().mapValues(lambda vs: sum(vs)/len(vs)).collect()

[('F', 37), ('M', 30)]

## Uso de datos reales

In [41]:
# Leemos los datos de cupones
ls = sc.textFile('data/coupon150720.csv') # el segundo argumento es el núm. de particiones, no necesario.

In [29]:
# cogemos cinco lineas para verlos son aleatorias. Cada linea es un cupon. Es un pasajero.
ls.take(5)

[u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0',
 u'79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0',
 u'79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0',
 u'79065668570385,1,DEL,DXB,9W,9W,160.63,USD,2,S,S,0546,150804,OK,INA0',
 u'79065668737021,1,AUH,IXE,9W,9W,152.46,USD,1,V,V,0501,150803,OK,INA0']

In [30]:
ls.first() # sí que devuelve la primera línea.

u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0'

In [31]:
ls.count() # numero de líneas.

1232662

In [32]:
ls.cache() # para guardar en cache y que no recalcule.

MapPartitionsRDD[41] at textFile at NativeMethodAccessorImpl.java:-2

In [33]:
ls.persist() #para decirle como va a hacerse la cache.

MapPartitionsRDD[41] at textFile at NativeMethodAccessorImpl.java:-2

In [34]:
ls.getNumPartitions() # num. de particiones actualmente. Lo selecciona Spark.

3

In [42]:
# en todos los pasajeros sumar cúantos han volado con Iberia.
# contamos el contador de iberia IB
ls.map(lambda x: x.split(',')).filter(lambda x: x[5]=='IB').count()

26158

In [43]:
# Solución. Mas eficiente que la anterior.
aerolineas = ls.map(lambda x: x.split(',')[5])
aerolineas.filter(lambda x: x == 'IB').count()

# otra forma
ls.map(lambda x: x.split(',')[5]).filter(lambda x: x == 'IB').count()

26158

In [46]:
## Cuanto dinero son esos 26.158 pasajeros??
# funcion para acelerar la selección. Funcion pequeña usamos lambda, función grande la creamos para legibilidad.
def getAerolineaDinero(l):
    elems = l.split(',')
    aerolinea = elems[5]
    dinero = float(elems[6])
    return(aerolinea, dinero)

In [47]:
# lo aplicamos. Se hace a partes aunque se puede hacer seguido.
dineros = ls.map(getAerolineaDinero)
dinerosIB = dineros.filter(lambda x: x[0] == 'IB')
cantidades = dinerosIB.map(lambda x: x[1])

In [48]:
# hacemos la suma final.
cantidades.sum()

2828044.4199999194

In [49]:
# hacemos por ejemplo un stats
cantidades.stats()

(count: 26158, mean: 108.113939139, stdev: 168.011485542, max: 5239.93, min: 0.0)

## ¿cuál es la ruta que tiene el billete más caro?

In [50]:
ls.first()

u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0'

In [53]:
# Cogemos origen, destino y dinero.

def getRutaDinero(l):
    elems = l.split(',')
    origen = elems[2]
    destino = elems[3]
    dinero = float(elems[6])
    ruta = origen + '-' + destino
    return(ruta, dinero)

In [54]:
getRutaDinero(ls.first())

(u'MAA-AUH', 56.79)

In [56]:
rutas.take(3) #miramos.

[(u'MAA-AUH', 56.79), (u'AUH-CDG', 84.34), (u'CJB-MAA', 60.0)]

In [58]:
# ordenamos
rutas.sortBy(lambda x: x[1], False)

PythonRDD[75] at RDD at PythonRDD.scala:43

In [59]:
# para ver lo que sale top five
rutas.sortBy(lambda x: x[1], False).take(5)

[(u'SYD-CGK', 6355194.0),
 (u'SYD-CGK', 6355194.0),
 (u'SYD-CGK', 6355194.0),
 (u'CDG-IAD', 637313.52),
 (u'IAD-CDG', 637313.52)]

In [60]:
# No sale bien, pues no hay billetes o tickets de 6 millones de dolares. filtramos por valores menores de 7500 
rutas.filter(lambda x: x[1] < 7500).sortBy(lambda x: x[1], False).take(5)

[(u'BLA-MIA', 7490.33),
 (u'MIA-CCS', 7415.0),
 (u'MIA-CCS', 7415.0),
 (u'CCS-MIA', 7415.0),
 (u'MIA-CCS', 7415.0)]

In [61]:
# Hay algunas aún que salen repetidas. Debemos afinar más.
filtrado = rutas.filter(lambda x: x[1] < 7500)

In [63]:
# agrupamos por clave.
filtrado.groupByKey().mapValues(lambda vs: max(vs)).take(5)

[(u'SZF-IST', 289.83),
 (u'DME-RHO', 168.95),
 (u'KRK-OSL', 98.0),
 (u'OMS-AAQ', 184.14),
 (u'KBP-IST', 382.18)]

In [64]:
maximo = filtrado.groupByKey().mapValues(lambda vs: max(vs))

In [65]:
maximo.sortBy(lambda x: x[1], False).take(5)

[(u'BLA-MIA', 7490.33),
 (u'MIA-CCS', 7415.0),
 (u'CCS-MIA', 7415.0),
 (u'RUH-LAX', 7401.94),
 (u'DXB-MAA', 7343.27)]

## Ruta con el medio por cupón más alto

In [68]:
filtrado.groupByKey().mapValues(lambda vs: sum(vs)/len(vs)) \
.sortBy(lambda x: x[1], False) \
.take(5)

[(u'BLA-MEC', 5164.355),
 (u'MEC-BLA', 5164.355),
 (u'CCS-MIA', 4350.516296296297),
 (u'MIA-CCS', 4186.95),
 (u'NRT-PPT', 3751.5499999999997)]

## Agrupamos por Ruta, Aerolinea, Booking Class. Para eliminar los outlayers. Haciendo un límite para esas categorías.

In [69]:
ls.first()

u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0'

In [70]:
# campos 2, 3, 6 y 8
def getRutaAlDineroClase(l):
    elems = l.split(',')
    ruta = elems[2] + '-' + elems[3]
    al = elems[4]
    dinero = float(elems[6])
    clase = elems[9]
    return((ruta, al, clase), dinero)

In [71]:
getRutaAlDineroClase(ls.first())

((u'MAA-AUH', u'9W', u'H'), 56.79)

In [72]:
import math

In [89]:
# función que me diga cual es el valor límite de cada grupo. con Log.
# como entrada la lista de valores y devuelva el valor límite para cada elto de la lista.
# Outlayers menores o iguales a la media más 2 veces la desv standard. En Normales se usaria tb rango intercuartilico
def valorLimite(vs):
    logs = [math.log(v+1) for v in vs] # sumamos 1 al logaritmo para evitar el log de valores con 0.
    media = sum(logs)/len(logs)
    desv = [(x-media)**2 for x in logs]
    var = sum(desv)/len(desv)
    desvStd = math.sqrt(var)
    return math.exp(media + 2*desvStd) - 1  # para quitar el 1 del logaritmo aunque se podría haber dejado.

In [90]:
valorLimite([12,3,4,1,2,4])

13.060852649090572

In [91]:
valorLimite([12,3,4,145])

240.70561554377625

In [92]:
rutas = ls.map(getRutaAlDineroClase)

In [93]:
rutas.take(5)

[((u'MAA-AUH', u'9W', u'H'), 56.79),
 ((u'AUH-CDG', u'9W', u'H'), 84.34),
 ((u'CJB-MAA', u'9W', u'H'), 60.0),
 ((u'DEL-DXB', u'9W', u'S'), 160.63),
 ((u'AUH-IXE', u'9W', u'V'), 152.46)]

In [94]:
rutas.groupByKey().take(5)

[((u'TRD-BOO', u'SK', u'V'),
  <pyspark.resultiterable.ResultIterable at 0x7f77b191d250>),
 ((u'DPS-TMC', u'GA', u'N'),
  <pyspark.resultiterable.ResultIterable at 0x7f77b191d950>),
 ((u'BEG-CDG', u'JU', u'K'),
  <pyspark.resultiterable.ResultIterable at 0x7f77b191d8d0>),
 ((u'SAV-CLT', u'US', u'N'),
  <pyspark.resultiterable.ResultIterable at 0x7f77b0622b10>),
 ((u'DUS-AMS', u'KL', u'C'),
  <pyspark.resultiterable.ResultIterable at 0x7f77b0622a50>)]

In [95]:
# los que cumplen el valor límite
# los complications funcionan tanto en python estandar como en spark.
def filtraCuponesExtremos(vs):
    vl = valorLimite(vs)
    f = [v for v in vs if v < vl]
    return f

In [96]:
rutas.groupByKey().mapValues(filtraCuponesExtremos).take(2)

[((u'TRD-BOO', u'SK', u'V'),
  [42.76,
   48.67,
   42.76,
   42.76,
   42.76,
   48.67,
   42.76,
   48.67,
   42.76,
   42.76,
   42.76,
   42.76,
   42.76,
   29.55,
   42.76,
   42.76,
   29.55,
   42.76,
   42.76]),
 ((u'DPS-TMC', u'GA', u'N'), [59.15, 59.26, 62.38, 62.38])]

In [97]:
grupos = rutas.groupByKey().mapValues(filtraCuponesExtremos)

In [99]:
def mediaGrupo(vs):
    if (len(vs) == 0):
        return 0.0
    else:
        return sum(vs)/len(vs)

In [101]:
grupos.mapValues(mediaGrupo).take(4)

[((u'TRD-BOO', u'SK', u'V'), 42.30263157894736),
 ((u'DPS-TMC', u'GA', u'N'), 60.7925),
 ((u'BEG-CDG', u'JU', u'K'), 0.0),
 ((u'SAV-CLT', u'US', u'N'), 0.0)]

In [102]:
# Hallamos el top five

grupos.mapValues(mediaGrupo).sortBy(lambda x: x[1], False).take(5)

[((u'SYD-CGK', u'GA', u'G'), 6355194.0),
 ((u'COO-CDG', u'AF', u'J'), 538599.2),
 ((u'CDG-COO', u'AF', u'J'), 179628.3533333333),
 ((u'DKR-NBO', u'KQ', u'T'), 58253.593333333345),
 ((u'TSN-ICN', u'KE', u'Z'), 38655.45)]

In [103]:
# solo para mayores de 10 cupones, aplicamos el filtro para evitar que salga otra vez los 6 millones.

In [107]:
gruposFiltrados = grupos.filter(lambda x: len(x[1]) > 10)

In [108]:
r = gruposFiltrados.mapValues(filtraCuponesExtremos).mapValues(mediaGrupo)

In [109]:
r.sortBy(lambda x: x[1], False).take(5)

[((u'MIA-CCS', u'S3', u'S'), 7826.25),
 ((u'CCS-MIA', u'S3', u'B'), 5550.0),
 ((u'CCS-MAD', u'V0', u'Y'), 5418.098666666667),
 ((u'RUH-FRA', u'LH', u'F'), 4792.983),
 ((u'LHR-DOH', u'QR', u'F'), 4275.98923076923)]

## Aerolíneas diferentes en los datos.

In [111]:
## con disctint
ls.first()

u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0'

In [112]:
ls.map(lambda x: x.split(",")[4]).distinct().take(5)

[u'', u'BE', u'WN', u'JV', u'WK']

In [113]:
ls.map(lambda x: x.split(",")[4]).distinct().count()

359

In [115]:
# son todos dolares
ls.map(lambda x: x.split(",")[7]).distinct().collect()

[u'USD']

In [None]:
## sacar datos para trabajar en R
ls.map(lambda x: x.split(",")[7]).distinct().saveAsTextFile("result.csv")