#CBDE Lab 4 (part 1) - The Hadoop ecosystem (MapReduce)

#Tutorial

In this tutorial we will get familiar with the MapReduce programming model. Since setting up a MapReduce cluster is quite cumbersome, we will simulate the execution of MapReduce programs using Apache Spark (https://spark.apache.org/). Apache Spark subsumes the functionalities provided by the original MapReduce framework (i.e., that one available in the Apache Hadoop ecosystem), and provides improvements when writing programs as well as on their runtime execution.

# Setup environment

Install dependencies

In [1]:
!pip3 install pyspark
import pyspark
from pyspark.sql import SparkSession
from pprint import pprint
import random
!pip3 install shortuuid
import shortuuid

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=46c4b77715185b53d735cb4098a39cb010441cdf694187d409799c08a4a7bcb2
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting shortuuid
  Downloading shortuuid-1.0.11-py3-none-any.whl (10 kB)
Installing collected packages: shortuuid
Successfully installed shortuuid-1.0.11


# Dataset description

We will use the same dataset as the one used in the HBase session. Please, refer to the HBase assignment for details on its structure.

In [2]:
!wget https://archive.ics.uci.edu/static/public/2/adult.zip
!unzip adult.zip

--2023-11-07 11:36:23--  https://archive.ics.uci.edu/static/public/2/adult.zip
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘adult.zip’

adult.zip               [<=>                 ]       0  --.-KB/s               adult.zip               [ <=>                ] 605.70K  --.-KB/s    in 0.08s   

2023-11-07 11:36:23 (7.54 MB/s) - ‘adult.zip’ saved [620237]

Archive:  adult.zip
  inflating: Index                   
  inflating: adult.data              
  inflating: adult.names             
  inflating: adult.test              
  inflating: old.adult.names         


In [3]:
schema = ['id','age','workclass','fnlwgt','education','education-num','marital-status','occupation','relationship','race','sex','capital-gain','capital-loss','hours-per-week','native-country','income']
def getAttribute(row,att):
  for i,v in enumerate(schema):
    if (v == att):
      return row[i]
  return None

def executeMapReduce(mapFunction, reduceFunction):
    sc = pyspark.SparkContext.getOrCreate()
    pprint(sc.textFile("adult.data").map(lambda t: (str(shortuuid.uuid()),t.replace(' ',''))).filter(lambda t: len((t[0]+","+t[1]).split(","))==16).flatMap(lambda t: mapFunction(t[0],t[1])).groupByKey().flatMap(lambda t: reduceFunction(t[0],t[1])).take(25))

Here, we provide you with the method `getAttribute(array,attribute)`, which returns the projection for a specific attribute in the array. Note that the array should also contain the key as the first value (see the provided example).

# Example 1) Projection

We provide you with the implementation of the projection operator. In SQL it would correspond to the following query:
```sql
SELECT DISTINCT age, relationship, native_country FROM adult;
```

In [4]:
def projection_map(k,v):
  tupl = (k+","+v)
  return [(getAttribute(tupl.split(","),"age")+","+
           getAttribute(tupl.split(","),"relationship")+","+getAttribute(tupl.split(","),"native-country"),1)]

def projection_reduce(k, lv):
    return [k]

executeMapReduce(projection_map,projection_reduce)

['39,Not-in-family,United-States',
 '50,Husband,United-States',
 '53,Husband,United-States',
 '28,Wife,Cuba',
 '37,Wife,United-States',
 '52,Husband,United-States',
 '31,Not-in-family,United-States',
 '38,Husband,United-States',
 '40,Husband,United-States',
 '54,Unmarried,United-States',
 '43,Husband,United-States',
 '19,Own-child,United-States',
 '49,Husband,United-States',
 '23,Not-in-family,United-States',
 '45,Own-child,United-States',
 '30,Own-child,United-States',
 '19,Wife,United-States',
 '24,Husband,United-States',
 '49,Unmarried,United-States',
 '25,Not-in-family,United-States',
 '25,Wife,United-States',
 '47,Not-in-family,United-States',
 '32,Not-in-family,?',
 '28,Not-in-family,United-States',
 '53,Wife,United-States']


# Example 2) Cross product

We provide you with the implementation of the cross product operator. In SQL it would correspond to the following query:
```sql
SELECT external.*, internal.*
FROM adult as internal, adult as external
WHERE external.native_country = "Italy" AND internal.native_country = "Ecuador"
```

In [5]:
N = 100

def crossproduct_map(k,v):
    tupl = k+","+v
    if "Italy" in getAttribute(tupl.split(","),"native-country"):
        return [(random.randint(0,N),tupl)]
    elif "Ecuador" in getAttribute(tupl.split(","),"native-country"):
        retValue = []
        for i in range(N):
            retValue.append((i,tupl))
        return retValue
    return []

def crossproduct_reduce(k, lv):
    italy = []
    ecuador = []
    for v in lv:
        if "Italy" in getAttribute(v.split(","),"native-country"):
            italy.append(v)
        elif "Ecuador" in getAttribute(v.split(","),"native-country"):
            ecuador.append(v)
    retValue = []
    for x in italy:
        for y in ecuador:
            retValue.append((None,x+"<->"+y))
    return retValue

executeMapReduce(crossproduct_map,crossproduct_reduce)

[(None,
  'eEEzFhPdffCSkrZvfHmjkv,44,Private,120277,Bachelors,13,Married-civ-spouse,Prof-specialty,Husband,White,Male,15024,0,50,Italy,>50K<->dAbYXx4NMgqFXFj5rnQ6f8,50,Private,193884,7th-8th,4,Married-civ-spouse,Craft-repair,Husband,White,Male,0,0,40,Ecuador,<=50K'),
 (None,
  'eEEzFhPdffCSkrZvfHmjkv,44,Private,120277,Bachelors,13,Married-civ-spouse,Prof-specialty,Husband,White,Male,15024,0,50,Italy,>50K<->aqVcgJ2TmwS7CjDGmij9N9,29,Private,159768,HS-grad,9,Never-married,Machine-op-inspct,Not-in-family,White,Male,3325,0,40,Ecuador,<=50K'),
 (None,
  'eEEzFhPdffCSkrZvfHmjkv,44,Private,120277,Bachelors,13,Married-civ-spouse,Prof-specialty,Husband,White,Male,15024,0,50,Italy,>50K<->QN5UJkfUPWKJh4pFkipNed,46,Private,190487,HS-grad,9,Divorced,Priv-house-serv,Unmarried,White,Female,0,0,28,Ecuador,<=50K'),
 (None,
  'eEEzFhPdffCSkrZvfHmjkv,44,Private,120277,Bachelors,13,Married-civ-spouse,Prof-specialty,Husband,White,Male,15024,0,50,Italy,>50K<->VUBpupBxCbXHgbKHy64DvF,21,Private,221480,Some-co