In [1]:
# import spark 
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [3]:
# creating ejemplot.txt file
%%writefile ejemplo.txt
primera linea
segunda linea
tercera linea

Writing ejemplo.txt


In [4]:
# Reading Text files with RDDs
textFile = sc.textFile('ejemplo.txt')

## Note:
When you read text files in PySpark, every line going to be taked as member of list.  

Example:  
ejemplo.txt going to be a list with three elements ["primera linea", "Segunda linea", "Tercerra linea"]

In [11]:
# Collect is a accion method to show information in RDD or Dataframe
print(textFile.collect())
# Count number of elements of RDDs
print(textFile.count())
# Take the first element in RDD
print(textFile.first())

['primera linea', 'segunda linea', 'tercera linea']
3
primera linea


In [13]:
# Transformation methods
second = textFile.filter(lambda linea: "segunda" in linea)
# it is necessary create an accion to show the transformations
second.collect()

['segunda linea']

# PySpark Transformation in Deep

In [15]:
list_numbers = [1, 2, 3, 4, 5]
# creating rdd object
rdd = sc.parallelize(list_numbers)

## filter: filter elements in RDD using lambda expression

In [None]:
print(rdd.filter(lambda n : n > 1).collect())

## map: apply function to elements in Rdd

In [19]:
def sum_one(element):
    """Sum one to element
    keyword arguments:
    element -> number
    
    returns original number + 1
    """
    return element + 1
    
    
print(rdd.map(sum_one).collect())

[2, 3, 4, 5, 6]


In [25]:
# Also you can chain map methods
def square_number(element):
    """Sum one to element
    keyword arguments:
    element -> number
    
    returns  the square of the original number
    """
    return element * element

print(rdd.map(sum_one)\
    .map(square_number)\
    .collect())

# Note: you can apply lambda function in map methods
tuple_rdd = rdd.map(sum_one)\
                .map(lambda x : (x, x**2))
tuple_rdd.collect()


[4, 9, 16, 25, 36]


[(2, 4), (3, 9), (4, 16), (5, 25), (6, 36)]

## FlatMap: is equal to map but convert the output ina simple list

In [29]:
simple_list = rdd.map(sum_one)\
                .flatMap(lambda x : (x, x**2))
simple_list.collect()

[2, 4, 3, 9, 4, 16, 5, 25, 6, 36]

## Sample(withReplacement, fraction, seed) take a sample from rdd

In [38]:
# Take 0.5 from original RDD without replacement
print(rdd.sample(withReplacement = False, fraction = 0.5).collect())
# Take 100 % from original RDD with replacement, that means that the elements could be repited
sample_with_replacement = rdd.sample(withReplacement = True, fraction = 5)
sample_with_replacement.collect()

[2, 3, 4]


[1, 1, 1, 3, 3, 3, 3, 4, 4, 4, 5, 5, 5, 5, 5, 5]

## Distinct: return a new RDD without duplicates

In [39]:
sample_with_replacement.distinct().collect()

[1, 3, 4, 5]

# groupby: Return RDD with group data in format key:val
 - Its important to mention thats this could cause shuffle

In [42]:
group_elements = sample_with_replacement.groupBy(lambda x: x > 1)
group_elements.collect()

[(False, <pyspark.resultiterable.ResultIterable at 0x7f1a98198d30>),
 (True, <pyspark.resultiterable.ResultIterable at 0x7f1a980fd600>)]

In [48]:
# The above code return iterable, to get the value iter over this element is necessary
print([(x,sorted(y)) for (x, y) in group_elements.collect()])

[(False, [1, 1, 1]), (True, [3, 3, 3, 3, 4, 4, 4, 5, 5, 5, 5, 5, 5])]


## Transformation over two RDDs

In [51]:
rdda = sc.parallelize([1,2,3])
rddb = sc.parallelize([4,5,6])

In [53]:
# union between two rdds
rddu = rdda.union(rddb)
rddu.collect()

[1, 2, 3, 4, 5, 6]

In [56]:
rdda = sc.parallelize([1,2,3,4])
rddb = sc.parallelize([3,4,5,6])

In [57]:
# intersection between two rdds
rddi = rdda.intersection(rddb)
rddi.collect()

[3, 4]

In [59]:
# subtraction between two rdd, returns elements in rdda that are not in rddb
rdds = rdda.subtract(rddb)
rdds.collect()

[1, 2]

In [62]:
# rdda cartesian cartisian gen cartesian point for each element in rdds
rddc = rdda.cartesian(rddb)
rddc.collect()

[(1, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (2, 3),
 (2, 4),
 (2, 5),
 (2, 6),
 (3, 3),
 (3, 4),
 (3, 5),
 (3, 6),
 (4, 3),
 (4, 4),
 (4, 5),
 (4, 6)]

## Actions methods, alternatives to collect

In [63]:
rdd = sc.parallelize(range(1,6))
rdd.collect()

[1, 2, 3, 4, 5]

### Collect could saturate memory, so here are some alternanives
- take(n)
- takeSample(withRep, n , seed)
- top(n)
- takeOrdered

In [64]:
rdd.take(5)

[1, 2, 3, 4, 5]

In [68]:
rdd.takeSample(True, 10) # Take sample with remplacement or without it

[4, 4, 5, 5, 2, 5, 4, 1, 5, 3]

In [66]:
rdd.top(10) # DESC order

[5, 4, 3, 2, 1]

In [67]:
rdd.takeOrdered(5) # ASC order or using a key

[1, 2, 3, 4, 5]

## reduce(operation) combine elements usiang operator

In [71]:
# you could pass lambdas and operators
rdd.reduce(lambda x, y: x + y)

15

In [72]:
from operator import add
rdd.reduce(add)

15

In [74]:
# Rdd glom, allows us vizualize the number of partitions in Rdd
rdd = sc.parallelize(range(1,6), 3)
rdd.glom().collect()

[[1], [2, 3], [4, 5]]

## Aggregate (zeroValue, seqOp, combOp)
- seqOp: This operator is used to acumulate results of each partition
- combOp: This operator is used to acumulate results of all partitions

In [79]:
rdd.aggregate(0, lambda x,y: add(x,y), lambda x,y: add(x,y))
# how it function
# rdd -> have 3 partitions so -> (1) (2, 3) (4, 5)
# The seqOP first work over each partition (1) (2 + 3) ( 4 + 5 )
# The combOp woks over all partition so + (1) + (5) + (9)
# te result is 15

15

## Actions to count elements
- count() -> count all the elements in RDD
- countApprox(timeout, confidence = 0.95) -> version aprox of count 
- countApproxDistinct(relativeSD = 0.05) ->  returns estimation of unique values

In [10]:
rdd_counters = sc.parallelize(range(10000))

In [11]:
rdd_counters.count()

10000

In [12]:
rdd_counters.countApprox(1, 0.95)

10000

In [13]:
rdd_counters.countApproxDistinct(0.05)

9760

In [14]:
# countbyValue return number of time that appers each element as map or dictionary
rdd = sc.parallelize(list("hola mundo"))
rdd.collect()

['h', 'o', 'l', 'a', ' ', 'm', 'u', 'n', 'd', 'o']

In [15]:
rdd.countByValue()

defaultdict(int,
            {'h': 1,
             'o': 2,
             'l': 1,
             'a': 1,
             ' ': 1,
             'm': 1,
             'u': 1,
             'n': 1,
             'd': 1})

In [16]:
rdd.countByValue().items()

dict_items([('h', 1), ('o', 2), ('l', 1), ('a', 1), (' ', 1), ('m', 1), ('u', 1), ('n', 1), ('d', 1)])