# PySpark Recipes: Chapter 1: Introduction



Hi, Welcome to training series on PySpark. The notebook series provide recipes that you can use out of the box for your Spark implemtation using Python API for Spark - PySpark. We would continue to update the notebook adding more recipes, and encourage others to share their implementation or solution to a business problem.

In this notebook, we introduce you to Spark Concepts and importing data to get started with. Why are we covering importing data here, because it would be used to understand "partitioning".

In [None]:
# The code in these cells is runable. 
# Click on this cell, then press Shift+Enter to run it, 
# or click the Run button in the toolbar.

import pyspark

SparkContext <b>was</b> the main entry point for Spark functionality ad represents the connection to a Spark cluster. It is used to create RDDs, accumulators and broadcast variables on that cluster. 
 
<span style="color:green;"><i>Extra Gyaan: An "entry point" is defined as a point where control is transferred from operating system to the provided program.</i></span>

You can have only one SparkContext active per JVM. If you want to create a new one, you have to use the stop() method of the active SparkContext. This behavior can change though when we set the <i>spark.driver.allowMultipleContexts</i> configuration flag to true. However multiple SparkContext is not considered as a good practice. It would be hard to debug multiple active contexts, as the workflow are not "completely isolated", and potential failure of one context can impact another and could break the whole JVM. 

It is in consideration to have multiple SparkContext elegantly, and might be a part of future Spark releases.

As of now REMEMBER 
- <b>Only one active SparkContext per JVM</b>.
- <b>Not required, if you are using Spark 2.0 and above as the entry point</b>

In [None]:
# completely optional to execute these lines of code - SparkContext would have 
# been anyways created via SparkSession

sparkContext = pyspark.SparkContext(appName="Pyspark Recipes")
print(sparkContext.version)

The drawback with SparkContext was its specific character regarding the processing context. Previously we had streamingContext for Streaming Data, sqlContext for SQL, hiveContext for Hive. 

If you are using Spark 2.0 or later, RDD along with DataSet and its subset DataFrames APIs are becoming the standard APIs and basic unit of data abstraction in Spark. So SparkSession was intrdocued for handling the new APIs. SparkSession, thus evolved to a role of a common entry point for all different pipelines. The instance of SparkSession is constructed with a builder common for all processing types - streaming, sql except Hive, which requires a call to enableHive() method. 

While we have seen that only one Spark Context can be there per JVM, one can have multiple SparkSessions. This is made possible because SparkSession acts as a wrapper for SparkContext, which is created implicitly by the builder without any extra configuration options. 

A word about Spark Configuration
While we have .config("<configuration>","<value>") for SparkSession, we can set the configuration directly via SparkContext.
    
    
While using traditional RDD objects approach, the onus of selecting the optimum optimization strategy was left on the developer, in DataFrame Spark abstracts the low-level methods, abstracting them, and deciding best optimization strategy for the developer.

It is recommended that we use DataFrame in our Spark applications.

In [None]:
from pyspark.sql import SparkSession

sparkSession = SparkSession \
                .builder \
                .master("local") \
                .appName("Pyspark Recipes") \
                .getOrCreate()

# The local here means - run spark locally with one worker thread. 
# For more details - We have touched the concept of having worker threads in "Optimizing Spark"

# to get the configuration value you have to go via the sparkContext object provided by SparkSession
sparkSession.sparkContext.getConf().getAll()


Note - # To see the complete list of configuration options available visit - https://spark.apache.org/docs/latest/configuration.html

On execution of the above code, you would get various configuration options. The sparkSession.sparkContext.getConf().getAll(), would only show a limited set of output, and include those configurations that have been specified, like "spark.driver.cores" and "spark.executor.memory". If you had not set the "spark.driver.cores" and "spark.executor.memory" (even though we have use default values), they would not be shown in the "sparkSession.sparkContext.getConf().getAll()" command. Try commenting the lines beginning with ".config" and check for yourselves.

Spark properties mainly can be classified into two types: 
- one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; 
- another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

Do visit the link for the list of configuration provided by Spark, to ensure that you only use appropriate configuration values while specifying it via SparkSession object. While we have discussed how to optimally use configuration in <b>"Optimizing Spark"</b>. We would also include some of the most important properties that is platform (local, YARN, Mesos, Kubernetes) agonostic.

In [None]:
from pyspark import SparkConf
SparkConf().getAll()

<span style="color:green;"><i>Extra Gyaan</i></span>

While introducing SparkSession, we have said that you have multiple SparkSession.

Does that means each "SparkSession configuration" would be different...?
<b>NO</b>, Remember, the configurations are set at the SparkContext level, and even though you have not used it as entry point, using SparkSession object instead - the configuration have to be same for all the SparkSessions objects. 

The factory method - getOrCreate() provided by Apache Spark, exactly does the role of preventing creation of multiple SparkContexts

In [None]:
# creating another SparkSession object

sparkSessionForWhateverPurpose = SparkSession \
                .builder \
                .master("local") \
                .appName("This would override your old AppName") \
                .getOrCreate()

sparkSession == sparkSessionForWhateverPurpose

<span style="color:green;"><i>Extra Gyaan (continued)</i></span>

Hold it, when we said that we can have multiple SparkSession, how come the two different objects sparkSession and sparkSessionForWhateverPurpose be the same?

Turns out, it is the property of the "builder", and if we use it we would get back the "same" SparkSession instance. 

So how do I create a new SparkSession?

In [None]:
sparkSessionTake2 = sparkSession.newSession()
sparkSession == sparkSessionTake2

<span style="color:green;"><i>More Extra Gyaan (continued)</i></span>

Why would we require multiple SparkSessions?
In case, when data is coming from two different sources. Say you want to compare two different table source  and destination from two separate hive servers.

However, such use cases are outliers, and you might never ever want to have multiple SparkSessions in your project. :)



# Partition Configuration in Spark

There is a configuration property which shows the default minimum partitions based on starting the SparkContext. Remember we have specified the SparkContext via SparkSession with some default parameters, and specifying local cluster without any parameters, which means just one worker thread.

<b>Remember</b> 
- local : Run Spark locally with one worker thread (i.e. no parallelism at all).
- <b>local[*] Run Spark locally with as many worker threads as logical cores on your machine.</b>
- local[n] : Run Spark locally with "n" worker threads (ideally, set this to the number of cores on your machine).
- local[n,f] : Run Spark locally with "n" worker threads and "f" maxFailures (see spark.task.maxFailures for an explanation of this variable)
- local[*,f] : Run Spark locally with as many worker threads as logical cores on your machine and "f" maxFailures.

By default SparkSessions would set defaultMinPartitions as one Partition per core (worker thread). So let us execute the code and check the output. 

In [None]:
sparkSession.sparkContext.defaultMinPartitions

In this section we look at how partitioning work with spark. For more details on Partitioning refer to the document - optimizing Spark.

The reason why we are discussing paritioning here is to demonstrate that while you can control partitioning using configuration option, it does not work in same way when you read from text files. So lets dive into Partitioning (did we tell you there would be a separate notebook on Partitioning :), ok now we are telling you - there is). 

As a first step, let us create some data. You might ask - why not read? Patience, we would be there in a little while. 

In [None]:
# Let us create some data first, don't worry about schema, we have it covered it in detail
from datetime import date, timedelta
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, StructField

start_date = date(2019, 1, 1)
data = []
for i in range(0, 50):
    data.append({"Country": "CN", "Date": start_date +
                 timedelta(days=i), "Amount": 10+i})
    data.append({"Country": "AU", "Date": start_date +
                 timedelta(days=i), "Amount": 10+i})

schema = StructType([StructField('Country', StringType(), nullable=False),
                     StructField('Date', DateType(), nullable=False),
                     StructField('Amount', IntegerType(), nullable=False)])

#this would display the data output - over 50 records
#data

Now, using our default configuration of "local", let us create a dataframe and see how many partitions it would create.

In [None]:
df = sparkSession.createDataFrame(data, schema=schema)
print(df.rdd.getNumPartitions())

It created "1" partition only. What if you change the above range to 5 million and try. Would Spark still create "1" partition or more? Try that, be patient, and let us know the results. 

Now we are going to recreate the sparkSession with configuration values, and see if it changes the partition. We are going to use "local[4]", which mean use four cores, to begin with. 

In [None]:
# Stopping the existing spark session
sparkSession.stop()

#The only change we did in the above initialization of SparkSession is to use local[4] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[4]") \
                .appName("Pyspark Recipes") \
                .getOrCreate()




Let us check, what impact does it has on the configuration parameter - "defaultMinPartitions".


In [None]:
sparkSession.sparkContext.defaultMinPartitions


It shows the minimum Partition to be "2". The number of minimum Partition is always going to be 2, and would be reduced to one if you are utilizing only one core. Else, irrespective of he number of cores, allowing one to create that many partitions, the minimum partition would always be 2. 

Now, previously we have seen that when we used the .createDataFrame() method, it had created just one partition. Let us see the impact of using 4 cores for number of partitions. Would we get 4 partitions, 2 partitions or 1 partition?


In [None]:
df = sparkSession.createDataFrame(data, schema=schema)
print(df.rdd.getNumPartitions())

As you can see, by default Spark would always create one partition per core. If you have 16 cores, you would have 16 partitions.

Now, let us try to limit the number of driver cores using Spark Configuration. Remember our local[4] is still there, and we are going to specify the limit on spark.driver.cores configuration to 2. Would we now get 4 partitions as above, or 2 partitions?

In [None]:
# Stopping the existing spark session
sparkSession.stop()


#The only change we did in the above initialization of SparkSession is to use local[2] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[4]") \
                .config("spark.driver.cores", '2') \
                .appName("Pyspark Recipes") \
                .getOrCreate()

df = sparkSession.createDataFrame(data, schema=schema)
print(df.rdd.getNumPartitions())


So changing the configuration "spark.driver.cores" did not help. We still have 4 partitions, where the number of cores specified in the local cluster overrule the configuration "spark.driver.cores". 

Let us look at other configuration which is "spark.default.parallelism". We would set it to 2. Would this limit the number of partitions?


In [None]:
# Stopping the existing spark session
sparkSession.stop()


#The only change we did in the above initialization of SparkSession is to use local[2] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[4]") \
                .config("spark.driver.cores", '2') \
                .config("spark.default.parallelism", '3') \
                .appName("Pyspark Recipes") \
                .getOrCreate()

df = sparkSession.createDataFrame(data, schema=schema)
print(df.rdd.getNumPartitions())

It used four partitions. So even though we had mentioned to use local[4], that is use 4 cores, but we limited the spark.default.parallelism to have 3 partitions via the configuration parameter "spark.default.parallelism"

Let us see if we had specified just 2 worker threads and set the default parallelism to 3, would we get 2 partitions or 3.

In [None]:
# Stopping the existing spark session
sparkSession.stop()

#The only change we did in the above initialization of SparkSession is to use local[2] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[2]") \
                .config("spark.driver.cores", '2') \
                .config("spark.default.parallelism", '5') \
                .appName("Pyspark Recipes") \
                .getOrCreate()

df = sparkSession.createDataFrame(data, schema=schema)
print(df.rdd.getNumPartitions())

In [None]:
# Stopping the existing spark session
sparkSession.stop()


#The only change we did in the above initialization of SparkSession is to use local[2] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[4]") \
                .config("spark.driver.cores", '2') \
                .config("spark.default.parallelism", '3') \
                .appName("Pyspark Recipes") \
                .getOrCreate()


# Don't worry about this - we would be comfortable with these concept in next notebook
dfCensus = sparkSession.read.format('csv') \
            .options(header = True, inferSchema = True, sep = ",", enforceSchema = True,
                    ignoreLeadingWhiteSpace = True, ignoreTrailingWhiteSpace = True) \
            .load('../datasets/charityml/censusdata.csv')

dfCensus.count()

In [None]:
# We are writing a small function to get the length of partition
def getParitions(dfTargetDataFrame):
    l = dfTargetDataFrame.rdd.glom().map(len).collect()
    print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ',
           sum(l)/len(l),'. Total Partitions: ', len(l))

getParitions(dfCensus)

Provided you have executed it on local computer, you might see the output for the above lines as follows:<br/>
<b>Min Parition Size:  9831 . Max Parition Size:  35391 . Avg Parition Size:  22611.0 . Total Partitions:  2</b>

But in our previous attempt we were having three partitions by using 4 cores, and default parallelism as 3. If you even remove the config default parallelism, and use 4 cores, you still would not end up with one partition per core. Ideally one should get 3/4 partitions depending on the configurations, but in this case Spark is not utilizing the parititons it can create?

It turns out that Spark while importing data - reads the data in blocks of 128 MB (the default Hadoop block size), and it would have either "total file size"/128 partitions or 2 (the default mininum partitions, whichever is more. Since our data is less than 128 MB (around 6 MB), we would be having one block, but the default minimum partition is 2, and hence, it would decide to use 2 partitions.

This is the default partitioning of Spark. If you want to increase the number of partitions, you need to execute the following command-

In [None]:
dfCensus= dfCensus.repartition(10)
getParitions(dfCensus)

When you ask Spark to partition the data in 10 different partitions, it would take the total of 45,222 records and create a partition of 4522 each,and you would see an output for the above lines as follows:<br/>
<b>Min Parition Size:  4522 . Max Parition Size:  4523 . Avg Parition Size:  4522.2 . Total Partitions:  10</b>

In [None]:
dfCensus.rdd.getNumPartitions() 

Final word about Spark Partitions, we have already seen how to use the .config in method chaining to specifiy the configurations. There are alternate ways of setting the configurations which we have demonstrated here. Feel free to play around with the configurations, and check the result for yourselves - "Learning by Doing".

In [None]:
from pyspark.sql import SparkSession

sparkSession = SparkSession \
                .builder \
                .master("local") \
                .config("spark.driver.cores", '2') \
                .config("spark.executor.memory", '1g') \
                .config("spark.default.parallelism", '4') \
                .appName("Pyspark Recipes") \
                .getOrCreate()

# One can also set Spark Configuration using following syntax-
#spark_session.conf.set("spark.driver.cores", '1')
#spark_session.conf.set("spark.executor.memory", '1g')

# to get the configuration value you have to go via the sparkContext object provided by SparkSession
sparkSession.sparkContext.getConf().getAll()

Just out of curiosity, we would do one more exercise. We would specify the cores to be utilized as 4 and the configuration parameter - "spark.default.parallelism" as "1". 

Then we would check for configuration property - defaultMinPartitions (remember, even with more than one "spark.default.parallelism" we had defaultMinPartitions as always two. We are trying to see if it becomes one, which is possible - as we have seen when we had utilized only one core)

We would also read the CSV file, and check for the number of partitions.


In [None]:
# Stopping the existing spark session
sparkSession.stop()

#The only change we did in the above initialization of SparkSession is to use local[2] which means 
# we are calling 
sparkSession = SparkSession \
                .builder \
                .master("local[4]") \
                .config("spark.default.parallelism", '1') \
                .appName("Pyspark Recipes") \
                .getOrCreate()

#Checking for defaultMinPartitions
sparkSession.sparkContext.defaultMinPartitions

In [None]:
# Don't worry about this - we would be comfortable with these concept in next notebook
dfCensus = sparkSession.read.format('csv') \
            .options(header = True, inferSchema = True, sep = ",", enforceSchema = True,
                    ignoreLeadingWhiteSpace = True, ignoreTrailingWhiteSpace = True) \
            .load('../datasets/charityml/censusdata.csv')

getParitions(dfCensus)



With this we conclude our introduction to Spark, and in the next section, we would look at importing data. So let us move to our next notebook. 

Don't forget to close your SparkSesssion when you are done - as it is a good practice to free cluster resources for other applications. SparkSession "stop()" eventually calls "SparkContext stop()". However be mindful, you might have multiple SparkSession, and in the end you have to ensure that all SparkSessions have finished their job. 



In [None]:
sparkSession.stop()