In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()

print(spark.version)


3.5.4


In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/17 19:33:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
%%writefile example.txt
first line
second line
third line
forth line

Writing example.txt


In [11]:
# Create RDD using a textFile method
textFile=sc.textFile('example.txt')

## PySpark RDD Transformations & Actions

* Transformations create a new RDD from an existing one.
* Actions trigger execution and return results to the driver.
* RDDs are immutable, meaning transformations create a new RDD.
---

# RDD Transformations

| **Transformation**    | **Description** |
|----------------------|----------------|
| `map(func)`         | Applies `func` to each element and returns a new RDD |
| `flatMap(func)`     | Similar to `map()`, but flattens nested lists |
| `filter(func)`      | Filters elements based on `func` |
| `groupByKey()`      | Groups data by key (used for key-value pairs) |
| `reduceByKey(func)` | Applies `func` to reduce values for each key |
| `sortByKey()`       | Sorts the RDD by key |
| `join(rdd2)`        | Joins two RDDs based on keys |
| `coalesce(n)`       | Reduces the number of partitions |
| `distinct()`        | Removes duplicate values |


## RDD Actions

In [14]:
textFile.count()

                                                                                

4

In [16]:
textFile.first()

'first line'

## RDD Transformation

### `filter()`: Filter specific values


In [19]:
# step 1 transformation
secfind=textFile.filter(lambda line:'line' in line)

In [21]:
# step 2 action
secfind.collect()

['first line', 'second line', 'third line', 'forth line']

In [148]:
# Filter transactions greater than 200
filtered_rdd = kv_rdd.filter(lambda x: x[1] > 200)
print("Filtered RDD:", filtered_rdd.collect())


Filtered RDD: [('129', 450.0), ('131', 300.0)]


### `groupByKey()`: Group data by key

In [151]:
# Group transactions by ServiceID
grouped_rdd = kv_rdd.groupByKey().mapValues(list)
print("Grouped RDD:", grouped_rdd.collect())

Grouped RDD: [('121', [200.0]), ('122', [150.0]), ('131', [100.0, 300.0]), ('129', [450.0])]


In [23]:
secfind.count()

4

In [1]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


In [8]:
from pyspark import SparkContext

In [10]:
sc = SparkContext()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/14 12:50:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
# Show RDD
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

In [13]:
# Map a function (or lambda expression) to each line
# Then collect the results.
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [19]:
word = text_rdd.map(lambda line: line.split())

In [21]:
# output: Splits every line into a list of the words
word.collect()

[['first'],
 ['second', 'line'],
 ['third', 'line'],
 ['the', 'a', 'forth', 'line']]

In [23]:
# Output: each string in the list
text_rdd.collect()

['first', 'second line', 'third line', 'the a forth line']

## Explore Map() vs FlatMap(): 

`flatMap()`: Flatten nested structures (Flatten nested structures). collects everything as a single flatMap Transformation.

In [26]:
# we did Transformation step, and action step in one line
text_rdd.flatMap(lambda line:line.split()).collect()

['first', 'second', 'line', 'third', 'line', 'the', 'a', 'forth', 'line']

In [140]:
# Split rows into words
words_rdd = rdd.flatMap(lambda line: line.split())
print("FlatMapped RDD:", words_rdd.take(10))


FlatMapped RDD: ['201', '10/13/2017', '100', 'NY', '131', '100.00', '204', '10/18/2017', '700', 'TX']


In [None]:
# Split rows into words
words_rdd = rdd.flatMap(lambda line: line.split())
print("FlatMapped RDD:", words_rdd.take(10))


In [15]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

## RDDs and Key Value Pairs


In [5]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00
206       10/19/2017      200       TX       131          300.00

Overwriting services.txt


In [7]:
services = sc.textFile('services.txt')

In [9]:
# First 2 elements of RDD
services.take(2)

                                                                                

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

`map()`: Apply function to each row. To split up

In [12]:
services.map(lambda line: line.split())

PythonRDD[3] at RDD at PythonRDD.scala:53

In [14]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [16]:
services.map(lambda line: line.split()).collect()

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

### Remove the hashtag

In [19]:
def hashtag_remove(line):
    if line[0]=='#':
        return line[1:]
    else: return line

In [21]:
services.map(hashtag_remove).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [23]:
# lambda expression for removing hashtag
# lambda arguments: result_if_true if condition else result_if_false
process_line = lambda line: line[1:] if line[0]=='#' else line

In [25]:
process_line('#dsdss')

'dsdss'

In [27]:
processed_rdd = services.map(process_line).collect()

In [29]:
print(processed_rdd)

['EventId    Timestamp    Customer   State    ServiceID    Amount', '201       10/13/2017      100       NY       131          100.00', '204       10/18/2017      700       TX       129          450.00', '202       10/15/2017      203       CA       121          200.00', '206       10/19/2017      202       CA       131          500.00', '203       10/17/2017      101       NY       173          750.00', '205       10/19/2017      202       TX       121          200.00']


In [11]:
# Removing hash tag. Grab everything from index 1
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00',
 '206       10/19/2017      200       TX       131          300.00']

### Now that we cleaned and removed `#` we grab the first three onj of RDD

In [13]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [15]:
clean = clean.map(lambda line: line.split())

In [17]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00'],
 ['206', '10/19/2017', '200', 'TX', '131', '300.00']]

### Grab fields:
Get the total sells per state

In [19]:
# 1. collect the state and amount columns
pairs = clean.map(lambda lst:(lst[3],lst[-1])).collect()

In [21]:
print(pairs)

[('State', 'Amount'), ('NY', '100.00'), ('TX', '450.00'), ('CA', '200.00'), ('CA', '500.00'), ('NY', '750.00'), ('TX', '200.00'), ('TX', '300.00')]


## ReduceByKey() in PySpark

### What is `reduceByKey()`?
`reduceByKey()` is a transformation in PySpark used for **aggregating values** by key in an RDD. It groups the data by key and applies a reduction function to combine the values.

#### **Syntax**
```python
rdd.reduceByKey(lambda a, b: a + b)
```
- **`a` and `b`** are values associated with the same key.
- The function (`a + b`) is applied iteratively to combine the values.

---

### **Apply `reduceByKey()`**
```python
# Sum sales amount for each state
result_rdd = rdd.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

# Collect and display the output
result = result_rdd.collect()
print(result)
```

---

### **Expected Output**
```python
[("NY", 850), ("TX", 950), ("CA", 700)]
```
Here’s how the function works internally:
- NY: `100 + 750 = 850`
- TX: `450 + 200 + 300 = 950`
- CA: `200 + 500 = 700`

---

### ** Explanation of `reduceByKey()` in Detail**
1. **Grouping by Key**
   `reduceByKey()` groups values that share the same key.
    Grouping by Key (groupBy concept inside reduceByKey), Spark automatically groups elements with the same key:

- 'NY': [100.00, 750.00]
- 'TX': [450.00, 200.00, 300.00]
- 'CA': [200.00, 500.00]
  
This step is internally done by reduceByKey() (similar to groupByKey(), but more efficient because it combines values during shuffling).

---
2. **Applying the Function**


    - Pairwise Reduction (How Lambda Works) Now, reduceByKey() applies lambda :
      lambda amt1, amt2: float(amt1) + float(amt2) pairwise on the values until only one value remains per key:
  
      
    NY:
    100.00 + 750.00 = 850.00

   
    It takes 100.00 and 750.00, applies the function (amt1 + amt2), and keeps 850.00.

   
    TX:
    450.00 + 200.00 = 650.00
    650.00 + 300.00 = 950.00

   
    First, it adds 450.00 and 200.00 → gets 650.00. Then, it takes 650.00 and 300.00 → gets 950.00.

   
    CA:
    200.00 + 500.00 = 700.00
    Just one pairwise addition needed.

   - It starts with the first two values and applies `lambda amt1, amt2: amt1 + amt2`.
   - The result is used in the next step until all values are aggregated.
     
4. **Efficient Execution**
   - `reduceByKey()` is more efficient than `groupByKey()` since it reduces data **before shuffling** across nodes.

---

### **Alternative Functions**
#### **Find Maximum Sales for Each State**
```python
max_sales_rdd = rdd.reduceByKey(lambda a, b: max(a, a))
print(max_sales_rdd.collect())
```
#### **Find Minimum Sales for Each State**
```python
min_sales_rdd = rdd.reduceByKey(lambda a, b: min(a, b))
print(min_sales_rdd.collect())
```

---

### **Conclusion**
- `reduceByKey()` helps in aggregating key-value RDDs efficiently.
- It reduces data early to optimize performance.
- Use cases include **sum, count, max, min, and other aggregations** by key.
- reduceByKey() partially reduces values before shuffling. This means it:
  
      ✅ Uses less memory
      ✅ Reduces network traffic
      ✅ Is faster for large datasets


In [23]:
pairs = clean.map(lambda lst:(lst[3],lst[-1]))

In [25]:
#  Reduce by key
# When using lambda amt1, amt2: float(amt1) + float(amt2), 
# the two variables (amt1 and amt2) 
# represent two amounts that belong to the same key (state in this case).
rekey=pairs.reduceByKey(lambda amt1,amt2:amt1+amt2)

In [27]:
rekey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00300.00'),
 ('CA', '200.00500.00')]

In [31]:
rekey = pairs.reduceByKey(lambda amt1,amt2: float(amt1)+float(amt2))

In [33]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 950.0), ('CA', 700.0)]

In [29]:
# 3. get rid of state, amount title
step3 = rekey.filter(lambda x:not x[0]== 'state)

SyntaxError: EOL while scanning string literal (3331101979.py, line 2)

In [None]:
# 3 4. sort by amount
s

In [None]:
step1 = clean.map()

In [59]:
# As previously it only concatinate string we convert it to float
rekey=pairs.reduceByKey(lambda amt1,amt2:float(amt1)+float(amt2))

In [61]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [134]:
# Extract ServiceID and Amount as key-value pairs
kv_rdd = rdd.map(lambda line: (line.split()[4], float(line.split()[-1])))
print("Mapped RDD:", kv_rdd.collect())

Mapped RDD: [('131', 100.0), ('129', 450.0), ('121', 200.0), ('131', 300.0), ('122', 150.0)]


In [39]:
%%writefile data2.txt
#ID    Name    Age    City    Score
1      John    25     NY      85.5
2      Jane    30     TX      92.0
3      Alice   27     CA      88.0
4      Bob     24     FL      79.5
5      Eve     29     WA      95.0


Writing data.txt


In [37]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

Here's your **Jupyter Notebook Markdown tutorial** for **RDD Actions** in PySpark. It includes a **pretext table** explaining each action and the corresponding **Python code** in proper markdown formatting.

---

### **RDD Actions in PySpark**  

#### **Action Description Table**
| Action             | Description |
|--------------------|------------|
| `count()`         | Counts total rows |
| `first()`         | Retrieves first row |
| `collect()`       | Converts RDD to list |
| `take(n)`        | Fetches first `n` rows |
| `distinct()`      | Returns distinct rows |
| `reduce()`        | Applies a function to aggregate values |
| `max()`           | Finds the maximum value |
| `min()`           | Finds the minimum value |
| `countByKey()`    | Counts occurrences of each key |
| `keys()`          | Retrieves all keys in an RDD |
| `values()`        | Retrieves all values in an RDD |
| `isEmpty()`       | Checks if RDD is empty |
| `takeSample(False, n)` | Takes `n` random samples |

---



In [82]:

%%writefile data.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
205       10/20/2017      404       FL       131          300.00
203       10/17/2017      305       TX       122          150.00

Overwriting data.txt


In [84]:
# To check the saved file, run:
import os
print(os.getcwd())  # Shows current working directory
print(os.listdir())  # Lists files in the directory


/Users/apple/Documents/GitHub/Python/BigData
['services.txt', '.DS_Store', 'example2.txt', 'lambda_expressions.ipynb', 'example.txt', '.ipynb_checkpoints', 'pyspark.ipynb', 'data.txt']


In [86]:
# Load data.txt into an RDD (excluding header)
rdd = sc.textFile("data.txt").filter(lambda line: not line.startswith("#"))

# Display first few lines
rdd.take(3)

['201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00']

## RDD Actions in PySpark
- Actions are operations that **trigger computation and return values to the driver**.
### **Basic Actions**

In [88]:
# Count number of records
print("Total rows:", rdd.count())



Total rows: 5


In [90]:
# Show the first row
print("First row:", rdd.first())



First row: 201       10/13/2017      100       NY       131          100.00


In [92]:
# Collect all records into a list
print("All records:", rdd.collect())



All records: ['201       10/13/2017      100       NY       131          100.00', '204       10/18/2017      700       TX       129          450.00', '202       10/15/2017      203       CA       121          200.00', '205       10/20/2017      404       FL       131          300.00', '203       10/17/2017      305       TX       122          150.00']


In [94]:
# Take the first 3 rows
print("First 3 rows:", rdd.take(3))




First 3 rows: ['201       10/13/2017      100       NY       131          100.00', '204       10/18/2017      700       TX       129          450.00', '202       10/15/2017      203       CA       121          200.00']


In [96]:
# Retrieve distinct rows
print("Distinct rows:", rdd.distinct().collect())


Distinct rows: ['204       10/18/2017      700       TX       129          450.00', '202       10/15/2017      203       CA       121          200.00', '205       10/20/2017      404       FL       131          300.00', '203       10/17/2017      305       TX       122          150.00', '201       10/13/2017      100       NY       131          100.00']


### **Numeric Operations**

In [103]:

# Extract the "Amount" column (last column) and convert to float
amount_rdd = rdd.map(lambda line: float(line.split()[-1]))

In [105]:
# Find the total sum of Amount
total_amount = amount_rdd.reduce(lambda x, y: x + y)
print("Total Amount:", total_amount)


Total Amount: 1200.0


In [107]:

# Find the maximum transaction amount
max_amount = amount_rdd.max()
print("Max Amount:", max_amount)

Max Amount: 450.0


In [109]:
# Find the minimum transaction amount
min_amount = amount_rdd.min()
print("Min Amount:", min_amount)

Min Amount: 100.0


In [111]:
# Compute average amount
avg_amount = total_amount / amount_rdd.count()
print("Average Amount:", avg_amount)

Average Amount: 240.0


### **Some other Useful Actions**

In [114]:
# Get the first 2 records as key-value pairs (ServiceID, Amount)
kv_rdd = rdd.map(lambda line: (line.split()[4], float(line.split()[-1])))
print("Key-Value pairs:", kv_rdd.take(2))

Key-Value pairs: [('131', 100.0), ('129', 450.0)]


In [116]:
# Count by key (ServiceID)
service_count = kv_rdd.countByKey()
print("Count per ServiceID:", dict(service_count))

Count per ServiceID: {'131': 2, '129': 1, '121': 1, '122': 1}


In [118]:
# Fetch all keys (ServiceIDs)
print("All ServiceIDs:", kv_rdd.keys().collect())

All ServiceIDs: ['131', '129', '121', '131', '122']


In [120]:
# Fetch all values (Amounts)
print("All Amounts:", kv_rdd.values().collect())

All Amounts: [100.0, 450.0, 200.0, 300.0, 150.0]


In [122]:
# Check if RDD is empty
print("Is RDD empty?", rdd.isEmpty())

Is RDD empty? False


In [124]:
# Take a sample (without replacement)
print("Sample Records:", rdd.takeSample(False, 2))

Sample Records: ['201       10/13/2017      100       NY       131          100.00', '204       10/18/2017      700       TX       129          450.00']


**Try using tuple unpacking for readability.**

In [49]:
x = ['ID','State','Amount']

In [53]:
def func1(lst):
    return lst[-1]

In [67]:
def func2(id_st_amt):
    # tuple unpacking
    (ids,st,amt)=id_st_amt
    return amt

In [57]:
func1(x)

'Amount'

In [69]:
func2(x)

'Amount'