# Pyspark


## <font color ='green'>Initializing Spark: </font>

In [2]:
from pyspark import SparkContext
# create spark context
sc = SparkContext.getOrCreate()
# reduce logs in the shell mode
#sc.setLogLevel("WARN")

In [4]:
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [None]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('project-capstone/Twitter_sentiment_analysis/clean_tweet.csv')
type(df)

In [2]:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

## <font color ='green'>Actions and transformations: </font>

- Transformations: create a new RDD from the existing one
- Actions: return a value to the driver program after running a computation on the RDD

<font color='red'> NOTICE: </font> Transformations are __lazy__, they __do not compute their results RIGHT AWAY__, they are only computed when an __action__ is done. Hence, some errors can be done without notification while doing only transformations.





The following color code is used to make differnce between <font color ='blue' size =4px> transformations </font> and <font color ='#E2AC00' size =4px> actions </font>

## <font color ='green'> Creating RDD from python collection: </font>

"RDDs are the building blocks of Spark, it is the original API that Spark exposed and pretty much all the higher level APIs decompose to RDDs.
From a developer perspective, an RDD is simply a set of Java or Scala objects representing data."

In [3]:
l = [(15151, 25463,35000, 45000), 
     (61612, 68645,61600, 55000), 
     (1615, 5463,15151, 25463), 
     (25000, 15000,10000, 5000), 
     (151, 263,342, 512)]
rdd=sc.parallelize(l, 5)

In [4]:
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [5]:
print(rdd.collect())

[(15151, 25463, 35000, 45000), (61612, 68645, 61600, 55000), (1615, 5463, 15151, 25463), (25000, 15000, 10000, 5000), (151, 263, 342, 512)]


## <font color ='green'>Applying Functions to each RDD element: </font>

### <font color ='blue'> map: </font>

In [6]:
from pyspark.mllib.linalg import Vectors, DenseVector

# apply the transformation (the lambda function) to each item of the rdd
elements = rdd.map(lambda row : Vectors.dense([row]))

In [7]:
rdd.collect()

[(15151, 25463, 35000, 45000),
 (61612, 68645, 61600, 55000),
 (1615, 5463, 15151, 25463),
 (25000, 15000, 10000, 5000),
 (151, 263, 342, 512)]

In [8]:
elements.collect()

[DenseVector([15151.0, 25463.0, 35000.0, 45000.0]),
 DenseVector([61612.0, 68645.0, 61600.0, 55000.0]),
 DenseVector([1615.0, 5463.0, 15151.0, 25463.0]),
 DenseVector([25000.0, 15000.0, 10000.0, 5000.0]),
 DenseVector([151.0, 263.0, 342.0, 512.0])]

### <font color ='blue'> flatMap:</font>
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

In [9]:
matrix= rdd.map(lambda row : [row[0],row[1]])

In [10]:
# the result is a list of lists
print(matrix.collect())

[[15151, 25463], [61612, 68645], [1615, 5463], [25000, 15000], [151, 263]]


In [11]:
vector= rdd.flatMap(lambda row : [row[0],row[1]])

In [12]:
# the result is a list
print(vector.collect())

[15151, 25463, 61612, 68645, 1615, 5463, 25000, 15000, 151, 263]


<font color='red'> NOTICE: </font> map with a class function


In [13]:
class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

In [14]:
def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

## <font color ='green'>Filtering: </font>

### <font color ='blue'> filter:</font>

In [15]:
centroids=sc.parallelize([(0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
                          (1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))])

In [16]:
# filter the element that corresponds to the condition x[0]==1
c1=centroids.filter(lambda x:x[0]==1)

In [17]:
print(c1.collect())

[(1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]


## <font color ='green'>Getting: </font>

### <font color ='#E2AC00'> collect:</font>

returns all RDD elements

In [18]:
elements.collect()

[DenseVector([15151.0, 25463.0, 35000.0, 45000.0]),
 DenseVector([61612.0, 68645.0, 61600.0, 55000.0]),
 DenseVector([1615.0, 5463.0, 15151.0, 25463.0]),
 DenseVector([25000.0, 15000.0, 10000.0, 5000.0]),
 DenseVector([151.0, 263.0, 342.0, 512.0])]

<font color='red'> NOTICE: </font> avoid collect for __large RDDs__, avoid __print( RDD.collect() )__ assign it to a variable or save it into a file instead !!


<font color='red'> NOTICE: </font> if not the entire RDD is needed, use take,sample


### <font color ='#E2AC00'> take:</font>

RDD.take(n) returns first n RDD elements

In [19]:
elements.take(2)

[DenseVector([15151.0, 25463.0, 35000.0, 45000.0]),
 DenseVector([61612.0, 68645.0, 61600.0, 55000.0])]

### <font color ='blue'> sample:</font>

RDD.sample(withReplacement, fraction, seed=None) Return a new RDD containing a statistical sample of the original RDD

In [20]:
elements.sample(False,0.5).collect()

[DenseVector([61612.0, 68645.0, 61600.0, 55000.0]),
 DenseVector([25000.0, 15000.0, 10000.0, 5000.0]),
 DenseVector([151.0, 263.0, 342.0, 512.0])]

## <font color ='green'>Reshaping data: </font>

### <font color ='blue'>  groupBy:</font>

In [21]:
path=sc.parallelize([(0,[(0, 1), (1, 1), (2, 2), (3, 3)]), 
                     (1, [(0, 2), (3, 1), (2, 2), (3, 3)]), 
                     (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)]), 
                     (0, [(1, 0), (0, 1), (2, 2), (3, 3)]), 
                     (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])],3)

In [22]:
#will group the elements which have the same tuple in the first index, the result is (common_key,ResultIterable)
path.groupBy(lambda x:x[1][0]).collect()

[((1, 0), <pyspark.resultiterable.ResultIterable at 0x9a4c400>),
 ((0, 2), <pyspark.resultiterable.ResultIterable at 0x9a4ceb8>),
 ((0, 1), <pyspark.resultiterable.ResultIterable at 0x9a4c710>)]

To show the ResultIterable, convert it into collection (list,tuple,dict...)

In [23]:
path.groupBy(lambda x:x[1][0]).map(lambda x:(x[0],list(x[1]))).collect()

[((1, 0), [(0, [(1, 0), (0, 1), (2, 2), (3, 3)])]),
 ((0, 2),
  [(1, [(0, 2), (3, 1), (2, 2), (3, 3)]),
   (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)])]),
 ((0, 1),
  [(0, [(0, 1), (1, 1), (2, 2), (3, 3)]),
   (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])])]

We can apply functions directly to the iterable (len, max, min,...) :

In [24]:
path.groupBy(lambda x:x[1][0]).map(lambda x:(x[0],len(x[1]))).collect()

[((1, 0), 1), ((0, 2), 2), ((0, 1), 2)]

<font color='red'> NOTICE: </font> groupBy does not ALWAYS keep keys order and elements order for the same key !!

In [25]:
attempt1=path.groupBy(lambda x:x[1][0]).map(lambda x:(x[0],list(x[1]))).collect()

In [26]:
attempt10=path.groupBy(lambda x:x[1][0]).map(lambda x:(x[0],list(x[1]))).collect()

In [27]:
attempt1

[((1, 0), [(0, [(1, 0), (0, 1), (2, 2), (3, 3)])]),
 ((0, 2),
  [(1, [(0, 2), (3, 1), (2, 2), (3, 3)]),
   (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)])]),
 ((0, 1),
  [(0, [(0, 1), (1, 1), (2, 2), (3, 3)]),
   (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])])]

In [28]:
attempt10

[((1, 0), [(0, [(1, 0), (0, 1), (2, 2), (3, 3)])]),
 ((0, 2),
  [(1, [(0, 2), (3, 1), (2, 2), (3, 3)]),
   (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)])]),
 ((0, 1),
  [(0, [(0, 1), (1, 1), (2, 2), (3, 3)]),
   (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])])]

In [29]:
attempt1==attempt10

True

### <font color ='blue'> groupByKey:</font>

Groups the elements having the same key together

In [30]:
path.groupByKey().map(lambda x:(x[0],tuple(x[1]))).collect()

[(0,
  ([(0, 1), (1, 1), (2, 2), (3, 3)],
   [(1, 0), (0, 1), (2, 2), (3, 3)],
   [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])),
 (1,
  ([(0, 2), (3, 1), (2, 2), (3, 3)],
   [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)]))]

<font color='red'> NOTICE: </font> groupBykey does not ALWAYS keep keys order and elements order for the same key !!

In [31]:
lengths=path.map(lambda x:(x[0],len(x[1])))

In [32]:
lengths.collect()

[(0, 4), (1, 4), (1, 5), (0, 4), (0, 6)]

### <font color ='#E2AC00'> reduce:</font>

In [33]:
lengths.reduce(lambda (k1,v1),(k2,v2):(min(k1,k2),(v1+v2)) )

(0, 23)

### <font color ='blue'> reduceByKey: </font>

applies a function to elements which have the same key

In [34]:
lengths.reduceByKey(lambda a,b:a+b).collect()

[(0, 14), (1, 9)]

<font color='red'> NOTICE: </font> reduceByKey is a __TRANSFORMATION__, while reduce is an __ACTION__ 

### <font color ='blue'> combineByKey: </font>

In [35]:
#the last step to get the centroids is to compute the mean of the elements having the same key in the following RDD:
for_centroids=sc.parallelize([(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
                              (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
                              (1, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
                              (0, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
                              (0, DenseVector([151.0, 263.0, 342.0, 512.0]))])

In [36]:
#step1: to compute the average, we compute the sum and the number of elements having the same key:
combined = for_centroids.combineByKey(lambda value: (value, 1), 
                                      # initialisation: (key,value) ==> (key,(value,1))
                                      lambda x, value: (x[0] + value, x[1] + 1),
                                      #aggregation par noeud:  (key,(values)) ==>(key,(somme(values)),count(values))
                                      lambda x, y: (x[0] + y[0], x[1] + y[1]))
                                      # regroupement des resulats de differents noeuds: 
                                      #(key,(somme_node1(values),count_node1(values))),(key,(somme_node2(values),count_node2(values)))
                                      #    ==>key,(somme(values)),count(values))


In [37]:
combined.collect()

[(0, (DenseVector([40302.0, 40726.0, 45342.0, 50512.0]), 3)),
 (1, (DenseVector([63227.0, 74108.0, 76751.0, 80463.0]), 2))]

In [38]:
#step2: avg= sum(elements)/count(elements)
computed_centroids = combined.map(lambda (key, (value_sum, count)): (key, value_sum / count))

In [39]:
# we finally get the centroids
computed_centroids.collect()

[(0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
 (1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]

In [40]:
centroids.collect()

[(0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
 (1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]

<font color='red'> NOTICE: </font> groupByKey/map is __to avoid__ (all the key-value pairs are shuffled around). Use of reduceByKey or combineByKey instead (combine output with a common key on each partition before shuffling the data)

In [41]:
print(lengths.collect())

[(0, 4), (1, 4), (1, 5), (0, 4), (0, 6)]


In [42]:
lengths.groupByKey().map(lambda x:(x[0],sum(x[1]))).collect()

[(0, 14), (1, 9)]

In [43]:
# we can get the same result using reduceByKey 
lengths.reduceByKey(lambda a,b:a+b).collect()

[(0, 14), (1, 9)]

### <font color ='#E2AC00'> aggregate:</font>

In [44]:
seqOp= lambda data, item: (data[0] + [item[0]], data[1] + item[1])
combOp= lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])
y= lengths.aggregate(([], 0), seqOp, combOp)

In [45]:
print(y)

([0, 1, 1, 0, 0], 23)


### <font color ='blue'> zipWithIndex: </font>

In [46]:
# element ==> (element,index_of_element)
indexed_elements = elements.zipWithIndex()

In [47]:
indexed_elements.collect()

[(DenseVector([15151.0, 25463.0, 35000.0, 45000.0]), 0),
 (DenseVector([61612.0, 68645.0, 61600.0, 55000.0]), 1),
 (DenseVector([1615.0, 5463.0, 15151.0, 25463.0]), 2),
 (DenseVector([25000.0, 15000.0, 10000.0, 5000.0]), 3),
 (DenseVector([151.0, 263.0, 342.0, 512.0]), 4)]

In [48]:
# to make the index in first position
indexed_elements=indexed_elements.map(lambda x:tuple(reversed(x)))

In [49]:
indexed_elements.collect()

[(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

## <font color ='green'>Functions for 2+ RDDs: </font>

### <font color ='blue'> zip:</font>

In [50]:
# zips two RDDs WITH THE SAME NUMBER OF PARTITIONS AND THE SAME NUMBER OF ELEMENTS IN EACH PARTITION
indexes=sc.parallelize([i for i in range(5)],5)
indexes.zip(elements).collect()

[(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

<font color='red'> NOTICE: </font> zip can only zip RDDs which has the __same number of partitions__

In [51]:
indexes_1=sc.parallelize([i for i in range(5)],4)


In [52]:
# return the number of partitions in RDD
elements.getNumPartitions()

5

In [53]:
indexes_1.getNumPartitions()

4

In [54]:
indexes_1.zip(elements).collect()

ValueError: Can only zip with RDD which has the same number of partitions

### <font color ='blue'> cartesian:</font>

returns RDD of [((i,j) for i in RDD I) for j in RDD J]

In [55]:
centroids.collect()

[(0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
 (1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]

In [56]:
path.collect()

[(0, [(0, 1), (1, 1), (2, 2), (3, 3)]),
 (1, [(0, 2), (3, 1), (2, 2), (3, 3)]),
 (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)]),
 (0, [(1, 0), (0, 1), (2, 2), (3, 3)]),
 (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])]

In [57]:
#returns RDD of [((i,j) for i in RDD I) for j in RDD J]
centroids.cartesian(path).collect()

[((0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
  (0, [(0, 1), (1, 1), (2, 2), (3, 3)])),
 ((0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
  (1, [(0, 2), (3, 1), (2, 2), (3, 3)])),
 ((0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
  (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)])),
 ((0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
  (0, [(1, 0), (0, 1), (2, 2), (3, 3)])),
 ((0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
  (0, [(0, 1), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)])),
 ((1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5])),
  (0, [(0, 1), (1, 1), (2, 2), (3, 3)])),
 ((1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5])),
  (1, [(0, 2), (3, 1), (2, 2), (3, 3)])),
 ((1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5])),
  (1, [(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)])),
 ((1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5])),
  (0, [(1, 0), (0, 1), (2, 2), (3, 3)])),
 ((1, DenseVector([31613.5, 37054.0, 3

### <font color ='blue'>join:</font>

join two RDDs 


In [58]:
#join two RDDs 
J=path.join(c1)

In [59]:
J.collect()

[(1,
  ([(0, 2), (3, 1), (2, 2), (3, 3)],
   DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))),
 (1,
  ([(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)],
   DenseVector([31613.5, 37054.0, 38375.5, 40231.5])))]

<font color='red'> NOTICE: </font> joined RDD have __different number of partitions__ (sum of partitions ??)

In [60]:
c1.getNumPartitions()

4

In [61]:
path.getNumPartitions()

3

In [62]:
J.getNumPartitions()

7

<font color ='blue'>glom()</font> flattens elements on the same partition

In [63]:
J.glom().collect()

[[],
 [(1,
   ([(0, 2), (3, 1), (2, 2), (3, 3)],
    DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))),
  (1,
   ([(0, 2), (2, 1), (1, 2), (2, 3), (3, 0)],
    DenseVector([31613.5, 37054.0, 38375.5, 40231.5])))],
 [],
 [],
 [],
 [],
 []]

Spark iteration time increasing exponentially when using join:

spark2-submit --conf spark.default.parallelism=20 

<font color='red'> NOTICE: </font> we can apply <font color ='blue'>leftOuterJoin</font>, <font color ='blue'>rightOuterJoin</font>, <font color ='blue'>fullOuterJoin</font>,<font color ='blue'>cogroup</font> to two RDDs.

difference between <font color ='blue'>cogroup</font> and <font color ='blue'>fullOuterJoin</font>:

In [64]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])

In [65]:
x.cogroup(y).map(lambda x:(x[0],list(x[1][0]),list(x[1][1]))).collect()

[('a', [1], [2]), ('b', [4], [])]

In [66]:
x.fullOuterJoin(y).collect()

[('a', (1, 2)), ('b', (4, None))]

### <font color ='blue'>union:</font>

Appends one RDD to another

In [67]:
print(c1.collect())

[(1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]


In [68]:
indexed_elements.collect()

[(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

In [69]:
c1.union(indexed_elements).collect()

[(1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5])),
 (0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

<font color ='#E2AC00'> count()</font> returns the number of elements in RDD

In [70]:
# count returns the number of elements in RDD
c1.union(indexed_elements).count()

6

<font color='red'> NOTICE: </font> Sometimes we need to concatenate RDD1 and RDD2 to get an RDD with RDD1 as first element and RDD2 as second element, in this case, union won't help, use instead:

## <font color ='green'>Repartitioning: </font>

###  <font color ='blue'> repartition:</font>

In [71]:
l = sc.parallelize([i for i in range(10)])

In [72]:
l.getNumPartitions()

4

In [73]:
b=l.repartition(7)

In [74]:
b.getNumPartitions()

7

###  <font color ='blue'> coalesce:</font>

Returns a new RDD which is reduced to a smaller number of partitions

In [75]:
s=l.repartition(3)

In [76]:
s.getNumPartitions()

3

<font color='red'> NOTICE: </font> <br>
    if you are __increasing__ the number of partitions use __repartition()__(performing full shuffle) <br>
    if you are __decreasing__ the number of partitions use __coalesce()__ (minimizes shuffles)

###  <font color ='blue'> partitionBy:</font>

In [77]:
p=l.map(lambda x:(x,x))

In [78]:
p=p.partitionBy(2, lambda x:x%2)

In [79]:
p.glom().collect()

[[(0, 0), (2, 2), (4, 4), (6, 6), (8, 8)],
 [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]]

<font color='red'> NOTICE: </font> partitionBy is applied to data with a (key,value) format

## <font color ='green'>Maths: </font>

In [80]:
lengths.collect()

[(0, 4), (1, 4), (1, 5), (0, 4), (0, 6)]

### <font color ='#E2AC00'> max:</font>

Returns the max of RDD elements

In [81]:
lengths.max()

(1, 5)

<font color='red'> NOTICE: </font> If the elements of RDD are tuples,default key for max is the first one , we can change it as below:

In [82]:
lengths.max(key=lambda x:x[1])

(0, 6)

### <font color ='#E2AC00'> sum:</font>

Returns the sum of RDD elements

In [83]:
lengths.map(lambda x:x[1]).sum()

23

### <font color ='#E2AC00'> mean:</font>

Returns the mean of RDD elements

In [84]:
lengths.map(lambda x:x[1]).mean()

4.6

### <font color ='#E2AC00'> countByKey:</font>

Returns a dictionary of (key,count(elements(key))

In [85]:
lengths.countByKey()

defaultdict(int, {0: 3, 1: 2})

### <font color ='#E2AC00'> countByValue:</font>

Returns a dictionary of (value,count(elements(value))

In [86]:
lengths.countByValue()

defaultdict(int, {(0, 4): 2, (0, 6): 1, (1, 4): 1, (1, 5): 1})

## <font color ='green'>Cache RDD: </font>

if many actions will be done over an RDD cache it in memory using cache() or persist(), to remove it from memory, use unpersist()

cache() stores data in memory, with persist you can choose the level of storage (memory or disk)

<font color='red'> NOTICE: </font> Don’t spill to disk unless functions that computed the datasets are very expensive or they filter a large amount of data. (recomputing may be as fast as reading from disk)

## <font color ='green'>When RDD is shuffled? </font>

How do you know if a shuffle will be called on a Transformation? <br>
       
       -repartition , join, cogroup, and any of the *By or *ByKeytransformations can result in shuffles
       -If you declare a numPartitionsparameter, it’ll probably shuffle
       -If a transformation constructs a shuffledRDD, it’ll probably shuffle
       -combineByKeycalls a shuffle (so do other transformations like groupByKey, which actually end up calling combineByKey)

## <font color ='green'>Check if variable is RDD: </font>

In [87]:
from pyspark.rdd import RDD

In [88]:
isinstance(elements,RDD)

True

## <font color ='green'>Save RDD: </font>

### <font color ='#E2AC00'> saveAsPickleFile:</font>

In [89]:
indexed_elements.saveAsPickleFile('elements')

<font color='red'> NOTICE: </font> we can use <font color ='#E2AC00'> saveAsTextFile</font>, <font color ='#E2AC00'> saveAsSequenceFile</font>, <font color ='#E2AC00'> saveAsHadoopFile</font>, ...

## <font color ='green'>Load RDD: </font>

In [90]:
loaded_elements=sc.pickleFile('elements')

In [91]:
loaded_elements.collect()

[(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

In [92]:
indexed_elements.collect()

[(0, DenseVector([15151.0, 25463.0, 35000.0, 45000.0])),
 (1, DenseVector([61612.0, 68645.0, 61600.0, 55000.0])),
 (2, DenseVector([1615.0, 5463.0, 15151.0, 25463.0])),
 (3, DenseVector([25000.0, 15000.0, 10000.0, 5000.0])),
 (4, DenseVector([151.0, 263.0, 342.0, 512.0]))]

## <font color ='green'>Creating DataFrames: </font>

In [4]:
from pyspark.sql import HiveContext, Row, DataFrame, Column, GroupedData, DataFrameNaFunctions, DataFrameStatFunctions
from pyspark.sql.types import *
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import split, explode
import pyspark.sql.functions as F

### <font color ='purple'> RDD to DF</font>

In [107]:
centroids.collect()

[(0, DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
 (1, DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]

In [108]:
df =centroids.toDF(["id_centroid","value"])
df.show()


+-----------+--------------------+
|id_centroid|               value|
+-----------+--------------------+
|          0|[13434.0,13575.33...|
|          1|[31613.5,37054.0,...|
+-----------+--------------------+



In [10]:
import numpy as np

### <font color ='purple'> read DF from csv:</font>

In [8]:
# Read the file centroids.csv
filepath="centroids.csv"
# define the schema of the dataframe
schema = StructType([StructField("pdl", StringType(), True), 
                     StructField("timestamp", StringType(), True), 
                     StructField("value", FloatType(), True)])

df2 = sqlContext.read.csv(filepath, header=True, schema=schema, sep=';')
df2.show()

+---+--------------------+---------+
|pdl|           timestamp|    value|
+---+--------------------+---------+
|  0|2016-01-01T00:00:...|47208.477|
|  0|2016-01-01T00:30:...| 44341.35|
|  0|2016-01-01T01:00:...| 46310.42|
|  0|2016-01-01T01:30:...|46844.215|
|  0|2016-01-01T02:00:...|48449.027|
|  0|2016-01-01T02:30:...|47331.043|
|  0|2016-01-01T03:00:...|47770.906|
|  0|2016-01-01T03:30:...| 47255.44|
|  0|2016-01-01T04:00:...| 47726.23|
|  0|2016-01-01T04:30:...|44565.863|
|  0|2016-01-01T05:00:...|24933.562|
|  0|2016-01-01T05:30:...|25545.246|
|  0|2016-01-01T06:00:...| 26215.35|
|  0|2016-01-01T06:30:...|26145.475|
|  0|2016-01-01T07:00:...|25813.287|
|  0|2016-01-01T07:30:...|24585.338|
|  0|2016-01-01T08:00:...|21934.707|
|  0|2016-01-01T08:30:...|21747.996|
|  0|2016-01-01T09:00:...|21209.621|
|  0|2016-01-01T09:30:...| 20941.58|
+---+--------------------+---------+
only showing top 20 rows



### <font color ='purple'> read DF from Hive:</font>

In [None]:
# query="select * from hive table "
# df = sqlContext.sql(query);

# df.registerTempTable("temp_table");
# df = sqlContext.sql("select * from temp_table");


## <font color ='green'> DataFrame to RDD:</font>

In [110]:
df.rdd.collect()

[Row(id_centroid=0, value=DenseVector([13434.0, 13575.3333, 15114.0, 16837.3333])),
 Row(id_centroid=1, value=DenseVector([31613.5, 37054.0, 38375.5, 40231.5]))]

## <font color ='green'> Save DataFrame:</font>

### <font color ='purple'> save DF to csv:</font>

In [106]:
path="saved_data_frame"
# for debug we can use overwrite mode
# df.write.options(header="true", sep=';').mode("overwrite").csv(filename)
df.write.options(header="true", sep=';').csv(path)


<font color='red'> NOTICE: </font> write csv returns a __directory__ containing n csv files, n=numpartions(dataframe). To save dataframe into a single csv file, consider doing coalesce(1) before saving.

### <font color ='purple'> save RDD/DF to hive table:</font>

In [None]:
# #Define SCHEMA TO OUR TABLE
# schema = StructType([StructField("id_centroid", StringType(), True), 
#                      StructField("timestamp", StringType(), True), 
#                      StructField("value", FloatType(), True)])
# # RDD to DF
# df_hive = sqlContext.createDataFrame(rdd_hive, schema)
# df_hive = df_hive.select(df_hive.id_centroid, to_timestamp(df_hive.timestamp).alias('timestamp'),df_hive.value)
# # Coalesce DF to one partition so that we save it into one table        
# df_hive = df_hive.coalesce(1)
# # Save DF to hive table
# df_hive.registerTempTable("tablename")
 
# # df_hive = sqlContext.sql("insert into common.table select * from df_hive")
# df_hive.write.mode("append").insertInto("common.table")

## <font color ='green'> Operations on DataFrame:</font>

### <font color ='purple'> withColumn:</font>

In [104]:
# # Transform colum timestamp
# regex = r"^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2})$"
# sub = r"$1$2$3$4$5$6"
# df3 = df2.withColumn('timestamp', F.regexp_replace('timestamp', regex, sub))


## <font color ='green'>Stopping SparkContext: </font>

In [None]:
#stop SparkContext
sc.stop()

## <font color ='green'>Execution of python file from a shell : </font>

spark2-submit --master yarn --driver-cores 6 --driver-memory 30G --num-executors 10 --executor-cores 4 --executor-memory 4G --queue root.projet2 /directory/code.py

## <font color ='green'>Execution of python file from a shell on remote server: </font>

nohup spark2-submit --master yarn --driver-cores 6 --driver-memory 30G --num-executors 10 --executor-cores 4 --executor-memory 4G --queue root.projet2 /directory/code.py &

## <font color ='green'>Run spark from spyder: </font>

spark-submit C:\ProgramData\Anaconda2\Scripts\spyder-script.py  &spark-submit C:\ProgramData\Anaconda2\Scripts\spyder-script.py &

Spark applications in Python can either be run with the bin/spark-submit script which includes Spark at runtime, or by including including it in your setup.py as:

        install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

Liens utils: <br>
https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c <br>
https://spark-packages.org/ <br>
http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism <br>
https://www.youtube.com/watch?v=7ooZ4S7Ay6Y&feature=youtu.be <br>
