##  Descripción del escenario.

El siguiente ejemplo carga dos tablas de Cassandra en Spark y luegos las procesa mediante SparkSQL 




### Se Cargan las librerias

se utilizan la libreria pyspark.sql la cual permite importat datos y cargarlos en dataframe de Spark

pyspark.sql.function: Incluye las bibliotecas necesarias para la carga de dataframe. 
pyspark.sql.types: Lista de tipos de Datos 

In [None]:
import time
from pyspark.sql.functions import UserDefinedFunction, lit, explode, desc, expr
from pyspark.sql.types import StringType, ArrayType

### Carga de la tabla movies desde Cassandra 

Se cargan los elementos de la tabla movies que se utilizaron en el ejemplo anterior (02_InsertCassandra). Luego se le notifica a Spark que hará una lectura de Cassandra.

In [None]:
inicio = time.time()

# Se seleccionan las opciones que va a leer en Cassandra, ejemplo Table, la clave y luego se define la división 
# de los datos. 

load_options = { "table": "movies", "keyspace": "test", "spark.cassandra.input.split.size_in_mb": "10"}
dfMovies = spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load()
dfMovies.show()
fin = time.time()

print("Tiempo: " + str((fin - inicio)))

### Carga de la tabla ratings desde Cassandra

De manera similar se ha cargado la tabla de Ratings que se tiene almacenada en Cassandra y que se calculo en el ejemplo anterior (02_InsertCassandra) y se almacenan en memoria en la variable dfRating. 

In [None]:
inicio = time.time()

# Se seleccionan las opciones que va a leer en Cassandra, ejemplo Table, la clave y luego se define la división 
# de los datos. 

load_options = { "table": "ratings", "keyspace": "test", "spark.cassandra.input.split.size_in_mb": "10"}
dfRatings = spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load()
fin = time.time()
dfRatings.show()

print("Tiempo: " + str((fin - inicio)))

### Comprobamos el número de registros cargados

Luego contamos el número de registro que se han extraido de Cassandra

In [None]:
print("Movies: " + str(dfMovies.count()))
print("Ratings: " + str(dfRatings.count()))

### Procesamiento de los datos

Obtenemos las 100 películas más populares

Por cada usuario, se añaden columnas con las 100 películas más populares y la puntuación que le ha dado cada uno, en las columnas que no tienen puntuación para las películas aparecen como NULL. 

In [None]:
inicio = time.time()

# Se realiza un agrupación de los datos por movie_id,  luego se cuentan por cada id. Se utiliza el sort para ordenarla
# y luego limita a las primeras 100 y finalmete obtiene un collect de todos los workers


popular = dfRatings.groupBy("movie_id").count().sort(desc("count")).limit(150).rdd.map(lambda r : r.movie_id).collect()

# En este punto se ha realizado un pivot de los datos para las peliculas más populares. 

ratings_pivot = dfRatings.groupBy("user_id").pivot("movie_id", popular).agg(expr("coalesce(first(rating),3)").cast("double"))
ratings_pivot.show()
fin = time.time()

print("Tiempo: " + str((fin - inicio)))