### **Task 1:** Explore RDD in spark

**Introduction:**
Apache Spark is a powerful distributed computing framework known for its speed and ease of use. At the heart of Spark lies the Resilient Distributed Dataset (RDD), a fundamental abstraction that represents distributed data collections across the Spark cluster. RDDs provide fault-tolerant, parallelized operations on data, enabling efficient processing of large-scale datasets.

**RDD (Resilient Distributed Dataset):**

**Definition and Characteristics:** RDD is a fundamental abstraction in Apache Spark, representing a distributed collection of elements that can be operated on in parallel.

Key characteristics include immutability, fault tolerance, and laziness.

**Immutability:** Once created, RDDs cannot be changed. However, you can apply transformations to derive new RDDs.

**Resilience to Failures and Lineage Graph:** RDDs are resilient to failures due to their lineage graph, which tracks the sequence of transformations applied to the base dataset. In case of data loss or failure, Spark can recompute lost partitions by tracing back the lineage graph and reapplying transformations from the original dataset.

**Lazy Evaluation:** Transformations on RDDs are lazily evaluated, meaning they are not executed immediately but remembered for future execution. This allows Spark to optimize the execution plan by combining multiple transformations and executing them in a single pass.

**RDD Operations:**

**Transformation Operations:** Transformation operations create a new RDD from an existing RDD by applying a function to each element of the RDD.

Examples include:

*   map(func): Applies a function to each element and returns a new RDD.
*   filter(func): Filters elements based on a predicate function.
*   flatMap(func): Similar to map, but each input item can be mapped to zero or more output items.

These operations are lazily evaluated and do not trigger execution until an action operation is called.

**Action Operations:** Action operations are operations that trigger the execution of transformations and return results to the driver program or write data to external storage systems.

Examples include:
*   reduce(func): Aggregate the elements of the RDD using a specified function.
*   collect(): Return all elements of the RDD to the driver program.
*   count(): Return the number of elements in the RDD.
*   take(n): Return the first n elements of the RDD.

Action operations are eagerly evaluated and cause Spark to execute the previously defined transformations.

**Narrow vs. Wide Transformations:**

**Narrow Transformations:** Transformations where each input partition contributes to only one output partition. Examples include map, filter, etc. These transformations can be computed in parallel without shuffling data across partitions.

**Wide Transformations:** Transformations where each input partition contributes to multiple output partitions. Examples include groupByKey, reduceByKey, sortByKey, etc. These transformations may require shuffling data across partitions, leading to increased network I/O and reduced performance.

**RDD Persistence:**

**Caching and Persistence Levels:**

RDD persistence refers to storing the contents of an RDD in memory or disk for reuse.
Spark supports different persistence levels, including MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, etc.

**Caching:** Caching RDDs in memory can significantly improve performance by avoiding recomputation of costly transformations.

**Benefits of Caching RDDs:** Caching is beneficial when the same RDD is used multiple times, the RDD is costly to compute, or iterative algorithms are applied to the RDD.

**Partitioning in RDD:**

**Concept of Partitions:**

Partitions are the basic units of parallelism in Spark, representing subsets of the RDD's data distributed across the cluster.
RDDs are divided into partitions to enable parallel processing.
Spark operations are performed independently on each partition, allowing for efficient distributed processing.

**Custom Partitioning Strategies:**

Spark allows custom partitioning strategies to control how data is distributed across partitions.
Users can define custom partitioners based on specific criteria, such as key-based partitioning for groupByKey or reduceByKey operations.
Custom partitioning strategies can optimize data locality and improve performance for certain types of operations.

**RDD Lineage:**

**Understanding RDD Lineage:**

RDD lineage refers to the sequence of transformations applied to an RDD to derive a new RDD.
Spark maintains the lineage graph, which records the dependencies between RDDs and the sequence of transformations applied.
Lineage information allows Spark to recover from failures by reapplying transformations from the original dataset.

**Importance in Fault Tolerance and Recovery:**

RDD lineage is crucial for fault tolerance and recovery in Spark.
In case of data loss or failure, Spark can recompute lost partitions by tracing back the lineage graph and reapplying transformations from the original dataset.
Lineage information ensures that Spark applications can recover from failures without data loss, making them resilient to faults.

 A simple code implementation demonstrating the concepts of RDDs in Apache Spark using Python and PySpark:

In [5]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "RDDExample")

# Create an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform transformation operations
squared_rdd = rdd.map(lambda x: x*x)
filtered_rdd = squared_rdd.filter(lambda x: x > 10)

# Perform action operations
result = filtered_rdd.collect()
count = filtered_rdd.count()

# Output the result
print("Filtered elements:", result)
print("Count of filtered elements:", count)

# Stop SparkContext
sc.stop()


Filtered elements: [16, 25]
Count of filtered elements: 2


A PySpark code to find the sum of squares of numbers greater than 10 from a given list using RDD operations:

In [6]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "RDDExample")

# Create an RDD from a list
data = [1, 2, 3, 4, 5, 11, 12, 13]
rdd = sc.parallelize(data)

# Filter numbers greater than 10 and calculate sum of squares
filtered_rdd = rdd.filter(lambda x: x > 10)
squared_rdd = filtered_rdd.map(lambda x: x*x)
sum_of_squares = squared_rdd.reduce(lambda x, y: x + y)

# Output the result
print("Sum of squares of numbers greater than 10:", sum_of_squares)

# Stop SparkContext
sc.stop()


Sum of squares of numbers greater than 10: 434


### **Task 2:** In PySpark, create a program that reads a CSV file containing sales data, performs data cleaning by handling missing values and removing duplicates, calculates the total sales amount for each product, and finally, outputs the results to a new CSV file. Ensure to use transformations and actions in your PySpark script

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=cd7507e18bb9572da97e3fce1f37d4ffdbaaf1004e302d5ed695c6e8144fdcc1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, count, desc

# Create a SparkSession
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

# Read the CSV file into a DataFrame
sales_data = spark.read.option("header", "true").csv("/sales_data.csv")

# Print the schema of the DataFrame
sales_data.printSchema()

# Handle missing values
cleaned_data = sales_data.na.drop()

# Remove duplicates
distinct_data = cleaned_data.dropDuplicates()

# Calculate the total sales amount for each product
result = distinct_data.groupBy("Product Category") \
                      .agg(sum("Total Amount").alias("Total Sales")) \
                      .sort(desc("Total Sales"))

# Show the result
result.show()

# Write the result to a new CSV file
result.coalesce(1).write.option("header", "true").csv("/output.csv")

# Stop the SparkSession
spark.stop()

root
 |-- Transaction ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price per Unit: string (nullable = true)
 |-- Total Amount: string (nullable = true)

+----------------+-----------+
|Product Category|Total Sales|
+----------------+-----------+
|     Electronics|   156905.0|
|        Clothing|   155580.0|
|          Beauty|   143515.0|
+----------------+-----------+

