# Assignment

Please answer each question to the corresponding question cell below. Your final code must have the code as well as the output of your code. You can use Saint Peter's [Azure Databricks](https://adb-7130196131129306.6.azuredatabricks.net/?o=7130196131129306#) to do this assignment.

## Questions

### Q1

What type of deployment configurations available in Spark? Evaluate each different mode in terms of interactivity, driver specs, cluster manager, and production use.

Local Mode - Local Mode: In this mode, Spark runs on a single machine with a single thread. This mode is best suited for development and testing purposes as it offers high interactivity and low latency. However, it has limited resources, and its scalability is restricted.
Standalone Mode: In this mode, Spark runs on a cluster of machines managed by Spark's built-in cluster manager. It offers better scalability and fault tolerance than local mode. It also supports dynamic allocation of resources, which allows Spark to adjust the resources allocated to a job based on its needs. However, it requires manual configuration of the cluster, which can be challenging for larger deployments.
Apache Mesos: Mesos is a general-purpose cluster manager that can be used to manage Spark clusters. It offers better resource isolation and scheduling than standalone mode and is easier to set up and manage than other cluster managers. However, it may not be as efficient as dedicated cluster managers like YARN or Kubernetes.
Hadoop YARN: YARN is a resource manager that can be used to manage Spark clusters. It offers better integration with the Hadoop ecosystem than standalone mode and supports security features like Kerberos authentication. It also offers better scalability and resource utilization than standalone mode. However, it can be complex to set up and manage, and it may not be suitable for non-Hadoop workloads.
Kubernetes: Kubernetes is an open-source container orchestration platform that can be used to manage Spark clusters. It offers better flexibility and scalability than other cluster managers and supports features like auto-scaling and workload isolation. However, it requires more setup and management than other cluster managers, and it may not be as efficient as dedicated cluster managers for large-scale deployments.

### Q2

What is a driver program in Spark, what are the responsibilities of this program, and how many driver processes can exist in a Spark cluster?

In Spark, a driver program is the main program or application that defines the computation and data processing tasks to be performed on a Spark cluster. The driver program is responsible for setting up the execution environment, creating the RDDs (Resilient Distributed Datasets) or DataFrames, defining the DAG (Directed Acyclic Graph) of transformations and actions, and submitting the tasks to the cluster for execution.
The driver program has several key responsibilities:
Defining the computation: The driver program defines the computation that needs to be performed on the data using the Spark API. It creates the RDDs or DataFrames and applies transformations and actions to the data.
Creating the DAG: The driver program creates a directed acyclic graph (DAG) of the computation, which specifies the dependencies between the RDDs and the transformations and actions that need to be applied to them.
Submitting tasks: The driver program submits the tasks to the cluster for execution, based on the DAG. It also monitors the progress of the tasks and handles any errors or failures that occur during the execution.
Coordinating with external systems: The driver program may need to interact with external systems, such as Hadoop or a database, to read or write data.
In a Spark cluster, there is only one driver program, and it runs on a separate node outside of the worker nodes. This driver program communicates with the Spark cluster's master node to coordinate the execution of tasks on the worker nodes. The driver program is responsible for managing the entire Spark application, and it must have sufficient resources to handle the size and complexity of the computation. The resources required by the driver program depend on the size of the input data, the complexity of the computation, and the amount of memory required to store the intermediate data. Therefore, it is essential to allocate enough resources to the driver program to prevent it from becoming a bottleneck for the entire Spark application.

### Q3

What is an executor, task and slots? Define each of them, write down it's responsibilities.

In Spark, an executor is a process that runs on a worker node in the cluster and is responsible for executing the tasks assigned by the driver program. Each executor runs in a separate JVM (Java Virtual Machine) instance and is allocated a fixed amount of memory and CPU cores.

A task is a unit of work that is assigned to an executor by the driver program. Tasks are created based on the transformations and actions defined in the Spark application and are executed in parallel across the worker nodes in the cluster. Each task performs a specific operation on a subset of the input data, such as filtering or aggregating, and produces a result that is passed back to the driver program.

Slots refer to the resources, such as CPU cores and memory, that are allocated to an executor. Each executor is allocated a fixed number of slots, and each slot is assigned to a specific task. The number of slots allocated to an executor depends on the available resources and the configuration of the cluster.

The responsibilities of an executor include:

Running tasks: The executor is responsible for executing the tasks assigned by the driver program. It reads the input data, performs the specified transformations and actions, and produces the output data.
Managing memory: The executor is responsible for managing the memory allocated to it. It must ensure that there is enough memory available to perform the tasks assigned to it and must release the memory when the tasks are completed.
Handling failures: The executor must handle any failures that occur during task execution, such as node failures or network errors. It must also report any errors or exceptions back to the driver program.
Interacting with external systems: The executor may need to interact with external systems, such as Hadoop or a database, to read or write data.

The responsibilities of a task include:

Performing a specific operation: The task is responsible for performing a specific operation, such as filtering or aggregating, on a subset of the input data.
Reading and writing data: The task reads the input data from the storage system and writes the output data to the storage system.
Interacting with other tasks: The task may need to interact with other tasks to perform the specified operation. For example, a reduce task may need to combine the output of multiple map tasks.
Overall, the combination of the driver program, executor, tasks, and slots work together to parallelize the computation and data processing tasks across the cluster and to ensure efficient resource utilization.

### Q4

What is a Spark DataFrame, how it is different than a table in a SQL database?

A Spark DataFrame is a distributed collection of data organized into named columns. It is similar to a table in a SQL database in that it represents a structured, tabular dataset with rows and columns. However, there are several key differences between a Spark DataFrame and a table in a SQL database.

Firstly, a Spark DataFrame is designed to handle large-scale, distributed datasets that cannot be processed on a single machine. It is built on top of Spark's distributed computing framework, which allows it to parallelize the processing of data across a cluster of machines. This makes it well-suited for big data processing tasks, such as data cleaning, feature engineering, and statistical analysis.

Secondly, a Spark DataFrame provides a rich set of APIs for manipulating data, including filtering, aggregating, and joining operations. These APIs are similar to the SQL queries used to manipulate tables in a database, but they are more expressive and flexible. For example, Spark DataFrames support complex data types, such as arrays and maps, and can handle nested structures, which can be challenging to represent in a relational database.

Thirdly, a Spark DataFrame is schema-on-read, meaning that the schema is inferred automatically from the input data when it is loaded into memory. This allows for more flexibility in the data processing pipeline, as the schema can be adjusted as needed without requiring changes to the underlying data storage. In contrast, a SQL database typically requires a pre-defined schema to be specified before data can be loaded into a table.

Overall, Spark DataFrames offer a powerful and flexible way to process large-scale, distributed datasets, with APIs that are well-suited for complex data transformations and analysis. While they share some similarities with tables in a SQL database, they are designed to handle a different set of challenges and requirements, and offer unique advantages in the context of big data processing.

### Q5

What is the difference between wide and narrow transformations, what is the role of partitions in this difference? Give example functions to each type of transformation.

In Spark, transformations are operations that create a new RDD (Resilient Distributed Dataset) from an existing one. Transformations can be categorized into two types: wide transformations and narrow transformations.

Narrow transformations are transformations where each input partition is used to create only one output partition. In other words, narrow transformations do not require shuffling of data across the network. Examples of narrow transformations include map(), filter(), and union().

Wide transformations are transformations where each input partition may be used to create multiple output partitions. Wide transformations require shuffling of data across the network, which can be a costly operation. Examples of wide transformations include groupByKey(), reduceByKey(), and join().

Partitions play an important role in the difference between wide and narrow transformations. In narrow transformations, each partition is processed independently of the other partitions, and the output of each partition is combined to produce the final RDD. In wide transformations, data must be shuffled across the network to ensure that data with the same key is processed by the same worker node. This requires data to be redistributed across partitions, which can be a costly operation.

Here are some examples of narrow and wide transformations in Spark:

Narrow transformations:

map(): Applies a function to each element in the RDD and returns a new RDD with the transformed values.
filter(): Filters the elements in the RDD based on a condition and returns a new RDD with the filtered values.
union(): Concatenates two RDDs and returns a new RDD with the combined values.
Wide transformations:

groupByKey(): Groups the elements in the RDD by key and returns a new RDD with the groups.
reduceByKey(): Aggregates the values for each key in the RDD and returns a new RDD with the aggregated values.
join(): Joins two RDDs based on a common key and returns a new RDD with the joined values.

Overall, understanding the difference between wide and narrow transformations is important for optimizing Spark jobs and improving their performance. By using narrow transformations where possible and minimizing the use of wide transformations, it is possible to reduce the amount of data shuffling required and improve the overall efficiency of the Spark job.

### Q6

What is the cost model in The Catalyst Optimizer?

The Catalyst Optimizer is a query optimizer used in Apache Spark to optimize SQL, DataFrame and Dataset queries. It uses a cost-based optimization model to generate the most efficient query execution plan based on estimated costs.

The cost model in the Catalyst Optimizer works by estimating the cost of each possible query execution plan based on a set of predefined metrics, such as data size, input/output data formats, network latency, CPU utilization, memory usage, and disk access. These metrics are used to estimate the cost of each operation in the query execution plan, such as join, filter, and sort.

The cost model uses statistical information about the data, such as column data distributions and cardinality, to estimate the cost of each operation in the query execution plan. The optimizer generates multiple candidate plans and compares their estimated costs to select the most efficient plan based on the total estimated cost.

The cost model in the Catalyst Optimizer is designed to be extensible, allowing users to customize the cost model to fit their specific needs. For example, users can define their own cost functions for specific operations or introduce new cost metrics to better capture the characteristics of their data.

Overall, the cost model in the Catalyst Optimizer is a powerful tool for optimizing query performance in Apache Spark. By estimating the cost of each possible query execution plan, it is able to generate the most efficient plan for a given query, reducing query execution time and improving overall system performance.

### Q7

Explain what happens when a user submits a Spark Application that initiates a SparkSesion and calls an action. Elaborate the role of driver, Jobs, Stages, and Tasks.

When a user submits a Spark application, it first creates a SparkSession, which is the entry point for all Spark functionality. The SparkSession is responsible for managing the connection to the Spark cluster and provides a unified interface to interact with Spark.

Once the SparkSession is created, the user can perform various transformations and actions on data using Spark APIs. Transformations are operations that create a new RDD (Resilient Distributed Dataset) from an existing one, while actions are operations that return a result to the driver program or write data to external storage.

When an action is called, the SparkSession submits a job to the cluster, which is composed of multiple stages. Each stage is a sequence of transformations that can be executed together, without the need for shuffling data between stages. The stages are divided based on the operations that require shuffling of data, which is a costly operation that involves moving data between nodes in the cluster.

The job is then divided into multiple tasks, which are the smallest unit of work in Spark. Each task is assigned to a worker node in the cluster and performs a specific computation on a subset of the data. The driver program coordinates the execution of tasks and collects the results returned by each task.

The driver program is responsible for managing the overall execution of the Spark application. It submits jobs to the cluster, collects results from tasks, and handles any errors or failures that occur during the execution.

In summary, when a user submits a Spark application that initiates a SparkSession and calls an action, the SparkSession submits a job to the cluster, which is divided into multiple stages based on the operations that require shuffling. Each stage is divided into tasks, which are assigned to worker nodes for execution. The driver program coordinates the execution of tasks and collects the results returned by each task. By breaking down the job into stages and tasks, Spark is able to execute computations in a distributed and parallel manner, improving the efficiency and scalability of the system.

Answer the following questions using the dataset below.

In [0]:
df = spark \
    .read \
    .format("csv")\
    .option("header", "true") \
    .load("/databricks-datasets/airlines/part-00000")

### Q8

Show the number of columns, and the number of rows in a tuple, ex: Pandas has `df.shape`.

In [0]:
# Print the schema to get the number of columns
df.printSchema()
num_cols = len(df.columns)

# Count the number of rows
num_rows = df.count()

# Print the number of columns and rows as a tuple
print("Number of columns and rows:", (num_cols, num_rows))


root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

### Q9

Show all the unique `UniqueCarrier`'s in this data.

In [0]:
# Get all unique UniqueCarrier values
unique_carriers = df.select("UniqueCarrier").distinct()

# Display the results
unique_carriers.show()


+-------------+
|UniqueCarrier|
+-------------+
|           UA|
|           PS|
|           TW|
|           EA|
|           WN|
|           NW|
|           HP|
|           PI|
|           CO|
|       PA (1)|
|           DL|
|           AA|
|           US|
|           AS|
+-------------+



### Q10

Show the total number of delays for each `UniqueCarrier`.

In [0]:
from pyspark.sql.functions import sum

df.groupBy("UniqueCarrier") \
  .agg(sum("ArrDelay").alias("total_delays")) \
  .show()



### Q11 - Bonus

Show the total number of delays for each `UniqueCarrier` for all the files shown below.

In [0]:
dbutils.fs.ls('/databricks-datasets/airlines/')

Out[8]: [FileInfo(path='dbfs:/databricks-datasets/airlines/README.md', name='README.md', size=1089, modificationTime=1455505834000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1437224869000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00000', name='part-00000', size=67108879, modificationTime=1437224869000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00001', name='part-00001', size=67108862, modificationTime=1437224870000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00002', name='part-00002', size=67108930, modificationTime=1437224878000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00003', name='part-00003', size=67108804, modificationTime=1437224887000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00004', name='part-00004', size=67108908, modificationTime=1437224895000),
 FileInfo(path='dbfs:/databricks-datasets/airlines/part-00005', name='part-00005', size=6710

## Notes

After you are done with the assignment, make sure:

- Code is in running state and outputs are visible along with code
- Don't print entire dataframes (use `display`, `df.show(n)`, or `df,take(n)` wherever possible)
- Export as IPython Notebook
- Export as HTML 
- Upload your work to the [Blackboard](https://saintpeters.blackboard.com/)

For more information on exporting notebooks, see [Exporting a notebook from Databricks](https://learn.microsoft.com/en-us/azure/databricks/notebooks/notebook-export-import).

Best of luck!