# RDD basics

Obteniendo los datos y creando el RDD

Usaremos el conjunto de datos reducido (10 por ciento) proporcionado para la Copa KDD 1999, que contiene casi medio millón de interacciones de red. El archivo se proporciona como un archivo Gzip que descargaremos localmente.

In [None]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

Ahora podemos usar este archivo para crear nuestro RDD.

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

La transformación del filtro


Esta transformación se puede aplicar a los RDD para mantener elementos justos que satisfagan una determinada condición. Más concretamente, se evalúa una función en cada elemento en el RDD original. El nuevo RDD resultante contendrá solo aquellos elementos que hacen que la función devuelva True.

Por ejemplo, imaginar que queremos contar cuántas interacciones normales que tenemos en nuestro conjunto de datos. Podemos filtrar nuestro RDD raw_data de la siguiente manera.  

In [2]:
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

 Ahora podemos contar cuántos elementos tenemos en el nuevo RDD.

In [3]:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))

There are 97278 'normal' interactions
Count completed in 5.951 seconds


Recordar de la libreta 1 que tenemos un total de 494021 en nuestro conjunto de datos del 10 por ciento. Aquí podemos ver que 97278 contiene lo normal. palabra de etiqueta  

Tener en cuenta que hemos medido el tiempo transcurrido para contar los elementos en el RDD. Hemos hecho esto porque queríamos señalar que los cálculos reales (distribuidos) en Spark tienen lugar cuando ejecutamos * acciones * y no * transformaciones *. En este caso, `count` es la acción que ejecutamos en el RDD. Podemos aplicar tantas transformaciones como queramos en un RDD y no habrá computación hasta que no llamemos a la primera acción que, en este caso, tarda unos segundos en completarse.

La transformación del mapa

Al utilizar la transformación del mapa en Spark, podemos aplicar una función a cada elemento en nuestro RDD. Las lambdas de Python son especialmente expresivas para este particular.


En este caso, queremos leer nuestro archivo de datos como formato CSV. Podemos hacer esto aplicando una función lambda a cada elemento en el RDD de la siguiente manera.

In [4]:
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])

Parse completed in 1.715 seconds
[u'0',
 u'tcp',
 u'http',
 u'SF',
 u'181',
 u'5450',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'1',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'0',
 u'8',
 u'8',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'1.00',
 u'0.00',
 u'0.00',
 u'9',
 u'9',
 u'1.00',
 u'0.00',
 u'0.11',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'0.00',
 u'normal.']


Nuevamente, toda acción ocurre una vez que llamamos a la primera acción Spark (es decir, tome en este caso). ¿Qué pasa si tomamos muchos elementos en lugar de solo los primeros?  

In [5]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))

Parse completed in 8.629 seconds


Podemos ver que lleva más tiempo. La función `map` se aplica ahora de forma distribuida a muchos elementos en el RDD, de ahí el tiempo de ejecución más largo.

Usar mapa y funciones predefinidas

Por supuesto, podemos usar funciones predefinidas con el mapa. Imagine que queremos tener cada elemento en el RDD como un par clave-valor donde la clave es la etiqueta (por ejemplo, normal) y el valor es toda la lista de elementos que representa la fila en el archivo con formato CSV. Podríamos proceder de la siguiente manera.    

In [6]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

(u'normal.',
 [u'0',
  u'tcp',
  u'http',
  u'SF',
  u'181',
  u'5450',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'1',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'0',
  u'8',
  u'8',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'1.00',
  u'0.00',
  u'0.00',
  u'9',
  u'9',
  u'1.00',
  u'0.00',
  u'0.11',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'0.00',
  u'normal.'])


La acción `collect`

Hasta ahora hemos utilizado las acciones de contar y tomar. Otra acción básica que debemos aprender es recopilar. Básicamente, tendrá todos los elementos en el RDD en la memoria para que podamos trabajar con ellos. Por esta razón, debe usarse con cuidado, especialmente cuando se trabaja con RDD grandes.  

Un ejemplo usando nuestros datos sin procesar.    

In [9]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))

Data collected in 17.927 seconds


Eso tomó más tiempo que cualquier otra acción que usamos antes, por supuesto. Cada nodo del trabajador Spark que tiene un fragmento del RDD tiene que coordinarse para recuperar su parte y luego reducir todo junto.    


Como último ejemplo que combina todo lo anterior, queremos recopilar todas las interacciones normales como pares clave-valor.   

In [13]:
# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)

Data collected in 12.485 seconds
There are 97278 normal interactions


Este recuento coincide con el recuento anterior de las interacciones normales. El nuevo procedimiento requiere más tiempo. Esto se debe a que recuperamos todos los datos con collect y luego usamos Python's len en la lista resultante. Antes solo contábamos la cantidad total de elementos en el RDD usando count.  