# 01 - Les tâches

In [37]:
import logging
import os
import time
from pathlib import Path
from typing import List

import numpy as np
import ray
from numpy import loadtxt

## 1. Modèle de tâches parallèles

Ray transforme les fonctions décorées en tâches sans état **Ray Task**, planifiées n'importe où sur un worker Ray dans le cluster, simplement en ajoutant le décorateur `@ray.remote`.  Toutes ces fonctions seront exécutées par un processus worker dans un cluster Ray.

Où seront-elles exécutées ? Par quel worker ? Tout cela est pris en charge par Ray.

Notre travail consiste à prendre nos fonctions Python existantes et à les convertissez en tasks : c'est aussi simple que cela !

### Exemple 1: Execution en série VS en parallèle

Examinons les différentes manières d'executer des tâches.


In [38]:
def regular_function():
    return 1

In [39]:
@ray.remote
def remote_function():
    return 1

Que se passe-t-il quand on appelle ces fonctions ?

In [None]:
try:
    res = regular_function()
    print(res)
except Exception as e:
    print(f"{type(e).__name__}: {e}")

In [None]:
try:
    res = remote_function()
    print(res)
except Exception as e:
    print(f"{type(e).__name__}: {e}")

En effet, il existe quelques différences clés entre la fonction originale et la fonction décorée :

- **Invocation** : La version classique est appelée avec `regular_function()`, tandis que la version distante est appelée avec `remote_function.remote()`.

- **Valeurs de retour** : `regular_function` s'exécute de manière synchrone et retourne le résultat de la fonction (1), tandis que `remote_function` retourne immédiatement un `ObjectID` et exécute ensuite la tâche en arrière-plan dans un processus worker distinct. On appelle cet `ObjectID` un **future**, car il sera executé plus tard en appelant `ray.get`.

Démonstration :

In [None]:
ray.init(
    ignore_reinit_error=True,
    logging_level=logging.ERROR,
    log_to_driver=False,
)

In [None]:
remote_function.remote()

In [None]:
ray.get(remote_function.remote())

Imaginons que nous voulons executer cette tâche 10 fois.

**En série**: Invocations de `regular_function` dans une boucle

In [45]:
result = 0
for _ in range(10):
    result += regular_function()
assert result == 10

**En parallèle**: Invocations de `remote_function` avec un seul appel à `get` :

In [46]:
results = []
for _ in range(10):
    results.append(remote_function.remote())
assert sum(ray.get(results)) == 10

### Exercice 1 : Ajouter deux np.arrays

      +---------------+       +---------------+    
      | read_array    |       | read_array    |    
      +---------------+       +---------------+    
               |                       |           
               +-----------+-----------+           
                           |                       
                    +---------------+              
                    |   add_array   |              
                    +---------------+              
                           |                       
                    +---------------+              
                    |   *result*    |              
                    +---------------+              

**👇 À COMPLETER**

Définir la task `read_array`, qui prend en argument un `path` de type `Path` et retourne un `np.array`.

In [47]:
# SOLUTION
@ray.remote
def read_array(path: Path) -> np.array:
    return loadtxt(path, dtype=int)

**👇 À COMPLETER**

Définir la task `add_array`, qui prend deux arguments `left` et `right` de type `np.array` et qui retourne leur somme avec `np.add`.

In [48]:
# SOLUTION
@ray.remote
def add_array(left: np.array, right: np.array) -> np.array:
    return np.add(left, right)

On doit alors obtenir des `ObjectRef` en appelant ces tâches :

In [49]:
left_matrix_path, right_matrix_path = (
    Path("../course-data/01-Ray-Tasks/matrix_1.txt"),
    Path("../course-data/01-Ray-Tasks/matrix_2.txt"),
)

In [None]:
obj_ref_left_matrix = read_array.remote(left_matrix_path)
print(f"left matrix: {obj_ref_left_matrix}")

In [None]:
obj_ref_right_matrix = read_array.remote(right_matrix_path)
print(f"right matrix: {obj_ref_right_matrix}")

Ajoutons nos deux tableaux en utilisant Ray. Remarque : Nous envoyons des références Ray `ObjectRef` comme arguments.

**👇 À COMPLETER**

Executer les tasks en parallèle en utilisant `function.remote(args)`.

In [52]:
# SOLUTION
result_obj_ref = add_array.remote(obj_ref_left_matrix, obj_ref_right_matrix)

**👇 À COMPLETER**

Obtenir le résultat en utilisant `ray.get(function)`.


In [None]:
# SOLUTION
result = ray.get(result_obj_ref)
print(f"Result: add arr1 + arr2: \n {result}")

**👇 À FAIRE**

Changer une valeur dans un des fichiers qui continnent les matrices. Réexecuter la case précédente. Que se passe-t-il ?

In [54]:
# SOLUTION
# ...Rien. En fait les tâches sont executées lors d'un `remote` mais on obtient le résultat avec l'appel à get.

### Exercice 2 : Suite de Fibonnaci

Dans cet exercice, on va définir une fonction qui génère une séquence de Fibonnaci allant de 1 à n.

In [112]:
def generate_fibonacci(sequence_size: int) -> List[int]:
    fibonacci = []
    for i in range(0, sequence_size):
        if i < 2:
            fibonacci.append(i)
            continue
        fibonacci.append(fibonacci[i - 1] + fibonacci[i - 2])
    return fibonacci[-1]

On utilise un wrapper pour l'execution sur un cluster :

In [113]:
@ray.remote
def generate_fibonacci_distributed(sequence_size):
    return generate_fibonacci(sequence_size)

Le but est de voir la différence entre des executions locales, et des executions sur cluster. On va en lancer autant que le nombre de processeurs sur la machine.

In [None]:
number_of_tasks = os.cpu_count()
number_of_tasks

**👇 À COMPLÉTER**

Compléter la fonction pour qu'elle appelle la fonction `generate_fibonacci` `number_of_tasks` fois avec une sequence_size de `10 000`.

In [115]:
def run_local(sequence_size):
    # SOLUTION
    results = [generate_fibonacci(sequence_size) for _ in range(number_of_tasks)]
    return results

In [None]:
%%time
run_local(10000)

**👇 À COMPLÉTER**

Compléter la fonction pour qu'elle appelle la fonction `generate_fibonacci_distributed` `number_of_tasks` fois avec une sequence_size de `10 000`.

In [117]:
def run_remote(sequence_size):
    # SOLUTION
    results = ray.get(
        [
            generate_fibonacci_distributed.remote(sequence_size)
            for _ in range(number_of_tasks)
        ]
    )
    return results

In [None]:
%%time
run_remote(10000)

### Exercice 3 : La dépendances des tâches 

Les dépendances entre les tâches peuvent constituer des goulots d'étranglement.

Dans cette exercice, nous allons aggréger 8 valeurs ensemble. Nous allons de faire avec une addition naïve d'entiers, mais pour simuler une opération lourde, nous allons ajouter une instruction de `sleep`.

In [131]:
@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

Voici les deux approches que nous allons comparer.

<img src="../images/task_dependencies_graphs.png" height="50%" width="70%">

Les valeurs à ajouter seront les suivantes :

In [143]:
values = [i for i in range(1, 8)]

**👇 À COMPLÉTER**

Écrire le code permettant d'aggréger la liste de valeur en modélisant l'execution lente :

In [None]:
%%time

futures = values.copy()
while len(futures) > 1:
    # SOLUTION
    futures = [add.remote(futures[0], futures[1])] + futures[2:]
result = ray.get(futures[0])
print(result)

**👇 À COMPLÉTER**

Écrire le code permettant d'aggréger la liste de valeur en modélisant l'execution rapide :

In [None]:
%%time

futures = values.copy()
while len(futures) > 1:
    # SOLUTION
    futures = futures[2:] + [add.remote(futures[0], futures[1])]
result = ray.get(futures[0])
print(result)

**👇 À FAIRE**

Quelle est la complexité des deux manières ? En augmentant n, on retrouve bien une correlation temporelle ?

In [152]:
# SOLUTION
# 1. Ajout linéaire
#
# Pour une liste de n éléments [x1,x2,…,xn], on effectue les opérations séquentiellement :
#     Étape 1 : Addition x1 avec le reste -> il reste n - 1​ opérations.
#     Étape 2 : Addition x2 avec le reste -> il reste n - 2​ opérations.
#     Continue jusqu'à ce qu'il reste 1 élement.`
#
# Le processus s'arrête lorsque :
#     n - k = 1 -> k = n - 1
#     Complexité de profondeur : O(n)
#
# 2. Ajout par paires
#
# Pour une liste de nn éléments [x1,x2,…,xn][x1​,x2​,…,xn​], les opérations se font par paires :
#
#     Étape 1 : Addition par paires -> il reste n/2​ opérations.
#     Étape 2 : Addition des résultats par paire -> il reste n/4​ opérations.
#     Continue jusqu'à ce qu’il reste 1 élément.
#
# Le processus s'arrête lorsque :
# n/2^(k) = 1  ->  k=log⁡2(n)
# Complexité de profondeur en O(log(n))

In [153]:
ray.shutdown()

---

### Exercice pour les braves

**👇 À FAIRE**

Implémenter un bubble sort local et un bubble sort sur cluster. Regarder l'évolution du temps en fonction de la dimension.