<span style='font-size:2.4em'>Procesamiento masivo de datos con SparkR</span>

<span style='font-size:1.5em'>VIII Jornadas de usuarios de R. Albacete, Castilla-La Mancha, 17 y 18 de noviembre de 2016</span>

Taller impartido por: <span style='font-size:1.2em'>Manuel Jesús Parra Royón</span>


![Alt](https://sites.google.com/site/manuparra/home/logoparty.png)

<HR>

# Lectura de datos desde SparkR

![Spark+R](https://sites.google.com/site/manuparra/home/SparkRlogo.png)

Como siempre, para todos nuestros `scripts` con **SparkR**, cargamos la biblioteca, y creamos una nueva sesión de SparkR.

In [1]:
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
sparkR.session(appName="EntornoInicio", master = "local[*]", sparkConfig = list(spark.driver.memory = "1g"))


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union

Spark package found in SPARK_HOME: /usr/local/spark/


Launching java with spark-submit command /usr/local/spark//bin/spark-submit   --driver-memory "1g" sparkr-shell /tmp/RtmpVALPn5/backend_porta2e676ae652 


Java ref type org.apache.spark.sql.SparkSession id 1 

Hoy en día el trabajo con BigData parece que siempre está asociado al ecosistema HADOOP. Hace unos años esto significaba que si también eras un buen programador en JAVA, trabajar en tal entorno, incluso un simple programa para hacer un WORDCOUNT, implicaba varias docenas de líneas de código. 

Pero hace 3-4 años la cuestión ha cambiado gracias a Apache Spark con su API de estilo funcional. 

Está escrito en SCALA pero también puede ser utilizado desde Python, JAVA y como estais viendo por este Taller: también en R

## Fuentes de datos

Dentro de una sesión de Spark, las aplicaciones pueden crear `SparkDataFrames` desde variadas fuentes de datos, como por ejemplo:  un fichero local (`data.frame`), desde HDFS (``hdfs:///``),  desde tablas en `HIVE` o desde otras múltiples fuentes de datos (AmazonS3, etc).

Concretamente las principales fuentes u orígenes de datos desde las que **cargar datos** son los siguientes:

* Ficheros locales
* Ficheros en sistemas distribuidos de almacenamiento Hadoop HDFS
* Sistemas de almacenamiento de datos tipo HIVE
* Desde bases de datos relacionales a través de JDBC
* ...


## Tipos de fuentes de datos

Una cosa son las **fuentes de datos** y otras cosa son los **tipo de fuentes de datos**. El tipo de fuente de datos puede ser visto como el formato de los datos.

Los conjuntos de datos pueden están almacenados en diferentes formatos, los más utilizados para SparkR (y Spark):

* Ficheros planos y CSV
* **Ficheros JSON**
* Ficheros de tipo ```avro``` (row-based)
* Ficheros de tipo ```parquet``` (column-based)
* Ficheros de tipo ```orc``` (column-based)

![ListOfSources](https://sites.google.com/site/manuparra/home/files_API.png)


## Repositorio de Datasets para todo el taller

Todos los conjuntos de datos que vamos a tratar para el Taller se encuentran disponibles en el directorio ```datasets```. Para consultar, modificar y añadir datasets, ficheros, etc, puedes hacerlo usando el gestor de fichero de ```Jupyter``` desde:  http://localhost:25980/tree/datasets

<BR>

## Trabajo con ficheros en formato CSV 

Vamos a revisar todas las funcionalidades que ofrece SparkR para el manejo de CSV

### Lectura

Comprobamos que hay en el directorio donde tenemos los datasets

In [3]:
list.files("/root/TallerSparkR/datasets/csv",full.names = T,recursive = T)


Leemos un fichero concreto de datos en formato ```CSV``` del directorio ```datasets/csv```. El fichero de ejemplo sólo tiene 1000 registros:


Para la lectura de datos con SparkR usamos la función ``read.df( )``

In [2]:
# Sólo indicamos un fichero concreto .... No hay problema Spark es muy listo ! ;)
df <- read.df("/root/TallerSparkR/datasets/csv/buy_costumers_amazon01.csv", "csv", header = "true", inferSchema = "true")
printSchema(df)
count(df)
head(df)

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- buy_hours: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- credit_card: long (nullable = true)


id,first_name,last_name,buy_hours,amount,credit_card
1,Julie,Knight,22:50,2865.65,30038230000000.0
2,Frank,Hernandez,23:48,3783.93,3577441000000000.0
3,Andrew,Patterson,19:32,581.7,3546533000000000.0
4,Charles,Snyder,9:44,3257.08,5002359000000000.0
5,Earl,Little,19:23,439.36,5048378000000000.0
6,Pamela,Torres,16:00,3773.75,3531928000000000.0


In [7]:
# Sólo indicamos un fichero concreto .... No hay problema Spark es muy listo ! ;)
df <- read.df("/root/TallerSparkR/datasets/csv/buy_costumers_amazon01.csv", "csv", header = "true", inferSchema = "true")
print("Estructura sin parsear:")
printSchema(df)

# Creamos un esquema para definir cual será la estructura del fichero a leer.
schema_amazon <- structType(structField("id", "integer"),
                     structField("first_name", "string"),
                     structField("last_name", "string"),
                     structField("buy_hours", "string"),
                     structField("amount", "double"),
                     structField("credit_card", "string"))

df <- read.df("/root/TallerSparkR/datasets/csv/buy_costumers_amazon01.csv", "csv", header = "true", schema=schema_amazon)
print("Estructura parseada:")
printSchema(df)
head(df)


[1] "Estructura sin parsear:"
root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- buy_hours: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- credit_card: long (nullable = true)
[1] "Estructura parseada:"
root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- buy_hours: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- credit_card: string (nullable = true)


id,first_name,last_name,buy_hours,amount,credit_card
1,Julie,Knight,22:50,2865.65,30038229243005
2,Frank,Hernandez,23:48,3783.93,3577440681063074
3,Andrew,Patterson,19:32,581.7,3546532606705637
4,Charles,Snyder,9:44,3257.08,5002359371334068
5,Earl,Little,19:23,439.36,5048377824454388
6,Pamela,Torres,16:00,3773.75,3531927547020434


Si queremos leer todos los ficheros de un directorio sin entrar en los subdirectorios:

In [8]:
# Esto leería todos los ficheros de la carpeta pero no entraría a cada subdirectorio... Spark no eres muy listo !
df <- read.df("/root/TallerSparkR/datasets/csv/", "csv", header = "true", inferSchema = "true", schema=schema_amazon)
count(df)

Comprobamos que estructura de ficheros y directorios tenemos. Observamos que en ```datasets/csv/``` existen subdirectorios, por lo que hay que usar comodines: *****


In [9]:
list.files("/root/TallerSparkR/datasets/csv",full.names = T,recursive = T)

Ejecutamos la siguiente instrucción para poder leer todo lo que cuelgue del directorio donde están los CSV.

In [10]:
# Leemos todos los ficheros CSV que haya en el directorio y todo en las subcarpetas... Spark que bien, no?
df_full <- read.df("/root/TallerSparkR/datasets/csv/*/", "csv", header = "true", inferSchema = "true")


Verificamos que ahora tenemos todos los datos cargados desde todos los ficheros ```CSV``` de la estructura de directorios:


In [11]:
count(df_full)

### Escritura

Una vez que hemos realizado transformaciones con los datos del SparkDataFrame, podemos dejarlo en memoria o bien pasarlo a DISCO (local) o HDFS (distribuido).

La API de fuentes de datos puede también ser usada para guardar y almacenar ``SparkDataFrames`` en múltiples formatos. Por ejemplo podemos almacenar el ``SparkDataDrame`` desde/hacia otros formatos como ``CSV``, ``PARQUET`` usando la función ``write.df``. 

Esto da mucha versatilidad, ya que independiente del tipo de fuente, podemos almacenarlo y leerlo desde cualquiera otra fuente. Como no podía ser de otra forma.

In [None]:
# Escritura desde CSV a CSV:
write.df(df_full, path = "datasets/results/df_full.csv", source = "csv", mode = "overwrite")

# Escritura desde CSV a PARQUET
write.df(df_full, path = "datasets/results/df_full.parquet", source = "parquet", mode = "overwrite")

En ``mode``	podemos usar 'append', 'overwrite', 'error', 'ignore'.

## PARQUET

Parquet es un formato de **almacenamiento en columnas** disponible para cualquier proyecto dentro del ecosistema de Hadoop, enfocado en la mejora del procesamiento de datos, modelado de datos y programación.

Parquet está diseñado para soportar esquemas de compresión y codificación muy eficientes. Múltiples proyectos han demostrado el impacto en el rendimiento de aplicar el correcto sistema de compresión y codificación a los datos. Parquet permite que los esquemas de compresión se especifiquen a nivel de columna.

Es un formato bien estructurado para ser usado **para problemas de BigData**.

La estructura del fichero se **segmenta en N columnas partidas en M grupos de filas**:

![Alt](https://raw.github.com/Parquet/parquet-format/master/doc/images/FileLayout.gif)


Leemos el dataset en formato Parquet, luego el resultado de la lectura es un SparkDataFrame, compatible con el trabajo en SparkR

In [12]:
# Leemos un dataset que contiene los datos en formato Parquet
df_parquet <- read.df("/root/TallerSparkR/datasets/parquet/", "parquet")

In [13]:
# Vemos la estructura del fichero y sus atributos
printSchema(df_parquet)

root
 |-- user_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- variable1: double (nullable = true)
 |-- variable2: double (nullable = true)
 |-- variable3: double (nullable = true)


In [14]:
# Vemos un resumen de los datos del fichero ...
head(df_parquet)

user_id,category,variable1,variable2,variable3
609f2d681b994fd6af8ed1e62b3be6e4,Shopping,13.445387,8.605127,8.091069
96cf2de06d520133d9604f513dc22504,Advertising,9.076202,11.514858,6.252373
e0669c33ee079c43bc22f2f2bb615ccb,News,10.806771,9.578176,5.613295
3d9d21ea525895a464f4eab75f03cee9,Arts_and_Entertainment,9.877052,5.284544,8.680847
69a728ead2a5219337f59a5909561d02,Science,6.08514,15.951622,11.22335
9ac8c3edbfc4a5aa340d91886a73d588,Advertising,9.253095,4.606095,10.748271


In [17]:
# Hacemos un pequeño cambio en el nombre de las columnas del SparkDataFrame.
colnames(df_parquet) <- c("user_id","cat","R1","R2","R3")

Vemos de nuevo el cambio de las columnas:

In [16]:
head(df_parquet)

user_id,cat,R1,R2,R3
609f2d681b994fd6af8ed1e62b3be6e4,Shopping,13.445387,8.605127,8.091069
96cf2de06d520133d9604f513dc22504,Advertising,9.076202,11.514858,6.252373
e0669c33ee079c43bc22f2f2bb615ccb,News,10.806771,9.578176,5.613295
3d9d21ea525895a464f4eab75f03cee9,Arts_and_Entertainment,9.877052,5.284544,8.680847
69a728ead2a5219337f59a5909561d02,Science,6.08514,15.951622,11.22335
9ac8c3edbfc4a5aa340d91886a73d588,Advertising,9.253095,4.606095,10.748271


In [18]:
# Contamos los registros del dataset ... es pequeño, no es BigData...
count(df_parquet)

Aplicamos unas transformaciones sencillas al SparkDataFrame, copiando la tabla en una Vista Temporal para poder trabajar con ella en SQL.

In [20]:
# Creamos una vista SparkDataFrame con el nombre "tmp_parquet".
# Este nombre tmp_parquet es el nombre que se usará ahora.
createOrReplaceTempView(df_parquet,"tmp_parquet")

Una vista temporal, permite trabajar con una copia temporal de los datos.

Contamos el número de registros:

In [23]:
# Usamos SparkSQL para hacer consultas a los datos.
count_rows <- sql("SELECT user_id,count(user_id) as registers FROM tmp_parquet group by user_id")
# Cuidado como obtener las cosas en SparkR: ---> Nooooooooo !!!! ;)
# print(collect(count_rows))

Compara el tiempo la opción anterior y la siguiente

In [24]:
head(count_rows)

user_id,num_users
33a39f26353e0f3e7005587c29a003d0,3
10238bec09a1e497dd163154bcc7d348,7
1a74c36cdef8b4cc1bd571df80a3fe77,4
6765378752d4267b406dc5c640829676,2
29da1de651a905d1b9afa74f43df74b3,4
a3a7755aaf44dc3006d3b74c7aba5298,4


Si usamos una vista temporal, está estará disponible durante toda la sesión a no ser que se elimine la vista temporal con `unpersist(....)`

Probamos con otro ejemplo, para saber las categorías que hay:

In [25]:
# createOrReplaceTempView(df_parquet,"tmp_parquet") --> No volvermos a cargarla!
# Usamos SparkSQL para hacer consultas a los datos.
categories <- sql("SELECT cat FROM tmp_parquet group by cat")

In [26]:
head(categories)

cat
Competitors
Science
Family_and_Parenting
Sports
Careers
Reference_and_Education


¿Cómo se calcularía el número de elementos de cada categoría?

In [None]:
# createOrReplaceTempView(df_parquet,"tmp_parquet")   --> No volvemos a cargarla !
# Usamos SparkSQL para hacer consultas a los datos.
categories_list <- sql("SELECT cat,count(user_id) as num_users FROM tmp_parquet group by cat")

In [None]:
head(categories_list)

¿Cuando usuarios distintos hay y que suma total tienen por usuario?

In [None]:
# createOrReplaceTempView(df_parquet,"tmp_parquet") --> No volvemos a cargarla!
# Usamos SparkSQL para hacer consultas a los datos.
table_summary <- sql("SELECT user_id,SUM(R1) as sum_index FROM tmp_parquet group by user_id")

In [None]:
count(table_summary)
head(table_summary)

In [28]:
unpersist(table_summary)

ERROR: Error in unpersist(tmp_parquet): objeto 'tmp_parquet' no encontrado


### Escritura de los datos

Al igual que con los otros formatos, se pueden exportar a cualquier otro.

In [None]:
# Escritura del fichero de formato parquet a formato parquet
write.df(finals, path = "/root/TallerSparkR/datasets/results/finals.parquet", source = "parquet", mode = "overwrite")

# Escritura del fichero de formato csv a formato a CSV
write.df(finals, path = "/root/TallerSparkR/datasets/results/finals.csv", source = "csv", mode = "overwrite")

## JSON

JSON, acrónimo de JavaScript Object Notation, es un formato de texto ligero para el intercambio de datos. 

JSON es un subconjunto de la notación literal de objetos de JavaScript aunque hoy, debido a su amplia adopción como alternativa a XML, se considera un formato de lenguaje independiente.

Es un formato actualmente muy utilizado ya que se ha impuesto como modelo para la entrada y salida de datos desde multiples y variados servicios web. Por ejemplo por citar varios:

- Twitter. https://dev.twitter.com/rest/public 
- Google APIs
- FaceBook API
- ...

Veamos en http://localhost:25980/tree/datasets como son los ficheros JSON por dentro.

Es muy utilizado este formato para servicios web de información, donde lo que prima es la sencillez y versatilidad. 



### Lectura de datos

La sintaxis es la misma, pero varía el identificador del tipo de fuente, en este caso JSON.

In [32]:
costumers <- read.df("/root/TallerSparkR/datasets/json/buy_costumers_amazon.json", "json")

Revisamos la información del SparkDataFrame y su estructura.

In [33]:
# Un resumen
head(costumers)

# El numero de registros
count(costumers)

# La estructura
printSchema(costumers)

amount,buy_hours,credit_card,first_name,id,last_name
2582.27,9:41,5010121228072899,Roy,1,Hunt
2244.43,10:11,3560573751972466,Martin,2,Robinson
626.54,10:07,6381020418347013,Julie,3,Medina
3786.54,18:15,3537886797721600,Scott,4,Edwards
1620.38,21:12,3558329354058037,Amanda,5,Morrison
326.73,9:24,5602231558845873,Kelly,6,Parker


root
 |-- amount: double (nullable = true)
 |-- buy_hours: string (nullable = true)
 |-- credit_card: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)


**¿Por qué no se lee correctamente el JSON?** Revisamos el dataset http://localhost:25980/tree/datasets y arreglamos el error.

La lectura de multiples ficheros es similar lo que ocurre con CSV, donde podemos indica que hay más archivos que queremos leer desde el directorio.

Lectura desde varios ficheros JSON:

In [36]:
costumers_double <- read.json(c("/root/TallerSparkR/datasets/json/buy_costumers_amazon.json", "/root/TallerSparkR/datasets/json/buy_costumers_amazon.json"))


¿Cuántos registros hay ahora?

In [37]:
count(costumers_double)

### Escritura de datos a formato JSON

Al igual que para los otros formatos se usa el mismo esquema para guardar datasets a disco.

In [None]:
write.df(costumers_double, path = "datasets/results/costumers.json", source = "json", mode = "overwrite")


Como siempre cerramos la sesion de Spark

In [38]:
sparkR.session.stop()

<HR>
# Zona de pruebas del NOTEBOOK en SparkR
![FooterSparkR](https://sites.google.com/site/manuparra/home/footer_SparkR_v2.png)


Escribe todas las pruebas en R que necesites a partir de aquí

<HR>