
## Spark RDD-

**RDD-Resilient Distributed Datasets** is a distributed collection of memory (not a distributed file system) where the data needed is always stored and kept available in RAM. It is the primary underlying data structure of Spark.

## Features:
* **In-memory computation**:  In-memory computing improves the performance of data processing tasks by caching and processing data in memory, rather than reading it from disk every time it is needed. This approach significantly reduces I/O overhead and speeds up data processing, giving edge over HDFS.

* **Lazy Evaluation**: Spark doesn't immediately execute transformations on its data; instead, it records the sequence of transformations as a logical plan and only executes them when an action is called. This deferred execution strategy offers several advantages, including optimization opportunities and fault tolerance.
*  **Fault tolerant through lineage and recomputation**: it ensures that data and computations remain intact and can recover from failures, such as node crashes. RDDs achieve fault tolerance through lineage and recomputation.
 **Lineage** - Spark maintains a lineage for each RDD. The lineage is a directed acyclic graph (DAG) that represents the sequence of transformations applied to create the RDD. Each RDD in the lineage knows how it was derived from its parent RDD(s). This lineage information is stored as metadata.
 **Recomputation**- In the event of a node failure or data loss, Spark can use the lineage information to recompute only the lost partitions of an RDD. Since RDDs are immutable (their content cannot be changed), Spark can reliably reapply the sequence of transformations to reconstruct the lost data.
* **Immutable**: Access provided by RDD is read-only. Immutability means that once you create an RDD, you cannot change its content. Instead, any operation that you perform on an RDD creates a new RDD as a result.
* **Partitioning**: An RDD is logically divided into smaller, manageable chunks called partitions. Partitions are the basic units of parallelism in Spark. Each partition contains a subset of the data and is processed independently by worker nodes in the cluster.


In [1]:
!pip install pyspark

import pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=1d3953d883b4b2580282be3d7db5cd4dde832f13497649d542b4c949d26857eb
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [12]:
# Creating Spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("APP").getOrCreate()

**map()**

In [7]:
numbers = [1, 2, 3, 4, 5, 6, 7]
numbers = spark.sparkContext.parallelize(numbers)
numbers.map(lambda x: x**2).collect()

[1, 4, 9, 16, 25, 36, 49]

The parallelize function is used to create a parallelized distributed collection, typically an RDD (Resilient Distributed Dataset), from an existing collection in your driver program. It allows you to take a dataset that exists in your driver program (e.g., a list, array, or other iterable) and distribute it across the nodes of a Spark cluster for parallel processing.

In [9]:
lists = ['alpha', 'sigma', 'beta']
lists = spark.sparkContext.parallelize(lists)
lists.map(lambda line: line.upper()).collect()

['ALPHA', 'SIGMA', 'BETA']

**flatMap()** is similar to map, but it can return an iterable of multiple elements for each input element. It flattens the results into a single iterable.

In [10]:

original_list = [1, 2, 3, 4, 5]
mapped_list = [item for sublist in map(lambda x: [x, x * 2], original_list) for item in sublist]

print(mapped_list)

[1, 2, 2, 4, 3, 6, 4, 8, 5, 10]


The flatMap function is a feature of Apache Spark's RDD (Resilient Distributed Dataset) and other similar functional programming libraries, not a built-in Python list method.

In [14]:

original_list = [1, 2, 3, 4, 5]

original_list = spark.sparkContext.parallelize(original_list)

# Using flatMap to create a new list with each element and its double
result_list = original_list.flatMap(lambda x: [x, x * 2]).collect()

# Display the result
print(result_list)


[1, 2, 2, 4, 3, 6, 4, 8, 5, 10]


Transforms an RDD so that each RDD element can be converted to multiple
new elements with the provided function

In [15]:
texts = ["Quick, Fox", "Lazy, Dog"]
texts = spark.sparkContext.parallelize(texts)
texts.flatMap(lambda line: line.split(', ')).collect()


['Quick', 'Fox', 'Lazy', 'Dog']

## filter()
can be used to trim out information that you don’t need

In [17]:
texts = ["Instructor, RasAlGhul", "Student 1, Batman", "Student 2, Arrow"]
texts = spark.sparkContext.parallelize(texts)
texts.filter(lambda line: "Student" in line).collect()

['Student 1, Batman', 'Student 2, Arrow']

In [18]:
#Without RDD code

texts = ["Instructor, RasAlGhul", "Student 1, Batman", "Student 2, Arrow"]
list(filter(lambda line: "Student" in line, texts))

['Student 1, Batman', 'Student 2, Arrow']

## distinct()
return a new RDD that contains distinct elements of the source RDD

In [22]:

data = spark.sparkContext.parallelize([1, 1, 3, -5, 'data', 'data'])
print(data.distinct().collect())

[1, 3, -5, 'data']


## groupByKey()

groupByKey() transformation is used to group the elements of an RDD (Resilient Distributed Dataset) by a key or a key-value pair. It groups together all the elements that have the same key into a sequence (an iterable) of values associated with that key

In [23]:

# Create an RDD with key-value pairs
data = [(1, 'A'), (2, 'B'), (1, 'C'), (2, 'D'), (3, 'E')]
rdd = spark.sparkContext.parallelize(data)

# Use groupByKey() to group the data by keys
grouped_rdd = rdd.groupByKey()

# Iterate through the grouped data
for key, values in grouped_rdd.collect():
    print(f"Key: {key}, Values: {list(values)}")



Key: 2, Values: ['B', 'D']
Key: 1, Values: ['A', 'C']
Key: 3, Values: ['E']


While groupByKey() is a useful transformation, it may not be the most efficient option for all use cases, especially when dealing with large datasets. In some cases, you might prefer to use operations like reduceByKey(), aggregateByKey(), or combineByKey() for grouped operations, as they provide better performance optimizations by reducing data shuffling.

## reduceByKey()

In Apache Spark, the reduceByKey() transformation is used to perform a reduction operation on the values associated with each key in an RDD (Resilient Distributed Dataset). It combines values with the same key using a specified function and produces a new RDD with one entry for each distinct key.

In [24]:
data = [("A", 2), ("B", 3), ("A", 4), ("B", 5), ("C", 1)]
rdd = spark.sparkContext.parallelize(data)

# Use reduceByKey() to sum the values for each key
sums = rdd.reduceByKey(lambda x, y: x + y)

# Collect and display the results
result = sums.collect()
for key, value in result:
    print(f"Key: {key}, Sum: {value}")

Key: C, Sum: 1
Key: A, Sum: 6
Key: B, Sum: 8


reduceByKey() is a powerful transformation for aggregating data based on keys and can be used for a variety of operations beyond just summing values, such as finding the maximum, minimum, or performing custom aggregations. It's particularly efficient because it performs the aggregation locally on each partition before shuffling the data across partitions, which reduces data movement and improves performance.

## sortBy()

In Apache Spark, the sortBy() transformation is used to sort the elements of an RDD (Resilient Distributed Dataset) or DataFrame based on a specified ordering criterion. It allows you to control the sorting order (ascending or descending) and specify the key by which you want to sort the data.

In [25]:

data = [5, 2, 8, 1, 7]
rdd = spark.sparkContext.parallelize(data)

# Use sortBy() to sort the RDD in ascending order
sorted_rdd = rdd.sortBy(lambda x: x)

# Collect and display the sorted result
sorted_data = sorted_rdd.collect()
print(sorted_data)


[1, 2, 5, 7, 8]


In [26]:
data = spark.sparkContext.parallelize([('Z', 99), ('B', 3), ('C', 4)])
## Sort RDD by values in the ascending order
print(data.sortBy(lambda pair: pair[1]).collect())
## Sort RDD by values in the descending order
print(data.sortBy(lambda pair: -pair[1]).collect())
## Sort RDD by keys
print(data.sortBy(lambda pair: pair[0]).collect())

[('B', 3), ('C', 4), ('Z', 99)]
[('Z', 99), ('C', 4), ('B', 3)]
[('B', 3), ('C', 4), ('Z', 99)]


In [27]:
data1 = spark.sparkContext.parallelize(['Arizona', 'California', 'Texas'])
data2 = spark.sparkContext.parallelize(['Arizona', 'Nevada'])

## subtract()

**subtract()** transformation is used to compute the set difference between two RDDs (Resilient Distributed Datasets). It returns an RDD containing only the elements that exist in the first RDD but not in the second RDD, effectively subtracting the elements of the second RDD from the first RDD.

In [28]:
print(data1.subtract(data2).collect())

['Texas', 'California']


## intersection()

**intersection()** returns a new RDD containing elements that are common to both of the input RDDs, effectively finding the set intersection of the two RDDs.

In [29]:
print(data1.intersection(data2).collect())

['Arizona']


## union()

**union()** transformation is used to combine two RDDs (Resilient Distributed Datasets) into a single RDD. It creates a new RDD that contains all the elements from both input RDDs without removing any duplicates.

In [30]:
print(data1.union(data2).collect())

['Arizona', 'California', 'Texas', 'Arizona', 'Nevada']


## Spark Actions

**collect()** action retrieves all the elements of an RDD or DataFrame and returns them to the driver program as an array or list. Be cautious when using collect() with large datasets because it can potentially exhaust the driver's memory.

In [31]:
data = spark.sparkContext.parallelize([1, 3, 1, 3, 3, -2])
print(data.collect())

[1, 3, 1, 3, 3, -2]


**count()** action returns the number of elements in an RDD or the number of rows in a DataFrame.

In [32]:
data = spark.sparkContext.parallelize([1, 3, 1, 3, 3, -2])
print(data.count())

6


**take(n)** action returns the first n elements of an RDD or the first n rows of a DataFrame as an array or list.

In [33]:
data = spark.sparkContext.parallelize([1, 3, 1, 3, 3, -2])
print(data.take(2))

[1, 3]


**countByValue()** return how many times each element occur in the RDD

In [37]:
data = spark.sparkContext.parallelize(["A", "B", "A", "A", "C", "B"])
print(data.countByValue())

defaultdict(<class 'int'>, {'A': 3, 'B': 2, 'C': 1})


**reduce(func)** action aggregates the elements of an RDD using a specified function. It's commonly used for computing sums, products, or custom aggregations.

In [38]:
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)

# Use the reduce() action to compute the sum of elements
def add(x, y):
    return x + y

sum_result = rdd.reduce(add)

# Display the sum result
print("Sum of elements:", sum_result)

Sum of elements: 15


**saveAsTextFile(path)** save the RDD to files in avdirectory. Will create the directory if it doesn’t exist and will fail if it does.

In [51]:
data = ["Hello", "World", "Apache", "Spark"]
rdd = spark.sparkContext.parallelize(data)

# Specify the directory where you want to save the text files
output_dir = "/content/drive/MyDrive/output.txt"

# Use saveAsTextFile() to save the RDD elements as text files
rdd.saveAsTextFile(output_dir)