# Spark

---

Spark le permite distribuir datos y cálculos en clústeres con múltiples nodos (piense en cada nodo como una computadora separada). La división de sus datos facilita el trabajo con conjuntos de datos muy grandes porque cada nodo solo funciona con una pequeña cantidad de datos.

Como cada nodo trabaja en su propio subconjunto de los datos totales, también realiza una parte de los cálculos totales necesarios, de modo que tanto el procesamiento de datos como el cálculo se realizan en paralelo sobre los nodos del clúster. Es un hecho que la computación en paralelo puede hacer que ciertos tipos de tareas de programación sean mucho más rápidas.

El primer paso para usar Spark es conectarse a un clúster. Habrá una computadora, llamada _master_, que administrará la división de los datos y los cálculos. El _master_ está conectado al resto de las computadoras del clúster, que se denominan _worker_. El _master_ envía a los _workers_ datos y cálculos para que se ejecuten, y ellos envían sus resultados al _master_.

## Spark DataFrames

La estructura de datos central de Spark es el Resilient Distributed Dataset (RDD). Este es un objeto de bajo nivel que permite que Spark haga su magia al dividir los datos en varios nodos del clúster. Sin embargo, es difícil trabajar directamente con los RDD, por lo que usaremos Spark DataFrame construido sobre los RDD.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

### Spark Schema

Si conoce el tipo de dato es bueno definir el schema en Spark, para tener un mejor rendimiento. El schema se puede definir usando la clase `StructType()` que recibe una lista de los diferentes tipos de datos. Para cada variable(o columna) puedo definir el tipo de datos usando la clase `StructField("name", dataType, bool)` que recibe el nombre del campo, el tipo de dato y si los datos pueden ser nulos.

### Data

* mon: mes (entero entre 1 y 12).
* dom: dia del mes (entero entre 1 y 31).
* dow: dia de la semana (1: lunes, ..., 7: domingo).
* org: origen del aeropuerto.
* mile: distancia en millas.
* carrier: operador.
* depart: hora de salida (hora decimal).
* duration: duracion esperada (minutos).
* delay: retraso (minutos).

In [0]:
path_file = "/FileStore/tables/flights.csv"

schema = StructType([
  StructField("mon", IntegerType(), True),
  StructField("dom", IntegerType(), True),
  StructField("dow", IntegerType(), True),
  StructField("carrier", StringType(), True),
  StructField("flight", IntegerType(), True),
  StructField("org", StringType(), True),
  StructField("mile", IntegerType(), True),
  StructField("depart", DoubleType(), True),
  StructField("duration", IntegerType(), True),
  StructField("delay", IntegerType(), True)
])

flights = spark.read.csv(path_file, header=True, schema=schema)
flights.show(5)

### Lazy evaluation

Spark no realiza ninguna transformación hasta que se solicita una acción. Básicamente a medida que agregamos más código, simplemente estamos actualizando el proceso que va a ejecutar.

In [0]:
import numpy as np

random_numbers = np.random.normal(loc=1000, scale=150, size=10000).tolist()

random_numbers_rdd = sc.parallelize(random_numbers)
random_numbers_rdd

In [0]:
random_numbers_rdd.collect()[:50]

In [0]:
flights

### Crear columnas

Actualizar un Spark DataFrame es algo diferente a trabajar `Pandas` porque el Spark DataFrame es inmutable . Esto significa que no se puede cambiar y, por lo tanto, las columnas no se pueden actualizar en su lugar.

Por lo tanto, todos estos métodos devuelven un nuevo DataFrame.

En Spark puedes hacer esto usando el método `.withColumn()`, que toma dos parámetros. Primero, una cadena con el nombre de su nueva columna y, en segundo lugar, la nueva columna en sí.

La nueva columna debe ser un objeto de clase `Column`. Crear uno de estos es tan fácil como extraer una columna de su DataFrame usando `df.colName`.

In [0]:
flights = flights.withColumn('km', flights.mile * 1.60934)
flights = flights.withColumn('retraso', (flights.delay >= 15).cast("integer"))

flights.show(5)

En algunas ocasiones querrás cambiar algún valor o una variable basado en un condicional. Spark, proporciona algunas sentencias condicionales que actúan de forma similar a **_if_**, **_then_**, y **_else_**. Es posible realizar una declaración tradicional del estilo mencionado anteriormente, sin embargo, puede degradar el rendimiento.
Podemos usar entonces condicionales incorporados optimizados, tales como:
* `when(<if condition>, <then x>)`
* `otherwise()`

In [0]:
flights = flights.withColumn("retraso", when(flights.retraso == 1, "demorado")
                             .when(flights.retraso == 0, "a tiempo")
                             .otherwise("N/A"))
flights.show(4)

In [0]:
flights.withColumnRenamed("retraso", "descripcion").show(4)

### Filtrar datos

Podemos usar el método `.filter()` para filtrar datos. Este método toma una expresión que seguirá la `WHERE` sentencia de una expresión SQL como un string, o una columna Spark con valores booleanos (`True`/`False`).

**Ejemplo**:

```

df.filter("column > 150").show()

df.filter(df.Column > 150).show()

```

In [0]:
flights.filter("carrier = 'US'").show()

In [0]:
flights.filter(flights.carrier == "US").show(4)

In [0]:
flights.createOrReplaceTempView("flights")

In [0]:
spark.catalog.listTables()

In [0]:
query = """
        SELECT *
        FROM flights
        WHERE carrier = 'US'
        """

spark.sql(query).show()

### Seleccionar

La variante Spark para seleccionar es el método `.select()`. Este método toma varios argumentos, uno para cada columna que desee seleccionar. Estos argumentos pueden ser el nombre de la columna como una cadena(uno para cada columna) o un objeto de columna(usando `df.colName`). Cuando pasa un objeto de columna, puede realizar operaciones como sumar o restar en la columna para cambiar los datos que contiene, al igual que en el interior `.withColumn()`.

La diferencia entre los métodos `.select()` y `.withColumn()` es que `.select()` devuelve solo las columnas que especifique, mientras que `.withColumn()` devuelve todas las columnas del DataFrame además de la que definió. A menudo es una buena idea dejar columnas que no necesita al comienzo de una operación para no arrastrar datos adicionales mientras hace cálculos. En este caso, usaría `.select()` y no `.withColumn()`.

**Nota**:

* Cuando hacemos un cálculo o una operación dentro de un `.select()`, para nombrar el resultado podemos usar el método `.alias()`.
* Podemos usar un método `.selectExpr()` que tome expresiones SQL como un string.

In [0]:
flights.select("mile", "km", (flights.mile * 1.60934).alias("km_2")).show(6)

In [0]:
flights.select(flights.mile, flights.km).show(4)

### Funciones definidas por el usuario (UDF)

Es un método de Python que el usuario escribe para realizar un bit específico de lógica. Una vez escrito, el método se llama a través de `pyspark.sql.functions.udf`. El resultado se almacena como una variable y se puede llamar como una función de Spark normal.

**Ejemplo**:

```

def reverseString(mystr):
    return mystr[::-1]

udfReverseString = udf(reverseString, StringType())

```

La función `udf(func, dataType)` toma dos argumentos, la función que acaba de definir y el tipo de dato que Spark devolverá.

In [0]:
def lowercase(string):
  return string.lower()

udf_lowercase = udf(lowercase, StringType())

In [0]:
flights.withColumn("carrier", udf_lowercase(flights.carrier)).show(4)

### Agrupación

Métodos de agrupación como `.min()`, `.max()`, `.count()`, entre otros, se conocen como `GroupedData`. Estos se pueden usar después de usar el método `.groupBy()`.

**Ejemplo**

```

df.groupBy("column").min("column").show()

```

Además de los métodos `GroupedData` también podemos usar un método `.agg()`. Este método permite pasar una expresión de columna agregada que usa cualquier de las funciones agregadas del submódulo `pyspark.sql.functions`. Este submódulo contiene muchas funciones, y toma el nombre de una columna.

In [0]:
display(flights.groupBy("carrier").count())

carrier,count
UA,13170
AA,11316
B6,5267
OO,8148
US,2740
AQ,90
OH,3229
HA,707
WN,5333


In [0]:
flights.groupby("mon", "org").agg(F.min("duration").alias("min_duration"), F.mean("km").alias("avg_km")).show()

### Join

Para unir DataFrames podemos usar el método `.join()`. Este método recibe tres argumentos, el primero es el DataFrame con el cual voy a unir, el segundo, `on` es el nombre de la(s) columna(s) por las cual voy a unir; y el tercero, el argumento `how` especifica el tipo de join a realizar.

Por otro lado, también puedo usar Spark SQL para unir dos DataFrames. Para eso registramos los DataFrames como tablas en Spark.

**Ejemplo**:

```
df1.createOrRemplaceTempView("df1")
df2.createOrRemplaceTempView("df2")

sql = """
      SELECT *
      FROM df1
      LEFT JOIN df2
      ON df1.column = df2.column
      AND df1.column1 = df2.column1
      """

df = spark.sql(sql)
```

### Eliminar columna(s)

Para eliminar columna(s) en Spark podemos usar el método `.drop()`. Depende de si es una columna o varias, podemos hacerlo de la siguiente manera:

```
# Una sola columna

df = df.drop("Column")

# Varias columnas

cols_to_drop = ["col1", "col2"]

df = df.drop(*cols_to_drop)

```

In [0]:
flights.drop("flight").show(5)

In [0]:
flights.drop(*["mile", "km"]).show(5)

## Preprocesamiento

### Valores missing

En muchas ocasiones el set de datos tiene valores missing que tienen algún mecanismo en particular. Podemos usar varios métodos para detectar y tratar los valores missing:

* **.isNull()**: Devuelve `True` cuando la observación es un missing.
* **.fillna(value, subset=None)**: Para reemplazar datos missing por un valor definido. El primer parámetro es el valor a reemplazar, y el segundo, la lista de las columnas en donde se va a imputar el dato missing.
* **.dropna(how="any", thresh=None, subset=None)**: Para eliminar datos missing

In [0]:
for column in flights.columns:
  print(f"La variable {column} tiene {flights.where(flights[column].isNull()).count()} datos missing")

In [0]:
flights.fillna(0).show()

In [0]:
flights.dropna().show()

In [0]:
train_data, test_data = flights.randomSplit([0.7, 0.3], seed=42)

Recordemos que en sklearn, tenemos:

* Estimadores .fit()
* Transformadores .transform()
* Predictores .predict()

En PySpark.ml, tenemos:

* Estimadores .fit()
* Transformadores .transform()

In [0]:
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="mean", inputCol="delay", outputCol="delay_imp")

imputer = imputer.fit(train_data)

train_data = imputer.transform(train_data)

test_data = imputer.transform(test_data)


### Variables categóricas

Algunas clases que nos ayudan a codificar datos categóricos en la etapa de preprocesamiento son:

* `StringIndexer(inputCol, outputCol)` Los valores de índice que se asignan es basado en la frecuencia relativa de forma descendente.
* `OneHotEncoder()`

In [0]:
indexer = StringIndexer(inputCol='org', outputCol="org_idx")

train_data = train_data.drop("org_idx")

model_indexer = indexer.fit(train_data)

In [0]:
train_data = model_indexer.transform(train_data)

test_data = model_indexer.transform(test_data)

In [0]:
ohe = OneHotEncoder(inputCol='org_idx', outputCol='org_dummy').fit(train_data)

train_data = ohe.transform(train_data)

test_data = ohe.transform(test_data)

### Creación de vector predictor (Assembling)

En Spark se debe consolidar las distantes variables independientes en una sola columna

* `VectorAssembler(inputCols, outputCol)`

In [0]:
train_data.show(4)

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['org_dummy', 'km'], outputCol='features')

In [0]:
train_data = assembler.transform(train_data)

test_data = assembler.transform(test_data)

In [0]:
test_data.show(5)

## Entrenamiento

In [0]:
linear_reg = LinearRegression(labelCol='duration', featuresCol='features').fit(train_data)

In [0]:
predictions = linear_reg.transform(test_data)
predictions.show()

In [0]:
RegressionEvaluator(labelCol='duration').evaluate(predictions)

In [0]:
print(f"El intercepto es: {linear_reg.intercept}")
for col, coeff in zip(train_data.columns, linear_reg.coefficients.tolist()):
  print(f"El coeficiente para la variable {col} es: {coeff}")

## Pipelines

In [0]:
train_data, test_data = flights.randomSplit([0.7, 0.3], seed=42)

In [0]:
indexer = StringIndexer(inputCol="org", outputCol="org_idx")

ohe = OneHotEncoder(inputCols=["org_idx", "dom"], outputCols=["org_dummy", "dom_dummy"])

assembler = VectorAssembler(inputCols=["km", "org_dummy", "dom_dummy"], outputCol="features")

dtree = DecisionTreeRegressor(labelCol="duration", featuresCol="features", maxDepth=4)

In [0]:
from pyspark.ml.pipeline import Pipeline 

In [0]:
model = Pipeline(stages=[indexer, ohe, assembler, dtree])

model = model.fit(train_data)

In [0]:
predictions = model.transform(test_data)

rmse = RegressionEvaluator(labelCol="duration").evaluate(predictions)

print(f"El RMSE es: {round(rmse, 3)}")

## Validación cruzada

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

model = Pipeline(stages=[indexer, ohe, assembler, dtree])

params = ParamGridBuilder().addGrid(dtree.maxDepth, [5, 7, 9]) \
                           .addGrid(dtree.maxBins, [10, 20, 30]) \
                           .build()

evaluator = RegressionEvaluator(labelCol="duration", metricName="rmse")

dtree_cv = CrossValidator(estimator=model, estimatorParamMaps=params, evaluator=evaluator, numFolds=5).fit(train_data)

In [0]:
best_model = dtree_cv.bestModel

for idx, stage in enumerate(best_model.stages):
  print(f"Para el indice {idx} el stage es: {str(stage).split(':')[0]}")

In [0]:
for key, value in best_model.stages[3].extractParamMap().items():
  print(f"{str(key).split('__')[1]}: {value}")

In [0]:
print(f"El RMSE promedio es: {np.mean(dtree_cv.avgMetrics)}")

In [0]:
rmse = RegressionEvaluator(labelCol="duration").evaluate(best_model.transform(test_data))

print(f"El RMSE es: {rmse}")