#Table of Contents
* [1. Introduction](#1.-Introduction)
	* [1.1 Cluster Computing](#1.1-Cluster-Computing)
	* [1.2 Distributed Resource Managers](#1.2-Distributed-Resource-Managers)
* [2. Usage of DRMs](#2.-Usage-of-DRMs)
	* [2.1 Single Job Configuration](#2.1-Single-Job-Configuration)
	* [2.2 Array and Parallel jobs](#2.2-Array-and-Parallel-jobs)
	* [2.3 DRMAA](#2.3-DRMAA)
* [3. Complementary approaches](#3.-Complementary-approaches)
	* [3.1 Jug: Simple Task Based Parallelism](#3.1-Jug:-Simple-Task-Based-Parallelism)
	* [3.2 IPython Parallel](#3.2-IPython-Parallel)
	* [3.3 Modern Cluster Computing](#3.3-Modern-Cluster-Computing)
* [4. Conclusion](#4.-Conclusion)


# 1. Introduction

In this IPython/Jupyter Notebook (which was itself created and executed in a computing cluster) we will explore advances usages of Distributed Resource Managers (DRMs) and its limitations for common parallel task. Then, some complementary approaches will also be reviewed. In this section, the cluster computing and DRM concepts are introduced which will be throughly used along this document.

## 1.1 Cluster Computing

Both in science and industry, the use of computer clusters for speeding up data processing tasks and centralising computing resources is a common practise. A computer cluster can be generally defined as set of interconnected computers (i.e. nodes) that can work together to perform coordinated tasks. Even though the design and scale of a computer cluster depend of the implementation and use requirements, apart from being in the same network, most modern computing clusters have the common characteristic that they share a clustered file system, which can be used for storage and sharing heterogeneus data between nodes and cluster users.

For even larger problems that can be distributed, grid computing is a different approach, which uses a collection of heterogeneous computing resources (e.g. cluster nodes) at different locations but loosely connected. Depending on the scale and type of problem, using a computer cluster can be more convenient and faster than grid computing, because of data locality and easier resource management.

## 1.2 Distributed Resource Managers

In order to manage the availiable resources with heterogeneous workloads, a distributed resource manager (DRM, also referred as batch-queuing system) is usually installed at computing clusters. This system is in charge of job scheduling and computing resource management (e.g. memory and disk) for all the cluster user submissions. Some commonly used DRMs include SGE (Sun Grid Engine derivatives), PBS (Portable Batch System), HTCondor or SLURM.

While batch-queuing systems are very useful for cluster administration and simple job submission and scheduling, the job submission interfaces commonly provided are not convenient for the execution of complex coordinated tasks. In this work, we will start by reviewing advanced use cases of DRMs and then we will explore some complementary modern approaches to solve their limitations.

# 2. Usage of DRMs

In this section, some advanced use cases of [SGE batch-queuing system](https://arc.liv.ac.uk/trac/SGE) (that is the distributed resouce manager installed at [IFCA cluster](https://grid.ifca.es/wiki/Cluster)) will be demonstrated. Son of Grid Engine is a open source community continuation of the Sun Grid Engine project, a more extensive description of uses and capalibities is provided at [SoGE webpage](http://arc.liv.ac.uk/SGE/).

## 2.1 Single Job Configuration

The most basic command in any DRM is the one used for submiting jobs to the cluster queue, which in case of SGE is *qsub*. Apart from *qsub*, SGE provides *qrsh* and *qlogin* for interactive sessions at one of the computing nodes. Job configuration parameters as project, wall clock time or available memory can be configured with different arguments either in a configuration file (i.e. .seg_request), the job script itself or as command-line flags. In the following example, a simple SGE scriptis used to calculate pi with arbitrary precision (e.g. 10000 digits) using [Machin's formula](http://en.wikipedia.org/wiki/John_Machin). In this case, the precision up to calculate pi can be configured with a command line argument when *qsub* is called. Many job configuration parameters are also specified in the configuration file.

In [8]:
from utils import highlight_source_bash
highlight_source_bash("pi_machin.sge")

In order to run an SGE job at the IFCA cluster, it is compulsory to specify a project (it could also have been included at the script file). Hence, the following command will send a job for the calculation of $\pi$ with 10000 significant digits to the cluster queue. The *-l immediate* flag is for fast scheduling of short jobs while the *-P* flag defines the project.

In [9]:
!qsub -l immediate -P l.gaes pi_machin.sge 10000

Your job 5928932 ("pi_machin") has been submitted


The *qsub* command confirms that the job has been submitted and returns a job-ID. We can check the status of running using the *qstat* command (the -u username flag is useful for listing only user jobs).

In [17]:
!qstat -u $(whoami)

After some minutes, the job has finished and we can check that the results have been written to *pi_machin.out*. The first few lines are shown at the next cells.

In [15]:
ma!head -5 pi_machin.out

3.141592653589793238462643383279502884197169399375105820974944592307\
81640628620899862803482534211706798214808651328230664709384460955058\
22317253594081284811174502841027019385211055596446229489549303819644\
28810975665933446128475648233786783165271201909145648566923460348610\
45432664821339360726024914127372458700660631558817488152092096282925\


Another useful SGE command is *qacct*, which can be used to summarize resource consumption and accounting information about jobs. 

In [18]:
!qacct -j 5928932

qname        cloud.short.q       
hostname     cloudprv-04-1.ifca.es
group        computacion         
owner        pablodcm            
project      l.gaes              
department   computacion         
jobname      pi_machin           
jobnumber    5928932             
taskid       undefined
account      sge                 
priority     -5                  
qsub_time    Sun May  3 13:31:16 2015
start_time   Sun May  3 13:31:19 2015
end_time     Sun May  3 13:32:01 2015
granted_pe   NONE                
slots        1                   
failed       0    
exit_status  0                   
ru_wallclock 42s
ru_utime     41.501s
ru_stime     0.535s
ru_maxrss    1.758KB
ru_ixrss     0.000B
ru_ismrss    0.000B
ru_idrss     0.000B
ru_isrss     0.000B
ru_minflt    37830               
ru_majflt    3                   
ru_nswap     0                   
ru_inblock   512                 
ru_oublock   184                 
ru_msgsnd    0                   
ru_msg

## 2.2 Array and Parallel jobs

The previous approach could be problematic when processing a large number of jobs (e.g. 100 or more), that are identical except some parameters (e.g. the name of the input file or a random seed). In principle, a shell script for each job
could be created or the parameters can be passed as a command line arguments. However, SGE provides an unifying solution to manage this type of problems: array jobs. A SGE array job is an script that is going to be run multiple times and each execution is characterized with a task-ID enviroment variable.

The mentioned use case is very common in many disciplines (e.g. mechanical simulations with different parameters or data analysis tasks with several input files). In this case, a real world data processing task from High Energy Physics is going to be used as an example. We have a compiled program, *process_tchain* that only selects the events in a [ROOT](https://root.cern.ch/drupal/) file that verify certain topological conditions (e.g. two leptons in the final state which pass some quality criteria). Using an array job configuration file, this processing task can be launched in at the same time over all data files listed a certain file (i.e. *inputfiles.txt*) only calling once the *qsub command*.

In [4]:
from utils import highlight_source_bash
highlight_source_bash("process_tchain_array.sge")

Apart from array jobs, many DRM installations (as the one present at IFCA) allow to send job to parallel queues, using the *parallel enviroment* flag (e.g. *-pe mpi*, which uses a special queue integrated with MPI). 

## 2.3 DRMAA

So far, all the communication for submitting and controlling job with the DRM has been done using the command line interface specific for SGE. However, the [Open Grid Forum](https://www.ogf.org/ogf/doku.php) mantains the [Distributed Resource Management Application API (DRMAA)](http://www.drmaa.org/) for programmatic and tightly coupled access to cluster, grid and cloud systems. The advantages of DRMAA over the specific tools of each DRM is that job submission, control and monitoring can be standardized and controlled by different applications or programs. Most of the available DRM vendor have implemented the DRMAA specification and bindings of the API are available in several programming languages (e.g. C, C++, Java or Python).

Some examples of the use of the [Python DRMAA binding](https://github.com/pygridtools/drmaa-python) will be provided in this document (based on DRMMA documentation). To check that everything is working, we can check that a drmma session can be initialized and retrieve some information about the implementation:

In [3]:
import drmaa

# initialize session
s = drmaa.Session()
s.initialize()
# get information about session
print 'A DRMAA object was created'
print 'Supported contact strings: ' + s.contact
print 'Supported DRM systems: ' + str(s.drmsInfo)
print 'Supported DRMAA implementations: ' + str(s.drmaaImplementation)
print 'Version ' + str(s.version)
# exit session
print 'Exiting'
s.exit()

A DRMAA object was created
Supported contact strings: session=cloudprv-10-3.3280.125413370
Supported DRM systems: SGE 8.1.4
Supported DRMAA implementations: SGE 8.1.4
Version 1.0
Exiting


We can also use the DRMAA for simple job submission and getting job status in a programmatic way (instead of using *qsub and qstat*):

In [22]:
import drmaa
import os 
import time

# create drmaa session
with drmaa.Session() as s:
    print('Creating job template')
    jt = s.createJobTemplate()
    jt.nativeSpecification = u"-b no -cwd -shell yes"
    jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
    jt.args = ['42', 'Simon says:']
    jt.joinFiles=True

    jobid = s.runJob(jt)
    print('Your job has been submitted with ID %s' % jobid)

    # decode status
    decodestatus = {drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
                    drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
                    drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
                    drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
                    drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
                    drmaa.JobState.RUNNING: 'job is running',
                    drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
                    drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
                    drmaa.JobState.DONE: 'job finished normally',
                    drmaa.JobState.FAILED: 'job finished, but failed'}

    # check for status
    for ix in range(3):
        print('Checking %s of 3 times' % ix)
        print decodestatus[s.jobStatus(jobid)]
        time.sleep(10)

    print('Cleaning up')
    s.deleteJobTemplate(jt)

Creating job template
Your job has been submitted with ID 5930270
Checking 0 of 3 times
job is queued and active
Checking 1 of 3 times
job finished normally
Checking 2 of 3 times
job finished normally
Cleaning up


The next example create an array job (with 10 tasks) and waits for all of them to finish:

In [23]:
import drmaa
import os 

# create drmaa session
with drmaa.Session() as s:
    print 'Creating job template'
    jt = s.createJobTemplate()
    jt.nativeSpecification = u"-b no -cwd -shell yes"
    jt.remoteCommand = os.getcwd() + '/sleeper.sh'
    jt.args = ['42','Simon says:']
    jt.joinFiles=True
    # send array of jobs
    joblist = s.runBulkJobs(jt,1,10,1)
    print 'Your job has been submitted with id ' + str(joblist)
    s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False)
    for curjob in joblist:
        print 'Collecting job ' + curjob
        retval = s.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
        print 'Job: ' + str(retval.jobId) + ' finished with status '+ str(retval.hasExited)
    print 'Cleaning up'
    s.deleteJobTemplate(jt)

Creating job template
Your job has been submitted with id [u'5930275.1', u'5930275.2', u'5930275.3', u'5930275.4', u'5930275.5', u'5930275.6', u'5930275.7', u'5930275.8', u'5930275.9', u'5930275.10']
Collecting job 5930275.1
Job: 5930275.1 finished with status True
Collecting job 5930275.2
Job: 5930275.2 finished with status True
Collecting job 5930275.3
Job: 5930275.3 finished with status True
Collecting job 5930275.4
Job: 5930275.4 finished with status True
Collecting job 5930275.5
Job: 5930275.5 finished with status True
Collecting job 5930275.6
Job: 5930275.6 finished with status True
Collecting job 5930275.7
Job: 5930275.7 finished with status True
Collecting job 5930275.8
Job: 5930275.8 finished with status True
Collecting job 5930275.9
Job: 5930275.9 finished with status True
Collecting job 5930275.10
Job: 5930275.10 finished with status True
Cleaning up


# 3. Complementary approaches

In the last section, some examples of advanced usage of DRMs were provided. However, the command line usage of DRMs is good for simple job submission but do not allows for complex parallel workloads and it specific for each DRM. The DRMMA approach, while powerful, is in general too low level for integration in day to day cluster computing tasks. In this section, some modern complementary approaches to cluster parallel computing will be reviewed.

## 3.1 Jug: Simple Task Based Parallelism

The first alternative approach that is going to be explored is [*jug*](https://github.com/luispedro/jug), a task-based parallelization framework written in Python. Basically it allows to write code (also libraries and programs in other languages) that is broken into tasks (which can have other task as dependencies) and run on different processors of workers. 

The most interesting characteristic of this framework is that all communication between processes and workers is done through the filesystem (also works using a networked filesystem as GPFS). Hence, this framework makes the trivial the parallelization of complex processing workflows in a computing cluster. Another advantage is that workers can be added or removed at any time during execution. 

The framework automatically takes care of distributing tasks between all avaliable workers and interchanging all required information between task. The final and intermediate results are persisted to the filesystem and can be easily explored after the computation. While *jug* can introduce a huge overhead for many small tasks (filesystem communication overhead), it can be reasonably performant and incredible flexible for workflows that can be broken into larger tasks. 

An example adapted from the [framework documentation](https://jug.readthedocs.org/en/latest/text-example.html) is going to be used to demonstrate its use. In the example, the aim is to download the Wikipedia pages for the British members of Parlament (MPs) listed on a list (i.e. *MPs.txt*) and extract the words that are unique for each one of them (that somehow will characterize each MP). The full code for this task is provided in *jugfile.py*, but here we will only be interested in how the job is broken into independent tasks, which can be inferred from the next code fragment:

```python
counts = []
for mp in file('MPs.txt'):
    mp = mp.strip()
    document = Task(getdata, mp)
    counts.append(Task(countwords, mp, document))
avgs = Task(addcounts, counts)
results = []
for c in counts:
    results.append(Task(divergence,avgs, len(counts), c))
```

The problem is broken into four different tasks: getdata (from Wikipedia for each MP), countwords (in a filename), addcounts (from all the files) and divergence (returns the words that are specific for each document). To make computation slower, some sleep time has been added to all of the tasks. We can check the initial status of the computation (e.g. total number of task to be executed) by calling:

In [6]:
!jug status jugfile.py

     Waiting       Ready    Finished     Running  Task name                     
--------------------------------------------------------------------------------
         656           0           0           0  jugfile.countwords            
         656           0           0           0  jugfile.divergence            
           1           0           0           0  jugfile.addcounts             
           0         656           0           0  jugfile.getdata               
................................................................................
        1313         656           0           0  Total                         



As expected, no task as been executed yet and there are a total of 1313 tasks to be executed (some of them are not ready because they are dependent on other tasks). In order start executing them, this command has to be called:

    jug execute jugfile.py
    
While this can be directly done in the login node (even many times as different processes), we are going to call this command from the compute nodes (which are automatically coordinated through the filesystem). Therefore, the previous command (after setting up the right gcc and Python enviroment) can be executed as a SGE task array in as many compute nodes as wanted. A SGE script for that is given at *jug_task_array.sge*, which can then be sent to the queue:

In [8]:
!qsub -t 1-20 jug_task_array.sge

Your job-array 5933317.1-20:1 ("jug_task_array") has been submitted


After some time, progress can be checked again with the *jug status* command:

In [17]:
!jug status jugfile.py

     Waiting       Ready    Finished     Running  Task name                     
--------------------------------------------------------------------------------
         588           0          51          17  jugfile.countwords            
         656           0           0           0  jugfile.divergence            
           1           0           0           0  jugfile.addcounts             
           0         588          68           0  jugfile.getdata               
................................................................................
        1245         588         119          17  Total                         



Now all task which verify the dependencies are being automatically run at the workers until all of them finish. We could also add (or quit) some more workers at any time. After a while (due to fake deadtimes added), all tasks are finished:

In [38]:
!jug status jugfile.py

     Waiting       Ready    Finished     Running  Task name                     
--------------------------------------------------------------------------------
           0           0         656           0  jugfile.getdata               
           0           0         656           0  jugfile.countwords            
           0           0         645          11  jugfile.divergence            
           0           0           1           0  jugfile.addcounts             
................................................................................
           0           0        1958          11  Total                         



All results have been saved to the filesystem (in the *jugfile.jugdata* directory). We can for example list the 8 first unique words for the first 10 MPs in the list (or any arbitrary computation with the results):

In [40]:
import jug
import jug.task
jug.init('jugfile.py', 'jugfile.jugdata')
import jugfile
results = jug.task.value(jugfile.results)
for mp,r in zip(file('MPs.txt'), results)[:10]:
    mp = mp.strip()
    print mp + ": "+ " "+" ".join(map(str,r[:8]))

Richard Benyon:  family august 20 contested again estate newbury rendel
Angus MacNeil:  westminster hotel teacher incident honours abuse best inquiry
Richard Younger-Ross:  mr voting claims proposed water susan walton profile
Bob Laxton:  representative union derby boycotting higher 1961 lamb technology
Sarah Teather:  she her liberal september group democrat 2007 4
Bob Marshall-Andrews:  you mr rebelled dowd own 9 widely 10
Virendra Sharma:  donation ealing register letter detail relationship telegraph vaz
Hywel Francis:  voted favour wales chair dr could allowing independent
Jacqui Smith:  she her home it over said i expenses
Angus Robertson:  snp scottish spokesperson affairs westminster european cinema amongst


## 3.2 IPython Parallel


The [IPython](http://ipython.org/) suite (which will be named Jupyter in future releases and changed to be language agnostic) defines a powerful and flexible architecture for parallel computing. [IPython parallel](http://ipython.org/ipython-doc/stable/parallel/index.html) can support many types of parallelism:
   - Single program, multiple data (SPMD) parallelism.
   - Multiple program, multiple data (MPMD) parallelism.
   - Message passing using MPI.
   - Task farming.
   - Data parallel.
   - Combinations of these approaches.
   - Custom user defined approaches.

Furthermore, IPython Parallel enables all types of parallel applications to be developed, executed, debugged and monitored interactively. Hence, this parallel framework can be applied to diverse use cases, including:
   - Easy parallelization of embarrassingly parallel problems to several processes, computers or a cluster.
   - Steering traditional MPI applications on a supercomputer from an IPython Notebook.
   - Analyzing and visualizing large datasets (that could be remote and/or distributed) interactively.
   - Tying together multiple MPI jobs running on different systems into one giant distributed and parallel system.

This system is relevant to cluster computing because it is designed in a way it can be easily integrated with several DRMs (e.g. SGE or PBS) using the [*ipcluster*](https://ipython.org/ipython-doc/dev/parallel/parallel_process.html) utilities. For example, using that command a total of 48 engines have been started on the IFCA cluster. We can easily connect to the controller of those engines and show the corresponding ids:

In [1]:
from IPython.parallel import Client
rc = Client()
print rc.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]


Now, we can steer those engines arbitrarily. For example, we can make a Monte Carlo estimation of $\pi$ for each of the nodes and get the results asyncronously (using a distributed view):

In [2]:
import numpy as np

def monte_carlo_pi(n):
    if (n < 1) : return 0
    n_shots = 0
    for i in range(n):
        x = np.random.random()
        y = np.random.random()
        r = np.sqrt(x*x+y*y)
        if ( r<=1 ): n_shots = n_shots + 1
    return 4.0*float(n_shots)/float(n) 

dview=rc[:]
dview.execute('import numpy as np')
ar = dview.apply_async(monte_carlo_pi, 1000000)

After the computation on all the engines has finished we can get the results as an array and compute the mean as a better $\pi$ estimation (beware that this method for estimating $\pi$ is much worse than the Machin's series used before):

In [3]:
pi_list = ar.get()
print pi_list

[3.14188, 3.141048, 3.141392, 3.139952, 3.137364, 3.140196, 3.142076, 3.140796, 3.142112, 3.1421, 3.140496, 3.142392, 3.1393, 3.139176, 3.13872, 3.140696, 3.144952, 3.142044, 3.139472, 3.142672, 3.14154, 3.142648, 3.14134, 3.138248, 3.142972, 3.140476, 3.140468, 3.141784, 3.142796, 3.142076, 3.142244, 3.141148, 3.141312, 3.144044, 3.141548, 3.141272, 3.142988, 3.145472, 3.137328, 3.144864, 3.14276, 3.14266, 3.139764, 3.13996, 3.140784, 3.14116, 3.143836, 3.141796]


In [8]:
print "Mean value for pi: {:2.5f}".format(np.mean(pi_list))

Mean value for pi: 3.14142


Just as an example of how this tool can speed up day to day data analysis tasks, a real world example is going to be used as an example. The [use case](http://chidlow.id.au/blog/2013/09/24/distributed-computing-with-ipython/) is about optimizing the hyper-parameters of a support vector machine which gives a better cross validation score. The scikit-learn module is required and the full example code is provided at *svm_params_crossval.py*. The program will compare the execution time in serial and in parallel using the 48 engines previously started:

In [1]:
%run svm_params_crossval.py

importing numpy on engine(s)
importing svm from sklearn on engine(s)

Running 150 tasks:
    Done  130 out of  150 | elapsed:    7.3s remaining:    1.1s
    Done  150 out of  150 | elapsed:    7.9s remaining:    0.0s

Parallel speedup: 4315%

Best: C = 1000000.0, gamma = 0.0001, err = 2.0%

[[ 100.    100.    100.    100.    100.    100.    100.    100.    100.  ]
 [ 100.    100.    100.     92.     12.      6.    100.    100.    100.  ]
 [ 100.    100.     79.33   10.      2.67    5.33   11.33   49.33   63.33]
 [ 100.     78.67   10.      4.      4.      6.     11.33   48.     63.33]
 [  78.     10.      4.      3.33    4.67    6.67   11.33   48.     63.33]
 [  10.      4.      3.33    4.67    6.67    6.67   11.33   48.     63.33]
 [   4.      3.33    3.33    4.67    7.33    6.67   11.33   48.     63.33]
 [   3.33    3.33    2.67    4.67    7.33    6.67   11.33   48.     63.33]
 [   3.33    2.      3.33    6.67    7.33    6.67   11.33   48.     63.33]
 [   4.      4.      4.67    6.  

As can be seen in the results, the use of IPython parallel at the cluster is able to speed up the task in a $4315~\%$, so the problem (which is an embarrassingly parallel problem) can take an important advantage of parallelization.

## 3.3 Modern Cluster Computing

Apart from the cluster computing tools that have been mentioned up to this point. It is worth mentioning two open-source frameworks that currently are de facto standards for cluster computing: [Apache Hadoop](http://hadoop.apache.org/) and [Apache Spark](https://spark.apache.org/). They could not be easily installed at the IFCA cluster so no usage examples are provided.

The Hadoop framework, written in Java, consist on the combination of a distributed file system (HDFS) and a processing engine (MapReduce). Map Reduce is a programming model (inspired in functional programming) for processing and generating large data sets with a parallel, distributed algorithm on a cluster. Basically is the combination of a *Map()* (applied per element) and a *Reduce()* (applied to several *Map()* results), which can be easily scaled to very large datasets.

Apache Spark follows a similar approach (in fact could use HDFS and Hadoop administration tools), but allows programs to keep data in memory and query it repeteatly making it faster and more appropiate for more complex task (e.g. machine learning or graph processing). With these tools, very large cluster computing workflows can be specified with a higher level of abstraction, so they are replacing traditional DRMs.

# 4. Conclusion

In this document, different approaches for cluster computing have been explored. After introducing a few basic concepts for cluster computing, some advanced uses of SGE have been tested. The usage of DRMAA for programmatically submit and monitor jobs in different DRMs have also been interactively demonstrated at IFCA computing cluster. After highlighting the drawbacks of DRMs for complex workflows, *jug* and *IPython Parallel* have been explored as complementary tools and some real world examples of their use have been provided. Finally, a basic description of the MapReduce concept (and their most popular open-source implementations) as an alternative to traditional cluster computing has been included.