**Tabla de contenido**

- [Technical requirements](#Technical-requirements)
- [Scaling with Spark](#Scaling-with-spark)
    - [Consejos y trucos de Spark](#Consejos-y-trucos-de-spark)
    - [Spark on the cloud](#Spark-on-the-cloud)
        - [Ejemplo de AWS EMR](#Ejemplo-de-AWS-EMR)
- [Poner en marcha infraestructura sin servidor](#Poner-en-marcha-infraestructura-sin-servidor)
- [Containerizing at scale with Kubernetes](#Containerizing-at-scale-with-Kubernetes)
- [Summary](#Summary)

El capítulo anterior trató sobre cómo iniciar la conversación sobre cómo hacemos llegar nuestras soluciones al mundo a través de diferentes patrones de implementación, así como algunas de las herramientas que podemos utilizar para hacerlo. Este capítulo tiene como objetivo ampliar esa conversación discutiendo los conceptos y herramientas que podemos usar para escalar nuestras soluciones y hacer frente a grandes volúmenes de datos o tráfico.

Ejecutar algunos modelos simples de Aprendizaje Automático (ML) en unos pocos miles de puntos de datos en tu computadora portátil es un buen ejercicio, especialmente cuando estás realizando los pasos de descubrimiento y prueba de concepto que mencionamos anteriormente al comienzo de cualquier proyecto de desarrollo de ML. Sin embargo, este enfoque no es apropiado si tenemos que manejar millones y millones de puntos de datos a una frecuencia relativamente alta, o si tenemos que entrenar miles de modelos de una escala similar al mismo tiempo. Esto requiere un enfoque, mentalidad y conjunto de herramientas diferentes.

En las siguientes páginas, cubriremos algunos detalles del marco más popular para distribuir computaciones de datos en uso hoy en día: Apache Spark. En particular, discutiremos algunos de los puntos clave sobre cómo funciona detrás de escena, para que, en el desarrollo, podamos tomar buenas decisiones sobre cómo usarlo, antes de pasar a discutir algunos de los enfoques de ML que podemos emplear con Spark. Esto te ayudará a construir sobre algunos de los ejemplos prácticos que ya vimos anteriormente en este libro, cuando usamos Spark para resolver nuestros problemas de ML, con una comprensión teórica más concreta y ejemplos prácticos detallados adicionales proporcionados.

Después de esto, aprenderás cómo escalar tu infraestructura utilizando funciones sin servidor. Luego, introduciremos brevemente algo de la teoría detrás del uso de clústeres de Kubernetes (K8s). El primero de estos proporciona un mecanismo para hacer que modelos simples escalen muy rápidamente. El segundo te permite mantener los beneficios de la contenedorización, pero también escalar horizontalmente. Finalmente, concluiremos, como de costumbre, proporcionando un resumen de lo que hemos aprendido.

En este capítulo, cubriremos los siguientes temas:

- Scaling with Spark
- Spinning up serverless infrastructure
- Containerizing at scale with Kubernetes


# Technical requirements

Para ejecutar los ejemplos en este capítulo necesitarás que las siguientes herramientas y paquetes estén instalados:
- Apache Spark
- Git
- AWS CLI v2

# Scaling with Spark

Apache Spark surgió del trabajo de algunos brillantes investigadores en la Universidad de California, Berkeley en 2012 y desde entonces, ha revolucionado la forma en que abordamos los problemas con grandes conjuntos de datos. Antes de Spark, el paradigma dominante para los grandes datos era Hadoop MapReduce, que ahora es mucho menos popular.

Spark es un marco de computación en clúster, lo que significa que funciona según el principio de que varios computadoras están conectadas de tal manera que permite que las tareas computacionales se compartan. Esto nos permite coordinar estas tareas de manera efectiva. Siempre que hablamos de ejecutar trabajos de Spark, siempre hablamos del clúster en el que estamos ejecutando. Esta es la colección de computadoras que realizan las tareas, los nodos trabajadores, y la computadora que alberga la carga de trabajo organizativa, conocida como el nodo principal.

Spark está escrito en Scala, un lenguaje con un fuerte sabor funcional que se compila en Máquinas Virtuales de Java (JVM). Dado que este es un libro sobre ingeniería de ML en Python, no discutimos demasiado sobre los componentes subyacentes de Scala en Spark, excepto donde nos ayudan a usarlo en nuestro trabajo. Spark tiene varias APIs populares que permiten a los programadores desarrollar con él en una variedad de lenguajes, incluido Python. Esto da lugar a la sintaxis de PySpark que hemos estado utilizando en varios ejemplos a lo largo de este libro.

Entonces, ¿cómo se organiza todo esto? Bueno, antes que nada, una de las cosas que hace que Apache Spark sea increíblemente popular es la gran cantidad de conectores, componentes y API que tiene disponibles. Por ejemplo, cuatro componentes principales:

- `Spark SQL, DataFrames, and Datasets`: Este componente te permite crear programas muy escalables que manejan datos estructurados. La capacidad de escribir consultas compatibles con SQL y crear tablas de datos que aprovechan el motor subyacente de Spark a través de una de las principales APIs estructuradas de Spark (Python, Java, Scala o R) da acceso muy fácil a la mayor parte de la funcionalidad de Spark.

- `Spark Structured Streaming`: Este componente permite a los ingenieros trabajar con datos en streaming que, por ejemplo, son proporcionados por una solución como Apache Kafka. El diseño es increíblemente simple y permite a los desarrolladores trabajar con datos en streaming como si fuera una tabla estructurada de Spark en crecimiento, con la misma funcionalidad de consulta y manipulación que para una tabla estándar. Esto proporciona una baja barrera de entrada para crear soluciones de streaming escalables.

- `GraphX`: Esta es una biblioteca que te permite implementar el procesamiento paralelo de grafos y aplicar algoritmos estándar a datos basados en grafos (por ejemplo, algoritmos como PageRank o conteo de triángulos). El proyecto GraphFrames de Databricks hace que esta funcionalidad sea aún más fácil de usar al permitirnos trabajar con API basadas en DataFrame en Spark y todavía analizar datos de grafos.

- `Spark MLlib`: Por último, pero no menos importante, tenemos el componente que es más apropiado para nosotros como ingenieros de ML: la biblioteca nativa de Spark para ML. Esta biblioteca contiene la implementación de muchos algoritmos y capacidades de ingeniería de características que ya hemos visto en este libro. Poder usar las API de DataFrame en la biblioteca la hace extremadamente fácil de usar, mientras que aún nos ofrece una ruta para crear código muy poderoso. Las posibles aceleraciones que puedes obtener para tu entrenamiento de ML al usar Spark MLlib en un clúster de Spark frente a ejecutar otra biblioteca de ML en un solo hilo pueden ser enormes. Hay otros trucos que podemos aplicar a nuestras implementaciones de ML favoritas y luego usar Spark para escalarlas; veremos esto más adelante.

La arquitectura de Spark se basa en la arquitectura de controlador/ejecutor. El controlador es el programa que actúa como el punto de entrada principal para la aplicación Spark y es donde se crea el objeto SparkContext. SparkContext envía tareas a los ejecutores (que se ejecutan en sus propias JVMs) y se comunica con el administrador del clúster de una manera apropiada para el administrador dado y para el modo en que la solución se está ejecutando. Una de las principales tareas del controlador es convertir el código que escribimos en un conjunto lógico de pasos en un Grafo Acíclico Dirigido (DAG) (el mismo concepto que usamos con Apache Airflow en el Capítulo 5, Patrones y Herramientas de Despliegue), y luego convertir ese DAG en un conjunto de tareas que necesitan ser ejecutadas en los recursos computacionales disponibles.

En las páginas que siguen, asumiremos que estamos ejecutando Spark con el gestor de recursos Hadoop YARN, que es una de las opciones más populares y que también es utilizada por defecto por la solución AWS Elastic Map Reduce (más sobre esto más adelante). Al ejecutar con YARN en modo clúster, el programa controlador se ejecuta en un contenedor en el clúster YARN, lo que permite a un cliente enviar trabajos o solicitudes a través del controlador y luego salir (en lugar de requerir que el cliente permanezca conectado al gestor del clúster, lo que puede suceder cuando ejecutas en el llamado modo cliente, que no discutiremos aquí).

El administrador de clúster es responsable de lanzar los ejecutores a través de los recursos que están disponibles en el clúster. La arquitectura de Spark nos permite, como ingenieros de ML, construir soluciones con la misma API y sintaxis, independientemente de si estamos trabajando localmente en nuestra computadora portátil o en un clúster con miles de nodos. La conexión entre el controlador, el administrador de recursos y los ejecutores es lo que permite que esta magia suceda.

## Consejos y trucos de Spark

En esta subsección, cubriremos algunos consejos simples pero efectivos para escribir soluciones eficientes con Spark. Nos enfocaremos en elementos clave de la sintaxis destinada a la manipulación y preparación de datos, que, como se discutió en otros lugares de este libro, siempre es el primer paso en cualquier pipeline de solución basado en ML. Comencemos:

1. Primero, cubriremos lo básico de escribir buen SQL en Spark. El punto de entrada para cualquier programa de Spark es el objeto SparkSession, que necesitamos importar como una instancia en nuestra aplicación. A menudo se instancia con la variable spark:

En este caso:

- .appName("BankCSV"): Nombre de la app (aparece en la UI de Spark)
- .master("local[*]"): Usa todos los núcleos de tu CPU en modo local
- .config("spark.hadoop.fs.defaultFS", "file:///"): # Trabaja con archivos locales (no HDFS)
- .getOrCreate(): Crea o reutiliza la sesión Spark

In [10]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BankCSV") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

2. Luego puedes ejecutar comandos de Spark SQL contra tus datos disponibles utilizando el objeto spark y el método sql:

spark.sql('''select * from data_table''')

Hay una variedad de formas de hacer que los datos que necesitas estén disponibles dentro de tus programas de Spark, dependiendo de dónde existan. El siguiente ejemplo ha sido tomado de parte del código que revisamos en el Capítulo 3, De Modelo a Fábrica de Modelos, y muestra cómo extraer datos en un dataframe desde un archivo csv:

In [11]:
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import when

file_path = lambda file: os.path.join(os.getcwd(),'data',file)
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("marital", StringType(), True),
    StructField("education", StringType(), True),
    StructField("default", StringType(), True),
    StructField("balance", DoubleType(), True),
    StructField("housing", StringType(), True),
    StructField("loan", StringType(), True),
    StructField("contact", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("campaign", StringType(), True),
    StructField("pdays", StringType(), True),
    StructField("previous", StringType(), True),
    StructField("poutcome", StringType(), True),
    StructField("deposit", StringType(), True)
])

df = spark.read.csv("file://" + file_path('bank.csv'), sep=',', header=True, schema=schema)
# Transformar la columna 'deposit' de 'yes'/'no' a 1/0
df = df.withColumn("deposit", when(df["deposit"] == "yes", 1).otherwise(0))
df.show(10)

+---+----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|       job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|    admin.| married|secondary|     no| 2343.0|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|      1|
| 56|    admin.| married|secondary|     no|   45.0|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|      1|
| 41|technician| married|secondary|     no| 1270.0|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|      1|
| 55|  services| married|secondary|     no| 2476.0|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|      1|
| 54|    admin.| married| tertiary|     no|  184.0|     no|  no|unkno

Como un ejemplo concreto, construyamos una UDF que examine los datos bancarios con los que trabajamos en el Capítulo 3, De Modelo a Fábrica de Modelos, para crear una nueva columna llamada 'month_as_int' que convierta la representación actual del mes en una cadena a un número entero para su posterior procesamiento. No nos preocuparemos por las divisiones de entrenamiento/prueba ni para qué podría usarse esto; en su lugar, simplemente destacaremos cómo aplicar algo de lógica a una UDF de PySpark. Comencemos:


In [12]:
import datetime
def month_as_int(month):
    month_number = datetime.datetime.strptime(month, "%b").month
    return month_number


Si queremos aplicar nuestra función dentro de Spark SQL, entonces debemos registrar la función como una UDF. Los argumentos para la función register() son el nombre registrado de la función, el nombre de la función de Python que acabamos de escribir y el tipo de retorno. El tipo de retorno es StringType() por defecto, pero lo hemos especificado explícitamente aquí:


In [13]:
from pyspark.sql.types import StringType
spark.udf.register("monthAsInt", month_as_int, StringType())

<function __main__.month_as_int(month)>

Finalmente, ahora que hemos registrado la función, podemos aplicarla a nuestros datos. Primero, crearemos una vista temporal del conjunto de datos del banco y luego ejecutaremos una consulta Spark SQL contra él que haga referencia a nuestra UDF. Ejecutar la siguiente sintaxis con el comando show() da el resultado mostrado en la Figura 6.3, que es nuestro resultado deseado:

In [15]:
df.createOrReplaceTempView('bank_data_view')
spark.sql('''
select *, monthAsInt(month) as month_as_int from bank_data_view
''').show()

                                                                                

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|month_as_int|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
| 59|     admin.| married|secondary|     no| 2343.0|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|      1|           5|
| 56|     admin.| married|secondary|     no|   45.0|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|      1|           5|
| 41| technician| married|secondary|     no| 1270.0|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|      1|           5|
| 55|   services| married|secondary|     no| 2476.0|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| u

Alternativamente, podemos crear nuestra UDF con la siguiente sintaxis y aplicar el resultado a un DataFrame de Spark. Esto nos da el mismo resultado que se muestra en la captura de pantalla anterior:

In [16]:
from pyspark.sql.functions import udf
month_as_int_udf = udf(month_as_int, StringType())
df = spark.table("bank_data_view")
df.withColumn('month_as_int', month_as_int_udf("month")).show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|month_as_int|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
| 59|     admin.| married|secondary|     no| 2343.0|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|      1|           5|
| 56|     admin.| married|secondary|     no|   45.0|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|      1|           5|
| 41| technician| married|secondary|     no| 1270.0|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|      1|           5|
| 55|   services| married|secondary|     no| 2476.0|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| u

Finalmente, PySpark también proporciona una sintaxis de decorador útil para crear nuestro UDF. El siguiente bloque de código también ofrece los mismos resultados que la captura de pantalla anterior:

In [17]:
@udf("int")
def month_as_int_udf(month):
    month_number = datetime.datetime.strptime(month, "%b").month
    return month_number
df.withColumn('month_as_int', month_as_int_udf("month")).show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|month_as_int|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+------------+
| 59|     admin.| married|secondary|     no| 2343.0|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|      1|           5|
| 56|     admin.| married|secondary|     no|   45.0|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|      1|           5|
| 41| technician| married|secondary|     no| 1270.0|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|      1|           5|
| 55|   services| married|secondary|     no| 2476.0|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| u

Esto muestra cómo podemos aplicar cierta lógica simple en un UDF, pero para desplegar un modelo a gran escala usando este enfoque, tenemos que poner la lógica de ML dentro de la función y aplicarla de la misma manera. Esto puede volverse un poco complicado si queremos trabajar con algunas de las herramientas estándar a las que estamos acostumbrados en el mundo de la ciencia de datos, como pandas y scikit-learn. Afortunadamente, hay otra opción que podemos usar que tiene algunos beneficios. La discutiremos ahora.

Las UDF que se están considerando actualmente tienen un pequeño problema cuando trabajamos en Python, ya que la traducción de datos entre la JVM y Python puede tardar un tiempo. Una forma de evitar esto es utilizar lo que se conoce como UDFs de pandas, que utilizan la biblioteca Apache Arrow en segundo plano para garantizar que los datos se lean rápidamente para la ejecución de nuestras UDFs. Esto nos brinda la flexibilidad de las UDFs sin ninguna desaceleración.

Las UDFs de pandas también son extremadamente poderosas porque funcionan con la sintaxis de, sí, lo adivinaste, los objetos Series y DataFrame de pandas. Esto significa que muchos científicos de datos que están acostumbrados a trabajar con pandas para construir modelos localmente pueden adaptar fácilmente su código para escalar utilizando Spark.

Como ejemplo, vamos a recorrer cómo aplicar un clasificador simple al conjunto de datos de vinos que usamos anteriormente en este libro. Cabe señalar que el modelo no fue optimizado para estos datos; solo estamos mostrando un ejemplo de cómo aplicar un clasificador preentrenado.

In [20]:
import sklearn.svm
import sklearn.datasets
clf = sklearn.svm.SVC()
X, y = sklearn.datasets.load_wine(return_X_y=True) 
clf.fit(X, y)

0,1,2
,C,1.0
,kernel,'rbf'
,degree,3
,gamma,'scale'
,coef0,0.0
,shrinking,True
,probability,False
,tol,0.001
,cache_size,200
,class_weight,


Luego podemos llevar los datos de las características a un DataFrame de Spark para mostrarte cómo aplicar la UDF de pandas en etapas posteriores:

In [21]:
df = spark.createDataFrame(X.tolist())

Los UDFs de pandas son muy fáciles de definir. Solo tenemos que escribir nuestra lógica en una función y luego agregar el decorador @pandas_udf, donde también debemos proporcionar el tipo de salida de la función. En el caso más simple, podemos simplemente envolver el proceso (normalmente serial o solo paralelizado localmente) de realizar una predicción con el modelo entrenado:

In [22]:
import pandas as pd
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import pandas_udf

@pandas_udf(returnType=IntegerType())
def predict_pd_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(clf.predict(X))

Finalmente, podemos aplicar esto al DataFrame de Spark que contiene los datos pasando los valores de entrada apropiados que necesitábamos para nuestra función. En este caso, vamos a pasar los nombres de las columnas de las características, de las cuales hay 13:

In [24]:
col_names = ['_{}'.format(x) for x in range(1, 14)]
df_pred = df.select('*', predict_pd_udf(*col_names).alias('class'))
df_pred.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----+----+----+----+-----+----+----+----+----+----+----+----+------+-----+
|   _1|  _2|  _3|  _4|   _5|  _6|  _7|  _8|  _9| _10| _11| _12|   _13|class|
+-----+----+----+----+-----+----+----+----+----+----+----+----+------+-----+
|14.23|1.71|2.43|15.6|127.0| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065.0|    0|
| 13.2|1.78|2.14|11.2|100.0|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050.0|    0|
|13.16|2.36|2.67|18.6|101.0| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185.0|    0|
|14.37|1.95| 2.5|16.8|113.0|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480.0|    0|
|13.24|2.59|2.87|21.0|118.0| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735.0|    2|
| 14.2|1.76|2.45|15.2|112.0|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450.0|    0|
|14.39|1.87|2.45|14.6| 96.0| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290.0|    0|
|14.06|2.15|2.61|17.6|121.0| 2.6|2.51|0.31|1.25|5.05|1.06|3.58|1295.0|    0|
|14.83|1.64|2.17|14.0| 97.0| 2.8|2.98|0.29|1.98| 5.2|1.08|2.85|1045.0|    0|
|13.86|1.35|2.27|16.0| 98.0|2.98|3.15|0.22|1.85|7.22|1.01|3.55|1045.0|    0|

                                                                                

Y eso completa nuestro recorrido exprés por los UDFs y los pandas UDFs en Spark, los cuales nos permiten tomar la lógica serial de Python, como transformaciones de datos o nuestros modelos de ML, y aplicarla de manera marcadamente paralela. En la siguiente sección, nos centraremos en cómo prepararnos para realizar cálculos basados en Spark en la nube.

## Spark on the cloud

Como debería estar claro a partir de la discusión anterior, escribir y desplegar soluciones de ML basadas en PySpark se puede hacer en tu portátil, pero para que puedas ver los beneficios al trabajar a gran escala, debes contar con un clúster de computación de tamaño adecuado. Proporcionar este tipo de infraestructura puede ser un proceso largo y doloroso, pero como ya se ha discutido en este libro, existe una gran cantidad de opciones de infraestructura disponibles por parte de los principales proveedores de nube pública.

Para Spark, AWS tiene una solución particularmente buena llamada Elastic Map Reduce (EMR), que es una plataforma de big data gestionada que te permite configurar fácilmente clústeres de varios tipos dentro del ecosistema de big data. En este libro, nos centraremos en soluciones basadas en Spark, por lo que nos enfocaremos en crear y usar clústeres que tengan herramientas de Spark disponibles.

En la siguiente sección, revisaremos un ejemplo concreto de cómo levantar un clúster de Spark en EMR y luego desplegar una aplicación sencilla basada en Spark MLlib en él. Así que, con eso, ¡exploremos Spark en la nube con AWS EMR!




### Ejemplo de AWS EMR

Para entender cómo funciona EMR, continuaremos en la línea práctica que seguirá el resto de este libro y nos sumergiremos en un ejemplo. Comenzaremos aprendiendo cómo crear un clúster completamente nuevo antes de discutir cómo escribir y desplegar nuestra primera solución de PySpark ML en él. ¡Comencemos!

Después de iniciar nuestro clúster EMR, queremos poder enviarle trabajos. Aquí, adaptaremos el ejemplo de la canalización Spark MLlib que producimos en el Capítulo 3, De Modelo a Fábrica de Modelos, para analizar el conjunto de datos bancarios y enviar esto como un paso a nuestro clúster recién creado. Haremos esto como un script independiente de PySpark que actúa como un solo paso en nuestra aplicación, pero es fácil ampliar esto para crear aplicaciones mucho más complejas.

Primero, tomaremos el código del Capítulo 3, De Modelo a Fábrica de Modelos, y realizaremos una buena refactorización basada en nuestras discusiones sobre buenas prácticas. Primero, podemos modularizar el código de manera más efectiva para que contenga una función que proporcione todos nuestros pasos de modelado (no todos los pasos se han reproducido aquí por brevedad). También hemos incluido un paso final que escribe los resultados del modelado en un archivo parquet:


In [None]:
from pyspark.sql import functions as f

def model_bank_data(spark, input_path, output_path):
    data = spark.read.format("csv")\
    .option("sep", ";")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load(input_path)
    data = data.withColumn('label', f.when((f.col("y") == "yes"), 1).otherwise(0))
    data.write.format('parquet')\
    .mode('overwrite')\
    .save(output_path)

Esta función model_bank_data:

1. Lee un archivo CSV de datos bancarios.

2. Crea una columna binaria label basada en si la respuesta y fue "yes".

3. Guarda el resultado en formato Parquet

Basándonos en esto, envolveremos todo el código principal de plantilla en una función principal que se pueda llamar en el punto de entrada del programa if __name__=="__main__":

In [None]:
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_path', ayuda='Ruta del bucket S3 para los datos de entrada. Se asume que es csv en este caso.'
    )
    parser.add_argument(
        '--output_path', ayuda='Ruta del bucket S3 para los datos de salida. Se asume que es parquet en este caso'
    )
    args = parser.parse_args()
    # Create spark context
    sc = SparkContext("local", "pipelines")
    # Get spark session
    spark = SparkSession\
        .builder\
        .appName('MLEIP Bank Data Classifier EMR Example')\
        .getOrCreate()
    
    model_bank_data(
        spark,
        input_path=args.input_path,#"s3://mleip-emr-ml-simple/bank.csv",
        output_path=args.output_path#"s3://mleip-emr-ml-simple/results.parquet"
    )
    

Esta función main() está diseñada como punto de entrada de un script de PySpark, que probablemente se ejecuta en un entorno como Amazon EMR o localmente para procesar datos almacenados en Amazon S3.

1. Parseo de argumentos desde la línea de comandos: 

    Usa el módulo argparse para definir dos argumentos que se deben pasar al ejecutar el script:

        --input_path: ruta al archivo CSV (por ejemplo, en un bucket de S3).

        --output_path: ruta donde se guardarán los datos procesados (como Parquet).

    args.input_path y args.output_path almacenan los valores que se pasen al script desde la línea de comandos.

2. Inicialización de Spark:

Crea un contexto de Spark.

- "local" indica que se ejecuta en modo local (no en un clúster).

- "pipelines" es el nombre de la aplicación.

- Crea una SparkSession, que es la entrada principal para usar la API de DataFrame en Spark.

- La aplicación se llama "MLEIP Bank Data Classifier EMR Example".

3. Llama a la función model_bank_data

Usa los argumentos de entrada y salida proporcionados por el usuario para ejecutar la función model_bank_data, que:

- Lee el CSV desde S3 (input_path).

- Crea la columna label.

- Escribe el resultado como Parquet en output_path.


En definitiva, la función completa quedaria así:

In [None]:
import argparse
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as f
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.feature import StandardScaler, OneHotEncoder, StringIndexer, Imputer,VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression

def model_bank_data(spark, input_path, output_path):
    data = spark.read.format("csv")\
    .option("sep", ";")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load(input_path)
    data = data.withColumn('label', f.when((f.col("y") == "yes"), 1).otherwise(0))
    data.write.format('parquet')\
    .mode('overwrite')\
    .save(output_path)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_path', ayuda='Ruta del bucket S3 para los datos de entrada. Se asume que es csv en este caso.'
    )
    parser.add_argument(
        '--output_path', ayuda='Ruta del bucket S3 para los datos de salida. Se asume que es parquet en este caso'
    )
    args = parser.parse_args()
    # Create spark context
    sc = SparkContext("local", "pipelines")
    # Get spark session
    spark = SparkSession\
        .builder\
        .appName('MLEIP Bank Data Classifier EMR Example')\
        .getOrCreate()
    
    model_bank_data(
        spark,
        input_path=args.input_path,#"s3://mleip-emr-ml-simple/bank.csv",
        output_path=args.output_path#"s3://mleip-emr-ml-simple/results.parquet"
    )

if __name__ == "__main__":
    main()

Ahora, para enviar este script al clúster EMR que acabamos de crear, necesitamos averiguar el ID del clúster, que podemos obtener desde la interfaz de usuario de AWS o ejecutando el siguiente comando:

`aws emr list-clusters --cluster-states WAITING`

Luego, necesitamos enviar el script emr_sparkmllib.py a S3 para que el clúster pueda leerlo. Podemos crear un bucket de S3 llamado mleip-emr-ml-simple para almacenar este y otros artefactos utilizando ya sea la CLI o la consola de AWS (ver Capítulo 5, Patrones y Herramientas de Despliegue). Una vez copiado, estamos listos para los pasos finales.

Ahora, debemos enviar el script usando el siguiente comando, reemplazando <CLUSTER_ID> con el ID del clúster que acabamos de crear. Después de unos minutos, el paso debería haberse completado y haber escrito los resultados en el archivo results.parquet en el mismo bucket de S3:

aws emr add-steps \
--cluster-id <CLUSTER_ID> \
--steps Type=Spark, Name="Spark Application Step",ActionOnFailure=CONTINUE,Args=
[s3://mleip-emr-ml-simple/emr_sparkmllib.py,--input_path,s3://mleip-emr-ml-
simple/bank.csv --output_path,s3://mleip-emr-ml-simple/results.parquet]

¡Y eso es todo: así es como podemos empezar a desarrollar tuberías de ML con PySpark en la nube usando AWS EMR!

# Poner en marcha infraestructura sin servidor

Siempre que hacemos cualquier tarea de aprendizaje automático o ingeniería de software, tenemos que ejecutar las tareas y cálculos requeridos en computadoras, a menudo con las redes, la seguridad y otros protocolos y software ya implementados, lo cual a menudo hemos referido previamente como nuestra infraestructura. Una gran parte de nuestra infraestructura son los servidores que usamos para ejecutar los cálculos reales. Esto podría parecer un poco extraño, así que comencemos hablando sobre la infraestructura sin servidor (¿cómo puede existir tal cosa?). Esta sección explicará este concepto y te mostrará cómo utilizarlo para escalar tus soluciones de aprendizaje automático.

Serverless es un término un poco engañoso, ya que no significa que no haya servidores físicos ejecutando tus programas. Sin embargo, sí significa que los programas que estás ejecutando no deben considerarse como alojados de manera estática en una sola máquina, sino como instancias efímeras en otra capa encima del hardware subyacente.

Los beneficios de las herramientas sin servidor para su solución de ML incluyen (pero no se limitan a) los siguientes:

- `¡Sin servidores!` No subestimes el ahorro de tiempo y energía que puedes obtener al delegar la gestión de la infraestructura a tu proveedor de la nube.
- `Simplified scaling`: Por lo general, es muy fácil definir el comportamiento de escalado de tus componentes serverless mediante el uso de un número máximo de instancias claramente definido, por ejemplo.
- `Low barrier to entry`: Estos componentes suelen ser extremadamente fáciles de configurar y ejecutar, lo que permite a usted y a los miembros de su equipo centrarse en escribir código, lógica y modelos de alta calidad.
- `Natural integration points`: Las herramientas sin servidor son a menudo agradables de usar para la transferencia entre otras herramientas y componentes. Su facilidad de configuración significa que puedes estar funcionando rápidamente con trabajos simples que transfieren datos o activan otros servicios en poco tiempo.
- `Simplified serving`: Algunas herramientas serverless son excelentes para proporcionar una capa de servicio para tus modelos de ML. La escalabilidad y la baja barrera de entrada mencionadas anteriormente significan que puedes crear rápidamente un servicio muy escalable que brinde predicciones bajo solicitud o al ser activado por algún otro evento.

Uno de los mejores y más utilizados ejemplos de funcionalidad sin servidor es AWS Lambda, que nos permite escribir programas en una variedad de lenguajes con una sencilla interfaz de navegador web o a través de nuestras herramientas de desarrollo habituales, y luego hacer que se ejecuten de manera completamente independiente de cualquier infraestructura que se haya configurado. Lambda es una solución increíble con una baja barrera de entrada para poner algún código en funcionamiento y escalarlo. Sin embargo, está muy orientado a la creación de APIs simples que pueden ser accedidas mediante una solicitud HTTP. Desplegar tu modelo de ML con Lambda es especialmente útil si apuntas a un sistema impulsado por eventos o solicitudes.

Para ver esto en acción, construyamos un sistema básico que reciba datos de imagen entrantes mediante una solicitud HTTP con un cuerpo JSON y devuelva un mensaje similar que contenga la clasificación de los datos utilizando un modelo de scikit-learn preconstruido. Esta guía se basa en el ejemplo de AWS en https://aws.amazon.com/blogs/compute/deploying-machine-learning-models-with-serverless-templates/.

Para esto, podemos ahorrar mucho tiempo aprovechando plantillas ya construidas y mantenidas como parte del marco de trabajo AWS Serverless Application Model (SAM) (https://aws.amazon.com/about-aws/whats-new/2021/06/aws-sam-launches-machine-learning-inference-templates-for-aws-lambda/).


Ahora, vamos a realizar los siguientes pasos para configurar una plantilla de despliegue de Lambda para alojar y servir un modelo de aprendizaje automático en una infraestructura sin servidor:

Primero, debemos ejecutar el comando sam init y seleccionar la opción de Plantillas de Inicio Rápido de AWS:

# Containerizing at scale with Kubernetes

Ya hemos cubierto cómo usar contenedores para construir y desplegar nuestras soluciones de ML. El siguiente paso es entender cómo orquestar y gestionar varios contenedores para desplegar y ejecutar aplicaciones a gran escala. Aquí es donde entra la herramienta de código abierto K8s.

K8s es una herramienta extremadamente poderosa que ofrece una variedad de funcionalidades diferentes que nos ayudan a crear y gestionar aplicaciones en contenedores muy escalables, incluyendo (pero no limitado a) lo siguiente:

- `Load Balancing`: K8s gestionará el enrutamiento del tráfico entrante hacia tus contenedores para que la carga se distribuya de manera equitativa.
- `Horizontal Scaling`:K8s proporciona interfaces simples para que puedas controlar la cantidad de instancias de contenedores que tienes en cualquier momento, lo que te permite escalar masivamente si es necesario.
- `Self Healing`: Existe una gestión integrada para reemplazar o reprogramar los componentes que no superan las verificaciones de estado.
- `Automated Rollbacks`: K8s almacena el historial de tu sistema para que puedas revertir a una versión anterior que funcione si algo sale mal.

Todas estas características ayudan a garantizar que sus soluciones desplegadas sean robustas y capaces de funcionar según lo requerido en todas las circunstancias. K8s está diseñado para asegurar que las características anteriores estén integradas desde el inicio mediante el uso de una arquitectura de microservicios, con un plano de control que interactúa con nodos (servidores), cada uno de los cuales aloja pods (uno o más contenedores) que ejecutan los componentes de su aplicación.

Lo más importante que K8s te ofrece es la capacidad de escalar tu aplicación en función de la carga mediante la creación de réplicas de la solución base. Esto es extremadamente útil si estás construyendo servicios con endpoints de API que podrían enfrentar aumentos en la demanda en diferentes momentos. Para aprender sobre algunas de las formas en que puedes hacer esto, consulta https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#scaling-a-deployment:

Kubeflow se presenta a sí mismo como el conjunto de herramientas de ML para K8s (https://www.kubeflow.org/), por lo que, como ingenieros de ML, tiene sentido que estemos al tanto de esta solución en rápido desarrollo. Esta es una herramienta muy emocionante y un área de desarrollo activa. El concepto de escalado horizontal para K8s generalmente todavía se aplica aquí, pero Kubeflow proporciona algunas herramientas estandarizadas para convertir los pipelines que construyes en recursos estándar de K8s, los cuales luego pueden gestionarse y asignarse recursos de las maneras descritas anteriormente. Esto puede ayudar a reducir el código repetitivo y nos permite a nosotros, como ingenieros de ML, centrarnos en construir nuestra lógica de modelado en lugar de configurar la infraestructura, aunque K8s ya sea una buena abstracción.

# Summary

En este capítulo, analizamos cómo tomar las soluciones de ML que hemos estado construyendo en los capítulos anteriores y consideramos cómo escalarlas a volúmenes de datos más grandes o a un mayor número de solicitudes de predicciones. Para esto, nos centramos principalmente en Apache Spark, ya que es el motor de propósito general más popular para la computación distribuida. Durante nuestra discusión sobre Apache Spark, revisitamos algunos patrones de codificación y sintaxis que habíamos usado previamente en este libro. Al hacerlo, desarrollamos una comprensión más profunda de cómo y por qué realizar ciertas acciones al desarrollar en PySpark. Discutimos el concepto de UDFs en detalle y cómo estos pueden usarse para crear flujos de trabajo de ML altamente escalables.

Después de esto, exploramos cómo trabajar con Spark en la nube, específicamente a través del servicio Elastic MapReduce (EMR) proporcionado por AWS. Luego, analizamos algunas de las otras formas en que podemos escalar nuestras soluciones; es decir, mediante arquitecturas sin servidor y escalado horizontal con contenedores. En el primer caso, recorrimos cómo construir un servicio para ofrecer un modelo de ML utilizando AWS Lambda. Esto utilizó plantillas estándar proporcionadas por el marco de Gestión de Aplicaciones Serverless de AWS. Finalmente, proporcionamos una visión general de cómo usar K8s y Kubeflow para escalar horizontalmente tuberías de ML, así como algunos de los otros beneficios de usar estas herramientas.

En el próximo capítulo, reuniremos muchos aspectos de nuestro nuevo conocimiento en ingeniería de ML para construir un microservicio de pronóstico, basándonos en el ejemplo de modelo básico mostrado en el Capítulo 1, Introducción a la Ingeniería de ML.