# IST 652 - Advance Topic Presentation - PySpark

*Presented By - Rahul Rathod and Yash Kapadia*

# What is Spark?
- **Apache Spark** is a fast, in-memory analytics system.
- Spark has several high-level tools, including:
  - **ML**: a machine learning library.
  - **Spark Streaming**: enables high-throughput, fault-tolerant stream processing of live data streams.
  - **Spark SQL**: runs SQL and HiveQL queries.
  - **GraphX**: an API for graphs and graph-parallel computation.
- Spark can be executed in two ways:
  - Independent processes on a cluster.
  - As a YARN application.  
- PySpark is a Python API written in python to support Apache Spark. It is not a part of python standard library.

# What is PySpark?
PySpark is the Python API written in python to support Apache Spark. Apache Spark is a distributed framework that can handle Big Data analysis. Apache Spark is written in Scala and can be integrated with Python, Scala, Java, R, SQL  languages. Spark is basically a computational engine, that works with huge sets of data by processing them in parallel and batch systems.

# Installing Pyspark

- Using Anaconda Navigator: Installing pyspark and py4j package
- Install Java 8

# Create Spark session

In [1]:
from pyspark.sql import SparkSession
# Spark 2.0+
spark = SparkSession.builder.getOrCreate()
# Spark 1.6 (RDDs)
sc = spark.sparkContext

# The SparkContext

- The **SparkContext** object performs the following tasks:  

  - It connects to the YARN ResourceManager and asks for resources on the Hadoop cluster,  
  
  - Starts executors on the worker nodes in the cluster that the ResourceManager allocated for the Spark application,  
  
  - Sends the application code to the executors,  
  
  - And finally, it sends tasks for the executors to run. 

# Spark RDDs
- An RDD (resilient distributed dataset) is a fault-tolerant collection of elements that can be operated on in parallel.

# Creating RDDs
- Can create an initial RDD by applying a transformation to data on disk.  

- Can create an initial RDD from a code object.  

- Example ways to create an RDD in Spark:
  - Use the `parallelize` operation to convert an existing code object into an RDD.
  - Use `textFile` operation to convert a text file on HDFS into an RDD.
  - Use `sequenceFile` operation to convert a binary file on HDFS into an RDD.

# Creating a Simple Array

In [2]:
myarray = list(range(1, 20))
myarray

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

# Parallelizing the Array - RDD

In [3]:
dist_array = sc.parallelize(myarray) # parallelize version of myarray
dist_array # it doesn't do anything

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

# RDD Operations
- There are two types of operations that can be done on RDDs:
  - **Transformations**: create a new dataset/RDD from an existing one.
  - **Actions**: which return a value to the driver program after running a computation on the RDD.  
  

# Example: Transformations  

In [4]:
sub_values = dist_array.map(lambda x: x-1)
print("subtract_values", sub_values.collect())
large_values = dist_array.filter(lambda y: y > 10)
print("large_values", large_values.collect())

subtract_values [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
large_values [11, 12, 13, 14, 15, 16, 17, 18, 19]


In [5]:
# checking the type after transformation
type(sub_values)

pyspark.rdd.PipelinedRDD

# Example: Actions

In [6]:
large_values = dist_array.filter(lambda y: y > 10)
large_values.collect()

[11, 12, 13, 14, 15, 16, 17, 18, 19]

# DataFrames
- The problem with RDDs is that they do not have enough structure.  

- They are harder to optimize and therefore slow.  

- DataFrames aim at solving this by adding structure.  

- A DataFrame is a distributed collection of data organized into named columns.  

- Similar to Pandas DataFrames but distributed across the cluster. 

# Creating a Pyspark Data Frame from Python Row Objects

In [7]:
from pyspark.sql import Row

# creating a Dataframe using python row objects

raw_data = [Row(state='NY', month='JAN', orders=3),
            Row(state='NJ', month='JAN', orders=4),
            Row(state='NY', month='FEB', orders=5)
           ]
data_df = spark.createDataFrame(raw_data)

In [8]:
# showing a dataframe
# .show() will display 20 rows by default
data_df.show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
+-----+------+-----+



In [9]:
# We can check the schema 
data_df.printSchema()

root
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)
 |-- state: string (nullable = true)



# Performing SQL like Operations on Dataframes

In [10]:
from pyspark.sql import functions as fn

# creating a dataframe for location
locations_df = spark.createDataFrame([
    Row(location_id = 'loc1', n_employees=3, state='NY'),
    Row(location_id = 'loc2', n_employees=8, state='NY'),
    Row(location_id = 'loc3', n_employees=3, state='PA'),
    Row(location_id = 'loc4', n_employees=1, state='FL')    
])

# creating a dataframe for transcations
transactions_df = spark.createDataFrame([
    Row(transaction_id = 1, location_id = 'loc1', n_orders=2.),
    Row(transaction_id = 2, location_id = 'loc1', n_orders=3.),
    Row(transaction_id = 3, location_id = 'loc3', n_orders=5.),
    Row(transaction_id = 4, location_id = 'loc5', n_orders=5.)
])

In [11]:
locations_df.show()

+-----------+-----------+-----+
|location_id|n_employees|state|
+-----------+-----------+-----+
|       loc1|          3|   NY|
|       loc2|          8|   NY|
|       loc3|          3|   PA|
|       loc4|          1|   FL|
+-----------+-----------+-----+



In [12]:
transactions_df.show()

+-----------+--------+--------------+
|location_id|n_orders|transaction_id|
+-----------+--------+--------------+
|       loc1|     2.0|             1|
|       loc1|     3.0|             2|
|       loc3|     5.0|             3|
|       loc5|     5.0|             4|
+-----------+--------+--------------+



# Manipulating the columns 

In [13]:
# Adding 1 to the n_employees column and checking for n_employees > 5
locations_df.\
    select('n_employees',
          'location_id',
          'state',
          (fn.col('n_employees') + 1).alias('n_employees_plus_1'),
          (fn.col('n_employees') > 5).alias('more_than_5_empl')
          ).\
    show(10)

+-----------+-----------+-----+------------------+----------------+
|n_employees|location_id|state|n_employees_plus_1|more_than_5_empl|
+-----------+-----------+-----+------------------+----------------+
|          3|       loc1|   NY|                 4|           false|
|          8|       loc2|   NY|                 9|            true|
|          3|       loc3|   PA|                 4|           false|
|          1|       loc4|   FL|                 2|           false|
+-----------+-----------+-----+------------------+----------------+



# Applying functions like square root and exponent

In [14]:
# Taking square root of number of employees and calculating the square of each value in n_employees column
locations_df.\
    select(fn.sqrt('n_employees'),
           'n_employees',
           fn.pow(fn.col('n_employees'), fn.lit(2))
          ).\
    show()   

+------------------+-----------+---------------------+
| SQRT(n_employees)|n_employees|POWER(n_employees, 2)|
+------------------+-----------+---------------------+
|1.7320508075688772|          3|                  9.0|
|2.8284271247461903|          8|                 64.0|
|1.7320508075688772|          3|                  9.0|
|               1.0|          1|                  1.0|
+------------------+-----------+---------------------+



# Filtering the Dataframe

In [15]:
# Filtering the Dataframe where number of employees greater than 2 and state is PA
locations_df.where((fn.col('n_employees') > 2) & \
                  (fn.col('state') == 'PA')).\
         show()

+-----------+-----------+-----+
|location_id|n_employees|state|
+-----------+-----------+-----+
|       loc3|          3|   PA|
+-----------+-----------+-----+



# Performing Joins

In [16]:
# Joining the two dataframes on location_id
# default join is inner join
new_df = locations_df.join(transactions_df, on='location_id')

In [17]:
new_df.show()

+-----------+-----------+-----+--------+--------------+
|location_id|n_employees|state|n_orders|transaction_id|
+-----------+-----------+-----+--------+--------------+
|       loc1|          3|   NY|     2.0|             1|
|       loc1|          3|   NY|     3.0|             2|
|       loc3|          3|   PA|     5.0|             3|
+-----------+-----------+-----+--------+--------------+



# Checking the execution plans

In [18]:
new_df.explain()

== Physical Plan ==
*(5) Project [location_id#16, n_employees#17L, state#18, n_orders#23, transaction_id#24L]
+- *(5) SortMergeJoin [location_id#16], [location_id#22], Inner
   :- *(2) Sort [location_id#16 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(location_id#16, 200)
   :     +- *(1) Filter isnotnull(location_id#16)
   :        +- Scan ExistingRDD[location_id#16,n_employees#17L,state#18]
   +- *(4) Sort [location_id#22 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(location_id#22, 200)
         +- *(3) Filter isnotnull(location_id#22)
            +- Scan ExistingRDD[location_id#22,n_orders#23,transaction_id#24L]


# Pyspark MLlib

# Creating a Dataframe using spark.read.csv method

In [19]:
bank_df = spark.read.csv('Bank Salaries.csv', 
                             header=True, 
                             inferSchema=True)

In [20]:
# displaying the schema
bank_df.printSchema()

root
 |-- Employee: integer (nullable = true)
 |-- EducLev: integer (nullable = true)
 |-- JobGrade: integer (nullable = true)
 |-- YrsExper: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- YrsPrior: integer (nullable = true)
 |-- PCJob: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)



In [21]:
# displaying the first 20 rows
bank_df.show()

+--------+-------+--------+--------+---+------+--------+-----+------+----+----+
|Employee|EducLev|JobGrade|YrsExper|Age|Gender|YrsPrior|PCJob|Salary| _c9|_c10|
+--------+-------+--------+--------+---+------+--------+-----+------+----+----+
|       1|      3|       1|       3| 26|  Male|       1|   No| 32000|null|null|
|       2|      1|       1|      14| 38|Female|       1|   No| 39100|null|null|
|       3|      1|       1|      12| 35|Female|       0|   No| 33200|null|null|
|       4|      2|       1|       8| 40|Female|       7|   No| 30600|null|null|
|       5|      3|       1|       3| 28|  Male|       0|   No| 29000|null|null|
|       6|      3|       1|       3| 24|Female|       0|   No| 30500|null|null|
|       7|      3|       1|       4| 27|Female|       0|   No| 30000|null|null|
|       8|      3|       1|       8| 33|  Male|       2|   No| 27000|null|null|
|       9|      1|       1|       4| 62|Female|       0|   No| 34000|null|null|
|      10|      3|       1|       9| 31|

In [22]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


# Create a vector representation for features
assembler = VectorAssembler(inputCols=['JobGrade', 'YrsExper', 'EducLev'],outputCol="features")
train_df = assembler.transform(bank_df)


# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='Salary')
lr_model = lr.fit(train_df)


# Output statistics 
trainingSummary = lr_model.summary
print("Coefficients: " + str(lr_model.coefficients))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)

Coefficients: [3759.8973095338047,628.2390899498624,1129.697425428689]
RMSE: 5999.123340
R2: 0.714577


# Creating Data Pipelines using Pyspark for Machine Learning

In [23]:
from pyspark.ml import Pipeline

pipe_model = Pipeline(stages=[VectorAssembler(inputCols=['JobGrade', 'YrsExper', 'EducLev'],outputCol="feature"),
                             LinearRegression(featuresCol = 'feature', labelCol='Salary')]).fit(train_df)

In [24]:
type(pipe_model)

pyspark.ml.pipeline.PipelineModel

In [25]:
# Displaying the stages of the pipeline
pipe_model.stages

[VectorAssembler_b279eca27973, LinearRegression_bebf6f420eb2]

In [26]:
# Extracting the coefficients of the Linear Regression model from the pipeline
pipe_model.stages[1].coefficients

DenseVector([3759.8973, 628.2391, 1129.6974])

# Why problems can be solved using Pyspark?

PySpark can significantly accelerate analysis by making it easy to combine local and distributed data transformation operations while keeping control of computing costs. In addition, the language helps data scientists to avoid always having to downsample large sets of data. For tasks such as building a recommendation system or training a machine-learning system, using PySpark is something to consider. It is important for you to take advantage of distributed processing can also make it easier to augment existing data sets with other types of data and the example it includes like combining share-price data with weather data.