<a href="https://www.kaggle.com/code/matjes/ais22-w06-big-data?scriptVersionId=97778336" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

*Analytical Information Systems*

# Worksheet 6 - Big Data and Streaming

Matthias Griebel
Lehrstuhl für Wirtschaftsinformatik und Business Analytics

SS 2022

***

# Table of Contents

* [1. MapReduce](#map_reduce)
    * [1.1. R and MapReduce](#map_reduce_r)
* [2. Stream Processing](#stream)
    * [2.1. Basic Stream Operators](#basic_stream)
    * [2.2. The challenge of stream processing for big data](#stream_challenge)
* [3. Exercises](#exercises)
    * [3.1. MapReduce - Sales analysis](#sales)
    * [3.2. Exam Questions](#exam)    
*** 

## 1. MapReduce<a id="map_reduce"></a>

__Recap__ from [wikipedia](https://en.wikipedia.org/wiki/MapReduce):

[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. 

Let's have a look at the word count example from the lecture again:

<img src="https://www.todaysoftmag.com/images/articles/tsm33/large/a11.png" style="width:50%">

**The map reduce steps:**
1. __Input__
1. __Splitting__: Prepare the Map() input
1. __Mapping__: Run the user-provided Map() code. Each worker node applies the map function to the local data, and writes the output to a temporary storage.
1. __Shuffling__: "Shuffle" the Map output to the Reduce processors. 
1. __Reduce__: Run the user-provided Reduce() code. The Reduce processors process each group of output data, per key, in parallel.
1. __Final result__: Produce the final output – the MapReduce system collects and sorts all the Reduce output


### 1.1. R and MapReduce<a id="map_reduce_r"></a>

**MapReduce Libraries**

MapReduce libraries have been written in many programming languages, with different levels of optimization. 
- A popular open-source implementation that has support for distributed shuffles is part of Apache Hadoop.
- [RHadoop](https://github.com/RevolutionAnalytics/RHadoop/wiki) is a collection of five R packages that allow users to manage and analyze data with Apache Hadoop. 
    - using RHadoop requires a Java and Hadoop installation, the Hadoop Distributed File System, etc.

For simplicity reasons we will only examplify the MapReduce algorithm using basic R and `tidyverse` packages:

**R MapReduce Word Count Implementation**

__Defining the map function__
The map function breaks the line into words and outputs a key/value pair for each word.

In [1]:
library(tidyverse)

── [1mAttaching packages[22m ─────────────────────────────────────── tidyverse 1.3.0 ──

[32m✔[39m [34mggplot2[39m 3.3.3     [32m✔[39m [34mpurrr  [39m 0.3.4
[32m✔[39m [34mtibble [39m 3.1.1     [32m✔[39m [34mdplyr  [39m 1.0.5
[32m✔[39m [34mtidyr  [39m 1.1.3     [32m✔[39m [34mstringr[39m 1.4.0
[32m✔[39m [34mreadr  [39m 1.4.0     [32m✔[39m [34mforcats[39m 0.5.0

── [1mConflicts[22m ────────────────────────────────────────── tidyverse_conflicts() ──
[31m✖[39m [34mdplyr[39m::[32mfilter()[39m masks [34mstats[39m::filter()
[31m✖[39m [34mdplyr[39m::[32mlag()[39m    masks [34mstats[39m::lag()



In [2]:
count_words <- function(line){
    line %>%
            str_split(" ",simplify=FALSE) %>%
            unlist() %>%
            tibble(key=., value=1)
}

__Defining the reduce function__

In the word count example, the Reduce function sums the word counts and generates a single output of the word and the final sum.

In [3]:
reduce_count <- function(df){
    df %>%
        summarise(key=key[1],
                  count=sum(value))
}

__Going through the MapReduce steps__

__1. Input__

In [4]:
Input =  "Deer Bear River\nCar Car River\nDeer Car Bear"
Input

__2. Splitting__

We will split the input by line ('\n' indicates a new line)

In [5]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist

**3. Mapping**

In [6]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words)

key,value
<chr>,<dbl>
Deer,1
Bear,1
River,1

key,value
<chr>,<dbl>
Car,1
Car,1
River,1

key,value
<chr>,<dbl>
Deer,1
Car,1
Bear,1


**4. Shuffling**

In [7]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words) %>% 
    map_df(rbind) %>% group_split(key)

key,value
<chr>,<dbl>
Bear,1
Bear,1

key,value
<chr>,<dbl>
Car,1
Car,1
Car,1

key,value
<chr>,<dbl>
Deer,1
Deer,1

key,value
<chr>,<dbl>
River,1
River,1


**5. Merge and sort**

In [8]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words) %>% 
    map_df(rbind) %>% group_split(key) %>%
    map(reduce_count) %>%
    map_df(cbind) %>% arrange(desc(count))

key,count
<chr>,<dbl>
Car,3
Bear,2
Deer,2
River,2


__Doing it the (undistributed) tidyverse way__

In [9]:
Input %>%
    str_replace_all("\n", " ") %>%
    str_split(" ",simplify=FALSE) %>% unlist %>% 
    tibble(key=.) %>%
    group_by(key) %>%
    summarize(count=n()) %>%  arrange(desc(count))

key,count
<chr>,<int>
Car,3
Bear,2
Deer,2
River,2


***
## 2. Stream Processing<a id="stream"></a>

_Credits_

- Jure Leskovec, Stanford University, http://web.stanford.edu/class/cs246/slides/15-streams1.pdf
- Michael Freedman, Princeton University, https://www.cs.princeton.edu/courses/archive/fall16/cos418/docs/L22-stream-processing.pdf

**Data Streams**

- In many data mining situations, we do not know the entire data set in advance
- We can think of the data as infinite and non-stationary (the distribution changes over time)
- Stream Management is important when the input rate is controlled externally:
    - Google queries
    - Twitter or Facebook status updates

__The Stream Model__

- Input elements enter at a rapid rate, at one or more input ports (i.e., streams)
    - We call elements of the stream tuples
- The system cannot store the entire stream accessibly

How do you make critical calculations about the stream using a limited amount of (secondary) memory?

### 2.1. Basic Stream Operators<a id="basic_stream"></a>

__Stateless conversion__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_ctoF.png" width="30%">

- Convert Celsius temperature to Fahrenheit: __emit__ (input * 9 / 5) + 3

__Stateless filtering__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_sf.png" width="30%">

Function can filter inputs: –if(input>threshold) {__emit__ input}

__Stateful conversion__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_ewa.png" width="30%">

Compute EWMA of Fahrenheit temperature:
- new_temp = ⍺ * ( CtoF(input) ) + (1- ⍺) * last_temp
- last_temp = new_temp – emit new_temp
- emit new_temp

__Aggregation (stateful)__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_agg.png" width="30%">

E.g., average value per window
- Window can be # elements (10) or time (1s)
- Windows can be disjoint (every 5s)
- Windows can be “tumbling” (5s window every 1s)

__Stream processing as chain__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_chain.png" width="30%">

__Stream processing as directed graph__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_chain.png" width="30%">

### 2.2. The challenge of stream processing for big data<a id="stream_challenge"></a>

Large amounts of data to process in realtime

__Examples__:
- Social network trends (#trending)
- Intrusion detection systems (networks, datacenters)
- Sensors: Detect earthquakes by correlating vibrations of millions of smartphones
- Fraud detection
    - Visa: 2000 txn / sec on average, peak ~47,000 / sec

__Stateless operations: trivially parallelized__

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/scale_out.png" width="30%">

__State complicates parallelization__


- Need to join results across parallel computations

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/agg_par.png" width="30%">


__Parallelization complicates fault-tolerance__


<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/fault.png" width="30%">


__We can parallelize joins__

- using partitioned hash joins
- but agian, complicates fault-tolerance

<img src="http://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/par_joins.png" width="30%">



### Stream Processing frameworks

Different frameworks handle these challenges differently

- Record acknowledgement (Storm)
- Micro-batches (Spark Streaming, Storm Trident) 
- Transactional updates (GoogleClouddataflow) 
- Distributed snapshots (Flink)

***
## 3. Exercises<a id="exercises"></a>

### 3.1. MapReduce - Sales analysis<a id="sales"></a>

You need to run a company-wide sales analysis. Your company uses a MapReduce system to handle the massive transaction data.

We will have a look at the data first:


In [10]:
library(tidyverse)
sales <- read_csv('https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/data/sales.csv')
sales %>% head(10)


[36m──[39m [1m[1mColumn specification[1m[22m [36m────────────────────────────────────────────────────────[39m
cols(
  date = [34mcol_date(format = "")[39m,
  customerID = [32mcol_double()[39m,
  productID = [32mcol_double()[39m,
  payment = [31mcol_character()[39m,
  amount = [32mcol_double()[39m,
  price = [32mcol_double()[39m,
  cost = [32mcol_double()[39m,
  category = [31mcol_character()[39m
)




date,customerID,productID,payment,amount,price,cost,category
<date>,<dbl>,<dbl>,<chr>,<dbl>,<dbl>,<dbl>,<chr>
2017-01-16,64292,8403,paypal,3,560.74,234.89,emergency
2017-08-16,41174,7234,paypal,3,351.14,171.11,specialty
2017-10-26,49737,32738,paypal,3,343.38,105.14,emergency
2017-11-24,24021,70159,cash,2,905.96,345.4,emergency
2017-02-13,78762,2002,cash,2,799.99,407.3,emergency
2017-07-18,79148,86205,credit card,1,284.07,132.35,emergency
2017-08-23,79148,40784,cash,3,125.79,47.53,specialty
2017-11-06,23090,16224,paypal,3,85.77,36.61,specialty
2017-09-28,12307,82560,credit card,2,658.88,330.44,emergency
2017-04-19,45757,27578,credit card,1,458.31,269.8,emergency


Define the corresponding Map and Reduce functions:

__Map__: Calculates the total profit for each product id within each subset

In [11]:
calculate_profit <- function(df){
    # Write your code here 
    # df  %>% 
}

__Reduce__: Adds up the profit for each different product id

In [12]:
reduce_profit <- function(df){
    # Write your code here 
    # df  %>% 
}

In [13]:
# uncomment
#sales %>% #Input
#    split(sample(rep(1:5, 1000))) %>% #Splitting
#    map(calculate_profit) %>% #Mapping
#    map_df(rbind) %>% group_split(productID)%>% #Shuffling
#    map(reduce_profit) %>% #Reduce
#    map_df(cbind) %>% arrange(desc(total_profit)) %>%  #Merge and Sort
#    head(10) #Display only top 10

## 3.2. Exam Questions<a id="exam"></a>

***From exam WS 2020/21 Question 3: Big Data and ML Applications***

(a) __Stream processing__:  

i. (3 Points) Name and briefly explain three main traits that differentiate streaming
data from traditional data models (e.g., databases).

In [14]:
# Write your answer here

ii. (1 point) What kind of operations are easy to implement for big data streaming?

In [15]:
# Write your answer here