<a href="https://colab.research.google.com/github/zafor158/Vietnam-War-Bombing-Operations-Analysis/blob/main/PySpark_using_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Task 1
1.Create and Transform an RDD:

  **Create** an RDD from a list of integers.
* Perform the following:
* Multiply each number by 3 using map.
* Filter the numbers greater than 10 using filter.
* Collect and display the final result.

In [None]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("RDD_Transformations").getOrCreate()

In [None]:
data=[10,20,30,40,50,60,70,80,90]
rdd=spark.sparkContext.parallelize(data)
map= rdd.map(lambda x:x*3)
filter=map.filter(lambda x:x>10)
rdd1_zafor=filter.collect()
print(rdd1_zafor)


[30, 60, 90, 120, 150, 180, 210, 240, 270]


## Task 2
2.Read a Text File:
* Use textFile to read a file containing sentences (e.g., "hello world", "spark is great").

Perform the following:
* Split each line into words using flatMap.
* Count the total number of words using count

In [None]:
file_content = """hello world
spark is great
Python is amazing
Data is powerful
learning is continuous"""

filename = "lines.txt"
with open(filename, "w") as file:
    file.write(file_content)

print(f"File '{filename}' created successfully!")


File 'lines.txt' created successfully!


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReadTextFile").getOrCreate()

text_file = spark.sparkContext.textFile("/content/lines.txt")

words = text_file.flatMap(lambda line: line.split(" "))

print(words.collect())

word_count = words.count()

print("Total number of words:", word_count)

spark.stop()

['hello', 'world', 'spark', 'is', 'great', 'Python', 'is', 'amazing', 'Data', 'is', 'powerful', 'learning', 'is', 'continuous']
Total number of words: 14


# Task 3
3.GroupByKey and ReduceByKey:
* Create an RDD with pairs of student names and their scores, e.g., [("Alice", 85),
("Bob", 90), ("Alice", 95)].

Use:
* groupByKey to group scores by student.
* reduceByKey to calculate the total  score for each student.

In [None]:
pairs=[("Alice", 85), ("Bob", 90), ("Alice", 95),("Bob",54)]
rdd=spark.sparkContext.parallelize(pairs)
rdd3_zafor=rdd.groupByKey().mapValues(list)
print(rdd3_zafor.collect())
rdd3_zafor=rdd.reduceByKey(lambda x,y:x+y)
print(rdd3_zafor.collect())

[('Alice', [85, 95]), ('Bob', [90, 54])]
[('Alice', 180), ('Bob', 144)]


# Task 4
RDD Persistence:
* Create an RDD from a large list (simulate it by generating numbers from 1 to 1,000,000).
* Perform multiple actions (e.g., count, sum) without caching and measure execution
time.
* Repeat with cache or persist and compare the performance.

In [None]:
import time
# Without caching
start_time = time.time()
data = list(range(1, 1000001))
rdd = spark.sparkContext.parallelize(data)
count = rdd.count()
sum_val = rdd.sum()
end_time = time.time()
print(f"Without caching - Count: {count}, Sum: {sum_val}")
print(f"Time taken: {end_time - start_time:.4f} seconds")

# With caching
start_time = time.time()
rdd.cache()  # or rdd.persist()
count = rdd.count()
sum_val = rdd.sum()
end_time = time.time()
print(f"With caching - Count: {count}, Sum: {sum_val}")
print(f"Time taken: {end_time - start_time:.4f} seconds")

# Further actions (demonstrating the benefit of caching)
start_time = time.time()
count = rdd.count()
sum_val = rdd.sum()
end_time = time.time()
print(f"With caching (repeated actions) - Count: {count}, Sum: {sum_val}")
print(f"Time taken: {end_time - start_time:.4f} seconds")

Without caching - Count: 1000000, Sum: 500000500000
Time taken: 1.7239 seconds
With caching - Count: 1000000, Sum: 500000500000
Time taken: 1.2471 seconds
With caching (repeated actions) - Count: 1000000, Sum: 500000500000
Time taken: 1.0951 seconds


# Task 5
Custom Transformations:
* Create an RDD of numbers.
* Write a custom transformation to identify and filter out prime numbers.

In [None]:
def is_prime(n):
    if n <= 1:
        return False
    for i in range(2, int(n**0.5) + 1):
        if n % i == 0:
            return False
    return True

numbers = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
rdd = spark.sparkContext.parallelize(numbers)

prime_rdd = rdd.filter(is_prime)

prime_numbers = prime_rdd.collect()
print(f"Prime numbers: {prime_numbers}")

Prime numbers: [2, 3, 5, 7, 11, 13]


# Task 6
Transformation and Action Workflow:
* Load a dataset (e.g., CSV or text file) containing the following:
```
Product,Category,Price
Laptop,Electronics,800
Shoes,Clothing,50
Phone,Electronics,500
```
Perform the following:
* Filter products with a price greater than 100.
* Map to get only the product names.
* Count the number of products in each category using map and reduceByKey.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD_Transformations").getOrCreate()
data = [
    ("Laptop", "Electronics", 800),
    ("Shoes", "Clothing", 50),
    ("Phone", "Electronics", 500),
    ("Tablet", "Electronics", 300),
    ("Shirt", "Clothing", 25),
    ("Headphones", "Electronics", 100),
    ("Watch", "Accessories", 150),
    ("Sunglasses", "Accessories", 80),
    ("Jacket", "Clothing", 120),
    ("Backpack", "Accessories", 60),
    ("Television", "Electronics", 1000),
    ("Microwave", "Home Appliances", 200),
    ("Refrigerator", "Home Appliances", 1200),
    ("Jeans", "Clothing", 40),
    ("Sweater", "Clothing", 35),
    ("Blender", "Home Appliances", 70),
    ("Gaming Console", "Electronics", 400),
    ("Desk Lamp", "Furniture", 45),
    ("Office Chair", "Furniture", 150),
    ("Table", "Furniture", 300)
]

rdd = spark.sparkContext.parallelize(data)
#rdd.saveAsTextFile("data1.txt")
#new_rdd=spark.sparkContext.textFile("/content/data1.txt")# read textfile data
# Filter products with a price greater than 100
filtered_rdd = rdd.filter(lambda x: x[2] > 100)

# Map to get only the product names and categories
product_category_rdd = filtered_rdd.map(lambda x: (x[0], x[1]))

# Count the number of products in each category
product_counts = product_category_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)

#Collect and print the results
print(f"Products with price greater than 100: {filtered_rdd.collect()}\n")
print(f"Product and categories: {product_category_rdd.collect()}\n")
print("Number of products in each category:")
for category, count in product_counts.collect():
    print(f"{category}: {count}")


spark.stop()

Products with price greater than 100: [('Laptop', 'Electronics', 800), ('Phone', 'Electronics', 500), ('Tablet', 'Electronics', 300), ('Watch', 'Accessories', 150), ('Jacket', 'Clothing', 120), ('Television', 'Electronics', 1000), ('Microwave', 'Home Appliances', 200), ('Refrigerator', 'Home Appliances', 1200), ('Gaming Console', 'Electronics', 400), ('Office Chair', 'Furniture', 150), ('Table', 'Furniture', 300)]

Product and categories: [('Laptop', 'Electronics'), ('Phone', 'Electronics'), ('Tablet', 'Electronics'), ('Watch', 'Accessories'), ('Jacket', 'Clothing'), ('Television', 'Electronics'), ('Microwave', 'Home Appliances'), ('Refrigerator', 'Home Appliances'), ('Gaming Console', 'Electronics'), ('Office Chair', 'Furniture'), ('Table', 'Furniture')]

Number of products in each category:
Electronics: 5
Accessories: 1
Furniture: 2
Clothing: 1
Home Appliances: 2


# Task 7
Integration with Spark SQL:

* Load a JSON dataset containing information about students:
```
[{"name": "Alice", "age": 20, "grade": "A"},
{"name": "Bob", "age": 22, "grade": "B"}]
```


Perform the following:
* Register the data as a temporary SQL table.
* Query students who have a grade "A" using Spark SQL.
* Save the result to a new JSON file.


In [None]:
# Initialize SparkSession
spark = SparkSession.builder.appName("SparkSQLIntegration").getOrCreate()

# JSON dataset
data = '''[
    {"name": "Alice", "age": 20, "grade": "A"},
    {"name": "Bob", "age": 22, "grade": "B"},
    {"name": "Moe", "age": 21, "grade": "A"},
    {"name": "Jane", "age": 19, "grade": "C"},
    {"name": "Tom", "age": 23, "grade": "B"},
    {"name": "Sara", "age": 20, "grade": "A"},
    {"name": "Liam", "age": 24, "grade": "D"},
    {"name": "Emma", "age": 22, "grade": "B"},
    {"name": "Noah", "age": 21, "grade": "A"},
    {"name": "Olivia", "age": 20, "grade": "C"}
]'''

json_file = "students.json"

# Save the data to a JSON file
with open(json_file, "w") as f:
    f.write(data)

# Load the JSON file into a Spark DataFrame
df = spark.read.json(json_file)

# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("students")

# Query students with grade "A" using Spark SQL
result_df = spark.sql("SELECT * FROM students WHERE grade = 'A'")

# Save the result to a new JSON file
output_file = "filtered_students.json"
# Save the result to a new JSON file with overwrite mode
result_df.write.mode("overwrite").json(output_file)


# Display the result
result_df.show()

# Stop the Spark session
spark.stop()

+---------------+---+-----+-----+
|_corrupt_record|age|grade| name|
+---------------+---+-----+-----+
|           NULL| 20|    A|Alice|
|           NULL| 21|    A|  Moe|
|           NULL| 20|    A| Sara|
|           NULL| 21|    A| Noah|
+---------------+---+-----+-----+



# Task 8
Advanced Word Count with Sorting:

Extend the word count program to:
* sort words by their frequency in descending order.
* Display the top 5 most frequent words.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AdvancedWordCount").getOrCreate()

# Sample text data
text_data = ["hello world hello", "spark is great", "hello spark", "world is big", "hello world"]
rdd = spark.sparkContext.parallelize(text_data)

# Split into words and count frequencies
word_counts = rdd.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)


# Sort by frequency in descending order
sorted_word_counts = word_counts.sortBy(lambda x: x[1], ascending=False)

# Get the top 5 most frequent words
top_5_words = sorted_word_counts.take(5)

# Display the results
print("Top 5 most frequent words:")
for word, count in top_5_words:
    print(f"{word}: {count}")

spark.stop()

Top 5 most frequent words:
hello: 4
world: 3
is: 2
spark: 2
great: 1


# Task 9
Custom Aggregations with aggregateByKey:
* Create an RDD with pairs of cities and temperatures, e.g., [("NY", 32), ("LA",
75), ("NY", 28)].

* Use aggregateByKey to calculate the average temperature for each city

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomAggregations").getOrCreate()

# Create an RDD with city-temperature pairs
city_temps = [("NY", 32), ("LA", 75), ("NY", 28), ("LA", 80), ("Chicago", 45), ("NY", 35), ("Chicago", 50), ("LA", 70)]
rdd = spark.sparkContext.parallelize(city_temps)

# Use aggregateByKey to calculate the average temperature for each city
def seq_op(acc, temp):
    # Accumulator: (sum of temperatures, count of temperatures)
    return (acc[0] + temp, acc[1] + 1)

def comb_op(acc1, acc2):
    return (acc1[0] + acc2[0], acc1[1] + acc2[1])

zero_value = (0, 0) # Initial accumulator value

avg_temps = rdd.aggregateByKey(zero_value, seq_op, comb_op) \
    .mapValues(lambda x: x[0] / x[1])

# Collect and print the results
for city, avg_temp in avg_temps.collect():
    print(f"Average temperature in {city}: {avg_temp}")

spark.stop()

Average temperature in NY: 31.666666666666668
Average temperature in LA: 75.0
Average temperature in Chicago: 47.5
