In [1]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')    
    
type(sc)

pyspark.context.SparkContext

In [2]:
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(__builtin__.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |   

In [3]:
## creamos 1000 enteros en una lista
integersList = range(1,1001)
#integersList
len(integersList)

1000

In [4]:
## Paralelizamos la coleccion utilizando 8 particiones o slices
## Esta operacion es una transformacion de datos en un RDD
## Dado que Spark usa lazy evaluation, no corren jobs de Spark
## hasta el momento
integersListRDD = sc.parallelize(integersList, 8)
type(integersListRDD)

pyspark.rdd.RDD

In [5]:
## podemos ver tambien otra informacion interesante del RDD
## el numero de particiones
integersListRDD.getNumPartitions()

8

In [6]:
## el conjunto de transformaciones que se aplica
integersListRDD.toDebugString()

'(8) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175 []'

In [7]:
## para ver mas metodos disponibles del RDD
help(integersListRDD)

Help on RDD in module pyspark.rdd object:

class RDD(__builtin__.object)
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |  
 |  __repr__(self)
 |  
 |  aggregate(self, zeroValue, seqOp, combOp)
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine functions and a neutral "zero
 |      value."
 |      
 |      The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
 |      as its result value to avoid object allocat

In [8]:
## creamos el RDD a partir de un archivo de texto
shakespeareRDD = sc.textFile('shakespeare.txt',8)
## aplicamos una accion para tomar los 15 items del RDD
#type(shakespeareRDD)
shakespeareRDD.take(5)

[u'1609', u'', u'THE SONNETS', u'', u'by William Shakespeare']

In [9]:
integersListRDD.take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [10]:
def por_dos(x): return x*2

## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados
## ver generacion de tuplas y uso de funciones de python en pyspark.
integersListRDD.map(por_dos).take(10)

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [11]:
## aplicamos una función lambda, y generamos tuplas de a 3 elementos 
## por cada uno existente previamente.
integersListRDD.map(lambda x: (x**2, x, x-1)).take(5)

[(1, 1, 0), (4, 2, 1), (9, 3, 2), (16, 4, 3), (25, 5, 4)]

In [12]:
## generamos una tupla que tenga el valor original, el anterior y siguiente
## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados

integersListRDD.map(lambda a: (a, 1, 1+a, a-1)).take(10)

[(1, 1, 2, 0),
 (2, 1, 3, 1),
 (3, 1, 4, 2),
 (4, 1, 5, 3),
 (5, 1, 6, 4),
 (6, 1, 7, 5),
 (7, 1, 8, 6),
 (8, 1, 9, 7),
 (9, 1, 10, 8),
 (10, 1, 11, 9)]

In [13]:
## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados

subRDD = integersListRDD.map(lambda a: a-1)
print(subRDD.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221,

In [14]:
## por ejemplo para los RDD con los que estuvimos trabajando

integersListRDD.count()

1000

In [15]:
# por ejemplo para los RDD con los que estuvimos trabajando

print(integersListRDD.count())
print(subRDD.count())
print(shakespeareRDD.count())

1000
1000
124614


In [16]:
## obtener los numeros pares
print(integersListRDD.filter(lambda a: a % 2 == 0).take(10))
print(integersListRDD.filter(lambda a: a == 2).take(10))
print(integersListRDD.filter(lambda a: a % 2 == 0).count())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[2]
500


In [17]:
# eliminar lineas vacias de the complete works of shakespeare
#shakespeareRDD.take(15)
shakespeareRDD.filter(lambda a: a != "").take(15)

[u'1609',
 u'THE SONNETS',
 u'by William Shakespeare',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,",
 u'  Making a famine where abundance lies,',
 u'  Thy self thy foe, to thy sweet self too cruel:',
 u"  Thou that art now the world's fresh ornament,",
 u'  And only herald to the gaudy spring,',
 u'  Within thine own bud buriest thy content,']

In [18]:
print(shakespeareRDD.filter(lambda a: a != "").count())
print(shakespeareRDD.count())


114976
124614


In [19]:
## la suma es asociativa y conmutativa
print(integersListRDD.reduce(lambda a, b: a + b))
print(integersListRDD.repartition(8).reduce(lambda a, b: a + b))

500500
500500


In [20]:
## la resta no no asociativa y conmutativa
print(integersListRDD.reduce(lambda a, b: a - b))
print(integersListRDD.repartition(8).reduce(lambda a, b: a - b))

477738
388388


In [21]:
## first() devuelve el primer elemento del RDD
print(shakespeareRDD.first())

1609


In [22]:
## first() devuelve el primer elemento del RDD
print(shakespeareRDD.first())

1609


In [23]:
## traemos los 3 elementos mas pequeños
print(integersListRDD.takeOrdered(3))
## traemos los 3 elementos mas grandes
print(integersListRDD.top(3))

[1, 2, 3]
[1000, 999, 998]


In [24]:
## se puede utilizar una funcion para fijar el orden en takeOrdered
## por ejemplo para revertirlo
integersListRDD.takeOrdered(3, lambda a: -a)

[1000, 999, 998]

In [26]:
## Similar a map, pero permite que cada item de entrada se mapee a cero o mas elementos de salida.
simpleRDD = sc.parallelize([2, 3, 4])
print(simpleRDD.map(lambda x: [1, x]).collect())
print(simpleRDD.flatMap(lambda x: [1, x]).collect())

[[1, 2], [1, 3], [1, 4]]
[1, 2, 1, 3, 1, 4]


In [27]:
listadelistas = [[1,2],[2,3],[5]]
rdd = sc.parallelize(listadelistas)
print(rdd.flatMap(lambda x: x).collect())

[1, 2, 2, 3, 5]


In [28]:
from operator import add

#pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])

# usamos mapValues para mejorar el formato de impresion
#print(pairRDD.reduceByKey(lambda a,b: a+b).collect())

# Diferentes formas de sumar por clave
# problemas de notacion python3
# print(pairRDD.groupByKey().map(lambda x: (x[1],sum(x[2]))).collect())

# Using mapValues, which is recommended when they key doesn't change
#print(pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect())

# reduceByKey is more efficient / scalable
#print(pairRDD.reduceByKey(add).collect())

print(shakespeareRDD.flatMap(lambda a: a.split()).map(lambda a: (a, 1)).reduceByKey(lambda a,b: a+b).take(100))

[(u'fawn', 11), (u'voluble', 2), (u'annoy!', 2), (u'mustachio', 1), (u'four', 114), (u'reproach-', 1), (u'drollery.', 1), (u'conjuring', 1), (u'slew.', 1), (u'increase', 14), (u'Drabbing.', 1), (u'Sergeant.', 1), (u'fig.', 1), (u'out-night', 1), (u'sinking', 2), (u'goats.', 2), (u'self-comparisons,', 1), (u'impertinent', 1), (u'spider.', 1), (u"suck'd.", 1), (u'Sennet.', 7), (u'advice;', 2), (u'Lest', 65), (u'Brakenbury,', 3), (u'wilfull', 1), (u'accommodo.', 1), (u'ignomy.', 1), (u'solid', 4), (u"hack'd", 7), (u'spoken-', 1), (u'fire!', 5), (u'undertaker.', 1), (u'Provost?', 4), (u'tires', 6), (u"Broach'd", 1), (u'rusts', 1), (u'understood:', 1), (u'admire', 4), (u'nature,', 61), (u'Jarteer', 1), (u"'You", 5), (u'additions;', 1), (u'sights!', 1), (u'conceive?', 2), (u'cull', 5), (u'up.', 76), (u'Whenever', 1), (u'chimneys', 2), (u'chine', 1), (u'fret,', 1), (u'hers', 13), (u'disseat', 1), (u'quart', 3), (u'Ceres,', 2), (u'finger.', 6), (u"'veal'", 1), (u'NOT', 220), (u'bring:', 1), (u

In [29]:
# generando tuplas para obtener la linea 
# de maxima longitud de todo el texto

result = shakespeareRDD.flatMap(lambda a: a.split()).reduce(lambda a,b: a if len(a) > len(b) else b)
print(result)

http://www.ibiblio.org/gutenberg/etext06


# ========================================================================================
# A PARTIR DE AQUÍ SE EJECUTAN LAS INSTRUCCIONES DEL SEGUNDO EJEMPLO.
# ========================================================================================

In [30]:
shakespeareRDD = sc.textFile('shakespeare.txt',8)

In [31]:
shakespeareRDD.take(20)

[u'The Project Gutenberg EBook of The Complete Works of William Shakespeare, by',
 u'William Shakespeare',
 u'',
 u'This eBook is for the use of anyone anywhere at no cost and with',
 u'almost no restrictions whatsoever.  You may copy it, give it away or',
 u're-use it under the terms of the Project Gutenberg License included',
 u'with this eBook or online at www.gutenberg.org',
 u'',
 u'** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **',
 u'**     Please follow the copyright guidelines in this file.     **',
 u'',
 u'Title: The Complete Works of William Shakespeare',
 u'',
 u'Author: William Shakespeare',
 u'',
 u'Posting Date: September 1, 2011 [EBook #100]',
 u'Release Date: January, 1994',
 u'',
 u'Language: English',
 u'']

In [32]:
wordsRDD = shakespeareRDD.flatMap(lambda line: line.split())
wordsRDD.take(10)

[u'The',
 u'Project',
 u'Gutenberg',
 u'EBook',
 u'of',
 u'The',
 u'Complete',
 u'Works',
 u'of',
 u'William']

In [33]:
wordsRDD = shakespeareRDD.map(lambda line: line.split())
wordsRDD.take(10)

[[u'The',
  u'Project',
  u'Gutenberg',
  u'EBook',
  u'of',
  u'The',
  u'Complete',
  u'Works',
  u'of',
  u'William',
  u'Shakespeare,',
  u'by'],
 [u'William', u'Shakespeare'],
 [],
 [u'This',
  u'eBook',
  u'is',
  u'for',
  u'the',
  u'use',
  u'of',
  u'anyone',
  u'anywhere',
  u'at',
  u'no',
  u'cost',
  u'and',
  u'with'],
 [u'almost',
  u'no',
  u'restrictions',
  u'whatsoever.',
  u'You',
  u'may',
  u'copy',
  u'it,',
  u'give',
  u'it',
  u'away',
  u'or'],
 [u're-use',
  u'it',
  u'under',
  u'the',
  u'terms',
  u'of',
  u'the',
  u'Project',
  u'Gutenberg',
  u'License',
  u'included'],
 [u'with', u'this', u'eBook', u'or', u'online', u'at', u'www.gutenberg.org'],
 [],
 [u'**',
  u'This',
  u'is',
  u'a',
  u'COPYRIGHTED',
  u'Project',
  u'Gutenberg',
  u'eBook,',
  u'Details',
  u'Below',
  u'**'],
 [u'**',
  u'Please',
  u'follow',
  u'the',
  u'copyright',
  u'guidelines',
  u'in',
  u'this',
  u'file.',
  u'**']]

In [34]:
wordsRDD = shakespeareRDD.flatMap(lambda line: line.split())
wordsRDD.take(10)

[u'The',
 u'Project',
 u'Gutenberg',
 u'EBook',
 u'of',
 u'The',
 u'Complete',
 u'Works',
 u'of',
 u'William']

In [35]:
wordsCountRDD = wordsRDD.map(lambda word: (word,1))
wordsCountRDD.take(10)

[(u'The', 1),
 (u'Project', 1),
 (u'Gutenberg', 1),
 (u'EBook', 1),
 (u'of', 1),
 (u'The', 1),
 (u'Complete', 1),
 (u'Works', 1),
 (u'of', 1),
 (u'William', 1)]

In [36]:
wordsCountRDD.reduceByKey(lambda a,b: a+b).sortBy(ascending=False,keyfunc=lambda x:x[1]).take(10)

[(u'the', 23407),
 (u'I', 19540),
 (u'and', 18358),
 (u'to', 15682),
 (u'of', 15649),
 (u'a', 12586),
 (u'my', 10825),
 (u'in', 9633),
 (u'you', 9129),
 (u'is', 7874)]

In [38]:
wordsCountRDD.reduceByKey(lambda a,b: a+b).sortBy(ascending=False,keyfunc=lambda x:x[1]).take(10)

[(u'the', 23407),
 (u'I', 19540),
 (u'and', 18358),
 (u'to', 15682),
 (u'of', 15649),
 (u'a', 12586),
 (u'my', 10825),
 (u'in', 9633),
 (u'you', 9129),
 (u'is', 7874)]

In [39]:
def trigrams(t):
    t=t.lower()
    return [t[i:i+3] for i in range(0, len(t) - 2)]

In [40]:
trigrams("hola datos")

['hol', 'ola', 'la ', 'a d', ' da', 'dat', 'ato', 'tos']

In [41]:
anotherShakespeareRDD = sc.textFile('shakespeare.txt',8)

In [42]:
trigramsRDD = anotherShakespeareRDD.flatMap(trigrams).filter(lambda a : a != '   ')

In [43]:
trigramsRDD.take(10)

[u'the',
 u'he ',
 u'e p',
 u' pr',
 u'pro',
 u'roj',
 u'oje',
 u'jec',
 u'ect',
 u'ct ']

In [56]:
trigramsCount = trigramsRDD.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
print(trigramsCount.take(5))

[(u'osy', 15), (u'aln', 4), (u'? t', 262), (u'f; ', 116), (u' 54', 1)]


In [45]:
trigramsCountSorted = trigramsCount.sortBy(ascending=False,keyfunc=lambda x:x[1])
print(trigramsCountSorted.take(20))

[(u' th', 83584), (u'the', 52064), (u'he ', 35146), (u'and', 32713), (u' an', 32680), (u'nd ', 31193), (u' to', 23631), (u'is ', 23165), (u' yo', 22910), (u'you', 22280), (u' he', 20994), (u' of', 19854), (u'to ', 19842), (u' no', 19327), (u' i ', 19146), (u'her', 18983), (u'hat', 18804), (u'll ', 18616), (u'at ', 18108), (u' wi', 17954)]


In [46]:
trigramsCountSorted.take(10)

[(u' th', 83584),
 (u'the', 52064),
 (u'he ', 35146),
 (u'and', 32713),
 (u' an', 32680),
 (u'nd ', 31193),
 (u' to', 23631),
 (u'is ', 23165),
 (u' yo', 22910),
 (u'you', 22280)]

In [47]:
totalFrec2 = trigramsCountSorted.map(lambda x: x[1]).reduce(lambda x,y: x+y)
totalFrec1 = trigramsRDD.count()
print(totalFrec2)
print(totalFrec1)

4835997
4835997


In [49]:
assert totalFrec2 == totalFrec1

In [50]:
totalFrec1

4835997

In [51]:
totalFrec2

4835997

In [52]:
print(totalFrec1)

4835997


In [53]:
print(trigramsRDD.count())

4835997


In [54]:
## Calculando la probabilidad de cada trigrama
## print(trigramsCountSorted.take(5))
trigramsProb = trigramsCountSorted.map(lambda x: (x[0],round(float(x[1])/totalFrec1,3)))

In [55]:
trigramsProb.take(10)

[(u' th', 0.017),
 (u'the', 0.011),
 (u'he ', 0.007),
 (u'and', 0.007),
 (u' an', 0.007),
 (u'nd ', 0.006),
 (u' to', 0.005),
 (u'is ', 0.005),
 (u' yo', 0.005),
 (u'you', 0.005)]

# ==============================================
# Joins en Apache Spark
# ==============================================

In [57]:
people_data = [
    (1,'People A'),
    (2,'People B'),
    (3,'People C'),
    (4,'People D'),
    (5,'People E')
]

a = sc.parallelize(people_data)

In [58]:
a.collect()

[(1, 'People A'),
 (2, 'People B'),
 (3, 'People C'),
 (4, 'People D'),
 (5, 'People E')]

In [59]:
subject_data = [(1, 'Subject 1'),
               (2, 'Subject 2'),
               (200, 'Subject 1500'),
               (2, 'Subject 2 Repetido')]

b = sc.parallelize(subject_data)

In [60]:
b.collect()

[(1, 'Subject 1'),
 (2, 'Subject 2'),
 (200, 'Subject 1500'),
 (2, 'Subject 2 Repetido')]

In [61]:
## Inner Join (Join)
## Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) 
## con todos los pares de elementos para cada key. (especificamente los que hay en comun 
## por esa clave en ambos sets de datos)

a.join(b).collect()

[(1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido'))]

In [62]:
## Right Outer Join
## Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) 
## asegurandonos que todos los datos del set de datos derecho estaran en el resultado del join.

a.rightOuterJoin(b).collect()

[(200, (None, 'Subject 1500')),
 (1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido'))]

In [63]:
## Left Outer Join
## Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) 
## asegurandonos que todos los datos del set de datos izquierdo estaran en el resultado del join.

a.leftOuterJoin(b).collect()

[(1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido')),
 (3, ('People C', None)),
 (4, ('People D', None)),
 (5, ('People E', None))]

In [64]:
## Outer/Full Join
## Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) 
## asegurandonos que todos los datos de ambos set de datos estaran aunque no haya match de keys.

a.fullOuterJoin(b).collect()

[(200, (None, 'Subject 1500')),
 (1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido')),
 (3, ('People C', None)),
 (4, ('People D', None)),
 (5, ('People E', None))]