# Parallelizing a Collection in Spark

Parallelizing a collection is a simple way to create an **RDD (Resilient Distributed Dataset)** in Spark.  
This method allows you to distribute existing collections like `List`, `Array`, or `Set` across a cluster  
for parallel processing.


In [1]:
from pyspark import SparkContext
from pyspark.rdd import RDD
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ParallelizingExample") \
    .getOrCreate()

In [2]:
spark

In [3]:
# Create an RDD by parallelizing a range of numbers
numberRDD = spark.sparkContext.parallelize(range(1, 10))

# Collect the data from the RDD
result = numberRDD.collect()

# Output the result
print(result)

[1, 2, 3, 4, 5, 6, 7, 8, 9]


# HOW DO WE CREATE RDDS?

# 1. Creating an RDD from External Datasets

While parallelizing collections is simple, it is not suitable for large datasets. Large datasets are often stored in distributed file systems like HDFS, and Spark provides APIs like `textFile()` to read such data efficiently.  

The `textFile()` method reads a file and creates an RDD where each element represents a line from the file.

### Example
Use the file path as an argument to the `textFile()` method to create the RDD.


In [60]:
# Define the file path
file_path = "data/textfiles/sample_log.txt"

# Create an RDD by reading the text file
logRDD = spark.sparkContext.textFile(file_path)

# Collect the data from the RDD
result = logRDD.collect()

# Print the result
print(result)

['', '127.0.0.1 - - [16/Nov/2024:10:00:00 +0000] "GET /index.html HTTP/1.1" 200 1024', '127.0.0.1 - - [16/Nov/2024:10:05:00 +0000] "POST /login HTTP/1.1" 302 512', '192.168.1.1 - - [16/Nov/2024:10:10:00 +0000] "GET /dashboard HTTP/1.1" 200 2048', '10.0.0.1 - - [16/Nov/2024:10:15:00 +0000] "GET /profile HTTP/1.1" 404 128']


# 2. Transforming an RDD in Spark

RDDs (Resilient Distributed Datasets) are immutable, meaning they cannot be modified directly.  
However, you can create a new RDD by applying transformations to an existing RDD using methods provided by Spark.

For example, you can use the `filter()` transformation to create a new RDD containing only the even or odd numbers from an existing RDD.


In [7]:
# Transform numberRDD to contain only even numbers
evenNumberRDD = numberRDD.filter(lambda num: num % 2 == 0)

# Collect and display the results
result = evenNumberRDD.collect()
print(result)

[2, 4, 6, 8]


# 3. Creating an RDD from a DataFrame

Though DataFrames are preferred for performance, converting to RDDs is useful in cases like handling unstructured data, custom partitioning after heavy computations, or integrating with legacy RDD code. Use DataFrames when possible for better efficiency.

Let's create a DataFrame and convert it into an RDD:


In [8]:
rangeDf = spark.range(1,5)
rangeRDD = rangeDf.rdd
rangeRDD.collect()

[Row(id=1), Row(id=2), Row(id=3), Row(id=4)]

# Counting ERROR and INFO Log Messages in a Log File

Now that we have a basic understanding of how to create RDDs, let's write a simple program to read a log file and count the number of messages with log levels of `ERROR` and `INFO`.

The following code reads a log file, filters the messages with the log levels `ERROR` and `INFO`, and counts the occurrences:

In [17]:
# Define file path
filePath = "data/textfiles/sample_log.log"


# Read the log file as an RDD
logRDD = spark.sparkContext.textFile(filePath)

# Collect the data from the RDD
result1 = logRDD.collect()
print(result1)



In [18]:
# Filter the lines where the log contains 'INFO' or 'ERROR'
resultRDD = logRDD.filter(lambda line: len(line.split(" - ")) >= 4 and line.split(" - ")[2] in ['INFO', 'ERROR']) \
                  .map(lambda line: (line.split(" - ")[2], 1)) \
                  .reduceByKey(lambda x, y: x + y)

# Collect and print the results
resultRDD.collect()

[('INFO', 2), ('ERROR', 2)]

# Transformations in Spark with Python Code Examples

## Transformations Overview
Transformations in Spark transform existing RDDs into new RDDs. They are lazy and categorized into:
1. **Narrow Transformations**: Data transformation without shuffle (e.g., `map`, `filter`).
2. **Wide Transformations**: Involve shuffling data across partitions.


## Narrow Transformations

#### `map()`
Applies a function to each element of an RDD, producing a new RDD with the same number of elements.


In [23]:
spark.sparkContext.parallelize(range(1, 11)).map(lambda x: x * 2).collect()


[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

### `flatMap()`

The `flatMap()` transformation applies a function that returns an iterator to each element of an RDD. It creates a new RDD with more elements, making it useful when multiple elements are needed from a single input element. For example, an RDD containing lines can be transformed into another RDD containing individual words.


In [26]:
spark.sparkContext.parallelize(["It's fun to learn Spark", "This is a flatMap example using Python"])\
.flatMap(lambda x : x.split(" ")).collect()

["It's",
 'fun',
 'to',
 'learn',
 'Spark',
 'This',
 'is',
 'a',
 'flatMap',
 'example',
 'using',
 'Python']

### `filter()`
The `filter()` transformation applies a function to filter out elements that do not meet the specified condition. It creates a new RDD containing only the elements that satisfy the condition.

Example:
Filter numbers greater than 5 from an RDD containing numbers 1 to 10.

In [28]:
# Python Example of `filter()`

rdd = spark.sparkContext.parallelize(range(1, 11))
filtered_rdd = rdd.filter(lambda x: x > 5).collect()

print(filtered_rdd)

[6, 7, 8, 9, 10]


## `union()`
The `union()` transformation combines elements from two RDDs into a new RDD. It does not remove duplicates and behaves similarly to UNION ALL in SQL

Example:
Combining two RDDs containing numbers 1-5 and 5-10.

In [29]:
# Python Example of `union()`
firstRDD = spark.sparkContext.parallelize(range(1, 6))
secondRDD = spark.sparkContext.parallelize(range(5, 11))
union_rdd = firstRDD.union(secondRDD).collect()

print(union_rdd)

[1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10]


## `mapPartitions()`
The `mapPartitions()` transformation provides more control by applying a function to each partition, accepting an iterator as input and returning an iterator as output.

Example:
Multiply each element by 2 using mapPartitions()

In [32]:
# Python Example of `mapPartitions()`
rdd = spark.sparkContext.parallelize(range(1, 11), 2)
result_rdd = rdd.mapPartitions(lambda iterOfElements: [e * 2 for e in iterOfElements]).collect()

print(result_rdd)

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


## Wide Transformations in Spark
Wide transformations involve a shuffle of data between partitions, requiring more computing resources such as memory, disk, and network bandwidth. These transformations include operations like `groupByKey()`, `reduceByKey()`, `join()`, `distinct()`, and `intersect()`. The result of a wide transformation is computed using data from multiple partitions.

## `distinct()`
The `distinct()` transformation removes duplicate elements from an RDD and returns a new RDD with only unique elements.

In [33]:
# Python Example of `distinct()`
rdd = spark.sparkContext.parallelize([1, 1, 2, 2, 3, 3, 4, 4])
distinct_rdd = rdd.distinct().collect()

print(distinct_rdd)

[1, 2, 3, 4]


## `sortBy()`
The `sortBy()` transformation sorts elements in an RDD based on a given key. It allows sorting by a function applied to each element. 
In the following example, we sort our RDD in descending order using the second element of the tuple

In [34]:
# Python Example of `sortBy()`
rdd = spark.sparkContext.parallelize([('Rahul', 4), ('Aman', 2), ('Shrey', 6), ('Akash', 1)])
sorted_rdd = rdd.sortBy(lambda x: -x[1]).collect()

print(sorted_rdd)

[('Shrey', 6), ('Rahul', 4), ('Aman', 2), ('Akash', 1)]


## `intersection()`
The `intersection()` transformation finds the common elements between two RDDs.
Like union() transformation, intersection() is also a set operation between two RDDs, but involves a shuffle. The
following examples show how to find common elements between two RDDs using intersection():

In [35]:
# Python Example of `intersection()`
firstRDD = spark.sparkContext.parallelize(range(1, 6))
secondRDD = spark.sparkContext.parallelize(range(5, 11))
intersection_rdd = firstRDD.intersection(secondRDD).collect()

print(intersection_rdd)

[5]


## `subtract()`
The `subtract()` transformation removes elements in one RDD that exist in another RDD.
Let's create two RDDs: The first one has numbers from 1 to 10 and the second one has elements from 6 to 10. If we
use subtract(), we get a new RDD with numbers 1 to 5:

In [37]:
# Python Example of `subtract()`
firstRDD = spark.sparkContext.parallelize(range(1, 11))
secondRDD = spark.sparkContext.parallelize(range(6, 11))
subtract_rdd = firstRDD.subtract(secondRDD).collect()

print(subtract_rdd)

[1, 2, 3, 4, 5]


## `cartesian()`
The `cartesian()` transformation performs a cartesian join of two RDDs, resulting in the cartesian product of both.

In [39]:
# Python Example of `cartesian()`
firstRDD = spark.sparkContext.parallelize(range(3))
secondRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
cartesian_rdd = firstRDD.cartesian(secondRDD).collect()

print(cartesian_rdd)

[(0, 'A'), (0, 'B'), (0, 'C'), (1, 'A'), (1, 'B'), (1, 'C'), (2, 'A'), (2, 'B'), (2, 'C')]


# Actions in Spark

Actions trigger the execution of transformations and return results to the driver or external storage. They don't create new RDDs but perform operations on the existing ones. Here are some common actions:

### 1. `collect()`
Returns all elements of an RDD to the driver. Use cautiously due to memory limitations.

Example:
```python
spark.sparkContext.parallelize(range(10)).collect()


## 2. `count()`
Returns the number of elements in an RDD.

In [106]:
spark.sparkContext.parallelize(range(10)).count()

10

## 3. `take()`
Returns the first N elements from an RDD

In [44]:
spark.sparkContext.parallelize(range(10)).take(2)

[0, 1]

## 4. `top()`
Returns the top N elements from the RDD.

In [47]:
spark.sparkContext.parallelize(range(10)).top(2)

[9, 8]

## 5. `takeOrdered()`
Returns N elements sorted by a custom order.

In [48]:
spark.sparkContext.parallelize(range(10)).takeOrdered(3, key=lambda x: -x)

[9, 8, 7]

## 6. `first()`
Returns the first element of an RDD

In [50]:
spark.sparkContext.parallelize(range(10)).first()

0

## 7. `countByValue()`
Counts occurrences of each element in the RDD.

In [51]:
spark.sparkContext.parallelize(["A", "A", "B"]).countByValue()

defaultdict(int, {'A': 2, 'B': 1})

## 8. `reduce()`
Combines RDD elements in parallel (e.g., summing values).

In [54]:
spark.sparkContext.parallelize(range(1, 11)).reduce(lambda x, y: x + y)

55

# Pair RDDs in Spark
A Pair RDD is a special type of RDD in Apache Spark that contains key-value pairs. Pair RDDs are useful for a variety of operations like joins and aggregations. Spark provides optimized transformations for Pair RDDs, making them very powerful for data processing tasks.

## 1. `groupByKey()`
- The `groupByKey()` transformation groups the values of a Pair RDD based on their key. It does not perform any aggregation, it simply groups the data.

In [102]:
pairRDD = spark.sparkContext.parallelize([(1, 5), (1, 10), (2, 4), (3, 1), (2, 6)])
result = pairRDD.groupByKey().collect()
for pair in result:
    print('key -', pair[0], ', value -', list(pair[1]))

key - 1 , value - [5, 10]
key - 2 , value - [4, 6]
key - 3 , value - [1]


## 2. `reduceByKey()`
- The `reduceByKey()` transformation aggregates values by key using a specified reduction function. It minimizes the data shuffle by performing a local aggregation first and then a global aggregation across nodes.

In [101]:
pairRDD = spark.sparkContext.parallelize([(1, 5), (1, 10), (2, 4), (3, 1), (2, 6)])
result = pairRDD.reduceByKey(lambda x, y: x + y).collect()
print(result)

[(1, 15), (2, 10), (3, 1)]


## 3. `sortByKey()`
The `sortByKey()` transformation sorts a Pair RDD based on its keys. By default, the sorting is in ascending order, but you can specify a custom order.

In [104]:
pairRDD = spark.sparkContext.parallelize([(1, 5), (1, 10), (2, 4), (3, 1), (2, 6)])
sortedRDD = pairRDD.sortByKey().collect()
print(sortedRDD)

[(1, 5), (1, 10), (2, 4), (2, 6), (3, 1)]


Note: You can change the sorting order by passing a custom ordering function to `sortByKey()`. e.g., `sortByKey(keyfunc=lambda k: -k)` for descending order

## 4. `join()`
The `join()` transformation joins two Pair RDDs based on their keys. It returns an RDD with the keys and the corresponding pairs of values from the two RDDs.

In [105]:
salesRDD = spark.sparkContext.parallelize([("US", 20), ("IND", 30), ("UK", 10)])
revenueRDD = spark.sparkContext.parallelize([("US", 200), ("IND", 300)])
joinedRDD = salesRDD.join(revenueRDD).collect()
print(joinedRDD)

[('IND', (30, 300)), ('US', (20, 200))]


Other Pair RDD Transformations
- `aggregateByKey()`: Allows performing custom aggregations for each key.
- `cogroup()`: Groups data from two Pair RDDs based on their keys.
- `leftOuterJoin()`, `rightOuterJoin()`: Join two RDDs with the inclusion of unmatched keys.
- `subtractByKey()`: Removes the keys and their values from the first RDD that are present in the second RDD.

# Caching and Checkpointing

Caching and checkpointing are important features in Spark that can significantly improve the performance of your Spark jobs.

## Caching

Caching data in memory is one of the main features of Spark. You can cache large datasets in memory or on disk, depending on your cluster hardware. Caching is useful in the following scenarios:
- When you use the same RDD multiple times.
- To avoid recomputing an RDD that involves heavy computation, such as `join()` and `groupByKey()`.

If you need to run multiple actions on an RDD, it's a good idea to cache it in memory to avoid recomputing the same RDD. 


In this example, Spark will compute the baseRDD twice to perform the `take()` and `count()` actions. 
However, by caching the RDD, Spark computes the RDD only once and then performs actions on the cached data. 
This becomes more beneficial when working with large datasets, where recomputing an RDD can be expensive.

Spark doesn't immediately cache the data after calling `cache()`. Instead, it makes a note of the caching operation, and once it encounters the first action, it will compute and cache the RDD based on the specified caching level.

In [63]:
baseRDD = spark.sparkContext.parallelize(range(1, 11))
baseRDD.count()

10

In [64]:
baseRDD = spark.sparkContext.parallelize(range(1, 11))
baseRDD.cache()  # Caching baseRDD
baseRDD.count()

10

## Checkpointing

The life cycle of a cached RDD ends when the Spark session terminates. If you have computed an RDD and want to use it in another Spark program without recomputing it, you can use the `checkpoint()` operation. This operation stores the RDD content on disk, making it available for later use.

### Example

Let's go through an example of how to use checkpointing in Spark:)

In [70]:
baseRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
spark.sparkContext.setCheckpointDir("data/tables/checkpointing")
baseRDD.checkpoint()

# Understanding Partitions

Data partitioning plays a crucial role in distributed computing, as it defines the degree of parallelism for applications. Understanding and defining partitions properly can significantly improve the performance of Spark jobs. There are two ways to control the degree of parallelism for RDD operations:

- `repartition()` and `coalesce()`
- `partitionBy()`

## `repartition()` versus `coalesce()`

Partitions of an existing RDD can be changed using `repartition()` or `coalesce()`. These operations redistribute the RDD based on the number of partitions provided.

- **`repartition()`**: 
  - Used to increase or decrease the number of partitions.
  - Involves heavy data shuffling across the cluster.
  - It can be used when there is a need to increase parallelism or adjust the number of partitions based on the workload.
  
- **`coalesce()`**:
  - Used to **only decrease** the number of partitions.
  - Generally does **not trigger a shuffle**, which makes it more efficient when reducing partitions.
  - It is ideal for optimizing execution time after heavy filtering operations. However, if the number of partitions provided is much smaller than the number of nodes in the cluster, data will be shuffled across some nodes.

### Example Use Case

- **When to Use `repartition()`**: If you need to increase the number of partitions significantly, for example, when your data is too large to fit in a small number of partitions and you want to take advantage of more parallelism.
  
- **When to Use `coalesce()`**: After filtering or when you want to optimize the performance by reducing the number of partitions (e.g., after a `filter()` operation where data is sparsely distributed across many partitions).

### Summary

- `repartition()` can both increase or decrease the number of partitions, but it incurs a shuffle.
- `coalesce()` can only decrease the number of partitions, and in most cases, it avoids a shuffle, improving performance for certain workloads.


The `repartition()` is not that bad. In some cases, when your job
is not using all the available slots, you can repartition your data to run it
faster.

## `partitionBy()` in Spark

The `partitionBy()` operation in Spark allows you to control the number of partitions in an RDD, especially for operations that shuffle data, such as `groupByKey()` or `join()`. By using a partitioning function, like `HashPartitioner`, you can optimize performance by redistributing the data across partitions.

#### Key Points:

- **Shuffling Operations**: Operations like `groupByKey()` or `join()` can accept an additional parameter to specify the number of partitions for the resulting RDD.
  
- **`partitionBy()` Usage**: This operation redistributes the RDD data based on the specified partitioning function, which minimizes shuffling, especially in join operations.

## 1. Using `groupByKey()` to control the number of partitions:


In [92]:

# Create an RDD with 3 partitions
baseRDD = spark.sparkContext.parallelize([("US", 20), ("IND", 30), ("UK", 10)], 3)

# Print the number of partitions
print(baseRDD.getNumPartitions())

3


In [93]:
groupedRDD = baseRDD.groupByKey(2)  # Group data into 2 partitions
print(groupedRDD.getNumPartitions())

2


## 2. Using `partitionBy()` to control the number of partitions:

In [98]:
# Create an RDD with 3 partitions
baseRDD = spark.sparkContext.parallelize([("US", 20), ("IND", 30), ("UK", 10)], 3)

# Repartition the RDD using partitionBy with 2 partitions
partitionedRDD = baseRDD.partitionBy(2)

# Persist the partitioned RDD if it's going to be used frequently
partitionedRDD.persist()

# Check the number of partitions after partitionBy
print(partitionedRDD.getNumPartitions())


2


## Drawbacks of Using RDDs

- **Opaque Code**: RDD code can sometimes be unclear, making it difficult for developers to understand what exactly the code is computing.
  
- **Lack of Optimizations**: Spark cannot optimize RDDs as effectively, especially when it comes to lambda functions. For example, Spark might not perform operations like `filter()` before wide transformations (e.g., `reduceByKey()` or `groupByKey()`), even when it could improve performance.

- **Performance on Non-JVM Languages**: RDDs are slower when used in non-JVM languages like Python and R. This is due to the overhead of creating a separate virtual machine for Python/R alongside the JVM, which involves data transfer between the VMs, significantly increasing execution time. 

- **No Built-in Optimizations**: Unlike DataFrames, RDDs do not benefit from Spark’s Catalyst optimizer, leading to potentially suboptimal execution plans.


# DataFrames in Spark

DataFrames are an abstraction of RDD APIs and provide a more efficient way to process structured data. They are distributed collections of data organized into rows and columns, similar to tables in a relational database. 
DataFrames enable users to perform data processing on data from various sources, including RDDs, different file formats, and databases.

## Features of DataFrames

- **Support for Various Data Sources**: DataFrames can process data from a wide range of sources, such as:
  - Files in different formats (CSV, AVRO, JSON, etc.)
  - Storage systems (Hive, HDFS, RDBMS)
  
- **Scalability**: DataFrames can handle data volumes ranging from kilobytes to petabytes, making them suitable for both small and large datasets.

- **Optimized Query Processing**: DataFrames leverage the Spark SQL query optimizer, which ensures that data processing is performed efficiently and distributed across multiple nodes.

- **Multi-Language API Support**: DataFrames support APIs in multiple languages, including:
  - Java
  - Scala
  - Python
  - R

# Running SQL on DataFrames in Spark

In Spark, you can run SQL queries on DataFrames by creating temporary views. These views can be either **local** or **global**.

## Temporary Views on DataFrames

Temporary views allow you to run SQL queries within a single session. After creating a temporary view, you can run SQL queries directly on the DataFrame and get the result as another DataFrame.


In [4]:
# Create a temporary view on the DataFrame
sales_df.createOrReplaceTempView("sales")

# Run SQL query on the DataFrame
sqlDF = spark.sql("SELECT * FROM sales")

# Show the results
sqlDF.show()

NameError: name 'sales_df' is not defined