# Taller de procesamiento de BigData en Spark + R
Manuel Parra (manuelparra@decsai.ugr.es). <a href="http://sci2s.ugr.es/es">Soft Computing and Intelligent Information Systems</a>
. <a href="http://sci2s.ugr.es/dicits/">Distributed Computational Intelligence and Time Series</a>. **University of Granada**.
![logos](https://sites.google.com/site/manuparra/home/header.png)

# Operaciones sobre SparkDataFrames


Como siempre para todos nuestros `scripts` con **SparkR**, cargamos la biblioteca, y creamos una nueva sesión de SparkR.

En este caso:

<span style="background-color:red;color:white">&nbsp; &nbsp; Cuidado con la cantidad de MEMORIA que usamos para esta sección ! &nbsp; &nbsp; </span>

In [None]:
#Fijamos la ruta donde está instalado Spark
Sys.setenv("SPARK_HOME"='/usr/local/spark/')

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
sparkR.session(appName="EntornoInicio", master = "local[*]", sparkConfig = list(spark.driver.memory = "1g"))

Los ``SparkDataFrames`` soportan un alto número de funciones para hacer un procesado de datos estructurado. 

Vamos a poner en práctica las más utilizadas. 

**La lista completa de operaciones que se pueden aplicar se puede ver desde API de SparkR en https://spark.apache.org/docs/latest/api/R/index.html **

![funcSparkR](https://sites.google.com/site/manuparra/home/functionSparkR.jpg)


# Operaciones con SparkDataFrames

In [None]:
# Cargamos una versión reducida de los datos en CSV
df_nyctrips <- read.df("/SparkR/datasets/yellow_tripdata_2016-02_small1.csv", "csv", header = "true", inferSchema = "true")

In [None]:
# Probamos de nuevo sin INFERIR SCHEMA

# Cargamos una versión reducida de los datos en CSV
df_nyctrips <- read.df("/SparkR/datasets/yellow_tripdata_2016-02_small1.csv", "csv", header = "true", inferSchema = "false")


** ¿Cuál de las dos sentencias anteriores ha tardado más? **

Estudiamos de manera superficial el dataset

In [None]:
# Comprobamos los campos del dataset
printSchema(df_nyctrips)

# Comprobamos como son los datos:
head(df_nyctrips)

# Contamos el total del registros:
count(df_nyctrips)

## Selección de instancias y columnas

Para la selección de columnas y filas, usamos ``select`` y ``filter``. 

Todas las operaciones se pueden combinar para producir un nuevo dataset o ``SparkDataFrame``. **Son equivalentes a usar SPARKSQL **.

Estas operaciones son esenciales si queremos transformar el dataset en otra versión preprocesada del mismo.

In [None]:
# Seleccionamos sólo la columna longitud, por el id de la columna
# Por ID de columna 
head(select(df_nyctrips,df_nyctrips$pickup_longitude))

In [None]:
# Seleccionamos sólo la columna longitud, por el nombre de la columna.
# Por nombre de columna del dataset
head(select(df_nyctrips,"pickup_longitude"))

Para aplicar filtros de para las filas usamos ``filter`` que admite expresiones con operadores condicionales: 

```
    < = > ! & | ...
```

In [None]:
# Aplicamos un filtro para ver los viajes aquellos viajes de taxi de más de 10 millas.
head(filter(df_nyctrips, df_nyctrips$trip_distance > 10 & df_nyctrips$total_amount> 20 ))


<HR>
<div style="font-family:helvetica;padding:5px;font-size:1.5em;background-color:#CFE7E2">Ejercicio práctico:</div>

Selecciona todos los viajes que se hacen desde las 10 de la noche a las 6 de la mañana que tienen más de 3 pasajeros

<HR>




In [None]:
# Aplicamos un filtro para ver los viajes aquellos viajes de taxi de más de 10 millas y el importe mayor de $ 20
head(filter(df_nyctrips, df_nyctrips$trip_distance > 10 & df_nyctrips$total_amount> 20 ))

Para agrupar datos se usa ``agg``. 

In [None]:
# Aplicamos un filtro para ver el viaje más caro en Taxi que se ha hecho:
head( agg(df_nyctrips ,max = max(df_nyctrips$total_amount)))

In [None]:
# Aplicamos un filtro para ver el viaje menos caro en Taxi que se ha hecho:
head(agg(df_nyctrips, min = min(df_nyctrips$total_amount)))


<HR>
<div style="font-family:helvetica;padding:5px;font-size:1.5em;background-color:#CFE7E2">Ejercicio práctico:</div>

- Calcula cual es el viaje más largo que se ha hecho en kilometros (1 milla aprox 1.6 kilómetros).

<HR>



## Uso de Agrupamiento y Agregación

Los SparkDataFrames soportan funciones de agregado despues de agrupar:

- ``groupBy``
- ``summarize``

Por ejemplo podemos utilizar lo siguiente:

In [None]:
# Agrupamos por Vendedor y mostramos el número de viajes.
head(summarize(groupBy(df_nyctrips, df_nyctrips$VendorID), count = n(df_nyctrips$VendorID)))

In [None]:
# Agrupamos por Vendedor y mostramos el número de viajes.
head(summarize(groupBy(df_nyctrips, df_nyctrips$VendorID), max = max(df_nyctrips$total_amount)))

In [None]:
# Agrupamos y ordenamos

numsum <- summarize(groupBy(df_nyctrips, df_nyctrips$VendorID), num = n(df_nyctrips$VendorID))
head(arrange(numsum,asc(numsum$num)))

In [None]:
# Agrupamos por numero de pasajeros y mostramos el numero de viajes
trips_passenger <- summarize(groupBy(df_nyctrips, df_nyctrips$passenger_count), count = n(df_nyctrips$passenger_count))

In [None]:
# Cuidado con el COLLECT !
trips_df <- head(collect(trips_passenger))


<HR>
<div style="font-family:helvetica;padding:5px;font-size:1.5em;background-color:#CFE7E2">Ejercicio práctico:</div>

¿Qué ocurre si hacemos ``collect`` de un SparkDataFrame?


<HR>


In [None]:
head(trips_df)

## Operaciones con columnas

Otras operaciones en R, corresponden con la manipulación o transformación de valores en los registros de un dataset. En este caso la manipulación es muy sencilla:

In [None]:
# Convertimos la columna de millas a kilómetros, igual que en R.
df_nyctrips$trip_distance <- df_nyctrips$trip_distance*1.6

In [None]:
head(df_nyctrips)

## Añadir columnas

In [None]:
# Usamos mutate para añadir columnas que operan con elementos de las demás columnas.

# mutate(sql_nyc,  uniform = rand(10),  normal  = randn(27))

head(mutate(df_nyctrips,  uniform = rand(10),  normal  = randn(27)))
head(mutate(df_nyctrips,  uniform =df_nyctrips$total_amount*1.1355,  normal  = randn(27)))


<HR>
<div style="font-family:helvetica;padding:5px;font-size:1.5em;background-color:#CFE7E2">Pregunta</div>

Añáde una columna que sea el tiempo del viaje. Pista ``INT(unix....())``.


<HR>

In [None]:
# Otro modo de hacerlo es:

head(withColumn(df_nyctrips,"uniform",rand(20)))

### dapply -- dapplayCollect

Aplicar una función a un conjunto datos masivo con ``dapply`` y ``dapplyCollect`` 

**dapply**

Aplica una función a cada partición de un ``SparkDataFrame``. La función que será aplicada para cada partición y debería tener sólo un parámetro. La salida de la función deberá ser igualmente un data.frame. Además hay que especificar el ``schema`` del formato de los datos del ``SparkDataFrame`` resultante y deberá corresponder con tipo de datos del valor devuelto.



In [None]:
# Hacemos una copia del SparkDataFrame para usarla en una vista temporal en SQL
createOrReplaceTempView(df_nyctrips,"slqdf_filtered_nyc")

# Hacemos una selección de los registros, donde calculamos el tiempo del viaje de cada viaje
sql_nyc <- sql("select VendorID,INT(unix_timestamp(tpep_dropoff_datetime)- unix_timestamp(tpep_pickup_datetime)) AS trip_time,passenger_count,trip_distance,total_amount from slqdf_filtered_nyc")

# Mostramos un trozo de SparkDataFrame
head(sql_nyc)

schema(sql_nyc)

# Indicamos el Schema, que debe coincidir con lo que queremos
schema <- structType(
    structField("VendorID", "integer"),
    structField("trip_time", "integer"), 
    structField("passenger_count", "integer"),
    structField("trip_distance", "double"),
    structField("total_amount", "double"),
    structField("total_amount_euro", "double")
)

# Creamos la función que hará los cambios.
new_sql_nyc <- dapply(
    sql_nyc, 
    function(x) { 
        x <- cbind(x, x$total_amount*1.1355) 
    }, 
    schema)

# Vemos el cambio
head(new_sql_nyc)


### gapply -- gapplyCollect


Aplica una función a cada uno de los grupos de un ``SparkDataFrame``. La función será aplicada a cada grupo del ``SparkDataFrame`` y debería tener sólo dos parámetros: agrupamiento por llave y data.frame al que corresponde esa llave. La salida de la función debería ser un data.frame. 

In [None]:
# Esquema del SparkDataFrame
schema <- structType(
    structField("VendorID", "integer"),
    structField("trip_time", "integer"), 
    structField("passenger_count", "integer"),
    structField("trip_distance", "double"),
    structField("total_amount", "double"),
    structField("max_amount", "double")
)

# Aplicamos la función gapply. Calculamos el máximo de cada Vendedor.
result <- gapply(
    sql_nyc,
    "VendorID",
    function(key, x) {
        y <- data.frame(key, max(x$total_amount))
    },
    schema)

# Mostramos el resultado.
head(result[order(result$trip_distance, decreasing = TRUE), ])



In [None]:
head(sql_nyc)

# Ahora probamos el gapplycollect: 
# Como el gapply, aplica una funcion a cada partición y luego hace un collect del resultado en un data.frame en R.
result <- gapplyCollect(
            
    sql_nyc,
    "VendorID",
    function(key, x) {
        y <- data.frame(key, max(x$trip_distance))
        colnames(y) <- c("VendorID", "max_trip_distance")
        y
    })

# Vemos el resultado.
head(result[order(result$trip_distance, decreasing = TRUE), ])

## Operando con SparkSQL sobre cojuntos masivos de datos.

Todas las funciones de manejo de datos que se han usado con SparkR, pueden hacerse de una forma sencilla e intuitiva  con SparkSQL

In [None]:
# sql_nyc es nuestro DataFrameSpark de SQL
createOrReplaceTempView(sql_nyc,"slqdf_filtered_nyc")

# Hacemos una consulta para extraer el viaje de mayor distancia de cada venderor.
results <- sql("select VendorID, MAX(trip_distance) from slqdf_filtered_nyc GROUP BY VendorID ")

In [None]:
# Vemos el resultado.
head(results)

**Recuerda cerrar la sesión con Spark**

En secciones siguientes se revisará en profunidad SparkSQL.