In [None]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark

In [None]:
sc = spark.sparkContext

In [None]:
sc

In [None]:
# Creating RDD
# There are 3 ways for creating RDD in spark 
#1.parallelize 
#2.external files 
#3.from the rdd 

# Example of the parallelize method
rdd1 = sc.parallelize([1, 2, 3, 4, 5],3)

In [None]:
# Return a list that contains all of the elements in this RDD
# Note : This method should only be used if the resulting array is expected to be small, 
# as all the data is loaded into the driver’s memory.
rdd1.collect()

In [None]:
# Return the number of elements in the RDD
rdd1.count()

In [None]:
# Find the number of partitions
rdd1.getNumPartitions()

In [None]:
# Return an RDD created by coalescing all elements within each partition into a list.
rdd1.glom().collect()

In [None]:
# Save the above created RDD as the text file
rdd1.saveAsTextFile('upgrad_folder')

In [None]:
# Creating RDD by reading from the file
# The file we are trying to read should be present in the hdfs path
rddnew = sc.textFile("upgrad.txt")
rddnew.collect()


In [None]:
# Counting the number of records in the file
rddnew.count()

In [None]:
# Return a new RDD by applying a function to each element of this RDD.

# this is using the lambda functions (anonymous functions)
rdd = sc.parallelize(["b", "a", "c"])
rdd_upper = rdd.map(lambda x: x.upper())
rdd_upper.collect()
    

In [None]:
# Map ... using a regular function
def upper_case( v ):
    return v.upper()
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(upper_case).collect()

In [None]:
#Return a new RDD containing only the elements that satisfy a predicate
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x: x % 2 == 0)
rdd2.collect()

In [None]:
#Distinct Return a new RDD containing the distinct elements in this RDD.
sc.parallelize([1, 1, 2, 3]).distinct().collect()

In [None]:
# The result returned above will not be sorted, but in the random order
# If we want to get the data in sorted order in the driver memory
sorted(sc.parallelize([1, 4, 2, 3, 2]).distinct().collect())

In [None]:
#Union Return the union of this RDD and another one.
rdd = sc.parallelize([1, 1, 2, 3])
rdd_union = rdd.union(rdd)
rdd_union.collect()

In [None]:
#intersection 
# Return the intersection of this RDD and another one. 
# The output will not contain any duplicate elements, even if the input RDDs did.

# NOTE : This method performs a shuffle internally
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
sorted(rdd1.intersection(rdd2).collect())

In [None]:
#subtract
#Return each value in self that is not contained in other
x = sc.parallelize([ 1,2,3,4,5])
y = sc.parallelize([2,3,4])
sorted(x.subtract(y).collect())

In [None]:
#cartesian
#Return the Cartesian product of this RDD and another one, that is, 
# the RDD of all pairs of elements (a, b) 
#where a is in self and b is in other.
rdd = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3,4])
sorted(rdd.cartesian(rdd2).collect())

In [None]:
###### Action functions ###########

In [None]:
#Collect
#Return a list that contains all of the elements in this RDD.

# NOTE : This method should only be used if the resulting array is expected to be small, 
#as all the data is loaded into the driver’s memory.
rdd = sc.parallelize([1, 2, 3, 4])
rdd.collect()

In [None]:
#Count
#Return the number of elements in this RDD.
sc.parallelize([2, 3, 4]).count()

In [None]:
#countByValue
#Return the count of each unique value in this RDD
#as a dictionary of (value, count) pairs.
sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()

In [None]:
#take(num)
# Take the first num elements of the RDD.

# It works by first scanning one partition, and use the results from
# that partition to estimate the number of additional partitions needed 
# to satisfy the limit.

# Translated from the Scala implementation in RDD#take().


#Note this method should only be used if the resulting array is expected to be small, 
#as all the data is loaded into the driver’s memory.

sc.parallelize([2, 3, 4, 5, 6]).take(4)


In [None]:
#top(num)
#Get the top N elements from an RDD.

#Note This method should only be used if the resulting array is 
#expected to be small, as all the data is loaded into the driver’s memory.

#Note It returns the list sorted in descending order.

sc.parallelize([2, 3, 4, 5, 6], 2).top(2)

In [None]:
#reduce(function)
#Reduces the elements of this RDD using the specified 
#commutative and associative binary operator. 
#Currently reduces partitions locally.
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)

sc.parallelize([9,3,1]).reduce(lambda x,y :x/y)
# We can pass the custom function inside this reduce function.

In [None]:
#fold
#Aggregate the elements of each partition, and then the results 
# for all the partitions, using a given associative function 
# and a neutral “zero value.”

# The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
# from operator import add

sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)

sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y :x*y)

In [None]:
#aggregate
# Aggregate the elements of each partition, and then the results for 
# all the partitions, using a given combine functions and a neutral
# “zero value.”

# The functions op(t1, t2) is allowed to modify t1 and return it as 
# its result value to avoid object allocation; however, it should not modify t2.

# The first function (seqOp) can return a different result type, U, 
# than the type of this RDD. Thus, we need one operation for merging 
# a T into an U and one operation for merging two U
rdd = sc.parallelize([1, 2, 3, 4])
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
# above step should get us : (1,1),(2,1),(3,1)
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
rdd.aggregate((0,0), seqOp, combOp)



In [None]:
#foreach
def f(x): print(x)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(f)




In [None]:
#####   operations on paired rdd

In [None]:
# Creating paired RDDs
# In order to work with paired RDDs it's required to return the RDD which is 
# composed of tuple
rdd = sc.parallelize(["b", "a", "c"])
pairedrdd= rdd.map(lambda x: (x,1))
pairedrdd.collect()


In [None]:
####### Transformation functions on one paired RDD’s  #######

In [None]:
#reduceByKey()
# Merge the values for each key using an associative and commutative 
# reduce function.

# This will also perform the merging locally on each mapper before sending 
# results to a reducer, similarly to a “combiner” in MapReduce.

# Output will be partitioned with numPartitions partitions, or the default 
# parallelism level if numPartitions is not specified. Default partitioner 
# is hash-partition.
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

In [None]:
#groupByKey()
# Group the values for each key in the RDD into a single sequence. 
# Hash-partitions the resulting RDD with numPartitions partitions.

# Note If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().collect())

In [None]:
#mapValues()
#Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

x = sc.parallelize([("a", ["Vishwa", "Mohan", "Rishavv"]), ("b", ["Abhinav"])])
def f(x): return len(x)
x.mapValues(f).collect()

In [None]:
#flatMapValues()
#Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning.
x = sc.parallelize([("a", ["Vishwa", "Mohan", "Rishavv"]), ("b", ["Abhinav", "Amit"])])
def f(x): return x
x.flatMapValues(f).collect()


In [None]:
#keys()
#Return an RDD with the keys of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()


In [None]:
#values()
# Return an RDD with the values of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).values()
m.collect()

In [None]:
#sortByKeys()
# Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('Apple', 11), ('Banana', 12), ('Mango', 13), ('Carrot', 14), ('Orange', 15)]
sc.parallelize(tmp).sortByKey(True, 1).collect()


In [None]:
########    Transformation functions on two paired RDD    ##########


In [None]:
#subtractByKey()
#Return each (key, value) pair in self that has no pair with matching key in other.

x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtractByKey(y).collect())

In [None]:
#join()

# Return an RDD containing all pairs of elements with matching keys in self
# and other.

# Each pair of elements will be returned as a (k, (v1, v2)) tuple,
# where (k, v1) is in self and (k, v2) is in other.

# Performs a hash join across the cluster.

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

In [None]:
#rightOuterJoin()

# Perform a right outer join of self and other.

# For each element (k, w) in other, the resulting RDD will either contain
# all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
# if no elements in self have key k.

# Hash-partitions the resulting RDD into the given number of partitions.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])
sorted(rdd2.rightOuterJoin(rdd1).collect())


In [None]:
#leftOuterJoin()
# Perform a left outer join of self and other.

# For each element (k, v) in self, the resulting RDD will either contain 
# all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no
# elements in other have key k.

# Hash-partitions the resulting RDD into the given number of partitions.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])
sorted(rdd2.leftOuterJoin(rdd1).collect())

In [None]:
#cogroup()
# For each key k in self or other, return a resulting RDD that contains a 
# tuple with the list of values for that key in self as well as other.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])

# rdd1.cogroup(rdd2).collect()  Below will be the result of the cogroup
# [('a',
#   (<pyspark.resultiterable.ResultIterable at 0x7f5c5c0dfed0>,
#    <pyspark.resultiterable.ResultIterable at 0x7f5c5c1c1450>)),
#  ('b',
#   (<pyspark.resultiterable.ResultIterable at 0x7f5c5c07bbd0>,
#    <pyspark.resultiterable.ResultIterable at 0x7f5c5c07bc50>))]


[(rdd1, tuple(map(list, rdd2))) for rdd1, rdd2 in sorted(list(rdd1.cogroup(rdd2).collect()))]




In [None]:
#############      Action on paired RDD   ################

In [None]:
#countByKey()
# Count the number of elements for each key, and return the result to 
# the master as a dictionary.

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())

In [None]:
#lookup(key)
# Return the list of values in the RDD for key key. 
# This operation is done efficiently if the RDD has a known partitioner by
# only searching the partition that the key maps to.

l = range(1000)
rdd = sc.parallelize(zip(l, l), 10)
rdd.lookup(42)