# Welcome to this mapreduce with Spark lab !

During this lab, we are going to explore some simple examples of how to
write big data pipelines using spark, based on the principles of mapreduce.

A very important resource to look for help is [the spark documentation](https://spark.apache.org/docs/latest/)

In [None]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
conf = pyspark.SparkConf().setAppName("MySparkApp")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '5G')
        .set('spark.driver.memory', '6G')
        .set('spark.driver.maxResultSize', '4G'))
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

# Part 1: Compute Pi

We are going to use a Monte-Carlo estimation of the number $\pi$.
For more information about this method, [see here.](https://academo.org/demos/estimating-pi-monte-carlo/)

To start, implements two ways to compute Pi with the monte-carlo method, with and without Spark.

In [None]:
def regularPi(n: int) -> float:
    """Estimates Pi using just python."""
    pass

def sparkPi(n: int) -> float:
    """Estimates Pi using spark"""
    pass

num_samples = 1000000
print("Estimation of Pi: {0}".format(sparkPi(num_samples)))

### To understand the power of parallel computing (provided your computer has several cores), let's measure the execution time over a big number of samples

In [None]:
def chronometer(func, n: int) -> float:
    """Measure the execution time of the function func with the parameter n"""
    before = time.time()
    func(n)
    after = time.time()
    return after - before


def gain(n: int):
    """Measure the speed factor of the spark function over the regular one, when run over n samples."""
    return chronometer(regularPi, n) / chronometer(sparkPi, n)

### How does the gain evolve with the number of n samples? Why? Can you plot it?

# Part 2: around word count

We are going to use the raw file from the BBC data set that can be found at http://mlg.ucd.ie/datasets/bbc.html

## First step: load the data

write the function to load the data as a Spark DataFrame

In [None]:
def getFilenames(folder) -> list:
    """returns the list of file names to where the bbc data is"""
    pass

def loadDataFrame(folder):
    filenames = getFilenames(folder)
    return spark.read.text(filenames)

## Second step: Exploration

Let's explore what have been loaded.
- Can you check the value of one entry? of several?
- How many entries do you have?
- How many non empty ones?
- Each entry is a text, can you write a mapreduce to plot the distribution of sizes of thoses texts?

## Thrid step: the word count map reduce

- Can you write a map reduce to count the number of occurrences of each word in the dataset
- Are you satisfied with the output? Why?
- How to clean the input?
- How to save the data on disk ?
- Can you write a version with a combiner? What is the difference

In [None]:
def wordCountMapper(text) -> list:
    """given a text, returns a list of tuples (term, count)"""
    pass

def wordCountReduce(x, y):
    """Given a two counts, returns the combined count."""
    pass

## Bonus Questions

- We are counting only single words, can you count ngrams?
- How to find which ngrams are statistically meaningful?