In [1]:
pip install pyspark




## Primeros pasos con pyspark


In [2]:
from pyspark import SparkContext

# crear instancia
sc = SparkContext()


In [3]:
# creamos un archivo de texto desde jupyter
%%writefile ejemplo.txt
primera linea
segunda linea
tercera linea
cuarta linea

Writing ejemplo.txt


## crear RDD

In [4]:
textFile = sc.textFile('ejemplo.txt')

In [5]:
textFile

ejemplo.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

## coger los datos del rdd

In [6]:
# crea una lista
textFile.collect()

['primera linea', 'segunda linea', 'tercera linea', 'cuarta linea']

In [7]:
# cuenta las lineas del rdd
textFile.count()

4

In [8]:
textFile.first()

'primera linea'

## Transformación

In [9]:
from os import setgid
# filtrar cualquier linea q tenga la palabra segunda
segunda = textFile.filter(lambda linea: 'segunda' in linea)
segunda

PythonRDD[4] at RDD at PythonRDD.scala:53

In [10]:
# para aplicar esta instrucción
segunda.collect()

['segunda linea']

## Transformaciones

In [14]:


lista = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(lista)

In [15]:
rdd = sc.parallelize(lista)

In [16]:
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## filtra los elementos de un RDD

In [20]:
filtrado_rdd = rdd.filter(lambda x: x > 1)

In [19]:
filtrado_rdd.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10]

## map aplica una fiunción a los elementos de un RDD

In [22]:
def suma1(x):
  return x + 1

In [23]:
filtrado_sumado_rdd = filtrado_rdd.map(suma1)

In [24]:
filtrado_sumado_rdd.collect()

[3, 4, 5, 6, 7, 8, 9, 10, 11]

In [30]:
# podemos aplicar mas de un map a una función
cuadrado_rdd = (filtrado_rdd.map(suma1).map(lambda x: (x,x**2)))

In [31]:
cuadrado_rdd.collect()

[(3, 9),
 (4, 16),
 (5, 25),
 (6, 36),
 (7, 49),
 (8, 64),
 (9, 81),
 (10, 100),
 (11, 121)]

## flatMat es igual que map() pero te convierte el resultado a una lista simple

In [32]:
cuadrado_rdd = (filtrado_rdd.map(suma1).flatMap(lambda x: (x,x**2)))

In [33]:
cuadrado_rdd.collect()

[3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100, 11, 121]

# Pair RDDS - Clave Valor

In [1]:
from pyspark import SparkContext
sc = SparkContext()

## crear un pair rdd a partir de una lista de tuplas

In [2]:
lista_tuplas = [('a', 1), ('b', 2), ('c', 3), ('a', 4)]
pair_rdd = sc.parallelize(lista_tuplas)

In [3]:
pair_rdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 4)]

## Crear un pair rdd utilizando zip()

In [4]:
pair_rdd2 = sc.parallelize(zip((['a', 'b', 'c']),range(1, 4, 1)))

In [5]:
pair_rdd2.collect()

[('a', 1), ('b', 2), ('c', 3)]

## Crearr rdd utilizando un archivo

In [6]:
rdd_celestina = sc.textFile('celestina.txt')

In [7]:
pair_rdd_celestina = rdd_celestina.map(lambda x: (x.split(' ')[0], x))


In [10]:
pair_rdd_celestina.takeSample(False, 15)

[('_prima_', '_prima_ **** la mas excelente'),
 ('_estruendo_', '_estruendo_ **** ruido grande'),
 ('', ' "sy ayuntas el anjme entrego con el vjnagre en tal'),
 ('calle,', 'calle, despierta.  llama a su muger Alisa.  Preguntan a Melibea'),
 ('obedescian.',
  'obedescian.  todas me honrrauan.  de todas era acatada.  ninguna'),
 ('', ''),
 ('fuera',
  'fuera de dia / o el dolor de mi desonrra / ay / ay / que esto es /'),
 ('_filado_', '_filado_ **** hilado, hilo'),
 ('', ''),
 ('SEMP.--callemos:',
  'SEMP.--callemos: que a la puerta estamos: e como dizen: las'),
 ('mas', 'mas o menos rojo'),
 ('', ''),
 ('_viuiere_', '_viuiere_ **** viviere'),
 ('muchos',
  'muchos dias he pugnado por lo dissimular: no he podido tanto, que'),
 ('', '')]

## Crear rdd utilizando keybu()

In [11]:
rdd = sc.parallelize(range(5))

In [12]:
rdd.collect()

[0, 1, 2, 3, 4]

In [13]:
pair_rdd = rdd.keyBy(lambda x: x + 1)

In [14]:
pair_rdd.collect()

[(1, 0), (2, 1), (3, 2), (4, 3), (5, 4)]

## Crear rdd utilizando zipWithIndex() - te da el valor q es igual al indice

In [15]:
rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e'])


In [16]:
pair_rdd = rdd.zipWithIndex()

In [17]:
pair_rdd.collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4)]

## Crear rdd utilizando zipWitUniqueId - te da el valor q es igual al id

In [18]:
rdd.zipWithUniqueId().collect()

[('a', 0), ('b', 2), ('c', 1), ('d', 3), ('e', 5)]

## Pair rdd con zip

In [19]:
rdd1 = sc.parallelize(range(5), 3)

In [20]:
rdd2 = sc.parallelize(range(100, 105, 1), 3)

In [21]:
rdd1.glom().collect()

[[0], [1, 2], [3, 4]]

In [22]:
rdd2.glom().collect()

[[100], [101, 102], [103, 104]]

In [23]:
pair_rdd = rdd1.zip(rdd2)

In [24]:
pair_rdd.collect()

[(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]