In [None]:
import ray
import pandas as pd
import warnings
import math
import time
import os

# Suppress noisy requests warnings.
warnings.filterwarnings("ignore")
os.environ["PYTHONWARNINGS"] = "ignore"

* Ray es un proyecto de código abierto para Python paralelo y distribuido.

* La computación paralela y distribuida es un elemento básico de las aplicaciones modernas. Necesitamos aprovechar varios núcleos o varias máquinas para acelerar las aplicaciones o ejecutarlas a gran escala. La infraestructura para rastrear la web y responder a las consultas de búsqueda no son programas de un solo hilo que se ejecutan en el portátil de alguien, sino conjuntos de servicios que se comunican e interactúan entre sí.

### ¿Por qué Ray?

Muchos tutoriales explican cómo utilizar el módulo de multiprocesamiento de Python. Desafortunadamente, el módulo de multiprocesamiento está severamente limitado en su capacidad para manejar los requerimientos de las aplicaciones modernas. Estos requisitos incluyen los siguientes:

* Ejecutar el mismo código en más de una máquina.
* Construir microservicios y actores que tengan estado y puedan comunicarse.
* Manejar con gracia los fallos de la máquina.
* Manejo eficiente de objetos grandes y datos numéricos.

Ray aborda todos estos puntos, simplifica las cosas sencillas y hace posible un comportamiento complejo.

### Conceptos necesarios
La programación tradicional se basa en dos conceptos fundamentales: funciones y clases. Utilizando estos bloques de construcción, los lenguajes de programación nos permiten construir innumerables aplicaciones.

Sin embargo, cuando migramos nuestras aplicaciones al entorno distribuido, los conceptos suelen cambiar.

En un extremo del espectro, tenemos herramientas como OpenMPI, el multiprocesamiento de Python y ZeroMQ, que proporcionan primitivas de bajo nivel para enviar y recibir mensajes. Estas herramientas son muy potentes, pero proporcionan una abstracción diferente, por lo que las aplicaciones de un solo hilo deben reescribirse desde cero para utilizarlas.

En el otro extremo del espectro, tenemos herramientas de dominio específico como TensorFlow para la formación de modelos, Spark para el procesamiento de datos y SQL, y Flink para el procesamiento de flujos. Estas herramientas proporcionan abstracciones de alto nivel como redes neuronales, conjuntos de datos y flujos. Sin embargo, como difieren de las abstracciones utilizadas para la programación en serie, las aplicaciones deben reescribirse desde cero para aprovecharlas.

<br>

![Image](https://miro.medium.com/max/720/1*CFtsBP134JiGeQQqoKNcXw.webp)

Ray ocupa un lugar intermedio único. En lugar de introducir nuevos conceptos. Ray toma los conceptos existentes de funciones y clases y los traslada al entorno distribuido como tareas y actores. Esta elección de API permite paralelizar aplicaciones en serie sin grandes modificaciones.

### Iniciar Ray

El comando `ray.init()` inicia todos los procesos Ray relevantes. En un cluster, esta es la única línea que necesita cambiar (necesitamos pasar la dirección del cluster). Estos procesos incluyen lo siguiente:

* Un número de procesos trabajadores para ejecutar funciones Python en paralelo (aproximadamente un trabajador por núcleo de CPU).
* Un proceso planificador para asignar "tareas" a los trabajadores (y a otras máquinas). Una tarea es la unidad de trabajo programada por Ray y corresponde a una invocación de función o método.
* Un almacén de objetos en memoria compartida para compartir objetos eficientemente entre los trabajadores (sin crear copias).
* Una base de datos en memoria para almacenar los metadatos necesarios para volver a ejecutar tareas en caso de fallo de la máquina.

Los trabajadores Ray son procesos separados en lugar de hilos porque el soporte para multihilos en Python es muy limitado debido al bloqueo global del intérprete.

In [None]:
ray.init()

### Paralelismo con tareas

Para convertir una función f de Python en una "función remota" (una función que puede ejecutarse de forma remota y asíncrona), declaramos la función con el decorador `@ray.remote`. Entonces, las invocaciones a la función a través de f.remote() devolverán inmediatamente futuros (un futuro es una referencia a la eventual salida), y la ejecución real de la función tendrá lugar en segundo plano (nos referimos a esta ejecución como una tarea).

In [None]:
@ray.remote
def f(x):
    time.sleep(1)
    return x

In [None]:
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))

In [None]:
results = ray.get(result_ids)
results

Dado que la llamada a `f.remote(i)` retorna inmediatamente, se pueden ejecutar cuatro copias de f en paralelo simplemente ejecutando esa línea cuatro veces.

### Dependencia de tareas

Las tareas también pueden depender de otras tareas. A continuación, la tarea multiply_matrices utiliza las salidas de las dos tareas create_matrix, por lo que no comenzará a ejecutarse hasta que se hayan ejecutado las dos primeras tareas. Las salidas de las dos primeras tareas se pasarán automáticamente como argumentos a la tercera tarea y los futuros se sustituirán por sus valores correspondientes). De este modo, las tareas pueden componerse con dependencias DAG arbitrarias.

In [None]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)

# Get the results.
z = ray.get(z_id)

### Agregación eficaz de valores

Las relaciones de tareas pueden utilizarse de formas mucho más sofisticadas. Por ejemplo, supongamos que queremos agregar 8 valores. Este ejemplo utiliza la suma de números enteros, pero en muchas aplicaciones, la agregación de grandes vectores a través de múltiples máquinas puede ser un cuello de botella. En este caso, el cambio de una sola línea de código puede cambiar el tiempo de ejecución de la agregación de lineal a logarítmico en el número de valores que se agregan.

<br>

![Image](https://miro.medium.com/max/720/1*vHz3troEmr4uLns0V8VmdA.webp)

Como se ha descrito anteriormente, para alimentar la salida de una tarea como entrada en una tarea posterior, basta con pasar el futuro devuelto por la primera tarea como argumento en la segunda tarea. El programador de Ray tendrá en cuenta automáticamente esta dependencia entre tareas. La segunda tarea no se ejecutará hasta que la primera haya terminado, y la salida de la primera tarea se enviará automáticamente a la máquina en la que se esté ejecutando la segunda.

In [None]:
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)

# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

In [None]:
result

In [None]:
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])


# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

### De las clases a los actores
Es un reto escribir aplicaciones interesantes sin usar clases, y esto es tan cierto en el entorno distribuido como en un único núcleo.

Ray te permite tomar una clase Python y declararla con el decorador `@ray.remote`. Cada vez que se instancia la clase, Ray crea un nuevo "actor", que es un proceso que se ejecuta en algún lugar del clúster y contiene una copia del objeto. Las invocaciones a métodos en ese actor se convierten en tareas que se ejecutan en el proceso del actor y pueden acceder y mutar el estado del actor. De este modo, los actores permiten que el estado mutable se comparta entre múltiples tareas de un modo que las funciones remotas no permiten.

Los actores individuales ejecutan métodos en serie (cada método individual es atómico) para que no haya condiciones de carrera. El paralelismo puede lograrse mediante la creación de múltiples actores.

In [None]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.x = 0
    
    def inc(self):
        self.x += 1
    
    def get_value(self):
        return self.x

# Create an actor process.
c = Counter.remote()

# Check the actor's counter value.
print(ray.get(c.get_value.remote()))  # 0

# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote()))  # 2

El ejemplo anterior es el uso más simple posible de los actores. La línea `Counter.remote()` crea un nuevo proceso actor, que tiene una copia del objeto `Counter`. Las llamadas a `c.get_value.remote()` y `c.inc.remote()` ejecutan tareas en el proceso actor remoto y mutan el estado del actor.

#### Manejadores de Actor

En el ejemplo anterior, sólo invocamos métodos en el actor desde el script principal de Python. Uno de los aspectos más poderosos de los actores es que podemos pasar manejadores a un actor, lo que permite a otros actores u otras tareas invocar métodos en el mismo actor.

El siguiente ejemplo crea un actor que almacena mensajes. Varias tareas de trabajo envían mensajes al actor repetidamente, y el script principal de Python lee los mensajes periódicamente.

In [None]:
@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages


# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))


# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(10):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

* Los actores son extremadamente potentes. Permiten tomar una clase Python e instanciarla como un microservicio que puede ser consultado desde otros actores y tareas e incluso desde otras aplicaciones.

* Las tareas y los actores son las abstracciones básicas que proporciona Ray. Estos dos conceptos son muy generales y se pueden utilizar para implementar aplicaciones sofisticadas, incluidas las bibliotecas integradas de Ray para el aprendizaje por refuerzo, el ajuste de hiperparámetros, la aceleración de Pandas, y mucho más.

In [None]:
ray.shutdown()
ray.init()

In [None]:
import boto3
s3 = boto3.resource('s3')
# Download the file from S3.
bucket = 'nyc-tlc'
s3.Bucket(bucket).download_file('csv_backup/yellow_tripdata_2011-04.csv', '../data/yellow_tripdata_2011-04.csv')
s3.Bucket(bucket).download_file('csv_backup/yellow_tripdata_2009-12.csv', '../data/yellow_tripdata_2009-12.csv')
s3.Bucket(bucket).download_file('csv_backup/yellow_tripdata_2010-06.csv', '../data/yellow_tripdata_2010-06.csv')
s3.Bucket(bucket).download_file('csv_backup/yellow_tripdata_2010-07.csv', '../data/yellow_tripdata_2010-07.csv')

In [None]:
direct = '../data/'
list_files = os.listdir(direct)
list_filtered = []
for file in list_files:
    if file.startswith('yellow_tripdata'):
        ds_csv = ray.data.read_csv(os.path.abspath('../data/' + file))
        ds_csv.repartition(12).write_parquet(f'../data/parquet/{file[:-4]}')
        del ds_csv

In [35]:
ds = ray.data.read_parquet('../data/parquet/')

Metadata Fetch Progress:   0%|          | 0/8 [00:00<?, ?it/s]
Metadata Fetch Progress: 100%|██████████| 8/8 [00:00<00:00, 19.25it/s]
Parquet Files Sample:   0%|          | 0/2 [00:00<?, ?it/s]
Parquet Files Sample:  50%|█████     | 1/2 [00:00<00:00,  3.09it/s]


Parquet Files Sample: 100%|██████████| 2/2 [00:00<00:00,  2.89it/s]


In [40]:
# Create 10 blocks for parallelism
ds = ds.repartition(10)

Read: 100%|██████████| 48/48 [00:07<00:00,  6.60it/s]
Repartition: 100%|██████████| 10/10 [00:12<00:00,  1.21s/it]


In [38]:
total_rows = ds.count()
print('Total rows: {:,}'.format(total_rows))

Total rows: 44,927,917


In [41]:
ds.limit(5).to_pandas()

Unnamed: 0,vendor_name,Trip_Pickup_DateTime,Trip_Dropoff_DateTime,Passenger_Count,Trip_Distance,Start_Lon,Start_Lat,Rate_Code,store_and_forward,End_Lon,End_Lat,Payment_Type,Fare_Amt,surcharge,mta_tax,Tip_Amt,Tolls_Amt,Total_Amt
0,VTS,2009-12-17 07:35:00,2009-12-17 07:40:00,1,0.11,-73.987928,40.737885,,,-73.990335,40.74845,Credit,4.9,0.0,0.5,1.0,0.0,6.4
1,VTS,2009-12-21 14:19:00,2009-12-21 14:24:00,1,1.07,-73.956008,40.779558,,,-73.967303,40.787833,CASH,4.9,0.0,0.5,0.0,0.0,5.4
2,VTS,2009-12-18 03:09:00,2009-12-18 03:34:00,1,8.98,-73.955745,40.689503,,,-73.93773,40.737463,CASH,23.7,0.5,0.5,0.0,0.0,24.7
3,VTS,2009-12-14 21:24:00,2009-12-14 21:33:00,2,1.66,-73.983985,40.754645,,,-73.986195,40.73761,Credit,6.9,0.5,0.5,3.0,0.0,10.9
4,VTS,2009-12-18 08:17:00,2009-12-18 08:29:00,1,1.55,-73.959132,40.769265,,,-73.976267,40.760615,CASH,7.7,0.0,0.5,0.0,0.0,8.2


In [44]:
def transform_batch(df: pd.DataFrame) -> pd.DataFrame:
    return df[(df["Trip_Distance"] > 2) & (df["Passenger_Count"] > 2)]

transformed_ds = ds.map_batches(transform_batch)

Map_Batches: 100%|██████████| 10/10 [00:07<00:00,  1.31it/s]


In [45]:
transformed_ds.limit(10).to_pandas()

Unnamed: 0,vendor_name,Trip_Pickup_DateTime,Trip_Dropoff_DateTime,Passenger_Count,Trip_Distance,Start_Lon,Start_Lat,Rate_Code,store_and_forward,End_Lon,End_Lat,Payment_Type,Fare_Amt,surcharge,mta_tax,Tip_Amt,Tolls_Amt,Total_Amt
0,VTS,2009-12-21 12:16:00,2009-12-21 12:52:00,3,16.57,-73.790967,40.645922,,,-73.971593,40.752568,Credit,45.0,0.0,0.5,7.0,4.57,57.07
1,VTS,2009-12-16 01:31:00,2009-12-16 01:47:00,5,5.03,-74.003327,40.751415,,,-73.990005,40.690455,Credit,13.7,0.5,0.5,3.0,0.0,17.7
2,VTS,2009-12-20 19:57:00,2009-12-20 20:13:00,5,3.22,-73.994107,40.751255,,,-73.970408,40.78925,Credit,10.9,0.0,0.5,3.0,0.0,14.4
3,VTS,2009-12-14 22:55:00,2009-12-14 23:04:00,5,2.24,-73.991612,40.759822,,,-74.00378,40.737545,CASH,7.7,0.5,0.5,0.0,0.0,8.7
4,VTS,2009-12-21 08:26:00,2009-12-21 09:07:00,5,4.49,-73.974723,40.792132,,,-73.977962,40.748085,CASH,21.7,0.0,0.5,0.0,0.0,22.2
5,VTS,2009-12-21 05:40:00,2009-12-21 05:45:00,5,2.17,-73.971808,40.764552,,,-73.992482,40.740508,CASH,6.9,0.5,0.5,0.0,0.0,7.9
6,VTS,2009-12-17 14:17:00,2009-12-17 15:19:00,3,17.74,-73.986025,40.762557,,,-73.78958,40.642812,CASH,45.0,0.0,0.5,0.0,4.57,50.07
7,VTS,2009-12-14 20:21:00,2009-12-14 20:34:00,5,2.09,-73.96551,40.768725,,,-73.972115,40.746033,CASH,8.9,0.5,0.5,0.0,0.0,9.9
8,VTS,2009-12-21 07:32:00,2009-12-21 08:00:00,3,13.32,-74.015207,40.709402,,,-73.873348,40.774312,CASH,31.3,0.0,0.5,0.0,0.0,31.8
9,VTS,2009-12-26 18:04:00,2009-12-26 18:18:00,5,4.26,-73.995898,40.738567,,,-73.977603,40.78787,CASH,12.1,0.0,0.5,0.0,0.0,12.6


In [47]:
total_rows = transformed_ds.count()
print('Total rows: {:,}'.format(total_rows))

Total rows: 1,008,808


In [None]:
# Explicitly stop
ray.shutdown()
assert not ray.is_initialized()

## Ejemplo: Ingesta de ML a gran escala

En este ejemplo, aprenderá a crear, desplegar y escalar una canalización de ingestión aleatoria de aprendizaje automático utilizando Ray Dataset y Dataset Pipelines.

* Cómo construir una tubería de ingesta aleatoria que carga, mezcla y alimenta datos en entrenadores distribuidos en unas pocas líneas de código;

* Cómo escalar el pipeline desde la ingesta de datos de 100MiB a datos de 500GiB.

![Image](https://docs.ray.io/en/master/_images/dataset-repeat-2.svg)


In [None]:
import pyarrow
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.datasource.datasource import RandomIntRowDatasource
import tempfile
from typing import List

### Construir una cadena de ingesta aleatoria

Un proceso de ingesta típico de aprendizaje automático consta de los 4 pasos siguientes:

* Cargar los datos de entrenamiento desde un almacenamiento externo;
* Iterar sobre los datos durante varias épocas;
* En cada epoch, aplicar global shuffle para decorrelacionar los datos;
* En cada época, dividir los datos barajados en fragmentos, y alimentar los fragmentos a los entrenadores distribuidos;

Veamos cómo implementamos este proceso utilizando el conjunto de datos Ray:

In [None]:
def create_shuffle_pipeline(
    training_data_dir: str, num_epochs: int, num_shards: int
) -> List[DatasetPipeline]:

    return (
        ray.data.read_parquet(training_data_dir)
        .repeat(num_epochs)
        .random_shuffle_each_window()
        .split(num_shards, equal=True)
    )

Ahora hemos definido una función create_shuffle_pipeline que crea un pipeline de ingestión. Lee training_data_dir, itera durante num_epochs veces, donde en cada epoch baraja y divide los datos de entrenamiento en num_shards.

### Alimentar el pipeline a los entrenadores

Implementemos también un TrainingWorker que consuma los datos barajados de cada shard.

Para simplificar, definiremos un Ray Actor que emule a los training workers. Específicamente,

1. Toma un shard del shuffle pipeline para entrenar;
2. itera sobre el fragmento para obtener un conjunto de datos de entrenamiento por época;
3. Luego consume el conjunto de datos por lotes;

In [None]:
@ray.remote
class TrainingWorker:
    def __init__(self, rank: int, shard: DatasetPipeline):
        self.rank = rank
        self.shard = shard

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_epochs()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches()):
                # TODO: replace the code for real training.
                pass

Ejecutémoslo! Ahora vamos a ejecutar el canal de datos de extremo a extremo, primero, analicemos algunos argumentos.

In [None]:
NUM_TRAINING_WORKERS = 4
NUM_EPOCHS = 5
NUM_COLUMNS = 10
SIZE_100MiB = 100 * 1024 * 1024

# create a local ray cluster.
ray.init()

In [None]:
print(f"Count of nodes: {len(ray.nodes())}")

In [None]:
splits = create_shuffle_pipeline(
    filter_files, NUM_EPOCHS, NUM_TRAINING_WORKERS
)

training_workers = [
    TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)
]

# Let's run the e2e pipeline
start = time.time()
ray.get([worker.train.remote() for worker in training_workers])
print(f"total ingestion time: {int(time.time() - start)}s")

In [None]:
# To clean the remote process
@ray.remote
class TrainingWorker:
    pass

In [None]:
import threading

@ray.remote
class Task:

    def __init__(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
    
    def _run(self):
        print('Started long-running task')
        time.sleep(10)
        print('You shouldn\'t see this because the process has been killed')

task = Task.remote()
task.__ray_terminate__.remote()   # kills the process with an `exit(0)`