In [1]:
#solution https://www.youtube.com/watch?v=oWljEzWGF6I
from pyspark import SparkConf, SparkContext
import collections

In [2]:
conf = SparkConf().setMaster('local').setAppName('ParallelizedCollection')
sc = SparkContext(conf = conf)
sc

In [3]:
# RDD de 5 enteros
r = sc.parallelize([1,2,3,4,5])
type(r)

pyspark.rdd.RDD

In [4]:
#RDD de texto
r = sc.parallelize(["hola", "hi", "ciao"])

In [5]:
#RDD mas complejo
r = sc.parallelize([i*i for i in range(1, 101)])

In [6]:
#Cargar archivos
r = sc.textFile("Data/CH06/file.txt")

In [7]:
r = sc.textFile("file:///Data/CH06/file.txt")

In [8]:
#Ficheros en HDF5 y Amazon S3
r = sc.textFile("hdfs://node:port/data/file.txt")
r = sc.textFile("s3n://bucket/file.txt")

In [9]:
#Cargar varios ficheros a la vez
r = sc.textFile("Data/CH06/*.txt")
r = sc.textFile("Data/CH06/")

In [10]:
r = sc.wholeTextFiles("Data/CH06/*.txt")

<h3>Collect, Take, Count</h3>

In [11]:
r = sc.parallelize([1,2,3,4,5,6])
r.collect()

[1, 2, 3, 4, 5, 6]

In [12]:
r = sc.parallelize(range(1000))
r.take(10) #Devolvemos los 10 primeros

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
r.count() #Calcula su tama√±o

1000

<h3>Reduce y Aggregate</h3>

In [14]:
def add(x, y):
    return x+y

In [15]:
r = sc.parallelize(range(1,6))
r.reduce(add)

15

In [16]:
r.reduce(lambda x,y : x+y)

15

In [17]:
#Multiplicar los numeros positivos
def multiply_positive(x, y):
    if x > 0 and y > 0:
        return x*y
    elif x > 0:
        return x
    elif y > 0:
        return y
    else:
        return 1 #elemmto neutro

In [18]:
r = sc.parallelize([-1,2,1,-5,8])
r.reduce(multiply_positive)

16

In [19]:
#reduce lanza una excepcion si el RDD esta vacio
r = sc.parallelize([])
r.reduce(lambda x,y : x+y)

ValueError: Can not reduce() empty RDD

In [4]:
r= sc.parallelize(range(3),1)
r.reduce(lambda x,y: x-y)

-3

In [5]:
r= sc.parallelize(range(3), 2)
r.reduce(lambda x,y: x-y)

1

In [6]:
r = sc.parallelize(["hola", "hi", "ciao"])
r.aggregate(0, lambda c,s : c+s.count('h'), lambda c1, c2: c1+c2)

2

<h3>Salvar RDDs en ficheros</h3>

In [3]:
r = sc.parallelize(range(1000), 2)

In [4]:
r.saveAsTextFile("file:///data/nums")

In [5]:
#La ruta pasada como parametro no debe existir en el sistema de ficheros, ya que spark crea una carpeta nueva con ese nombre
r.saveAsTextFile("file:///data/nums")

Py4JJavaError: An error occurred while calling o41.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/data/nums already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


<h2>Transformaciones</h2>
<h3>map y flatMap</h3>

In [6]:
r = sc.parallelize([1,2,3,4])
r2 = r.map(lambda x: x+1)
r2.collect()

[2, 3, 4, 5]

In [7]:
def increment(x):
    return x+1

In [8]:
r = sc.parallelize([1,2,3,4])
r2 = r.map(increment)
r2.collect()

[2, 3, 4, 5]

In [9]:
r = sc.parallelize(["hola", "hi", "ciao"])
r2 = r.map(lambda x: len(x))
r2.collect()

[4, 2, 4]

In [10]:
import csv

In [11]:
r = sc.parallelize(["1,5,7", "8,2,4"])
r2 = r.map(lambda x: list(csv.reader([x]))[0])
r2.collect()

[['1', '5', '7'], ['8', '2', '4']]

In [12]:
r3 = r2.map(lambda l: [int(e) for e in l])
r3.collect()

[[1, 5, 7], [8, 2, 4]]

In [13]:
r = sc.parallelize(["1,5,7", "8,2,4"])
r2 = r.flatMap(lambda s: list(csv.reader([s]))[0])
r2.collect()

['1', '5', '7', '8', '2', '4']