### Pyspark - RDD Concepts, GroupBy And Aggregate Functions

Jay Urbain, PhD   
3/18/2024


PySpark documentation:   
https://spark.apache.org/docs/latest/api/python/reference/index.html  

Answer the TODO items.



![](spark-cluster-overview.webp)

In [1]:
# option 1

import os

JAVA_HOME = "/home/jay/anaconda3/envs/pyspark"
os.environ["JAVA_HOME"] = JAVA_HOME

# option 2

import sys
sys.path.append("/Users/jayurbain/opt/miniconda3/envs/pyspark") 

In [2]:
from pyspark.sql import SparkSession

In order to create an RDD, first, you need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a builder() or newSession() methods of the SparkSession.

Spark session internally creates a sparkContext variable of SparkContext. You can create multiple SparkSession objects but only one SparkContext per JVM. In case if you want to create another new SparkContext you should stop existing Sparkcontext (using stop()) before creating a new one.

In [3]:
spark=SparkSession.builder.appName('Agg').getOrCreate()

24/03/20 18:57:24 WARN Utils: Your hostname, tensorbook resolves to a loopback address: 127.0.1.1; using 192.168.86.36 instead (on interface wlp0s20f3)
24/03/20 18:57:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/20 18:57:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/20 18:57:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/03/20 18:57:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
spark

#### PySpark RDD – Resilient Distributed Dataset

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable distributed collections of objects, which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

In order to create an RDD, we need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a builder() or newSession() methods of the SparkSession.

Spark session internally creates a sparkContext variable of SparkContext. We can create multiple SparkSession objects but only one SparkContext per JVM. In case if you want to create another new SparkContext you should stop existing Sparkcontext (using stop()) before creating a new one.

If you have tabular data, you should just use a DataFrame. Going down to the RDD layer is helpful for unstructured data, for example, text, images, and signal data.


In [5]:
# Create RDD from list using parallelize

dataList = [("Java", 2000), ("Python", 10000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList)

In [6]:
# Create an RDD from external data source

dataList = [("Java", 2000), ("Python", 10000), ("Scala", 3000)]
rdd2=spark.sparkContext.textFile('test3.csv')

#### RDD Operations

On PySpark RDD, you can perform two kinds of operations.

RDD transformations – Transformations are lazy operations. When you run a transformation(for example update), instead of updating a current RDD, these operations return another RDD.

RDD actions – operations that trigger computation and return RDD values to the driver.


#### RDD Transformations

Transformations on Spark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey() and return new RDD instead of updating the current.

#### PySpark DataFrame

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with  optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.


#### TODO: Is PySpark faster than pandas? Explain your thinking.**
    
    

    
    

Create a dataframe

In [None]:
df_pyspark=spark.read.csv('test3.csv',header=True,inferSchema=True)

In [None]:
df_pyspark.show()

In [None]:
df_pyspark.printSchema()

PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.

In other words, Spark SQL brings native SQL queries on Spark meaning you can run traditional ANSI SQL’s on Spark Dataframe, in the later section of this PySpark SQL tutorial, you will learn in detail using SQL select, where, group by, join, union e.t.c

In order to use SQL, first, create a temporary table on DataFrame using createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination.

Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame.

Reference:

https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html

In [None]:
df_pyspark.createOrReplaceTempView("PEOPLE")
df1=spark.sql("select Name, Departments, Salary as salary from PEOPLE")
df1.show()

In [None]:
## Groupby

# Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

In [None]:
#### TODO: Perform the operation above using PySpark SQL




In [None]:
df_pyspark.groupBy('Name').avg().show()

In [None]:
#### TODO: Use PySpark dataframe to group by departmebnnts and compute sum for each department




In [None]:
#### TODO: Use PySpark SQL to group by departmebnnts and compute sum for each department




In [None]:
#### TODO: Use PySpark dataframe to group and compute count for each department




Another method

In [None]:
df_pyspark.agg({'Salary':'sum'}).show()