# **Lab Session: Algorithms and Programming with Spark RDDs using PySpark**  

## Introduction
This lab session introduces you to foundational concepts of distributed data processing using Spark's Resilient Distributed Datasets (RDDs). You'll leverage Python and the PySpark library within the Colab environment to build, execute, and analyze various Spark programs. The session covers:

- **Setting up PySpark in Colab**: Learn to configure and initialize the SparkContext.
- **Practical Exercises**:
  1. **Word Count Problem**: Implement a Spark algorithm to analyze word frequencies in a text file.
  2. **Data Aggregation**: Compute the average quantities from a sample dataset while minimizing shuffle operations.
  3. **Join Operations**: Explore algorithms to perform equi-joins and right-outer joins on RDDs without the direct `join()` transformation.
  4. **SQL Query Encoding in Spark**: Encode and test SQL-like queries using Python MapReduce transformations.

Each exercise is designed to deepen your understanding of Spark's capabilities, RDD transformations, and actions. You'll also develop skills in optimizing performance and implementing complex operations.

**Tools Required**:
- **Python**: For Spark programming.
- **Colab Notebook**: To run your Spark scripts.

By the end of this session, you'll have hands-on experience in using Spark for solving real-world problems effectively.

## **Exercise 1: Word Count Problem**
**Objective**: Design and implement a Spark algorithm to compute word frequencies in a text file.

1. **Setup**:  
   - Ensure your Colab environment is ready with Spark installed. Run the following commands:

In [1]:
!pip install pyspark
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))




- Download or use an existing text file (e.g., `shake.txt`).

In [2]:
!wget https://www.dropbox.com/s/7ae58iydjloajvt/shake.txt

--2025-01-20 11:20:04--  https://www.dropbox.com/s/7ae58iydjloajvt/shake.txt
Resolving www.dropbox.com (www.dropbox.com)... 162.125.1.18, 2620:100:6016:18::a27d:112
Connecting to www.dropbox.com (www.dropbox.com)|162.125.1.18|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://www.dropbox.com/scl/fi/ao0ozilexrndwvfnce3kp/shake.txt?rlkey=udk3zb5n8ur7rj3xmqosgb7pz [following]
--2025-01-20 11:20:05--  https://www.dropbox.com/scl/fi/ao0ozilexrndwvfnce3kp/shake.txt?rlkey=udk3zb5n8ur7rj3xmqosgb7pz
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://uc02b526b4516ca33a6319ead1f4.dl.dropboxusercontent.com/cd/0/inline/CigyjsDGUkJlmb_GDYgkKLDBzMQmt6zk4kuTZOjQ7WhQV0qD2b2MGUq7Mw5zoDVrVvIwn5ngu5eP8jANyYUL6YmShan9ZTT7E8on4JQcNbAcYADpyEyetSJcQUN1hq9HWDE/file# [following]
--2025-01-20 11:20:05--  https://uc02b526b4516ca33a6319ead1f4.dl.dropboxusercontent.com/cd/0/inline/CigyjsDGUkJlmb_GDYgkKLDBzMQmt6z

2. **Create the RDD**:
   - Load the text file into an RDD:

In [3]:
document = sc.textFile("shake.txt")

3. **Transform and Process**:
   - Tokenize the lines into words:

In [5]:
words = document.flatMap(lambda line: line.split())

- Map each word to a key-value pair:

In [6]:
word_pairs = words.map(lambda word: (word, 1))

 - Reduce by key to count occurrences:

In [7]:
word_counts = word_pairs.reduceByKey(lambda x, y: x + y)

4. **View Results**:
   - Display the word counts:

In [8]:
print(word_counts.collect())



## **Exercise 2: Data Aggregation**
**Objective**: Compute the average quantity of each pet from a dataset and analyze shuffle operations.

1. **Setup**:  
   - Create an RDD for the dataset. For example:

In [9]:
data = [("dog", 3), ("cat", 4), ("dog", 5), ("cat", 6)]
pets = sc.parallelize(data)

2. **Aggregate Data**:
   - Calculate the total quantity and count for each pet:

In [10]:
totals = pets.mapValues(lambda qty: (qty, 1)).reduceByKey(
    lambda x, y: (x[0] + y[0], x[1] + y[1])
)

- Compute the average:

In [11]:
averages = totals.mapValues(lambda x: x[0] / x[1])


3. **Optimize Shuffle**:
   - Discuss with the class how to reduce shuffle operations, e.g., using `combineByKey`.

4. **View Results**:
   - Print the averages:

In [12]:
print(averages.collect())

[('dog', 4.0), ('cat', 5.0)]



## **Exercise 3: Join Operations**
**Objective**: Perform equi-joins and right-outer joins without the `join()` transformation.

1. **Setup**:  
   - Use two RDDs representing key-value datasets:

In [13]:
rdd1 = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])
rdd2 = sc.parallelize([("A", 4), ("B", 5), ("D", 6)])

2. **Equi-Join Implementation**:
   - Perform a cartesian product and filter:

In [14]:
equi_join = rdd1.cartesian(rdd2).filter(lambda x: x[0][0] == x[1][0]).map(
    lambda x: (x[0][0], (x[0][1], x[1][1]))
)

3. **Right-Outer Join**:
   - Extend the equi-join logic to include keys exclusive to `rdd2`.

4. **Discuss Results**:
   - Compare performance with the standard `join()` transformation.


## **Exercise 4: Encoding SQL Queries in Spark**
**Objective**: Encode SQL-like queries using Python MapReduce and test them.

1. **Setup**:  
   - Download the files:

In [15]:
!wget https://www.dropbox.com/s/tmt6u80mkrwfjkv/Customer.txt
!wget https://www.dropbox.com/s/8n5cbmufqhzs4r3/Order.txt

--2025-01-20 11:26:34--  https://www.dropbox.com/s/tmt6u80mkrwfjkv/Customer.txt
Resolving www.dropbox.com (www.dropbox.com)... 162.125.1.18, 2620:100:6016:18::a27d:112
Connecting to www.dropbox.com (www.dropbox.com)|162.125.1.18|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://www.dropbox.com/scl/fi/xa82wow8dh9modao7msn4/Customer.txt?rlkey=rlbumsfpo0ctooaxcydyz6jbv [following]
--2025-01-20 11:26:34--  https://www.dropbox.com/scl/fi/xa82wow8dh9modao7msn4/Customer.txt?rlkey=rlbumsfpo0ctooaxcydyz6jbv
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://uc2b13da6d84e90b4f887bba2bed.dl.dropboxusercontent.com/cd/0/inline/CijuuTawzEnS6gWmCQMH1Iac7U5xWziRYIdW6sm44DrvhBxBKcGYipdmHlEVLeP3We-aWw_o7zQecjNbbH1lOzFxgmfPNNcaOmOCF9Hu0O6_ZI1ovBZ01tW2AyAnENXG-o4/file# [following]
--2025-01-20 11:26:35--  https://uc2b13da6d84e90b4f887bba2bed.dl.dropboxusercontent.com/cd/0/inline/CijuuTawzEnS6gWmCQMH1I

- Load data into RDDs:

In [16]:
customer_rdd = sc.textFile("Customer.txt").map(lambda line: line.split(","))
order_rdd = sc.textFile("Order.txt").map(lambda line: line.split(","))

2. **Query 1: Customers with Orders in July**:
   - Filter customers by month:   
   ``SELECT name FROM Customer WHERE month(startDate)=7``

In [18]:
from datetime import datetime
july_customers = customer_rdd.filter(
    lambda x: datetime.strptime(x[1], "%d/%m/%Y").month == 7
).map(lambda x: x[2])
print(july_customers.collect())

['SAWYER NEWTON', 'DAVIDSON WADE', 'FULLER OWEN', 'ROBERTSON BENNETT', 'SHERMAN KELLY', 'BUSH PEARSON', 'BAILEY GARRETT', 'JACKSON SCOTT', 'KNIGHT STAFFORD', 'PECK COLE', 'CAMPBELL THOMAS', 'OSBORNE SULLIVAN', 'RHODES HARVEY', 'GRIFFIN BOYD', 'WOOD DAVIS']


3. **Query 2: Distinct Names**:
   - Use `distinct()`:  
   `SELECT DISTINCT name FROM Customer WHERE month(startDate)=7`

In [19]:
distinct_names = july_customers.distinct()
print(distinct_names.collect())

['DAVIDSON WADE', 'FULLER OWEN', 'ROBERTSON BENNETT', 'SHERMAN KELLY', 'JACKSON SCOTT', 'KNIGHT STAFFORD', 'PECK COLE', 'OSBORNE SULLIVAN', 'GRIFFIN BOYD', 'SAWYER NEWTON', 'BUSH PEARSON', 'BAILEY GARRETT', 'CAMPBELL THOMAS', 'RHODES HARVEY', 'WOOD DAVIS']


4. **Query 3: Aggregated Orders**:
   - Perform grouping and aggregation:  
`SELECT  O.cid, SUM(total), COUNT(DISTINCT total)  FROM Order O GROUP BY O.cid`

In [20]:
grouped_orders = order_rdd.map(lambda x: (x[0], float(x[1]))).groupByKey()
aggregated = grouped_orders.mapValues(
    lambda x: (sum(x), len(set(x)))
)
print(aggregated.collect())

[('513846', (1189.0, 1)), ('737120', (1423.0, 1)), ('1893664', (1057.0, 1)), ('1028371', (1376.0, 1)), ('349164', (1443.0, 1)), ('3217724', (1202.0, 1)), ('3379534', (1191.0, 1)), ('119', (3775.0, 3)), ('2351951', (1248.0, 1)), ('1839695', (1205.0, 1)), ('2732261', (1435.0, 1)), ('2962803', (1435.0, 1)), ('934469', (1198.0, 1)), ('912851', (1396.0, 1)), ('3374155', (1392.0, 1)), ('2574018', (1057.0, 1)), ('1385313', (1395.0, 1)), ('2210863', (1119.0, 1)), ('812380', (1340.0, 1)), ('3179824', (1387.0, 1)), ('994935', (1127.0, 1)), ('304949', (1292.0, 1)), ('2356312', (1477.0, 1)), ('2097298', (1290.0, 1)), ('1636076', (1243.0, 1)), ('3099347', (1083.0, 1)), ('1547444', (1093.0, 1)), ('2514211', (1033.0, 1)), ('2549436', (1387.0, 1)), ('3151238', (1442.0, 1)), ('1050335', (1441.0, 1)), ('551992', (1228.0, 1)), ('2888121', (1222.0, 1)), ('245860', (1212.0, 1)), ('1886411', (1216.0, 1)), ('1927028', (1340.0, 1)), ('1237286', (1137.0, 1)), ('1225388', (1210.0, 1)), ('284446', (1197.0, 1)), 

5. **Query 4: Join Customers and Orders**:
   - Use a key-based join:  
   `•	SELECT C.cid, O.total FROM Customer C, Order O WHERE  C.cid=O.ci`

In [22]:
join_result = customer_rdd.map(lambda x: (x[0], x[2])).join(order_rdd.map(lambda x: (x[0], x[1])))
print(join_result.collect())

[('119', ('CAMPBELL THOMAS', '1482')), ('119', ('CAMPBELL THOMAS', '1217')), ('119', ('CAMPBELL THOMAS', '1076')), ('120', ('OSBORNE SULLIVAN', '1385')), ('120', ('OSBORNE SULLIVAN', '1162')), ('120', ('OSBORNE SULLIVAN', '1117')), ('120', ('OSBORNE SULLIVAN', '1247')), ('120', ('OSBORNE SULLIVAN', '1347')), ('120', ('OSBORNE SULLIVAN', '1117'))]
