# Creating spark context

* Master node
* Driver program -- > user application
* User application --> Spark context
* Spark context --> Spark config

In [1]:
from IPython.display import Image
from IPython.core.display import HTML 
Image("./architecture.png",width=400, height=100)

FileNotFoundError: No such file or directory: './architecture.png'

FileNotFoundError: No such file or directory: './architecture.png'

<IPython.core.display.Image object>

In [2]:
from pyspark import SparkConf
conf = SparkConf()\
        .setMaster("local")\
        .setAppName("My applicaton")\
        .set("spark.executor.instances","1")\
        .set("spark.executor.cores","1")\
        .set("yarn.nodemanager.resource.memory-mb","871859200")
from pyspark import SparkContext
sc = SparkContext(conf=conf)

In [3]:
print(sc._conf.get("spark.executor.instances"))
print(sc._conf.get("spark.executor.cores"))

1
1


# RDD

* RDD is Resilient distributed dataset
* It is the data structure of spark
* Collection of immutable objects
* RDD is fault tlerance

In [4]:
# There might be various data sources
# User
nums = range(0, 10)
print(nums)
# File
# might have a csv file

range(0, 10)


In [5]:
# Create RDD from this 
rdd = sc.parallelize(nums,5)
print("Number of partitions:", rdd.getNumPartitions())
print("What data is present in individual partitions ", rdd.glom().collect())

Number of partitions: 5
What data is present in individual partitions  [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]


# You should not create more number of partitions

In [6]:
# Create RDD from this 
rdd_big = sc.parallelize(nums,15)
print("Number of partitions:", rdd_big.getNumPartitions())
print("What data is present in individual partitions ", rdd_big.glom().collect())

Number of partitions: 15
What data is present in individual partitions  [[], [0], [1], [], [2], [3], [], [4], [5], [], [6], [7], [], [8], [9]]


In [7]:
rdd_small = rdd.coalesce(5, shuffle=False)
print("Number of partitions:", rdd_small.getNumPartitions())
print("What data is present in individual partitions ", rdd_small.glom().collect())

Number of partitions: 5
What data is present in individual partitions  [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]


# PERSIST

Caching and persistence helps storing internediate results in memory or  disk so they can be reused in subsequent stages. 

Most algorithms in MLlib are iterative, going over the data multiple times. Thus, it is important to cache() your input datasets before passing them to MLlib. Even if your data does not fit in memory, try persist(StorageLevel.DISK_ONLY).

In [8]:
import pyspark
rdd1 = sc.parallelize([1,2])
rdd1.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)

print(rdd1.getStorageLevel())


Disk Memory Serialized 2x Replicated


In [9]:
# Understanding Storage levels

# pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

# DISK_ONLY = StorageLevel(True, False, False, False, 1)

# DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

# MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

# MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

# MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

# MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

# MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

# MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

# MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

# MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

# OFF_HEAP = StorageLevel(True, True, True, False, 1)


# Various data sources

RDD
Users create RDDs in two ways:

1)by loading an external dataset

2)by parallezing a collection of objects (e.g., a list or set) in their driver program.

In [10]:
#Creating an RDD of strings with textFile() in Python
rdd1 = sc.textFile("sample.txt")
# by using paralleize for objects
rdd2 = sc.parallelize([1,2])
rdd3 = sc.parallelize([3,4])

# RDDs offer two types of operations: 
* Transformations
* Actions

# Transformations

transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations.
* map
* filter
* flatMap
* distinct
* intersection
* group by key
* reduce by key

### pandas

In [17]:
import pandas as pd
df = pd.DataFrame()
df['name'] = ["marshal","ted","barney"]
df['expenses'] = [2000,3000,4000]
df

Unnamed: 0,name,expenses
0,marshal,2000
1,ted,3000
2,barney,4000


In [18]:
df['new column'] = df['expenses'].apply(lambda x: x+100)

In [19]:
df

Unnamed: 0,name,expenses,new column
0,marshal,2000,2100
1,ted,3000,3100
2,barney,4000,4100


### map in spark

In [20]:
#map
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x + 100)
print(squared) # lazy evaluation

# if you want to see the result you neeed to perform an action i.e collect
for num in squared.collect():
    print("%i " % (num))

#MapPartition is like a map, but the difference is it runs separately on each partition(block) of the RDD.  

PythonRDD[15] at RDD at PythonRDD.scala:53
101 
102 
103 
104 


In [22]:
# now we will see why you don't want to create more partitions..
nums = sc.parallelize([1, 2, 3, 4],1000)
squared = nums.map(lambda x: x + 100)
print(squared.collect()) # --> takes lot of time

[101, 102, 103, 104]


### filter

The filter() operation does not mutate the existing inputRDD.

Instead, it returns a pointer to an entirely new RDD. inputRDD can still be reused later in the program—for instance, to search for other words.

In [23]:
#fliter
nums = sc.parallelize([1, 2,3,4,5,6])
squared = nums.filter(lambda x: x !=1).collect()
for num in squared:
    print("%i " % (num))

2 
3 
4 
5 
6 


### flatmap

Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap().

In [24]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.map(lambda line: line.split(" "))
words.collect()

[['hello', 'world'], ['hi']]

In [25]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.collect()

['hello', 'world', 'hi']

### distinct

In [26]:
numbers = sc.parallelize([1,1,1,2,2,3,3,7])
distinct_numbers = numbers.distinct()
distinct_numbers.collect()

[1, 2, 3, 7]

### intersection

In [27]:

num1=sc.parallelize([1,2,3,45,90,135])
num2=sc.parallelize([3,6,9,12,15,30,45])
intersection=num1.intersection(num2)
intersection.collect()

[3, 45]

### group by key

In [28]:

data = sc.parallelize({('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)},3)
group = data.groupByKey().collect()
print(group)

## Print Output
for t in group:
    print(t[0], [v for v in t[1]])

[('s', <pyspark.resultiterable.ResultIterable object at 0x7fb9b8fca4e0>), ('t', <pyspark.resultiterable.ResultIterable object at 0x7fb9b8fcaa58>), ('k', <pyspark.resultiterable.ResultIterable object at 0x7fb9b8fcac88>), ('p', <pyspark.resultiterable.ResultIterable object at 0x7fb9b8fca240>)]
s [4, 3]
t [8]
k [5, 6]
p [5, 7]


### reduce by key

In [29]:
#reduce by key
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
                    ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
 
# Applying reduceByKey operation on x
y = x.reduceByKey(lambda accum, n: accum + n)
print(y.collect())

[('b', 5), ('a', 3)]


# What is lazy evalutions

In [30]:

Image(url="http://static1.tothenew.com/blog/wp-content/uploads/2015/02/image.png")

In [34]:
inputRDD = sc.textFile("log.txt")
InfoRDD = inputRDD.filter(lambda x: "INFO" in x)
errorsRDD = inputRDD.filter(lambda x: "ERRORS" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x) 

badLinesRDD = errorsRDD.union(warningsRDD)

Py4JJavaError: An error occurred while calling o326.union.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/alinemati/Yandex.Disk/Nirvana/DATA/18- Hadoop & Spark Introduction OPEN/log.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.SparkContext$$anonfun$union$1$$anonfun$31.apply(SparkContext.scala:1314)
	at org.apache.spark.SparkContext$$anonfun$union$1$$anonfun$31.apply(SparkContext.scala:1314)
	at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
	at org.apache.spark.SparkContext$$anonfun$union$1.apply(SparkContext.scala:1314)
	at org.apache.spark.SparkContext$$anonfun$union$1.apply(SparkContext.scala:1313)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
	at org.apache.spark.SparkContext.union(SparkContext.scala:1313)
	at org.apache.spark.SparkContext$$anonfun$union$2.apply(SparkContext.scala:1325)
	at org.apache.spark.SparkContext$$anonfun$union$2.apply(SparkContext.scala:1325)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
	at org.apache.spark.SparkContext.union(SparkContext.scala:1324)
	at org.apache.spark.rdd.RDD$$anonfun$union$1.apply(RDD.scala:603)
	at org.apache.spark.rdd.RDD$$anonfun$union$1.apply(RDD.scala:603)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.union(RDD.scala:602)
	at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:159)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
#lazy evaluation once used with action then only we can see output
InfoRDD , warningsRDD, badLinesRDD

In [None]:
badLinesRDD.collect()

# Actions
We’ve seen how to create RDDs from each other with transformations, but at some point, we’ll want to actually do something with our dataset. Actions are the second type of RDD operation. They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.

* count
* first



In [None]:
Image("./main-qimg-8150cb46c3eae0445fce8785b7c7b024.png",height=150,width=150)

In [None]:
lines = sc.textFile("sample.txt")

# count number of lines
count=lines.count()
print("Number of lines   ::",count)

# print first line
first_line=lines.first()
print("first line:       ::",first_line)

# print first two linws
first_2 = lines.top(2)
print("Top 2 lines       ::",first_2)

# slect some random samples
sample = lines.takeSample(False, 3)
print("Sample            ::",sample)

# select some random samples with specifying random_seed
sample = lines.takeSample(False, 3, seed=0)
print("Sample with seed  ::",sample)

In [None]:
data = sc.parallelize([1,2,3,1,1,1])
data.collect()

# Extra few things

In [None]:
data2=sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect()
data2

In [None]:
nums.count()

In [None]:
nums.countByValue()

In [None]:
take_=sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 

In [None]:
nums.takeOrdered(4)

In [None]:
nums.takeSample(False, 3,seed=0)

The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.

In [None]:
nums.reduce(lambda x, y: x + y)

Fold accepts additional parameter (i.e. initial value) which gets added to each partition output.

The signature of the fold() is like reduce(). Besides, it takes “zero value” as input, which is used for the initial call on each partition. But, the condition with zero value is that it should be the identity element of that operation. The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.

In [None]:
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
sc.parallelize([1, 2, 3, 4, 5]).fold(1, add)

In [None]:
# JOIN
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
joined.collect()