# Módulo Spark & Scala

## Spark SQL
#### @FJPiqueras - KeepCoding

#### A tener en cuenta que los notebooks de jupyter generan por defecto:

##### SparkContext en la variable sc
##### SparkSession en la variable spark

##### Notebook powered by Spark 2.2.0

### En primer lugar definimos las case classes con las que vamos a trabajar en todo el módulo. Se trata de un dataset de películas que contiene tres ficheros distintos. users.dat, movies.dat y ratings.dat

### Carga de dataset en formato json

In [None]:
 import spark.implicits._

 val df = spark.read.json("Datasets/SparkSQL/personas.json")

### printSchema - Muestra el schema del dataframe, al cargar un fichero de tipo json que trae un esquema se estructura automáticamente.

In [None]:
 df.printSchema()

### Primeras queries con la API SparkSQL

In [None]:
  //show muestra por pantalla la información del dataframe en formato tabla
  df.select("name").show
  df.select("name", "age").show
  df.select("*").show

### Operaciones simples con la api

In [None]:
 df.select($"name", $"age" + 1).show()

### filter - Filtrados, equivalente en sql a "select * from personas p where age>21"

In [None]:
 df.filter($"age" > 21).show

### where - Condiciones

In [None]:
df.where("age>21").show

### limit - obtiene las filas indicadas por parámetro

In [None]:
df.limit(1).show

### withColumnRenamed - Renombrar columnas

In [None]:
val newDf=df.withColumnRenamed("age", "edad")
newDf.limit(1).show

### order by - Ordenación por uno de los campos del df

In [None]:
newDf.orderBy("edad", "name").show
newDf.sort("name").show

### createOrReplaceTempView - Registra el dataframe en una tabla con el nombre pasado por parámetro

In [None]:
 df.createOrReplaceTempView("personas")

### De la API de Spark al uso de programación declarativa

In [None]:
spark.sql("SELECT * FROM personas").show()

### Aplicando queries sobre una nueva sesión de Spark

In [None]:
spark.newSession().sql("SELECT * FROM personas").show()

### Queries "avanzadas"

In [None]:
spark.sql("SELECT name, age FROM personas WHERE age >= 13 AND age <= 19 and name like 'J%'").show

### Queries al vuelo directamente sobre fichero

In [None]:
spark.sql("SELECT * FROM json.`Datasets/SparkSQL/personas.json`").show

### Carga de ficheros estructurados dentro de una clase

In [10]:
case class Person(name: String, age: Long)

import spark.implicits._
val peopleDS = spark.read.json("Datasets/SparkSQL/personas.json").as[Person]

peopleDS.select("*").show

+----+------+-------+
| age|idDpto|   name|
+----+------+-------+
|null|     1|Michael|
|  30|     3|   Andy|
|  19|     4| Justin|
|null|     1| Javier|
|  38|     2|  Laura|
|  54|     5|Nicolas|
|  34|  null| Raquel|
+----+------+-------+



defined class Person
import spark.implicits._
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, idDpto: bigint ... 1 more field]


### Pasando a datasets no estructurados a información estructurada

In [12]:
 val peopleDF = spark.sparkContext
      .textFile("Datasets/SparkSQL/personas.txt")
      .map(_.split(",")).map(row => Person(row(0), row(1).trim.toLong))
      .toDF()

peopleDF.show

+----+------+-------+
| age|idDpto|   name|
+----+------+-------+
|null|     1|Michael|
|  30|     3|   Andy|
|  19|     4| Justin|
|null|     1| Javier|
|  38|     2|  Laura|
|  54|     5|Nicolas|
|  34|  null| Raquel|
+----+------+-------+



org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): java.lang.ClassCastException: Person cannot be cast to Person

In [None]:
peopleDF.select("*").show

In [None]:
spark.sql("select * from people").show

### Aplicando maps sobre SparkSQL

In [None]:
//Obtén la edad de los teenagers de la tabla personas
val dfTeenagers = spark.sql("select * from personas where age <20").map(row => "Name: " + row(0)).show()

### Funciones agregadas: avg, count

In [None]:
newDf.groupBy().avg("edad").show

In [None]:
newDf.groupBy("name", "edad").count().show

In [None]:
### Funciones agregadas - max, min

In [None]:
newDf.agg(min("edad")).show
newDf.agg(max("edad")).show

//Equivalente con lenguaje declarativo
newDf.createOrReplaceTempView("personasNew")
spark.sql("select max(edad) from personasNew").show

### Conversión Dataframe en RDD

In [19]:
val dfToRDD = spark.read.json("Datasets/SparkSQL/personas.json")

dfToRDD.rdd

dfToRDD: org.apache.spark.sql.DataFrame = [age: bigint, idDpto: bigint ... 1 more field]
res12: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[63] at rdd at <console>:31


### Operaciones con varios dataframes
### join

In [3]:
 val dfPersonas = spark.read.json("Datasets/SparkSQL/personas.json")
 val dfDptos = spark.read.json("Datasets/SparkSQL/departamentos.json")

dfPersonas.show
dfDptos.show

+----+------+-------+
| age|idDpto|   name|
+----+------+-------+
|null|     1|Michael|
|  30|     3|   Andy|
|  19|     4| Justin|
|null|     1| Javier|
|  38|     2|  Laura|
|  54|     5|Nicolas|
|  34|  null| Raquel|
+----+------+-------+

+---+--------------+
| id|          name|
+---+--------------+
|  1|       BigData|
|  2|     Analítica|
|  3|      Business|
|  4|          RRHH|
|  5|OficinaTécnica|
|  6|         Staff|
+---+--------------+



dfPersonas: org.apache.spark.sql.DataFrame = [age: bigint, idDpto: bigint ... 1 more field]
dfDptos: org.apache.spark.sql.DataFrame = [id: bigint, name: string]


In [9]:
val dfJoin = spark.sql("select p.name, d.name from personasJoin p, departamentosJoin d where p.idDpto=d.id")
dfJoin.show

+-------+--------------+
|   name|          name|
+-------+--------------+
|Michael|       BigData|
|   Andy|      Business|
| Justin|          RRHH|
| Javier|       BigData|
|  Laura|     Analítica|
|Nicolas|OficinaTécnica|
+-------+--------------+



dfJoin: org.apache.spark.sql.DataFrame = [name: string, name: string]


In [5]:
dfPersonas.createOrReplaceTempView("personasJoin")
dfDptos.createOrReplaceTempView("departamentosJoin")

### UDFs

In [14]:
spark.udf.register("strLen", (s: String) =>  s.length())

res7: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))


In [17]:
spark.sql("SELECT name, strLen(name) from personasJoin").show

+-------+----------------+
|   name|UDF:strLen(name)|
+-------+----------------+
|Michael|               7|
|   Andy|               4|
| Justin|               6|
| Javier|               6|
|  Laura|               5|
|Nicolas|               7|
| Raquel|               6|
+-------+----------------+



### Save - Guardado de los resultados de un dataframe

In [None]:
dfTeenagers.drop(df.age)

### Interactuando con Hive

In [None]:
//No funciona en jupyter por un problema con el hive metastore
//val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

In [None]:
sqlContext.sql("CREATE TABLE IF NOT EXISTS people (age INT, name STRING)")

In [None]:
spark.sql("LOAD DATA LOCAL INPATH 'Datasets/Dataset2/ratings.data' INTO TABLE ratings")

In [None]:
spark.sql("FROM src SELECT key, value").collect().foreach(println)

### Continuación de ejercicios de SparkSQL en VM