# Spark - Exploratory Data Analysis

In this courselet, we will explore the use of Spark as a Data Science tool when working with Big Data. By the end of this courselet, you would be able to:

- Perform a descriptive analysis
- Compare the performance of Spark under different data formats (csv vs. parquet)
- Identify the advantages of using Spark in a cluster, by relying on different optimization strategies.

## What is Spark?

[Apache Spark](https://spark.apache.org/) is an open-source engine used for the processing and analysis of large-scale data. Today, Spark is one of the most popular frameworks for the developement of data engineering, data science and machine learning tasks.

## Advantages of Spark

- Multi-language: It can be implemented through multiple-languages (Python, R, Java, Scala), making it flexible for diverse users
- Speed: Spark leverages on RAM to perform it's computational tasks, which translates into faster data processing
- Parallelism and fault tolerance: Spark is designed to be used in the context of a multi-node cluster. Therefore, its design aims for a parallel data processing across nodes, as well as fault tolerance in the case that a node presents a failure
- Wide community of users: Given it's popularity, there is a huge open-source community, which allows Spark to be constantly developed and improved

## Disadvantages of Spark

- Memory consumption: Given that Spark uses RAM to optimize speed in processing, this can also lead to several instances of *OutOfMemory* error if not managed properly 
- Manual fine-tuning and optimization: To optimize the overall performance of Spark, the user must manually [tune the settings](https://www.databricks.com/glossary/spark-tuning) of the cluster in terms of hardware. This process is not straightforward and a inadequate setting up might result in a non-optimal perfomance
- Inneficient management of small files: For cases of small datafiles, Spark is not an optimal framework, given the high transaction cost of distributing and collecting data across nodes. For the case of small datafiles, using Pandas or a more conventional approach is more efficient

## Importing libraries and starting our Spark session.

To start working, we are first going to import our main library and initialize both our [SparkSession](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession) and our [SparkContext](https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/SparkContext.html).

In [None]:
import pyspark
from pyspark.sql import SparkSession
# Starting our Spark Session
spark = SparkSession.builder \
    .appName("EDA Spark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Starting our Spark Context
sc = spark.sparkContext

## RDD

The [**Resilient Distributed Dataset (RDD)**](https://spark.apache.org/docs/latest/rdd-programming-guide.html) is the main data structure in Spark. It consists of a collection of object distributed across the nodes of the cluster. This allows for the processes to be parallelized across the nodes, and to guarantee fault tolerance. RDD's posses two fundamental types of operations: [**transformations**](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),  which are operations that convert RDD's into new RDD's, and [**actions**](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions), which return non-RDD values/objects.

A fundamental piece of Spark's design is the concept of [**lazy evaluation**](https://towardsdatascience.com/3-reasons-why-sparks-lazy-evaluation-is-useful-ed06e27360c4). This concept refers to an execution plan in which Spark does not perform a computational task until explicitly intructred to do so. When we perform a transformation and create a new RDD, Spark does not perform any computation at that point. Instead, what it creates is an execution plan. It is not until we perform an action, when Spark actually executes the task and returns a new object. In this way, the user can perform as many transformations as desired, but non of them will be executed until an action is performed, achieving a more efficient use of the resources.

Let's now work with an example RDD.

In [None]:
# We start by creating a list of values
lst = [i for i in range(1000000)]
# Executing the function "parallelize", converts our list into an RDD
rdd = sc.parallelize(lst)
rdd # As we will see, the representation of the RDD is not a list anymore

In [None]:
# Performing a transformation. 
squared_rdd = rdd.map(lambda x: x**2) # Transforming our RDD into a new one with the values squared
squared_rdd # As we stated previously, the previous transformation created a new RDD, but no computation has been performed

In [None]:
squared_rdd = squared_rdd.collect() # Performing and action to collect our RDD into a list again
squared_rdd[:16] # Now we have a list of squared values. We only show the first 16 values

In [None]:
# Another transformation
filtered_rdd = rdd.filter(lambda x: x % 2 == 0) # Filtering our RDD into a new one with even numbers. This is a transformation
filtered_rdd.count() # Returning the count of even numbers. This is our action

The previous example showed a series of tasks over a list which was transformed into an RDD. However, the creation of RDD's is not limited to lists. Multiple data sources can be transformed into RDD's to take advantage of the distribution principle. In the next cells of code, we are going to perform a very common task, which is counting the frequencies of words in a given text. For that, we are going to do the following:

1. Define a stopwords list
2. Load our shakespeare.txt file, which is a compilation with all the works from William Shakespeare (a big text file)
3. Perform a series of transformations, before calling an action and returning a list of tuples with the 10 most frequent words in Shakespeare works, and their frequency

In [None]:
# This is the list of stopwords used by the nltk library. We defined it here to save time installing the library.
stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 
'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 
'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 
'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves',
 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are',
 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing',
 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 
'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 
'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 
'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 
'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 
's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y',
 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 
'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn',
 "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 
'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]

In [None]:
shakespeare = spark.read.text("data/shakespeare.txt") 
shakespeare # By using the read.text function, our txt file is read as a DataFrame. We will explore further on DataFrames after this example.

In [None]:
top_words = (
    shakespeare
    .rdd.flatMap(lambda line: line[0].split(" ")) # First, we convert the DataFrame into an RDD, we "flatten" the elements of the RDD and split them by space. Basically, we make a list of all the words
    .filter(lambda word: word.lower() not in stop_words and word != "") # Now we filter each word lowered, removing stop words and empty strings 
    .map(lambda word: (word, 1)) # For each word, we create a tuple with the word (which represents the key) and a value of 1 as a count. This tuple represents a Key-Value pair (K,V)
    .reduceByKey(lambda x, y: x + y) #For each Key, we perform the specified function on it's values. In this case, it is a sum of the Values with the same Key (x+y)
    .sortBy(lambda x: x[1], ascending=False) # We sort the tuples by the count, in descending order
    .take(10) # Finally, we perform a "take" action and return the first 10 tuples
)

top_words

## DataFrames

Another essential data structure in Spark, particularly for data analysis, are DataFrames, or Distributed DataFrames. This type of DataFrame is built on top of RDD's, but with the advantage of having a better defined structure, providing a schema and facilitating data interaction and query execution. It also follows Spark's principle of distributed data processing across the different nodes.

For this courselet, we are exploring the taxi trips reported to the City of Chicago in 2020. This data is publicly available through the [Chicago Data Portal](https://data.cityofchicago.org/en/Transportation/Taxi-Trips-2020/r2u4-wwk3/about_data). 

We will proceed to load our data.

In [None]:
df = spark.read.csv("data/chicago-taxi-2020.csv",header=True, inferSchema=True)

As the our first actions, we can print the schema of the data, and count the number of rows.

In [None]:
df.printSchema()

In [None]:
df.count()

## DataFrames Operations

In the next cells of codes, we are going to explore some basic ways to manipulate our DataFrames.

In [None]:
# We can take a subset of the data
df.select('Trip ID', 'Trip Total')

As it happens with RDD's, dataframes also follow the principle of lazy evaluation. In this case, when we called for a subsetting of the dataframe, Spark only returned a logical representation of it. If we want to actually display values, we must perform an action. 

In [None]:
df.select('Trip ID', 'Trip Total').show(10) # We are going to show the first 10 rows

In [None]:
# We can apply filters to our df's columns
df.filter(df["Tips"] > 0).show(5)

Results don't look good right? They are not readable at all. Maybe we should perform a different type of action method. 

The *.toPandas()* method allows us to convert an RDD into a Pandas dataframe. It is especially recommended if we expect the output of our query to be small. Let's use that to show the first 5 observations of our filtered df:

In [None]:
df.filter(df["Tips"] > 0).limit(5).toPandas()

In [None]:
# Remove duplicated rows
df.distinct()

In [None]:
# Sort df by a column
df.orderBy(df["Fare"].desc()).limit(5).select('Trip ID', 'Fare').toPandas()

In [None]:
# Transform and create new columns - Using \ allows us to continue our transformations sequentially while ordering our code better

df.withColumn("Fare in Euros", df["Fare"] * 0.92) \
  .orderBy(df["Fare"].desc()) \
  .limit(5) \
  .select('Trip ID', 'Fare in Euros') \
  .toPandas()

# Descriptive Statistics

To support our operations, we are going to use the collections of functions provided by pyspark.sql. You can consult the full list in the [source documentation.](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.max.html)

In [None]:
import pyspark.sql.functions as F

In [None]:
# Mean of a column
df.select(F.mean("Trip Total"))

As you can observe, our previous line of code has not returned the mean yet. Instead, it returned a DataFrame object. We must perform an action to display the mean. The following line of code extends the previous one with the extension of the *show.()* action.

In [None]:
df.select(F.mean("Trip Total")).show()

In [None]:
# Mean of multiple columns
df.select(F.mean("Trip Total"), F.mean("Trip Seconds")).show()

In [None]:
# Max value of a column
df.select(F.max("Trip Total")).show()

In [None]:
# Calculating the average trip total by community area.
df.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total")
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"]) \
    .orderBy(F.col("Pickup Community Area")).show()

In [None]:
# Evaluating the missingness rate per column
missingness = df.agg(*[(1 - (F.count(F.col(c)) / F.count('*')))
         .alias(c + '_missing_rate') for c in df.columns])
missingness.toPandas().transpose()

# Spark SQL

One of the main features in Spark is the incorporation of an SQL module which allows us to analyze strcutured data through the use of SQL Queries. Let's take a look at some examples.

In [None]:
df.createOrReplaceTempView("trips") # We create this temporary view so Spark recognizes our DataFrame as a table when writing queries.

In [None]:
# Select the Taxi ID that charged the most expensive trip
spark.sql("""
    SELECT `Taxi ID`
    FROM trips
    WHERE `Trip Total` = (SELECT MAX(`Trip Total`) FROM trips)""").show()

In [None]:
# Select the 5 companies with the highest avg fares
spark.sql("""
    SELECT company, AVG(Fare) as avg_fare
    FROM trips
    GROUP BY Company
    ORDER BY avg_fare DESC
    LIMIT 5
""").show()

## Visualizing Data

Spark does not count with a dedicated library or API for data visualization. However, we can generate non-RDD objects and plot them through the use of conventional python libraries like Matpolib or Seaborn. 

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# We create a list of for each column to visualize.
trip_seconds = df.select('Trip Seconds').rdd.flatMap(lambda x: x).collect() 
trip_total = df.select('Trip Total').rdd.flatMap(lambda x: x).collect()

sns.scatterplot(x=trip_seconds, y=trip_total)
plt.title('Scatter Plot of Trip Seconds vs Trip Total')
plt.xlabel('Trip Seconds')
plt.ylabel('Trip Total')
plt.show()

In [None]:
pandas_df = spark.sql("""
    SELECT company, AVG(`Trip Miles`) as avg_trip
    FROM trips
    GROUP BY Company
    ORDER BY avg_trip DESC
    LIMIT 5
""").toPandas() # Another approach is to query our table, and create a simple PandasDF from it to plot it.

sns.barplot(x='company', y='avg_trip', data=pandas_df)
plt.title('Top 5 Companies with the longest AVG Trips')
plt.xticks(rotation=45)
plt.show()

## CSV vs Parquet

So far, we have returned some simple results using the data in csv format. However, in some cases, data might be available for our convenience in the [Parquet file format](https://parquet.apache.org/#td-block-1). Given it's features and design, this open-source format allows Spark to make faster reads to the data, and tends to have smaller file sizes compared to csv. As we have our taxis data also in parquet format, we are going to load a new DataFrame using this file format, and compare the execution time between the csv DataFrame and the Parquet DataFrame. 

In [None]:
df_parquet = spark.read.parquet("data/chicago-taxi-2020.parquet",header=True, inferSchema=True)
df_parquet.printSchema() # We confirm the schema is the same

In [None]:
import time

# For this comparison, we are executing our previous query of calculating the avg fare grouped by Pickup Community Area

# CSV Performance
start_csv = time.time()
df.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total") \
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"])\
    .orderBy(F.col('Pickup Community Area')).show()
end_csv = time.time()
total_csv = end_csv - start_csv
print(f"Execution Time (CSV): {total_csv}\n")

# Parquet Performance
start_parquet = time.time()
df_parquet.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total") \
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"])\
    .orderBy(F.col('Pickup Community Area')).show()
end_parquet = time.time()
total_parquet = end_parquet - start_parquet
print(f"Execution Time (Parquet): {total_parquet}")

As we see, in this first example, using a parquet format resulted in a faster execution, providing the same results. Let's look at a second example.

In [None]:
# A more simple example, returning the avg trip total and the avg trip duration

# CSV Performance
start_csv = time.time()
df.select(F.mean("Trip Total"), F.mean("Trip Seconds")).show()
end_csv = time.time()
total_csv = end_csv - start_csv
print(f"Execution Time (CSV): {total_csv}\n")

# Parquet Performance
start_parquet = time.time()
df_parquet.select(F.mean("Trip Total"), F.mean("Trip Seconds")).show()
end_parquet = time.time()
total_parquet = end_parquet - start_parquet
print(f"Execution Time (CSV): {total_parquet}")

## Fine Tuning and Improving Performance

To take the most advantage of Spark and improve the overall performance of our cluster, it is important to understand and determine the right parameter configuration in our environment.

The following are a series of features and techniques that can enhance Spark's overall performance.

**Right number of partitions**

Identifying the right number of partitions is key to achieve a solid performance of our computational tasks. A partition refers to a portion of the data distributed among the cores. 

To change the number of partitions in our RDD, Spark possesses [two different functions](https://sparkbyexamples.com/spark/spark-repartition-vs-coalesce/):

- [*.repartition()*](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.repartition.html): Can be used to either increase or decrease the number of partitions by shuffling the data and redistributing it in *n* partitions.
- [*.coalesce()*](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html): Can only be used to decrease the number of partitions without shuffling the data. For some contexts, this is a more efficient way to decrease the number of partions.

It is important to take into consideration that partitioning is an expensive task, and therefore, increasing the number of partitions in our data does not necessarily represents an improvement in performance. 

To start our fine-tuning, let's get the current number of partitions in our Parquet DataFrame:

In [None]:
df_parquet.rdd.getNumPartitions() 

We can repartition our DataFrame and see if we achieve a better performance. In the following cell of code, we will iterate over different partition sizes, repeat our previous grouping task, and identify which size is the optimal given our current resources and the data.

In [None]:
def task_time(ddf, partitions):
    ddf_repartitioned = ddf.repartition(partitions) # Repartition the RDD
    start_time = time.time()
    ddf_repartitioned.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total") \
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"])\
    .orderBy(F.col('Pickup Community Area'))
    end_time = time.time()
    return end_time - start_time

partitions = [5, 10, 15, 20,
              25, 30, 35, 40, 
              45, 50]

times = [task_time(df_parquet, p) for p in partitions]

optimal_partitions = partitions[times.index(min(times))] # We identify the partition size with the lowest time

print(f"The optimal number of partitions is {optimal_partitions} with a time of {min(times)} seconds\n")

# Plotting
plt.plot(partitions, times)
plt.xlabel('Number of Partitions')
plt.ylabel('Time taken (seconds)')
plt.title('Effect of Partitioning on Performance')
plt.show()

As we can observe in the graph, an initial increasing in the number of partitions improves the computation time. However, as the number of partitions increases, the repartitioning does not offer an improval in performance.

**Data Persistence**

Another way to improve our performance is by persisting our recurrently used RDD's of Dataframes. By persisting our data, we avoid the task of recomputing the data from the source everytime an action is called. There are different [persistence levels](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence) available for set-up and the choice mostly depends on the size and type of data we are working with. To persist our data, we have two different methods:

-[*.persist()*](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.spark.persist.html): With this method, the user defines the storage level to persist the data.

-[*cache()*](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.cache.html): With this method, data is cached with the default level *MEMORY_AND_DISK*.

Let's make a comparison of the performance of a query in our parquet DataFrame before and after caching.

In [None]:
# Pre-caching
start_pre = time.time()
df_parquet.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total") \
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"])\
    .orderBy(F.col('Pickup Community Area')).show()
end_pre = time.time()
total_pre = end_pre - start_pre
print(f"Is the dataframe cached: {df_parquet.is_cached}")
print(f"Execution Time (Pre-cached): {total_pre}\n")

In [None]:
df_parquet.cache()
df_parquet.take(1) #We need to perform a first action to actually cache the data.

In [None]:
# Cached
start_post = time.time()
df_parquet.groupBy("Pickup Community Area") \
    .agg(F.avg("Trip Total") \
    .alias("AVG Trip Total")) \
    .na.drop(subset=["Pickup Community Area"])\
    .orderBy(F.col('Pickup Community Area')).show()
end_post = time.time()
total_post = end_post - start_post
print(f"Is the dataframe cached: {df_parquet.is_cached}")
print(f"Execution Time (Cached): {total_post}\n")

In [None]:
# Unpersisting our data
df_parquet.unpersist()

**Bucketing**

[Splitting the data into *buckets*](https://medium.com/@diehardankush/what-all-about-bucketing-and-partitioning-in-spark-bc669441db63) is another way to partition our data. In this approach, instead of partitioning the data into an n number of partitions, *buckets* can be created by partitioning the data by an specific column or set of columns. This technique is parlicularly useful when we constantly perform aggregated operations or joins. Let's take a look at an example.

In [None]:
# Non-bucketed querying
start = time.time()
df_parquet.groupBy("Pickup Community Area").agg({"Trip ID": "count", "Fare": "avg"}).show()
end = time.time()
print(f"Time: {end-start} seconds")

In [None]:
df_parquet.write.bucketBy(10, "Pickup Community Area").sortBy("Pickup Community Area").saveAsTable("bucket_taxi")
bucketed_df = spark.table("bucket_taxi")

# Bucketed querying
start = time.time()
bucketed_df.groupBy("Pickup Community Area").agg({"Trip ID": "count", "Fare": "avg"}).show()
end = time.time()
print(f"Time: {end-start} seconds")

As we stated at the beginning, finding the proper configuration is a task that highly depends on the type of data you're working with, as well as your available resources. 

In this courselet we have explored some techniques, mostly to illustrate their implementation, although some of them might not have led to an actual improvement in the performance of our cluster. You're invitated to learn more about fine tuning by reading at the relevant documentation.