*Analytical Information Systems*

# Worksheet 5 - Big Data and Streaming

Matthias Griebel<br>
Lehrstuhl für Wirtschaftsinformatik und Informationsmanagement

SS 2020

## MapReduce

[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster.

Let's have a look at the word count example again

<img src="http://wi-wiki.de/lib/exe/fetch.php?cache=&w=899&h=417&tok=68959d&media=bigdata:mapreducewordcountoverview1.png" style="width:50%">

1. __Input__

1. __Splitting__: Prepare the Map() input

1. __Mapping__: Run the user-provided Map() code. Each worker node applies the map function to the local data, and writes the output to a temporary storage.

1. __Shuffling__: "Shuffle" the Map output to the Reduce processors. 

1. __Reduce__: Run the user-provided Reduce() code. The Reduce processors process each group of output data, per key, in parallel.

1. __Final result__: Produce the final output – the MapReduce system collects and sorts all the Reduce output

### MapReduce Libraries

MapReduce libraries have been written in many programming languages, with different levels of optimization. 
- A popular open-source implementation that has support for distributed shuffles is part of Apache Hadoop.
- [RHadoop](https://github.com/RevolutionAnalytics/RHadoop/wiki) is a collection of five R packages that allow users to manage and analyze data with Apache Hadoop. 
    - using RHadoop requires a Java and Hadoop installation, the Hadoop Distributed File System, etc.

Thus, we will only examplify the MapReduce algorithm using basic R and the `tidyverse`:

### Examplary R MapReduce Word Count Implementation

__Defining the map function__

The map function breaks the line into words and outputs a key/value pair for each word.

In [613]:
count_words <- function(line){
    line %>%
            str_split(" ",simplify=FALSE) %>%
            unlist() %>%
            tibble(key=., value=1)
}

__Defining the reduce function__

In the word count example, the Reduce function sums the word counts and generates a single output of the word and the final sum.

In [614]:
reduce_count <- function(df){
    df %>%
        summarise(key=key[1],
                  count=sum(value))
}

__Going through the MapReduce steps__

__1. Input__

In [615]:
Input =  "Deer Bear River\nCar Car River\nDeer Car Bear"
Input

__2. Splitting__

We will split the input by line ('\n' indicates a new line)

In [616]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist

2. Mapping

In [617]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words)

key,value
Deer,1
Bear,1
River,1

key,value
Car,1
Car,1
River,1

key,value
Deer,1
Car,1
Bear,1


4. Shuffling

In [621]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words) %>% 
    map_df(rbind) %>% group_split(key)

key,value
Bear,1
Bear,1

key,value
Car,1
Car,1
Car,1

key,value
Deer,1
Deer,1

key,value
River,1
River,1


5. Reducing

In [622]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words) %>% 
    map_df(rbind) %>% group_split(key) %>%
    map(reduce_count)

key,count
Bear,2

key,count
Car,3

key,count
Deer,2

key,count
River,2


5. Merge and sort

In [623]:
Input %>%
    str_split("\n",simplify=FALSE) %>% unlist %>%
    map(count_words) %>% 
    map_df(rbind) %>% group_split(key) %>%
    map(reduce_count) %>%
    map_df(cbind) %>% arrange(desc(count))

key,count
Car,3
Bear,2
Deer,2
River,2


__Doing it the undistributed tidyverse way__

In [638]:
Input %>%
    str_replace_all("\n", " ") %>%
    str_split(" ",simplify=FALSE) %>% unlist %>% 
    tibble(key=.) %>%
    group_by(key) %>%
    summarize(count=n()) %>%  arrange(desc(count))

key,count
Car,3
Bear,2
Deer,2
River,2


## Stream Processing

__Credits__

- Jure Leskovec, Stanford University, http://web.stanford.edu/class/cs246/slides/15-streams1.pdf
- Michael Freedman, Princeton University, https://www.cs.princeton.edu/courses/archive/fall16/cos418/docs/L22-stream-processing.pdf

### Data Streams

- In many data mining situations, we do not know the entire data set in advance
- We can think of the data as infinite and non-stationary (the distribution changes over time)
- Stream Management is important when the input rate is controlled externally:
    - Google queries
    - Twitter or Facebook status updates

__The Stream Model__

- Input elements enter at a rapid rate, at one or more input ports (i.e., streams)
    - We call elements of the stream tuples
- The system cannot store the entire stream accessibly

    
How do you make critical calculations about the stream using a limited amount of (secondary) memory?

### Basic Stream Operators

__Stateless conversion__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_ctoF.pdf" style="width:20%">

- Convert Celsius temperature to Fahrenheit: __emit__ (input * 9 / 5) + 3

__Stateless filtering__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_sf.pdf" style="width:20%">

Function can filter inputs: –if(input>threshold) {__emit__ input}

__Stateful conversion__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_ewa.pdf" style="width:20%">

Compute EWMA of Fahrenheit temperature:
- new_temp = ⍺ * ( CtoF(input) ) + (1- ⍺) * last_temp
- last_temp = new_temp – emit new_temp
- emit new_temp

__Aggregation (stateful)__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_agg.pdf" style="width:20%">

E.g.,Average value per window
- Window can be # elements (10) or time (1s)
- Windows can be disjoint (every 5s)
- Windows can be “tumbling” (5s window every 1s)

__Stream processing as chain__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_chain.pdf" style="width:20%">

__Stream processing as directed graph__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/sc_chain.pdf" style="width:20%">

### The challenge of stream processing for BIG DATA

Large amounts of data to process in realtime

__Examples__:
- Social network trends (#trending)
- Intrusion detection systems (networks, datacenters)
- Sensors: Detect earthquakes by correlating vibrations of millions of smartphones
- Fraud detection
    - Visa: 2000 txn / sec on average, peak ~47,000 / sec

__Stateless operations: trivially parallelized__

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/scale_out.pdf" style="width:20%">

__State complicates parallelization__


- Need to join results across parallel computations

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/agg_par.pdf" style="width:30%">


__Parallelization complicates fault-tolerance__


<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/fault.pdf" style="width:30%">


__We can parallelize joins__

- using partitioned hash joins
- but agian, complicates fault-tolerance

<img src="https://raw.githubusercontent.com/wi3jmu/AIS2020/master/notebooks/figures/05/par_joins.pdf" style="width:30%">



### Stream Processing frameworks

Different frameworks handle these challenges differently

- Record acknowledgement (Storm)
- Micro-batches (Spark Streaming, Storm Trident) 
- Transactional updates (GoogleClouddataflow) 
- Distributed snapshots (Flink)

### Streaming data with R

__The `sparklyr` interface for Spark Streaming__

from the official [Website](https://spark.rstudio.com/guides/streaming/):

Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. Because is part of the Spark API, it is possible to re-use query code that queries the current state of the stream, as well as joining the streaming data with historical data. Please see Spark’s official documentation for a deeper look into Spark Streaming.

The sparklyr interface provides the following:

- Ability to run dplyr, SQL, spark_apply(), and PipelineModels against a stream
- Read in multiple formats: CSV, text, JSON, parquet, Kafka, JDBC, and orc
- Write stream results to Spark memory and the following file formats: CSV, text, JSON, parquet, Kafka, JDBC, and orc
- An out-of-the box graph visualization to monitor the stream
- A new reactiveSpark() function, that allows Shiny apps to poll the contents of the stream create Shiny apps that are able to read the contents of the stream


#### Interacting with a stream

A good way of looking at the way how Spark streams update is as a three stage operation:

1. __Input__ - Spark reads the data inside a given folder. The folder is expected to contain multiple data files, with new files being created containing the most current stream data.

1. __Processing__ - Spark applies the desired operations on top of the data. These operations could be data manipulations (dplyr, SQL), data transformations (sdf operations, PipelineModel predictions), or native R manipulations (spark_apply()).

1. __Output__ - The results of processing the input files are saved in a different folder.

#### `sparklyr` Example

__Install requirements__

In [None]:
system("apt-get install openjdk-8-jdk-headless -qq > /dev/null")
Sys.setenv(JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64")
install.packages(c("sparklyr", "future"))
library(sparklyr)
spark_install()
library(future)
library(tidyverse)

1. Open the Spark connection

In [655]:
sc <- spark_connect(master = "local")

ERROR: Error in spark_connect(master = "local"): could not find function "spark_connect"


Optional step. This resets the input and output folders. It makes it easier to run the code multiple times in a clean manner.

In [None]:
if(file.exists("source")) unlink("source", TRUE)
if(file.exists("source-out")) unlink("source-out", TRUE)

2. Produce a single test file inside the “source” folder. This allows the “read” function to infer CSV file definition.

In [None]:
stream_generate_test(iterations = 1)
list.files("source")

3. Point the stream reader to the folder where the streaming files will be placed. 

In [None]:
read_folder <- stream_read_csv(sc, "source") 

4. Process stream function: The stream_watermark() functions add a new timestamp variable that is then used in the group_by() command. This is required by Spark Stream to accept summarized results as output of the stream. The second step is to simply decide what kinds of aggregations we need to perform. In this case, a simply max, min and count are performed.

In [None]:
process_stream <- read_folder %>%
  stream_watermark() %>%
  group_by(timestamp) %>%
  summarise(
    max_x = max(x, na.rm = TRUE),
    min_x = min(x, na.rm = TRUE),
    count = n()
  )

4. The output writer is what starts the streaming job. It will start monitoring the input folder, and then write the new results in the “source-out” folder. 

In [None]:
write_output <- stream_write_memory(process_stream, name = "stream")

5. The test generation function will run 100 files every 0.2 seconds. To run the tests “out-of-sync” with the current R session, the future package is used.

In [None]:
invisible(future(stream_generate_test(interval = 0.2, iterations = 100))))

6. The “source-out” folder can be treated as a if it was a single table within Spark. Using spark_read_csv(), the data can be mapped, but not brought into memory (memory = FALSE). This allows the current results to be further analyzed using regular dplyr commands.

In [None]:
spark_read_csv(sc, "stream", "source-out", memory = FALSE) %>%
    summarize()

## Exercises

### 1. MapReduce

Sales analysis 

Calculates the total profit for each product id within eah subset

Adds up the profit for each different product id

In [643]:
sales <- read_csv('sales.csv')
sales %>% head(10)

Parsed with column specification:
cols(
  date = col_date(format = ""),
  customerID = col_double(),
  productID = col_double(),
  payment = col_character(),
  amount = col_double(),
  price = col_double(),
  cost = col_double(),
  category = col_character()
)


date,customerID,productID,payment,amount,price,cost,category
2017-01-16,64292,8403,paypal,3,560.74,234.89,emergency
2017-08-16,41174,7234,paypal,3,351.14,171.11,specialty
2017-10-26,49737,32738,paypal,3,343.38,105.14,emergency
2017-11-24,24021,70159,cash,2,905.96,345.4,emergency
2017-02-13,78762,2002,cash,2,799.99,407.3,emergency
2017-07-18,79148,86205,credit card,1,284.07,132.35,emergency
2017-08-23,79148,40784,cash,3,125.79,47.53,specialty
2017-11-06,23090,16224,paypal,3,85.77,36.61,specialty
2017-09-28,12307,82560,credit card,2,658.88,330.44,emergency
2017-04-19,45757,27578,credit card,1,458.31,269.8,emergency


In [644]:
calculate_profit <- function(df){
    df %>%
        # Write your code here 
        mutate(profit=(price-cost)*amount) %>% 
        group_by(productID) %>%
        summarise(total_profit = sum(profit))
}

In [645]:
reduce_profit <- function(df){
    df %>%
        # Write your code here 
        summarise(productID=productID[1],
                  total_profit=sum(total_profit))
}

In [646]:
sales %>% #Input
    split(sample(rep(1:5, 1000))) %>% #Splitting
    map(calculate_profit) %>% #Mapping
    map_df(rbind) %>% group_split(productID)%>% #Shuffling
    map(reduce_profit) %>% #Reduce
    map_df(cbind) %>% arrange(desc(total_profit)) %>%  #Merge and Sort
    head(10) #Display only top 10

productID,total_profit
43446,171232.25
24221,111657.54
60974,88118.82
70159,84644.56
89266,73408.04
86064,68414.58
9947,64715.4
61070,64571.4
62077,63238.5
2002,62437.71


### 2. Stream Processing

1. Why do stateful operation complicate parallelization?

> Need to join results across parallel computations

2. Why do parallelization operations complicate fault-tolerance?

> How to ensure exactly-once semantics if one node fails?