# "Big" Data Science

### Guest Lecture in Introduction to Data Science at TAU / Prof. Saharon Rosset
### Giora Simchoni
### June 4th, 2019

# What is Small? What is Big?

These definitions are constantly changing.

(1) "Everything processed in Excel is small data." ([Rufus Pollock, The Guardian](https://www.theguardian.com/news/datablog/2013/apr/25/forget-big-data-small-data-revolution))

(2) "[Big Data] is data so large it does not fit in main memory" (Leskovec et al., Mining of Massive Datasets)

Or maybe we should define the size of our data according how easy it is to process and understand it?


(3) "[Small Data is] data that has small enough size for human comprehension." ([jWork.ORG](jWork.ORG))

(4) "data sets that are too large or complex for traditional data-processing application software to adequately deal with" ([Wikipedia](https://en.wikipedia.org/wiki/Big_data))

We'll talk about solutions to (3) and (4) today.

# When data is too big to fit in RAM...

### You want Regression? Deep Learning? Try computing an average!

### Distributed File System
* a new form of file system
* data is distributed over computing clusters (a collection of hundreds to thousands of "computer nodes")

### MapReduce
* a new software methodology which knows how to "speak" to data on DFS
* from computing `SELECT Country, COUNT(*) FROM table_name GROUP BY Country` to Machine Learning

# Distributed File System (DFS)

* Typically: [HDFS (Hadoop Distributed File System)](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html), another open source project by [Apache](https://www.apache.org/)

* "Computers" (nodes) are stacked in racks of 8-64 size each.

* Nodes on the same rack are connected, and racks are connected by a network or "switch"

<img src = "images/computer_cluster.png">

# Probably the only time this image is called for

<img src = "images/actual_cluster.jpg">

# Fact: Computers Fail
* If a node's disk has 1 in a million chance of failing the next minute, what is the chance of (at least one) failure on a cluster of 10000 nodes?

* Solution: chunk your data into say 64MB-size chunks, and replicate on say 3 nodes, on different racks

<img src = "images/computer_cluster_files.png">

# MapReduce

* Typically: [Hadoop](https://hadoop.apache.org/) by Apache
* Three main features:
    1. Parallel computations, exploiting the cluster
    2. Tolerant to hardware failures
    3. All you need is a Map function and a Reduce function
    
## The Mapper
A function which will take one or more file chunks and sum them up to (key, value) mapping. The mappings from a few mappers are then grouped to (key, [values]) mapping, and sorted by key.

## The Reducer
A function which will take one of those grouped mapping and aggregate them in some way. A reducer typically handles one key at a time.

# The Classic MapReduce Example: Word Count

<img src = "images/word_count1.png">

# Word Count: The Mapper

<img src = "images/word_count2.png">

# Word Count: MapReduce then groups and sorts, regardless of input:

<img src = "images/word_count3.png">

# Word Count: The Reducer

<img src = "images/word_count4.png">

# MapReduce Functional View

<img src = "images/mapreduce_highlevel.png">

# MapReduce Architectural View

<img src = "images/mapreduce_architecture.png">

Question: why would we prefer a smaller number of Reducer workers?

# MapReduce: Failure is an Option

What happens when...

* The Master is down?
* A Map worker is down?
* A Reduce worker is down?

# MapReduce: In Action (almost)

In [1]:
def mapper(text):
    list_of_key_values = []
    for word in text.split(' '):
        if word != '':
            list_of_key_values.append((word, 1))
    return list_of_key_values

def reducer(single_key_values):
    key, values = single_key_values
    return key, sum(values)

In [2]:
mapper('O Captain! my Captain! our fearful trip is done,')

[('O', 1),
 ('Captain!', 1),
 ('my', 1),
 ('Captain!', 1),
 ('our', 1),
 ('fearful', 1),
 ('trip', 1),
 ('is', 1),
 ('done,', 1)]

In [3]:
mapper('The ship has weather’d every rack, the prize we sought is won,')

[('The', 1),
 ('ship', 1),
 ('has', 1),
 ('weather’d', 1),
 ('every', 1),
 ('rack,', 1),
 ('the', 1),
 ('prize', 1),
 ('we', 1),
 ('sought', 1),
 ('is', 1),
 ('won,', 1)]

In [4]:
mapper('The port is near, the bells I hear, the people all exulting')

[('The', 1),
 ('port', 1),
 ('is', 1),
 ('near,', 1),
 ('the', 1),
 ('bells', 1),
 ('I', 1),
 ('hear,', 1),
 ('the', 1),
 ('people', 1),
 ('all', 1),
 ('exulting', 1)]

In [5]:
# remember Hadoop will group map output for you!
# remember single key for single reducer, though single reducer can handle a few keys
reducer(('Captain', [1, 1]))

('Captain', 2)

# Running a Hadoop Job - Live Demo

```
bash-4.1$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-file /home/mapper.py -mapper /home/mapper.py \
-file /home/reducer.py -reducer /home/reducer.py \
-input shakespeare -output output -numReduceTasks 4

packageJobJar: [/home/mapper.py, /home/reducer.py, /tmp/hadoop-unjar2660950065718379808/] [] /tmp/streamjob5124415768465865698.jar tmpDir=null
19/05/30 10:50:09 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/05/30 10:50:10 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/05/30 10:50:11 INFO mapred.FileInputFormat: Total input paths to process : 10
19/05/30 10:50:11 INFO mapreduce.JobSubmitter: number of splits:10
19/05/30 10:50:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559226271713_0005
19/05/30 10:50:12 INFO impl.YarnClientImpl: Submitted application application_1559226271713_0005
19/05/30 10:50:12 INFO mapreduce.Job: The url to track the job: http://2cd2e5b21407:8088/proxy/application_1559226271713_0005/
19/05/30 10:50:12 INFO mapreduce.Job: Running job: job_1559226271713_0005
19/05/30 10:50:18 INFO mapreduce.Job: Job job_1559226271713_0005 running in uber mode : false
19/05/30 10:50:18 INFO mapreduce.Job:  map 0% reduce 0%
19/05/30 10:50:35 INFO mapreduce.Job:  map 60% reduce 0%
19/05/30 10:50:52 INFO mapreduce.Job:  map 90% reduce 0%
19/05/30 10:50:53 INFO mapreduce.Job:  map 90% reduce 15%
19/05/30 10:50:54 INFO mapreduce.Job:  map 90% reduce 23%
19/05/30 10:50:59 INFO mapreduce.Job:  map 100% reduce 32%
19/05/30 10:51:00 INFO mapreduce.Job:  map 100% reduce 50%
19/05/30 10:51:02 INFO mapreduce.Job:  map 100% reduce 100%
19/05/30 10:51:03 INFO mapreduce.Job: Job job_1559226271713_0005 completed successfully

bash-4.1$ bin/hadoop fs -ls output
Found 5 items
-rw-r--r--   1 root supergroup          0 2019-05-30 10:51 output3/_SUCCESS
-rw-r--r--   1 root supergroup      78211 2019-05-30 10:51 output3/part-00000
-rw-r--r--   1 root supergroup      77696 2019-05-30 10:51 output3/part-00001
-rw-r--r--   1 root supergroup      77111 2019-05-30 10:51 output3/part-00002
-rw-r--r--   1 root supergroup      78458 2019-05-30 10:51 output3/part-00003
```

# MapReduce: Plausible Exercise

Facebook has 2.5 billion users.

Suppose (and we're REALLY simpifying things here) files in HDFS contain each a list of integers, which are the number of friends each user has.

Write a Mapper and a Reducer to compute the maximum number of friends a user has.

In [6]:
def mapper(list_of_ints):
    ### Your code here
    pass
    
def reducer(single_key_values):
    ### Your code here
    pass

# One Solution

In [7]:
def mapper(list_of_ints):
    list_of_key_values = [('whatever', max(list_of_ints))]
    return list_of_key_values
    
def reducer(single_key_values):
    key, values = single_key_values
    return key, max(values)

# MapReduce: Another Plausible Exercise

Which of the following will NOT be "naturally-fitting" to the MapReduce methodology? More than 1 answer is correct.

a. Getting the average annual income by country from a huge table containing each adult person in the world and his/her annual income

b. Getting the median annual income by country from a huge table containing each adult person in the world and his/her annual income

c. Multiplying a distributed huge 1-trillion x 1-trillion matrix with a 1-trillion long vector

d. Fitting a Random Forests model with 1000 trees to a medium-size dataset which can fit in a single node RAM

e. Fitting Gradient Boosted Trees model with 1000 trees to a medium-size dataset which can fit in a single node RAM

f. Getting the average income by settlement from a file containing each person in Israel, his/her city and income

# But surely you don't need to write a Mapper and a Reducer each time you want a simple average, let alone fit a Random Forest model?...

# Hadoop Ecosystem

<img src = "images/hadoop_ecosystem.png">

# Pig Script Example

```
A = LOAD 'data' AS (f1:int,f2:int,f3:int);

DUMP A;
(1,2,3)
(4,2,1)
(8,3,4)
(4,3,3)
(7,2,5)
(8,4,3)

B = GROUP A BY f1;

DUMP B;
(1,{(1,2,3)})
(4,{(4,2,1),(4,3,3)})
(7,{(7,2,5)})
(8,{(8,3,4),(8,4,3)})

X = FOREACH B GENERATE COUNT(A);

DUMP X;
(1L)
(2L)
(1L)
(2L)
```

# Hive Script Example

```
SELECT COUNT(*) FROM table2;
```

# Spark

* [Spark](https://spark.apache.org/) by Apache is a "unified analytics engine for large-scale data processing."
* Write (single) programs in Java, Scala, Python (PySpark), R (SparkR), and SQL.
* Works not only with Hadoop data! Also as a standalone (you can install Spark on your laptop), on streaming data, Kubernetes and more
* Especially suitable for Machine Learning
* But for Deep Learning you're probably already using [TensorFlow](https://www.tensorflow.org/) by Google or [Keras](https://keras.io/) on top of it
* And for parallel data manipulations utilizing a multicore CPU I'd also look at [Dask](https://dask.org/) by Anaconda

# Spark: Greatest advantage - In-memory computation

* Hadoop: Tasks are distributed among the nodes of a cluster, which in turn load/save data on disk
* Spark: Saves and loads the data in and from the RAM rather than from the disk

<img src = "images/spark_vs_hadoop.png">

# Spark: RDD and DataFrame

Every framework has its basic data objects or structures:

* Basic R has: vector, list, data.frame, matrix, factor
* Basic Python has: list, set, dictionary, tuple
* Tidyverse (R) has: tibble
* Pandas (Python) has: Series, DataFrame
* Spark has: RDD, DataFrame, DataSet (only in Scala/Java)

# RDD (Resilient Distributed Dataset)

* Collection of elements (lines of text, tuples (rows) of table), that can be divided across multiple nodes in a cluster to run parallel processing
* "Resilient" = can automatically recover from node failures
* Immutable - you can't change a RDD, you can only **transform** it into another RDD (Map) or perform an **action** on it in some way (Reduce)
* Lazy Evaluation - nothing happens when you transform a RDD! Not until you perform an action, in which case Spark remembers all the transformations which have led your data to where it is (in a DAG), and moves your data along these "pipes" in an optimized way

In [8]:
from pyspark import SparkContext

sc = SparkContext()
rdd = sc.parallelize([1,2,3,4,5])
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [9]:
rdd.collect()

[1, 2, 3, 4, 5]

# PySpark: Basic Transformations

In [10]:
# map
rdd.map(lambda x: x + 1).collect()

[2, 3, 4, 5, 6]

In [11]:
# flatMap
rdd.flatMap(lambda x: range(1, x)).collect()

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]

In [12]:
# filter
rdd.filter(lambda x: x < 4).collect()

[1, 2, 3]

In [13]:
# glom
rdd.glom().collect()

[[1], [2], [3], [4, 5]]

In [14]:
# sortByKey
rdd.sortBy(lambda x: x, ascending=False).collect()

[5, 4, 3, 2, 1]

# PySpark: Basic Actions

In [15]:
# take
rdd.take(2)

[1, 2]

In [16]:
# first
rdd.first()

1

In [17]:
# reduce
rdd.reduce(lambda a, b: a + b)

15

In [18]:
# fold
rdd.fold(1000, lambda a, b: a + b)

5015

In [19]:
# count, sum, mean, min, max, stdev, variance...
rdd.count()

5

# PySpark: Functional Programming (Chaining)

In [20]:
# increase each value by 10 and sum
rdd.map(lambda x: x + 10).sum()

65

In [21]:
# group by remainder of division by 3,
# sum each group's values,
# sort RDD by remainder
# and filter by remainder smaller than 2
rdd.groupBy(lambda x: x % 3) \
    .mapValues(sum) \
    .sortByKey() \
    .filter(lambda x: x[0] < 2) \
    .collect()

[(0, 3), (1, 5)]

# PySpark: Word Count

In [22]:
shakespeare = sc.textFile('/home/jovyan/hadoop_example/shakespeare')

In [23]:
shakespeare.count()

44640

In [24]:
wc = shakespeare \
    .flatMap(lambda line: line.split(' ')) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

In [25]:
wc.top(10, key = lambda t: t[1])

[('the', 6788),
 ('and', 5113),
 ('I', 5036),
 ('to', 4298),
 ('of', 3930),
 ('a', 3160),
 ('my', 2829),
 ('in', 2532),
 ('you', 2490),
 ('is', 2101)]

# DataFrames

* (inspired by Pandas which is inspired by R)
* Collection of `Row`s under named `Column`s
* Especially suitable for tabular data
* Syntax very similar to Pandas
* Like RDD: Resilient, Distirbuted, Lazy evaluated, Immutable

In [26]:
from pyspark.sql import SparkSession

spark = SparkSession(sc)

df = spark.createDataFrame(data=[(1, 'A'), (2, 'B'), (3, 'C')], schema=['id', 'name'])
df

DataFrame[id: bigint, name: string]

In [27]:
df.show()

+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  3|   C|
+---+----+



# PySpark: Basic DataFrame Manipulations (I)

In [28]:
saheart = spark.read.csv('SAheart.csv', header = True, inferSchema = True)
saheart.count()

462

In [29]:
saheart.columns

['row.names',
 'sbp',
 'tobacco',
 'ldl',
 'adiposity',
 'famhist',
 'typea',
 'obesity',
 'alcohol',
 'age',
 'chd']

In [30]:
saheart = saheart.drop('row.names')

In [31]:
saheart.show(2)

+---+-------+----+---------+-------+-----+-------+-------+---+---+
|sbp|tobacco| ldl|adiposity|famhist|typea|obesity|alcohol|age|chd|
+---+-------+----+---------+-------+-----+-------+-------+---+---+
|160|   12.0|5.73|    23.11|Present|   49|   25.3|   97.2| 52|  1|
|144|   0.01|4.41|    28.61| Absent|   55|  28.87|   2.06| 63|  1|
+---+-------+----+---------+-------+-----+-------+-------+---+---+
only showing top 2 rows



# PySpark: Basic DataFrame Manipulations (II)

In [32]:
saheart.select('famhist').distinct().show()

+-------+
|famhist|
+-------+
|Present|
| Absent|
+-------+



In [33]:
saheart.groupby('famhist').agg({'chd': 'avg'}).show()

+-------+-------------------+
|famhist|           avg(chd)|
+-------+-------------------+
|Present|                0.5|
| Absent|0.23703703703703705|
+-------+-------------------+



In [34]:
saheart.stat.corr('obesity', 'alcohol')

0.051619568611913025

# PySpark: Processing DataFrame for Classification

In [35]:
train, test = saheart.randomSplit([0.8, 0.2])
print(train.count())
print(test.count())

351
111


In [36]:
from pyspark.ml.feature import RFormula

formula = RFormula(formula='chd ~ .', featuresCol='features', labelCol='label')

fit = formula.fit(train)
train_df = fit.transform(train)
train_df.show(1) # notice the two extra columns!

+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+
|sbp|tobacco| ldl|adiposity|famhist|typea|obesity|alcohol|age|chd|            features|label|
+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+
|102|    0.4|3.41|    17.22|Present|   56|  23.59|   2.06| 39|  1|[102.0,0.4,3.41,1...|  1.0|
+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+
only showing top 1 row



In [37]:
test_df = fit.transform(test)

# PySpark: Logistic Regression (I)

In [38]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100, regParam=0.0)

lrModel = lr.fit(train_df)

print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: [0.008408247604829246,0.0819338712417374,0.1844407014226751,0.031730852428542095,-0.9351098151988192,0.03907230665069162,-0.09409933142418063,0.0016123733510384239,0.028466557617748908]
Intercept: -4.259414174733452


In [39]:
pred_df = lrModel.transform(test_df)
pred_df.show(2) # notice the extra columns!

+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+--------------------+--------------------+----------+
|sbp|tobacco| ldl|adiposity|famhist|typea|obesity|alcohol|age|chd|            features|label|       rawPrediction|         probability|prediction|
+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+--------------------+--------------------+----------+
|101|   0.48|7.26|     13.0| Absent|   50|  19.82|   5.19| 16|  0|[101.0,0.48,7.26,...|  0.0|[2.00202242646562...|[0.88100925630563...|       0.0|
|108|    0.0|1.43|    26.26| Absent|   42|  19.38|    0.0| 16|  0|[108.0,0.0,1.43,2...|  0.0|[2.91657410259036...|[0.94865969878719...|       0.0|
+---+-------+----+---------+-------+-----+-------+-------+---+---+--------------------+-----+--------------------+--------------------+----------+
only showing top 2 rows



# PySpark: Logistic Regression (II)

In [40]:
# assuming data is small
import numpy as np
y_pred = np.array([int(row.prediction) for row in pred_df.select('prediction').collect()])
y_true = np.array([int(row.label) for row in pred_df.select('label').collect()])
print('Test accuracy: %.2f' % np.mean(y_pred == y_true))

Test accuracy: 0.78


In [41]:
# for AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
auc = evaluator.evaluate(pred_df)
print('Test AUC: %.2f' % auc)

Test AUC: 0.83


# PySpark: Random Forest

In [44]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(numTrees=1000)

rfModel = rf.fit(train_df)

pred_df = rfModel.transform(test_df)

# when data isn't so small, get accuracy with MulticlassClassificationEvaluator, but beware of the automatic threshold
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(pred_df)
print('Test accuracy: %.2f' % accuracy)

Test accuracy: 0.75


In [45]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
auc = evaluator.evaluate(pred_df)
print('Test AUC: %.2f' % auc)

Test AUC: 0.79
