# Multiprocessor programming
Created by Francesc Dantí & Lluis Garrido  

This lecture is centered in Multiprocessor programming

+ **Multiprocessor programming** is a programming technique that enables taking profit from the multiple processors from a single computer or a set of computers interconnected between them.
+ **Big Data** is a term that encompasses analyzing, visualizing and storing large data sets. Big data is usually difficult to deal with most desktop statistic and visualization packages, requiring therefore parallel software running on tens, hundreds or even thousands of processors. You'll see this with Francesc Dantí. Jordi Nin will introduce you into Spark, a specialized package for this kind of problems.

In this lecture we will focus on single computer multiprocessor programming, that is, on using the multiple processors available in a desktop computer to reduce the computation time necessary to solve a problem. 

## Introduction

Major chip manufactores have give up trying to make processors run faster. Moore's law continues to be true: each year more and more transistors fit into the same space, but their clock speed cannot be increased without overheating. With the current technology, a silicon based chip would melt at a clock speed of 5 GHz! Instead, manufactorers have turned to "multicore" architectures, in which multiple processors (cores) communicate through hardware. 

The operating system is the application that manages these multiple cores. If two computation intensive processes (i.e. applications) are run on the computer, the operating system manages to run each task on two different cores. If we only have a single computation intensive task it will only run on one core, even if our computer has multiple cores. If nothing is done explicitly, we will waste a lot of computation power!

Our objective in these lectures is to show the tools python offers us to take advantage of the multiple processors. In order to take profit of the multicore capabilities the amount of tasks should equal the number of processors. There is no much sense in defining more tasks than cores we have, that is, there is no sense in manually defining 8 computation intenstive tasks if our computer has only 4 cores. In this latter case the operating sytem will try to run 8 tasks using 4 cores. Technically, you'll see a loss of efficiency in this case since the operating system will loose compuptation time switching between the tasks.

We will see how to do this in these lectures from a low-level point of view. That is, we will manually split the computation work in
multiple tasks so that each one is executed in different cores. This is what usually has to be done: the programmer
has to manually perform the split and the operating system will automatically execute
each task on a different core. This is the principle of parallel programming: harnessing
multiple processors to work on a single task by dividing it in multiple (smaller)
tasks. 

Parallelization can also be performed by means of distributed computing. Whereas in multicore systems the cores communicate between themselves through the bus at the hardware level, in distributed systems a software communicates and coordinates the actions of computational entities located on a network. The computational entities are usually computers. In distributed computing a large number of discrete computers, named nodes, distributed across a network (e.g., the Internet) devote some or all of their computation time to solve a common problem; each node receives and completes many
small tasks, reporting the results to a central server which integrates the reults into the overall solution. However, since information is exchanged trough the network, care must be taken in order to select the amount of information that is passed in order to optimize the computational performance.

IPython offers indeed an environment capable of dealing with both architectures in a transparent
manner for the programmer. The user should be aware of the underlying
architecture in which the application will be run in order to avoid performance
degradation.

## What can be parallelized?

What kind of algorithms can be parallelized? Well, stricly speaking we may say that all algorithms can be parallelized up to certain degree. The question would be: what kind of algorithms can be easily parallelized? The answer is simple: those algorithms in which we need to process the elements of the database in an independent manner. For instance, matrix multiplication is parallelizable task. Take a look at the figure below: each element of the resulting matrix is computed in an independent way with respect the other elements. This means that we can split the resulting matrix; each part can be assigned to a different task. Matrix-like multiplication problems appear often in different kind of problems: think, for instance, in the matrix refactorization algorithm used by recommenders.

Other kind of problems that can be easily parallelized are document analysis. For instance, assume we have several documents which need to be cleaned previous to analysis. This cleaning can be easily parallelized since usually each document is processed in an independent manner. The analysis itself may be parallelizable if each document can be processed in an independent manner with respect the others. Other kind of problems are counting words in a document, for instance.

<br/>
<center><img width="40%" src="images/matmul.png"></center>
<br/>

What is not easily parallelizable? With your knowledge, don't try to parallelize algorithms such as the supervised learning or graph algorithms. These kind of algorithms need "all the data at once" to learn and thus it is not an easy parallelizable problem. Another completely different problem would be to train several different support vector machines. In this case each machine can be trained in an independent manner with respect to the others. 

In conclusion, you can parallelize those problems in which you are able to divide the problem into independent tasks.

#### How can a code be parallelized?

This is a difficult question to answer but some hints are given here. In order to be able to transform a sequential application into a parallel application, one needs to detect those parts of the code in which independent tasks are performed. Once those parts have been identified, the code has to be reorganized to take advantge of this independence.

Two different decomposition techniques (of the code) can be used:
+ Task decomposition: identify within the code those tasks that are independent with respect each other. For instance, independent function calls, independent iterations of a loop, and so on. E.g.: performing different matrix multiplications, classifying multiple elements using a SVM, ... 
+ Data decomposition: it is based on dividing the data into smaller parts. Each part is assigned to a different task. E.g.: the matrix multiplication by dividing it into different parts, a big file that is divided into parts so that each part is processed by a different engine (see New York example below).

There is no magic way to identify the way the code can be reorganized. However, it is useful to begin with those parts of the code that have a high computational load. 

Finally, one word of advise: do not enter the world of parallelization unless you are sure your algorithms works. Once you are sure it works, you may try parallezation if execution time is important for you. Use it if you have a lot of data to process!

## Architecture overview

We show here a simplified version of the architecture that allows us to take advantage of multiple cores (for a more detailed description please see this [link](http://ipython.org/ipython-doc/2/parallel/parallel_intro.html#architecture-overview)).

<br/>
<center><img width="50%" src="images/architecture.png"></center>

Each of the blocks explained below:

+ **Each engine is a Python instance**, usually a Python interpreter, that receives commands through a network connection. When multiple engines are started, parallel and distributed computing becomes possible.
+ The **client is a Python object** created at an IPython interpreter or a notebook. This object will allow us to send commands to the Python engines.
+ The scheduler is an application that distributes the commands to the engines. We will see that there are two ways of distributing this work: the direct view and the load balanced view.

Take into account that

+ Each engine is an independent instance of a Python interpreter. All the variables declared at, e.g. engine 1 are not visible at the remaining engines. In a similar way, if we want to work with `numpy` functions we should import this package at all engines.
+ As commented before, we won't be able to control on which CPU each engine is executed. What we can do is to send a different job to each engine and the operating system will execute each python instance (with its job) on a different CPU. The number of engines should be thus at most the number of available CPUs. In Python there are, however, some details that should be taken into account in order to maximize efficiency.
  

This lecture will assume that the client and the engines all run on the same computer. In **Big Data II** lecture we've focused on a distributed architecture, that is, an architecture in which engines, schedulers and clients may run on different computers, interconnected by a network interface.



## Getting started

The first thing to do is to start the cluster. There are two ways for doing it:

### From the notebook interface

This is the simplest way of proceeding and is the recommended way for newbies in this topic. Within the IPython notebook, you can use the Clusters tab of the dashboard, and press Start with the desired number of cores, under the desired profile. This will automatically run the correct commands to start your IPython cluster.

The notebook will be used as interface to the cluster, that is, we will be able to send diverse tasks to the engines.

### From the command-line

On the command-line, you can run the following command to start an IPython cluster:

    $ ipcluster start

The latter command will create a cluster with N engines, where N equals the N umber of cores. If you want to create a cluster with a different number of engines just run

    $ ipcluster start -n 4
    
With the latter command we start a cluster with 4 engines. 

In order to send tasks to the latter cluster you just need to run an ipython interpreter.

    $ ipython
    


## Connecting to the cluster (the engines)

The last section has shown us how to initialize the cluster. No matter which is the way you have selected, the following commands allows you to connect to it. These commands should be either introduced through the notebook (as done here) or typed into the ipthon command line interpreter.

In [None]:
import ipyparallel as parallel
engines = parallel.Client()
engines.block = True
print(engines.ids)

The previous command is executed by the notebook and outputs the number of engines in the cluster. If an error is shown when running the commands, the cluster has not been correctly created. We will explain later on the meaning of the **block** attribute.

The variable **engines** is an object that represents the available engines to which commands can be sent. Let us see now two different ways by which we may send tasks to the engines: the first, called **direct view**, is the simplest one and allows the user to directly control which tasks are sent to which engines; the second, called **load balanced view**, delegates in the controller to which engines each task is sent. 

As will be seen later, the first one is useful if a task can be computationally evenly distributed in smaller tasks wheras the second is more useful if such subdivison cannot be easily done. For instance, if we have
to analyze multiple data files the direct view is a good approach
if all files have approximately the same size. But if files differ
(quite a lot) in size the load balanced view is a better approach.
Let us now see both approaches.



We will now see both approaches.

## Direct view of engines

#### Calling Python functions

You may run any command as usual in the notebook (or IPython command-line interpreter). Note that we currently do not take advantage of the multiple engines.

In [None]:
a = 1
b = 10
c = a + b
print(c)

How do we do to send a command to the cluster? For the moment we just show the principles and then will go into details. Recall that the engines variable just defined represents the engines of the cluster. Within the direct view, `engines[0]` respresents the first engine, `engines[1]` the second engine, and so on. The next commands sends commands to the first engine

In [None]:
engines[0].execute('a = 2')
engines[0].execute('b = 10')
engines[0].execute('c = a + b')

The latter command executes the same commands as before but on the first engine. We can retrieve the result by doing

In [None]:
engines[0].pull('c')

Observe that we do not have direct access to the command line of the
first engine. Rather, we may send commands to it through the client.

What about parallelization? Let us try the next

In [None]:
engines[0].execute('a = 2')
engines[0].execute('b = 10')
engines[1].execute('a = 9')
engines[1].execute('b = 7')
engines[0:2].execute('c = a + b')

The previous commands initialize different values for a and b at engines
0 and 1 and executes the sum at both engines. Since each engine is
an independent process, the operating system may schedule each engine
in different cores and thus execution is performed in parallel. Again,
as before, we can retrieve both results using the pull command:

In [None]:
engines[0:2].pull('c')

With the previous commands we are directly accessing the engines and therefore this type of approach is direct view. We may use Python commands to simplify a bit the previous commands. 

In [None]:
dview2 = engines[0:2]                 

The variable 'dview2' references the first two engines. In order to access both engines at the same time we may just write

In [None]:
dview2.execute('d = 2 * a + b')
dview2.pull('d')

From now on, we will use the `dview2` variable to simultaneously execute commands on engines 0 and 1. 

In order to define a variable that refers to all the engines we may just write

In [None]:
dview = engines.direct_view()

The latter variable allows to send jobs to all the engines at once.

Let us now show that we are really doing computations in parallel. Let us try with something bigger! Two matrix multiplication, for instance. We begin by doing serialized computations on the notebook and compute the total processing time. We fill perform the product manually, since these type of operations are usually performed in the gradient descent of the factorization in the recommenders.

In [None]:
import time
import numpy

size = 300

# Create two size x size matrix
A = numpy.random.rand(size,size)
B = numpy.random.rand(size,size)

# Resulting matrix
C = numpy.zeros(shape=(size, size))

print("We begin computations")
t0 = time.time() 
for i in range(size):
    for j in range(size):
        result = 0
        for r in range(size):
            result += A[i][r] * B[r][j]
        C[i][j] = result
    
print("Time in seconds: ", time.time() - t0)

And now we will do computations in parallel. The idea is to do use two cores, and for this issue we will do half of the computations at each of the cores. On the first core the variable i will run from 0 to size/2, and on the other core it will run from size/2 to size. 

For this we write the next code

In [None]:
def mul(A, B, size, i0, i1):
    import numpy
    C = numpy.zeros(shape=(i1-i0,size))
    for i in range(i0,i1):
        for j in range(size):
            result = 0
            for r in range(size):
                result += A[i][r] * B[r][j]
            C[i-i0][j] = result
    return C

The function `mul` is defined locally but it will be run on each of both engines. We need to execute `import numpy` on the function so to ensure that that the scientific computing library becomes available on each engine. By means of the `apply` function we may remotely execute commands that are defined locally. For instance, we may run matrix in the two cores as follows:

In [None]:
size = 300
half = 150

# This is the resulting matrix
C = numpy.zeros(shape=(size, size))

print("We begin computations")
t0 = time.time()

# We perform the products on each of the engines
[C0, C1] = dview2.map(mul, [A, A], [B, B], [size, size], [0, half], [half, size])

# And now construct matrix C
C[0:half,:] = C0
C[half:size,:] = C1

print("Time in seconds: ", time.time() - t0)

As we have seen the total computing time has decreased thanks to the divison of the computation in multiple (two) tasks. We would like to comment that
+ The function `mul` is defined locally but has been executed remotely on engine 0 and engine 1 via the `map` function: technically, this is called a **remote call**. In this example engine 0 executes `mul(A, B, size, 0, half)` whereas engine 1 executes `mul(A, B, size, half, size)`.
+ The `map` function does not return (technically, it blocks) until the engines are done with their computations. This is due to the fact that we have set `engines.block` to `True` at the beginning. We will go on this issue later on again.

#### Exercise

For those that have 4 (or more cores), can you rewrite the code to that 4 of your cores are used?

In this example the task `mul(A0,B0)` is executed on one engine and `mul(A1, B1)` is executed on another one. Which command is executed on each engine? What happens if the list of arguments to map includes three or more matrices? Let us see it with the following example!

In [None]:
engines[0].execute('my_id = "engineA"')
engines[1].execute('my_id = "engineB"')

def sleep_and_return_id(sec):
    import time
    time.sleep(sec)                 
    return my_id,sec

dview2.map(sleep_and_return_id, [3,3,3,1,1,1])

Observe that the returned result indicates us which engine executed the function. In other words, it shows us how tasks are distributed among the engines. You may repeat this experment as many times as you wish, but the result will always be the same. The tasks are distributed in a uniform way among the
engines before executing them no matter which is the delay we pass
as argument to the function `sleep_and_return_id`. This is in fact a characteristic of the direct view interface: **the tasks are distributed among the engines before executing them**. 

For the proposed exeample, is this efficient? No, for sure! The total amount of time you'll have to wait is 9 seconds since the `map` call is blocking until all engines have finished.  

As we have commented before the direct view is a good way of proceeding if you expect each task to take the same amount of time. But if not, use the load balanced view as we will see shortly.

The total computation time using the cluster may be higher than the local execution time. That may happen! Note that the local computation includes only the matrix product. On the other hand, the total computation time including the cluster includes the following procedures: scatter, push, matrix product, and gather. Parallelization may not necessarily improve the overall performance since we had to include (with respect to the local computation) new procedures that were previously not necessary.

#### Blocking and non-blocking commands

Let us now focus on the blocking issue. Recall that at the beginning we have set `engines.block = True`. Then, by default, all the calls to functions such as `execute`, `map`, `push` or `pull` block and do not return until all engines are finished with their execution. We have seen this with the previous examples. 

We may change the default behaviour and make the calls non-blocking. Let us see it with the matrix multiplication example

In [None]:
dview2.block = False
async = dview2.map(mul, [A, A], [B, B], [size, size], [0, half], [half, size])
print("Hey, it's me again!")

By setting `dview2.block` to `False` the `map` call just sends the request to the engines and returns immediately. 

When running `map` in non-blockig mode, it returns an `Asyncresult` object (the same is valid for the other functions we  have seen). The returned object can be used to request if computations have finished. For instace, for the matrix multiplication performed previously we can check if the result is already avaiable 

In [None]:
async.ready()

The member function `ready` is a non-blocking function (i.e. it will return immediately) that will return `False` if result is not ready, and `True` otherwise.

We may use the `get` member to retrieve the result

In [None]:
[C0, C1] = async.get()

The latter function will **block** until the result is available, no matter if the result is available or not when calling this function. Thus, from a practical point of view, the following blocking code 
```python
[C0, C1] = dview2.map(mul, [A, A], [B, B], [size, size], [0, half], [half, size], block = True)
```
is equivalent to
```python
async = dview2.map(mul, [A, A], [B, B], [size, size], [0, half], [half, size], block = False)
[C0, C1] = async.get()
```
Which of both behaviours is better, if any, blocking or non-blocking? If you want to obtain good efficiency, we recommend to use the non-blocking mode since it allows the client to perform other tasks while the engines are performing computations, e.g. retrieve data from internet or disc for the next tasks the engines will have to perform. If the task the client performs is computationally intensive you may need to "reserve" one core for this issue. That is, rather than having one engine running on each CPU the number of engines should be one minus the number of CPUs. In general, it is better that the number of (computationally intensive) tasks equals the number of CPUs. Otherwise, the operating system will be overwhelmed trying to plan the execution of more tasks than CPUs. If, however, the task the client performs is computationally low intenstive there should be no problem in having all tasks running at once. You should select one or other method depending on the performance of your experimental results.

#### Conclusions

As we have seen, parallelization can be used to reduce execution time. If sequential time execution takes T seconds, parallelization with the presented technique allows to reduce execution time up to T/N secons, where N is the number of cores. Take into account that

+ It is not recommended to use parallelization until you are sure that the algorithm you have implemented works properly. 
+ Parallization usually implies modifying the original sequential code. Some new functions may be needed, or data to process may be manually divided into several parts in order to be processed by each of the engines. Obtaining a good performance improvement may not be an easy task. 


## Load balanced view of the clients

The load balanced view is an interface that allows, as the direct view interface, parallelizing tasks. With this interface, however, the user have no direct access to individual engines. It is the IPython scheduler that assigns work to each client. This interface is simultaneously simpler and more powerful.

Let us first recall the way the interpreter connects to the clients using a **Direct View**:
```python
from IPython import parallel
engines = parallel.Client()
engines.block = True       
engines = clients.direct_view()
```

To create a **Load Balanced View** we will use the following command

In [None]:
engines.block = True
lview2 = engines.load_balanced_view(targets=[0,1])    # Engines 1 and 2
lview = engines.load_balanced_view()                  # All engines

Note that we have created a load balanced view in blocking mode. 

Our first example wil be centered in the `sleep_and_return_id` function we have shown before

In [None]:
engines[0].execute('my_id = "engineA"')
engines[1].execute('my_id = "engineB"')

def sleep_and_return_id(sec):
    import time
    time.sleep(sec)                 
    return my_id,sec

lview2.map(sleep_and_return_id, [3,3,3,1,1,1])   # Experiment and change these values! For example: [10,1,2,2,2,2,1,1,1]

Let us now use a LoadBalanced scheduler! We can see that by default, LoadBalanced View scheduler assigns a new task to an engine when it becomes free. Changing this behavior is not the scope of this lesson but this <a href="http://ipython.org/ipython-doc/stable/parallel/parallel_task.html#schedulers">link</a> can give you more details.

We will now see a more complete example to see the usefullness of this kind of behaviour. We will count the words of a set of files! For that issue we will see first the execution in a non-parallel computation, using the direct view, and in the non-balanced view.

In [None]:
import time
import re
from collections import Counter

cnt_total = Counter()

# This is a remote function
def count_words(file):
    cnt = Counter()
    words = re.findall(r'\w+', open(file).read().lower())
    for word in words:
        cnt[word] += 1
    return cnt

# We now read the file that contains the words        
with open("base_dades/llista_ordenada_120.cfg", "r") as myfile:
  data=myfile.read().split() 

print("We begin computations")
t0 = time.time()

for file in data:
    cnt_total += count_words(file)

print("Time in seconds: ", time.time() - t0)

cnt_total.most_common(5)

We see here the implementation using the direct view. Observe how the counter is used.

In [None]:
import time
from collections import Counter
cnt = Counter()
dview2.push(dict(cnt = cnt))

# This is a remote function
def count_words(file):
    from collections import Counter
    import re
    words = re.findall(r'\w+', open(file).read().lower())
    for word in words:
        cnt[word] += 1

# We now read the file that contains the words        
with open("base_dades/llista_ordenada_120.cfg", "r") as myfile:
  data=myfile.read().split()       

print("We begin computations")
t0 = time.time()

dview2.map(count_words, data);

[cnt1, cnt2] = dview2.pull('cnt')

cnt_total=cnt1+cnt2

print("Time in seconds: ", time.time() - t0)

cnt_total.most_common(5)

Observe how much time it takes to process these files. In this case, files are distributed uniformly among the two engines. But the files have been ordered by size, so computation is not evenvly distributed among the the engines. 

The code we have seen is the principle of the **MapReduce** programming model: a MapReduce program is composed of a Map() procedure that performs filtering and sorting (such as counting the number of times each word appears in a file) and a Reduce() procedure that performs a summary operation (that is, taking each of the results and computing the overall result).

Let us try the load balanced view!

In [None]:
cnt = Counter()
dview2.push(dict(cnt = cnt))

print("We begin computations")
t0 = time.time()

lview2.map(count_words, data);   # The code changes here!!!!!

[cnt1, cnt2] = dview2.pull('cnt')

cnt_total=cnt1+cnt2

print("Time in seconds: ", time.time() - t0)

cnt_total.most_common(5)

Observe that we have reduced the computation time to process the files. Great! Observe, however, that with the load balanced view we do not know at which engine the task will be executed. We use the direct view to `push` and `pull` the results from the engines.

Some general conclusions
+ We may use both the direct view and the load blanced view interface for executing commands on engines. Just use the one that fits best your needs.
+ With the load balanced view, by default, the Python scheduler assigns work to each engine as soon as they finish with the previous assigned tasks.
+ The `map` function returns as soon as all tasks have finished since we are using the blocking mode.

You can try now to process all files (`llista_tot.cfg`) using a direct view and a load balanced view. Take into acount that here files are not ordered by size. Thus, the improment you obtain may not be very big. Use all the processors you have for this isssue.

## An example using non-blocking commands: the New York taxi trips database

This section presents a real application of the parallel capabilities
of IPython and the discussion of several approaches to it. The dataset
is a database of taxi trips in New York and it has been obtained through
a Freedom of Information Law (FOIL) request from the New York City
Taxi \& Limousine Commission (NYCT\&L) by University of Illinois at
Urbana-Champaign (http://publish.illinois.edu/dbwork/open-data/).
The dataset consists in $12\times2$GBytes Comma Separated Files (CSV)
files. Each file has approximately $14$ million entries (lines) and
is already cleaned. Thus no special preprocessing is needed to be
able to process it. For our purposes we are interested only in the
following information from each entry: 

+ `pickup_datetime`: start time of the trip, mm-dd-yyyy hh24:mm:ss
EDT. 

How many pickups are performed
during week days and how many during weekends? And how many pickups
are performed in the morning? 

Implementing the previous classification is rather simple since it
only requires checking, for each entry, check the pickup datetime. Performing this task in
a sequential may take a rather large amount of time since the number
of entries, for a single CSV file, is rather large. In addition, special
care has to be taken when reading the file since a 2GByte file may
not fully fit into the computer's memory. 

We may take advantage of the parallelization capabilities in order
to reduce the processing time. The idea is to divide the input data
into chunks so that each engine takes care of classifying the entries
of their corresponding chunks (i.e. Data decomposition). We propose here an approach which 
is based on implementing a producer-consumer paradigm
in order to distribute the tasks. The producer, associated to the
client, reads the chunks from disc and distributes them among the
engines using a round robin technique. No explicit `map` function
is used in this case. Rather, we simulate the behavior of the `map`
function in order to have fine control of the parallel problem. Recall
that each engine is an independent process. Since we assign different
tasks to each engine, the operating system will try to execute each
engine on a different process.

### The source code

We begin by initializing the engines:

In [None]:
%reset -f

import ipyparallel as parallel
from itertools import islice
from itertools import cycle

#Connect to the Ipython cluster    
engines = parallel.Client()

# By default we use non-blocking feature
engines.block = False

#Create a DirectView to all engines
dview = engines.direct_view()

print("The number of engines in the cluster is: " + str(len(engines.ids)))

We next declare the functions that will be executed on the engines. We do this thanks to the `%%px` parallel magic command.

In [None]:
%%px

# The %%px magic executes the code of this cell on each engine.

# Define instructions to be executed on each engine.
from datetime import datetime
import pandas as pd
import numpy as np
from collections import Counter
import time

def is_morning(d):
    "Given a datetime, returns if it was on morning or not"
    h = datetime.strptime(d, "%Y-%m-%d %H:%M:%S").hour
    if (0 <= h and h < 12):
        return 1
    else:
        return 0

def is_weekend(d):
    "Given a datetime, returns if it was on weekend or not"
    wday = datetime.strptime(d, "%Y-%m-%d %H:%M:%S").weekday() #strptime transforms str to date
    if (4 < wday <= 6):
       return 0
    else:
       return 1

# Function that given a dictionary (data), applies classify function on each element
# and returns an histogram in a Counter object
def process(b):
    #Recives a block (list of strings) and updates result in global var local_total()
    global local_total
    
    #Create an empty df. Preallocate the space we need by providing the index (number of rows)
    df = pd.DataFrame(index=np.arange(0,len(b)), columns=['datetime', 'is_morning', 'is_weekend'])
   
    # Data is a list of lines, containing datetime at col 5 and latitude at row 11.
    # Allocate in the dataFrame the datetime and latitude and longitude dor each line in data
    count = 0
    for line in b:
        elements = line.split(",")
        df['datetime'].iloc[count] = elements[5]
        count += 1

    #Delete NaN values from de DF
    df.dropna(thresh=(len(df.columns) - 1), axis=0)
   
    #Apply classify function to the dataFrame
    df['is_morning'] = df['datetime'].apply(is_morning)
    df['is_weekend'] = df['datetime'].apply(is_weekend)

    cdf = df.drop('datetime', axis=1)

    #Increment the global variable local_total
    count_morning = cdf['is_morning'].sum()
    count_weekend = cdf['is_weekend'].sum()
    local_total += Counter({'is_morning': count_morning, 'is_weekend': count_weekend})

# Initialization function
def init():
    #Reset total var
    global local_total
    local_total = Counter()

Next we show the code executed by the client. The next code performs the next task

+ It reads a chunk of `lines_per_block` lines form the file. The chunk is assigned to an engine which performs the classification. The result of the classification is updated on a local variable on each engine. This process is repeated until all chunks have been processed by the engines.
+ Once finished, the client retrieves the local variable of each engine and computes the overall result.


In [None]:
# This is the main code executed on the client
from collections import Counter

import time
t0 = time.time() 

#File to be processed
filename = './head_100000.csv'

#Defines a generator that reads lines from a file and serves it in blocks
def get_chunk(f,N,first):
    #Permit to delete header on first chunk
    if first: 
        line0 = 1
    else:
        line0 = 0

    # Returns blocks of N lines from the file f
    while True:
        yield list(islice(f, line0, N))

# A simple counter to verify execution
chunk_n = 0

# Number of lines to be sent to each engine at a time. Use carefully!
lines_per_block = 2000

# Create an emty list of async tasks. One element for each engine
async_tasks = [None] * len(engines.ids)

# Cycle Object to get an infinite iterator over the list of engines
c_engines = cycle(engines.ids)

# Initialize each engine. Observe that the execute is performed
# in a non-blocking fashion.
for i in engines.ids:
    async_tasks[i] = engines[i].execute('init()')

# The variable to store results
global_result = Counter()

# Open the file in ReadOnly mode
f = open(filename, 'r') #iterable

# Keep track of the first chunk to be able to remove the header
first_chunk = True

# Used to show the progress
print('We begin sending data')

# While the generator returns new chunk, sent them to the engines
while True:
    # Read a new chunk from generator
    new_chunk = get_chunk(f,lines_per_block,first_chunk).__next__()
    if not new_chunk: #if the list is empty, break the loop
        break
    
    #After the first loop, first_chunk is False. 
    first_chunk = False
    
    #Decide the engine to be used to classify the new chunk
    run_engine = c_engines.__next__()
    
    # Wait until the engine is ready
    while ( not async_tasks[run_engine].ready() ):
        print("Wait till the engine finalizes its previous task")
        time.sleep(1)
    
    #Send data to the assigned engine.
    mydict = dict(data = new_chunk)
    
    # The data is sent to the engine in blocking mode. The push function does not return
    # until the engine has received the data. 
    engines[run_engine].push(mydict,block=True)

    # We execute the classification task on the engine. Observe that the task is executed
    # in non-blocking mode. Thus the execute function reurns immediately. 
    async_tasks[run_engine] = engines[run_engine].execute('process(data)')
    
    # Increase the counter    
    chunk_n += 1
    
    # Update the progress
    if chunk_n % 1000 == 0:
        print("Chunk: " + str(chunk_n))

# Get the results from each engine and accumulate in global_result
for engine in engines.ids:
    # Be sure that all async tasks are finished
    while ( not async_tasks[engine].ready() ):
        print("Wait till the engine finalizes its last task")
        time.sleep(1)
    global_result += engines[engine].pull('local_total', block=True)

#Close the file
f.close()

print("\n")
print("Total number of chunks processed: " + str(chunk_n))
print("---------------------------------------------")

print("Time in seconds: ", time.time() - t0)

Several experiments have been performed previously on a i7-4790 CPU with 4 physical cores
with HiperThreading and 8Gb of RAM. We have made experiments with
different number of engines and different number of lines per block (variable
`lines_per_block` in previous subsection). 

### Number of lines per block

We begin with the effect of the number of lines per block. Experiments
have been made using 8 engines, that is, the number of processors of the computer. Thus, in our environment there will be a total of 9 processes running:
the client (i.e. the producer), which is in charge of reading the CSV file and distributing them
among the engines in blocks defined by the variable associated to lines per
block, and 8 engines (i.e. the consumers) that will take the blocks of data of the producer and
process them. For the experiments only 1 million lines have been processed
(one may generate a file with one million lines by using i.e. the Unix command `"head -n 1000000 file.csv"`).

</br>
    <center><img width="60%" src="images/1M8cores.png"></center>
    
As can be seen, an optimal execution
time is located near 2,000 lines per block. With fewer number of lines per
block, efficiency is lost because most of the time engines are idle and thus cores
also are idle. Recall that the client begins by distributing the chunks among the
engines. If the chunk is to short it may happen that engine 0 or 1 have already
finished processing their chunk at the moment at which the client sends the chunk
to engine 3 or 4, for instance. Chunks should be large enough so that the engine
finishes processing its chunk just before the client sends it a new chunk. 
In addition to the prevoius inconvenient, using small chunks may waste lots of 
computational time managing short messages between processes. 

On the other hand, when working with more than 6,000 lines
per block, the messages to be passed between processes are too big to be
moved quickly. Moreover, an engine may have not finished processing its 
chunk when the client asks him if it has finished. If the engine has
not finished the client will wait for one second and ask again. This
waiting time reduces the overall performance.

The effect of the previous waiting time is interesting. Tests can be done to show that with a
lower waiting time the optimal lines per block value is reduced. Nevertheless,
optimal execution time does not change because the optimal execution time
is based on not having idle cores.

### Number of engines

The number of engines is associated to the level of parallelization that the
code can reach. We have tested our algorithm using 2000 lines per block and
different number of engines using again a reduced version of one CSV file. In
this case 100,000 lines have been processed. 

</br>
    <center><img width="60%" src="images/parallelization.png"></center>
    
Once the minimum
is reached (in this case for 8 cores) there aren’t any benefits on parallelizing
the job with more engines; on the contrary, with more processes, the operating system scheduler is going to spend more time managing processes so the
execution time may downgrade. That is, the operating system scheduler may
become a bottleneck.

### Overall results

With this optimal value of 2,000 for lines per block variable we have executed
our algorithm over a whole CSV file made up of 14.7 million lines. Execution
time with 8 engines is 1009 seconds (17 minutes) and, with 4 engines, time increases to
1895 seconds (32 minutes).

