<img src="uva_seal.png">  

## Key Value Pairs

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: August 20, 2023

---  

### SOURCES 

1. Learning Spark 1st Ed., Chapter 4: Working with Key/Value Pairs

### OBJECTIVES
1. Learn about properties and methods for pair RDDs


### CONCEPTS AND FUNCTIONS
- Pair RDDs  
- Partition  
- `reduceByKey()`, `groupByKey()`, `combineByKey()`, `sortByKey()`  
- `mapValues()`, `flatMapValues()`  
- `keys()`, `values()`  
- `join()`, `subtractByKey()`, `rightOuterJoin()`, `leftOuterJoin()`, `cogroup()`  
- `countByKey()`  
- `collectAsMap()`  
- `lookup()`  
- `groupWith()`  

---  

### PAIR RDD BASICS

A *Pair RDD* contains key/value pairs (e.g., dictionary in Python).  
Oftentimes data scientists will talk about the *key* of a record, meaning the field on which it will be aggregated.  For example, if the records contain employee titles, and we wish to compute salary statistics by title, then it makes sense for the key to be title, and the values are salaries.

In broader terms, Pair RDDs are useful for merging and aggregating data.  

Applying the `map()` function against an RDD will produce a Pair RDD.

In [1]:
import os

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("pair_rdd") \
        .getOrCreate()

sc = spark.sparkContext

/opt/conda/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/18 22:10:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/18 22:10:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/18 22:10:13 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
lines = sc.parallelize(['french fries','chicken burrito','Apache Spark', 'OpenAI ChatGPT'])

In [3]:
p = lines.map(lambda x: (x.split(" ")[0], x)).collect()

                                                                                

In [4]:
print(p)

[('french', 'french fries'), ('chicken', 'chicken burrito'), ('Apache', 'Apache Spark'), ('OpenAI', 'OpenAI ChatGPT')]


In [5]:
type(p)

list

### Basic Transformations

Some examples:

`reduceByKey()`  
Runs several parallel reduce operations, one for each key.  
Combining is done locally on each machine for each key before computing a global combine for the key.  
This reduces shuffling of the data across nodes, which is expensive.

`fold()`  
Similar to `reduce()`, `fold()` includes a “zero value” or starting point, which acts as the identity.  

In [None]:
# Create an RDD
rdd = sc.parallelize([(1,2),(3,4),(3,6),(-1,10),(-1,22)])

In [None]:
# Extract the keys
rdd.keys().collect()

**NOTICE: Does the `keys()` function dedupe the keys (remove duplicates)?**

In [None]:
# Reduce (sum) by keys
rdd.reduceByKey(lambda x,y: x+y).collect()

**TRY FOR YOURSELF (UNGRADED EXERCISE)**

The cell below uses a different operator in the `reducer`.  What do you expect the output to look like?  Now run it to verify.

In [None]:
rdd.reduceByKey(lambda x,y: x*y).collect()

**Revisiting Word Count**

Now that you have an understanding of Pair RDDs, review the word count program below to see how they are used.  The `map()` creates the Pair RDDs, and `reduceByKey()` is a `reducer` operating on them.

In [None]:
lines = sc.textFile("README.txt")

In [None]:
wordcounts = lines.map(lambda x: x.replace(',',' ') \
                        .replace('.','   ').replace('-',' ').lower()) \
                        .flatMap(lambda x: x.split()) \
                        .map(lambda x: (x, 1)) \
                        .reduceByKey(lambda x,y:x+y) \
                        .map(lambda x:(x[1],x[0])) \
                        .sortByKey(False) 

In [None]:
wordcounts.take(10)

**TRY FOR YOURSELF (UNGRADED EXERCISE)**

`wordcounts` contains $(count, word)$ pairs for the top 10 most frequent words.    Write code to save only the words in a list.  Test that it works properly.

**Finding Frequent Word Bigrams**

A frequent task in natural language processing (NLP) is generating bigrams from text, which are pairs of adjacent words.

Let's demonstrate bigrams by parsing this sentence above into its bigrams.  The code below will apply a `mapper` to generate $(word, 1)$ key value pairs.

In [None]:
text = sc.parallelize(['A frequent task in natural language processing (NLP) is generating bigrams from text, which are pairs of adjacent words.'])

bigrams = text \
            .map(lambda x: x.split()) \
            .flatMap(lambda x: [((x[i],x[i+1]),1) for i in range(0,len(x)-1)])

In [None]:
bigrams.collect()

In [None]:
# output from the bigram code

[(('A', 'frequent'), 1), 
 (('frequent', 'task'), 1), 
 (('task', 'in'), 1), 
 (('in', 'natural'), 1), 
 (('natural', 'language'), 1), 
 (('language', 'processing'), 1), 
 (('processing', '(NLP)'), 1), (('(NLP)', 'is'), 1), 
 (('is', 'generating'), 1), (('generating', 'bigrams'), 1), 
 (('bigrams', 'from'), 1), 
 (('from', 'text,'), 1), 
 (('text,', 'which'), 1), 
 (('which', 'are'), 1), 
 (('are', 'pairs'), 1), 
 (('pairs', 'of'), 1), 
 (('of', 'adjacent'), 1), 
 (('adjacent', 'words.'), 1)]

Next, we modify the code snippet to do a word count on the bigrams.
Specifically, we include a `reduceByKey()` and `sortByKey()` on the PairRDDs to count the frequency of each bigram.  This won't be super fascinating, as each bigram appears only once.

In [None]:
# Bigram Word Count

bigrams = text \
          .map(lambda x: x.split()) \
          .flatMap(lambda x: [((x[i],x[i+1]),1) for i in range(0,len(x)-1)])\
          .reduceByKey(lambda x,y: x+y) \
          .map(lambda x: (x[1],x[0])) \
          .sortByKey(False)

In [None]:
bigrams.collect()

**TRY FOR YOURSELF (UNGRADED EXERCISE)**

Let's make the bigram word count a little more interesting, shall we?  

The cell below defines `text_two` to be the same as `text`.  

Modify the text by inserting some duplicate bigrams.  

Next copy the Bigram Word Count code below, running it on `text_two`.  

Notice the difference in output.  Does it make sense?

In [None]:
text_two = sc.parallelize(['A frequent task in natural language processing (NLP) is generating bigrams from text, which are pairs of adjacent words.'])

In [None]:
# copy Bigram Word Count code here, running on text_two

# collect the bigrams to review results. for a massive dataset,
# collecting all the bigrams could crash your driver (out of memory exception)

bigrams.collect()

**Partition**  

Partitions determine the amount of parallelism when executing on RDDs.  
Most operators in this chapter take a parameter for partitioning.  

Example: reduceByKey(lambda x, y: x + y, 10)

**Join**  
We can build up a dataset by joining it with other datasets on one or more keys.  Here are some common join operations:  

`join()`  is an inner join   
`leftOuterJoin()`    # keep all records from the left table, match on right table  
`rightOuterJoin()`   # keep all records from the right table, match on left table

**Sorting**  
Sort functions take a parameter for sort direction.  
It is possible to provide a comparison function for custom sorting.  

Here is an example of converting integers to strings and using a string compare function:  

```
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))  
```

**Actions on Pair RDDs**  
All transformations for RDDs are avail for Pair RDDs, plus additional ones like:  
`countByKey()`  
`collectAsMap()`  
`lookup()`  

In [None]:
# define a Pair RDD
rdd = sc.parallelize([(1,2),(3,4),(3,6),(5,1),(5,10),(5,100)])

In [None]:
# return a dictionary where values are counts on each key
rdd.countByKey()

In [None]:
# return a list of values for key=3
rdd.lookup(3)

**Summary**  
This should give you a good understanding of what Pair RDDs are, how you can create them, and some common transformations.