## PySpark
 
 * Python supports Object-oreinted, Array based, Asychonous based and Function based programming.
 * To deal with Big Data concepts in Python, We use pySpark library.
 * PySpark uses functional programing which allows to distribute functions in clustering esily. 
 * No need to mainttain extrnal state like Global as functional always returns result.
 * Supports anonymous functions-> Lamda fuunctions.



In [1]:
X=['Python','is','awesome','programming']

print(sorted(X))

['Python', 'awesome', 'is', 'programming']


 * lambda functions in Python are defined inline and are limited to a single expression.
 * The key parameter to sorted is called for each item in the iterable. This makes the sorting case-insensitive by changing all the strings to lowercase before the sorting takes place.

* This is a common use-case for lambda functions, small anonymous functions that maintain no external state.

* Other common functional programming functions exist in Python as well, such as filter(), map(), and reduce(). All these functions can make use of lambda functions or standard functions defined with def in a similar manner.

In [2]:
print(sorted(X,key= lambda ar:ar.lower())) # lambda functions 

['awesome', 'is', 'programming', 'Python']


# filter(), map(), and reduce()
* The built-in filter(), map(), and reduce() functions are all common in functional programming. You’ll soon see that these concepts can make up a significant portion of the functionality of a PySpark program.

* It’s important to understand these functions in a core Python context. Then, you’ll be able to translate that knowledge into PySpark programs and the Spark API.

* filter() filters items out of an iterable based on a condition, typically expressed as a lambda function:
* filter() takes an iterable, calls the lambda function on each item, and returns the items where the lambda returned True.
* Calling list() is required because filter() is also an iterable. filter() only gives you the values as you loop over them. list() forces all the items into memory at once instead of having to use a loop.

In [3]:
seq = [0, 1, 2, 3, 5, 8, 13] 
  
# result contains odd numbers of the list 
result = filter(lambda x: x % 2, seq) 

print(list(result))

[1, 3, 5, 13]


* map() is similar to filter() in that it applies a function to each item in an iterable, but it always produces a 1-to-1 mapping of the original items. The new iterable that map() returns will always have the same number of elements as the original iterable, which was not the case with filter():

In [4]:
print(list(map(lambda ar: ar.upper(),X)))

['PYTHON', 'IS', 'AWESOME', 'PROGRAMMING']


In [5]:
my_strings = ['a', 'b', 'c', 'd', 'e']
my_numbers = [1,2,3,4,5]

results = list(map(lambda x, y: (x, y), my_strings, my_numbers))

print(results)

[('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]


* the function being applied can be a standard Python function created with the def keyword or a lambda function.

* However, reduce() doesn’t return a new iterable. Instead, reduce() uses the function called to reduce the iterable to a single value:

In [6]:
from functools import reduce
l=[1,2,3,4,5,6,7,8,9,0]
print(reduce(lambda x,y: x+y,l))

45


* Sets are another common piece of functionality that exist in standard Python and is widely useful in Big Data processing. Sets are very similar to lists except they do not have any ordering and cannot contain duplicate values. You can think of a set as similar to the keys in a Python dict.

## Starting with PySpark

* Install pyspark : pip install pyspark

* PySpark program isn’t that much different from a regular Python program, but the execution model can be very different from a regular Python program, especially if you’re running on a cluster.

* There can be a lot of things happening behind the scenes that distribute the processing across multiple nodes if you’re on a cluster. However, for now, think of the program as a Python program that uses the PySpark library.

* We will take one sample problem statement . Read a file and print words count

* Now that you’ve seen some common functional concepts that exist in Python as well as a simple PySpark program, it’s time to dive deeper into Spark and PySpark.

In [7]:
import pyspark

#Create a SparkContext using every core of the local machine
sc= pyspark.SparkContext("local[*]")

# loda each line of the data file into RDD
rdd = sc.textFile("E:\SparkScala\pyspark.txt")

print("Total words count :: ", rdd.count())


print(rdd.filter(lambda line: "python" in line.lower()))



Total words count ::  8
PythonRDD[3] at RDD at PythonRDD.scala:53


In [10]:

python_lines = rdd.filter(lambda line: "python" in line.lower())
print(python_lines.count())


2


* import os
* os.environ
* Check environment variables are set properly. SPARK_HOME,PYHTON_PATH

* Spark is a generic engine for processing large amounts of data.

* Spark is written in Scala and runs on the JVM. Spark has built-in components for processing streaming data, machine learning, graph processing, and even interacting with data via SQL.

* Spark is implemented in Scala, a language that runs on the JVM, so how can you access all that functionality via Python?

* PySpark is the answer.

* The current version of PySpark is 2.4.3 and works with Python 2.7, 3.3, and above.

* You can think of PySpark as a Python-based wrapper on top of the Scala API. This means you have two sets of documentation to refer to:

    * PySpark API documentation
    * Spark Scala API documentation
    
* The PySpark API docs have examples, but often you’ll want to refer to the Scala documentation and translate the code into Python syntax for your PySpark programs. Luckily, Scala is a very readable function-based programming language.

* PySpark communicates with the Spark Scala-based API via the Py4J library. Py4J isn’t specific to PySpark or Spark. Py4J allows any Python program to talk to JVM-based code.

* There are two reasons that PySpark is based on the functional paradigm:

    * Spark’s native language, Scala, is functional-based.
    * Functional code is much easier to parallelize.
* Another way to think of PySpark is a library that allows processing large amounts of data on a single machine or a cluster of machines.

* In a Python context, think of PySpark has a way to handle parallel processing without the need for the threading or multiprocessing modules. All of the complicated communication and synchronization between threads, processes, and even different CPUs is handled by Spark.

## PySpark API and Data Structures

* To interact with PySpark, you create specialized data structures called Resilient Distributed Datasets (RDDs).

* RDDs hide all the complexity of transforming and distributing your data automatically across multiple nodes by a scheduler if you’re running on a cluster.

* To better understand PySpark’s API and data structures, recall the Hello World program mentioned previously:

In [None]:
import pyspark

#Create a SparkContext using every core of the local machine
sc= pyspark.SparkContext("local[*]")

# loda each line of the data file into RDD
rdd = sc.textFile("E:\SparkScala\pyspark.txt")

print("Total words count :: ", rdd.count())
print(rdd.filter(lambda line: "python" in line.lower()))

* The entry-point of any PySpark program is a SparkContext object.  This object allows you to connect to a Spark cluster and create RDDs.
* The local[*] string is a special string denoting that you’re using a local cluster, which is another way of saying you’re running in single-machine mode. The * tells Spark to create as many worker threads as logical cores on your machine.

* Creating a SparkContext can be more involved when you’re using a cluster. To connect to a Spark cluster, you might need to handle authentication and a few other pieces of information specific to your cluster. You can set up those details similarly to the following: using SparkConf() object.

        conf = pyspark.SparkConf()
        conf.setMaster('spark://head_node:56887')
        conf.set('spark.authenticate', True)
        conf.set('spark.authenticate.secret', 'secret-key')
        sc = SparkContext(conf=conf)

## RDD Creation 

## parallelize():

* You can create RDDs in a number of ways, but one common way is the PySpark parallelize() function. parallelize() can transform some Python data structures like lists and tuples into RDDs, which gives you functionality that makes them fault-tolerant and distributed.

* To better understand RDDs, consider another example. The following code creates an iterator of 10,000 elements and then uses parallelize() to distribute that data into 2 partitions:



In [11]:
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)

[1, 3, 5, 7, 9]

* In the above example , Created RDD using sc.parallelize method. Used, 10000 items with clusters as 2. 

* parallelize() turns that iterator into a distributed set of numbers and gives you all the capability of Spark’s infrastructure.

* Notice that this code uses the RDD’s filter() method instead of Python’s built-in filter(), which you saw earlier. The result is the same, but what’s happening behind the scenes is drastically different. By using the RDD filter() method, that operation occurs in a distributed manner across several CPUs or computers.

* Again, imagine this as Spark doing the multiprocessing work for you, all encapsulated in the RDD data structure.

* take() is a way to see the contents of your RDD, but only a small subset. take() pulls that subset of data from the distributed system onto a single machine

* take() is important for debugging because inspecting your entire dataset on a single machine may not be possible. RDDs are optimized to be used on Big Data so in a real world scenario a single machine may not have enough RAM to hold your entire dataset.

* Note: Spark temporarily prints information to stdout when running examples like this in the shell, which you’ll see how to do soon. Your stdout might temporarily show something like [Stage 0:> (0 + 1) / 1].

* The stdout text demonstrates how Spark is splitting up the RDDs and processing your data into multiple stages across different CPUs and machines.

## Using TextFile:

* Another way to create RDDs is to read in a file with textFile(), which you’ve seen in previous examples. RDDs are one of the foundational data structures for using PySpark so many of the functions in the API return RDDs.


* One of the key distinctions between RDDs and other data structures is that processing is delayed until the result is requested. This is similar to a Python generator. Developers in the Python ecosystem typically use the term lazy evaluation to explain this behavior.


* You can stack up multiple transformations on the same RDD without any processing happening. This functionality is possible because Spark maintains a directed acyclic graph of the transformations. The underlying graph is only activated when the final results are requested. In the previous example, no computation took place until you requested the results by calling take().


* There are multiple ways to request the results from an RDD. You can explicitly request results to be evaluated and collected to a single cluster node by using collect() on a RDD. You can also implicitly request the results in various ways, one of which was using count() as you saw earlier.


* Note: Be careful when using these methods because they pull the entire dataset into memory, which will not work if the dataset is too big to fit into the RAM of a single machine.

## Cluster
* You can use the spark-submit command installed along with Spark to submit PySpark code to a cluster using the command line. This command takes a PySpark or Scala program and executes it on a cluster. This is likely how you’ll execute your real Big Data processing jobs.

* Note: The path to these commands depends on where Spark was installed and will likely only work when using the referenced Docker container.

* To run the Hello World example (or any PySpark program) with the running Docker container, first access the shell as described above. Once you’re in the container’s shell environment you can create files using the nano text editor.

* To create the file in your current folder, simply launch nano with the name of the file you want to create:

* $ nano hello_world.py

* Finally, you can run the code through Spark with the pyspark-submit command:

* $ /usr/local/spark/bin/spark-submit hello_world.py

* This command results in a lot of output by default so it may be difficult to see your program’s output. You can control the log verbosity somewhat inside your PySpark program by changing the level on your SparkContext variable. To do that, put this line near the top of your script:

* sc.setLogLevel('WARN')
* This will omit some of the output of spark-submit so you can more clearly see the output of your program. However, in a real-world scenario, you’ll want to put any output into a file, database, or some other storage mechanism for easier debugging later.

* Luckily, a PySpark program still has access to all of Python’s standard library, so saving your results to a file is not an issue:



In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile("E:\SparkScala\pyspark.txt")
python_lines = txt.filter(lambda line: 'python' in line.lower())

with open('results.txt', 'w') as file_obj:
    file_obj.write(f'Number of lines: {txt.count()}\n')
    file_obj.write(f'Number of lines with python: {python_lines.count()}\n')

## Inspect  SparkContext


In [23]:
print(sc.version) # Retrieve SparkContext version
print(sc.pythonVer) # Retrieve Python version
print(sc.master) # Master URL to connect to
print(str(sc.sparkHome)) # Path where Spark is installed on worker nodes
print(str(sc.sparkUser()))# Retrieve name of the Spark User running SparkContext
print(sc.appName) # Return application name
print(sc.applicationId) # Retrieve application ID
print(sc.defaultParallelism)# Return default level of parallelism
print(sc.defaultMinPartitions) # Default minimum number of partitions for

3.0.0-preview
3.7
local[*]
None
Lucky
pyspark-shell
local-1586862692744
4
2


## Configurations SparkConf

In [None]:

from pyspark import SparkConf, SparkContext

conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))

sconfig = SparkContext(conf = conf)

print(sc.appName)

## Loading Data :: parallelize colletions

In [11]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
print("rdd", rdd.count())
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
print("rdd2", rdd2.count())
rdd3 = sc.parallelize(range(100))
print("rdd3", rdd3.count())
rdd4 = sc.parallelize([("a",["x","y","z"]),
("b",["p", "r"])])
print("rdd4", rdd4.count())

rdd 3
rdd2 3
rdd3 100
rdd4 2


## Loading data :: External

* Read either one text file from HDFS, a local file system or or any Hadoop-supported file system URI with textFile(),  or read in a directory of text files with wholeTextFiles().

        sc.textFile("path")
        sc.wholeTextFiles("directory")

## RDD functions

In [12]:
print(rdd.getNumPartitions()) # List the number of partitions
print(rdd.count())   # Count RDD instances
print(rdd.countByKey())  # Count RDD instances by key

print(rdd.countByValue()) # Count RDD instances by value

print(rdd.collectAsMap()) # Return (key,value) pairs as a

print(rdd3.sum()) # Sum of RDD elements

print(sc.parallelize([]).isEmpty())  #Check whether RDD is empty


4
3
defaultdict(<class 'int'>, {'a': 2, 'b': 1})
defaultdict(<class 'int'>, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})
{'a': 2, 'b': 2}
4950
True


In [13]:
print(rdd3.max()) # Maximum value of RDD elements
print(rdd3.min()) # Minimum value of RDD elements

print(rdd3.mean()) # Mean value of RDD elements

print(rdd3.stdev()) # Standard deviation of RDD elements

print(rdd3.variance())  # Compute variance of RDD elements
print(rdd3.histogram(3) ) #Compute histogram by bins

print(rdd3.stats()) # Summary statistics (count, mean, stdev, max &

99
0
49.5
28.86607004772212
833.25
([0, 33, 66, 99], [33, 33, 34])
(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)


In [14]:
## Applying Functions

rdd.map(lambda x: x+(x[1],x[0])).collect() #Apply a function to each RDD element
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0])) #Apply a function to each RDD element and flatten the result
rdd5.collect()
rdd4.flatMapValues(lambda x: x).collect() #Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

In [15]:
# Getting
print(rdd.collect()) # Return a list with all RDD elements

print(rdd.take(2)) # Take first 2 RDD elements

print(rdd.first()) # Take first RDD element

print(rdd.top(2)) # Take top 2 RDD elements

## Sampling
print(rdd3.sample(False, 0.15, 81).collect()) # Return sampled subset of rdd3

## Filtering
print(rdd.filter(lambda x: "a" in x).collect()) # Filter the RDD


print(rdd5.distinct().collect()) # Return distinct RDD values

print(rdd.keys().collect()) # Return (key,value) RDD's keys

[('a', 7), ('a', 2), ('b', 2)]
[('a', 7), ('a', 2)]
('a', 7)
[('b', 2), ('a', 7)]
[3, 4, 26, 30, 39, 40, 41, 42, 52, 63, 76, 79, 80, 86, 97]
[('a', 7), ('a', 2)]
['b', 'a', 2, 7]
['a', 'a', 'b']


In [16]:
## Iterating

def g(x): print(x)

rdd.foreach(g)


In [None]:
## Reducing
rdd.reduceByKey(lambda x,y : x+y).collect() # Merge the rdd values for each key

rdd.reduce(lambda a, b: a + b)  # Merge the rdd values

## Grouping by
rdd3.groupBy(lambda x: x % 2).mapValues(list).collect()      # Return RDD of grouped values

 
rdd.groupByKey().mapValues(list).collect()           # Group rdd by key

##Aggregating

seqOp = (lambda x,y: (x[0]+y,x[1]+1))
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd3.aggregate((0,0),seqOp,combOp)            #Aggregate RDD elements of each

rdd.aggregateByKey((0,0),seqOp,combOp).collect()        # Aggregate values of each RDD key

rdd3.fold(0,'add').collect()                              #Aggregate the elements of each 4950 partition, and then the results

rdd.foldByKey(0, 'add').collect()                         # Merge the values for each key

rdd3.keyBy(lambda x: x+x).collect()                        #Create tuples of RDD elements by applying a function


In [None]:
rdd2.sortBy(lambda x: x[1]).collect()    # Sort RDD by given function
rdd2.sortByKey().collect()               # Sort (key, value) RDD by key


In [None]:
rdd.repartition(4)                #New RDD with 4 partitions
rdd.coalesce(1)               # descreses partiion to 1

In [None]:
# Save RDD
rdd.saveAsTextFile("rdd.txt")

rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.apache.hadoop.mapred.TextOutputFormat


In [None]:
## Stop SparkContext
sc.stop()
