Notebook dedicado a explorar distintas formas de cache que se pueden aplicar sobre los datos de un DataFrame y algunos métodos de SparkSession.Catalog

Spark gestiona los metadatos asociados a cada tabla (gestionada o no). Esto se captura en el Catálogo (SparkSession.Catalog), Catalog es una abstracción de alto nivel en Spark SQL para almacenar metadatos. La funcionalidad del catálogo se amplió en Spark 2.x con nuevos métodos públicos que le permiten examinar los metadatos asociados con sus bases de datos, tablas y vistas. Spark 3.0 lo amplía para usar un catálogo externo

In [1]:
import spark.implicits._
val columns = Seq("name","age","weight", "height")
val data = Seq(("Jose", 40, 74, 1.63), ("María Isabel", 38, 58, 1.65), ("Antonio", 27, 60, 1.68), ("Norma", 63, 65, 1.54))
val df = data.toDF(columns:_*)

// declaramos variable para referenciar el catalogo
val catalog = spark.catalog

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.36:4042
SparkContext available as 'sc' (version = 3.0.2, master = local[*], app id = local-1615722556779)
SparkSession available as 'spark'


import spark.implicits._
columns: Seq[String] = List(name, age, weight, height)
data: Seq[(String, Int, Int, Double)] = List((Jose,40,74,1.63), (María Isabel,38,58,1.65), (Antonio,27,60,1.68), (Norma,63,65,1.54))
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]
catalog: org.apache.spark.sql.catalog.Catalog = org.apache.spark.sql.internal.CatalogImpl@47163b1a


In [2]:
df.cache() // cache la data
df.count() // Acción que materializa la cache


val df1 = df.select("name","age","weight")
// Se puede apreciar que los datos están cacheados al revisar el plan fisico y aparecer InMemoryTableScan
df1.explain(true)
df1.show() 

//df.unpersist() // Se elimina la cache

== Parsed Logical Plan ==
'Project [unresolvedalias('name, None), unresolvedalias('age, None), unresolvedalias('weight, None)]
+- Project [_1#4 AS name#13, _2#5 AS age#14, _3#6 AS weight#15, _4#7 AS height#16]
   +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
name: string, age: int, weight: int
Project [name#13, age#14, weight#15]
+- Project [_1#4 AS name#13, _2#5 AS age#14, _3#6 AS weight#15, _4#7 AS height#16]
   +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
Project [name#13, age#14, weight#15]
+- InMemoryRelation [name#13, age#14, weight#15, height#16], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [name#13, age#14, weight#15, height#16]

== Physical Plan ==
InMemoryTableScan [name#13, age#14, weight#15]
   +- InMemoryRelation [name#13, age#14, weight#15, height#16], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- LocalTableScan [name#13, age#14, weight#15, height#16]

+------------+---

df1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]


In [3]:
import org.apache.spark.sql.functions.lit

val df2 = df.withColumn("type", lit("person"))

// Muestra listado de tablas
catalog.listTables.show()

// Comprobamos si existe la tabla o vista con el nombre especificado
if (catalog.tableExists("persons")){
  // Eliminamos la vista
  catalog.dropTempView("persons")
}

// Creamos la vista
// NOTA: Crear la vista no significa que se haga cache sobre los datos
df2.createTempView("persons")

catalog.listTables.show()

// Cache de la vista mediante Spark SQL. Realiza el cache inmediatamente
spark.sql("cache table persons")
spark.sql("SELECT * FROM persons").show()
// Se elimina la Cache de la vista mediante Spark SQL
spark.sql("uncache table persons")

catalog.dropTempView("persons")

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

+-------+--------+-----------+---------+-----------+
|   name|database|description|tableType|isTemporary|
+-------+--------+-----------+---------+-----------+
|persons|    null|       null|TEMPORARY|       true|
+-------+--------+-----------+---------+-----------+

+------------+---+------+------+------+
|        name|age|weight|height|  type|
+------------+---+------+------+------+
|        Jose| 40|    74|  1.63|person|
|María Isabel| 38|    58|  1.65|person|
|     Antonio| 27|    60|  1.68|person|
|       Norma| 63|    65|  1.54|person|
+------------+---+------+------+------+



import org.apache.spark.sql.functions.lit
df2: org.apache.spark.sql.DataFrame = [name: string, age: int ... 3 more fields]
res1: Boolean = true


In [4]:
import org.apache.spark.sql.functions.{when, col}

val df3 = df2.withColumn("sex", when(col("name") === "Jose" || col("name") === "Antonio", "Male").otherwise("Female"))

if (catalog.tableExists("persons2")){
  catalog.dropTempView("persons2")
}

df3.createTempView("persons2")

// Cache lazy de la vista mediante Spark SQL
// En Spark 3.0, además de otras opciones, puede especificar una tabla como LAZY, lo que significa que solo debe almacenarse en caché cuando se usa por primera vez
// en lugar de inmediatamente
spark.sql("cache lazy table persons2")
spark.sql("SELECT * FROM persons2").show()

// Se elimina la Cache de la vista mediante Spark SQL
spark.sql("uncache table persons2")

catalog.dropTempView("persons2")

+------------+---+------+------+------+------+
|        name|age|weight|height|  type|   sex|
+------------+---+------+------+------+------+
|        Jose| 40|    74|  1.63|person|  Male|
|María Isabel| 38|    58|  1.65|person|Female|
|     Antonio| 27|    60|  1.68|person|  Male|
|       Norma| 63|    65|  1.54|person|Female|
+------------+---+------+------+------+------+



import org.apache.spark.sql.functions.{when, col}
df3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 4 more fields]
res2: Boolean = true


In [5]:
val df4 = df3.withColumn("country", lit("Venezuela"))
if (catalog.tableExists("persons3")){
  catalog.dropTempView("persons3")
}

df4.createTempView("persons3")

// Comprobamos que la vista este cacheada
println(s"Is person3 cached -> ${catalog.isCached("persons3")}")

// Almacena en memoria caché la tabla/vista especificada.
catalog.cacheTable("persons3")

// Comprobamos que la vista está en caché
println(s"Is person3 cached -> ${catalog.isCached("persons3")}")

// Intentamos almacenar en caché los datos de una vista ya previamente guardados (en caché)
spark.sql("cache lazy table persons3")
spark.sql("SELECT * FROM persons3").show()

// Se elimina la Cache de la vista mediante Spark SQL, que fue cacheada mediante la función de SparkSession.Catalog.cacheTable
spark.sql("uncache table persons3")
// Si intentamos eliminar la cache de una tabla/vista que no existe nos arroja => AnalysisException: Table or view not found: default.persons4;
//spark.sql("uncache table persons4")

// Comprobamos que la vista ya no esta cacheada
println(s"Is person3 cached -> ${catalog.isCached("persons3")}")

catalog.dropTempView("persons3")



Is person3 cached -> false
Is person3 cached -> true
+------------+---+------+------+------+------+---------+
|        name|age|weight|height|  type|   sex|  country|
+------------+---+------+------+------+------+---------+
|        Jose| 40|    74|  1.63|person|  Male|Venezuela|
|María Isabel| 38|    58|  1.65|person|Female|Venezuela|
|     Antonio| 27|    60|  1.68|person|  Male|Venezuela|
|       Norma| 63|    65|  1.54|person|Female|Venezuela|
+------------+---+------+------+------+------+---------+

Is person3 cached -> false


df4: org.apache.spark.sql.DataFrame = [name: string, age: int ... 5 more fields]
res3: Boolean = true


In [6]:
// Comprobamos que el método isCached del catalogo funciona con los métodos cache tanto de los DataFrames como de SparkSQL

if (catalog.tableExists("df")){ // verificamos la existencia de la tabla/vista para evitar  -> AnalysisException: Table or view not found: df;
  println(s"1- Is df cached -> ${catalog.isCached("df")}")
}

if (catalog.tableExists("df2")){ // verificamos la existencia de la tabla/vista para evitar  -> AnalysisException: Table or view not found: df2;
  println(s"1- Is df2 cached -> ${catalog.isCached("df2")}")
}

if (catalog.tableExists("df3")){ // verificamos la existencia de la tabla/vista para evitar  -> AnalysisException: Table or view not found: df3;
  println(s"1- Is df3 cached -> ${catalog.isCached("df3")}")
}

df.cache()
df.createOrReplaceTempView("df")
df.count()

df2.cache()
df2.createOrReplaceTempView("df2")
df2.count()

df3.createOrReplaceTempView("df3")
spark.sql("cache table df3")

println(s"2- Is df cached -> ${catalog.isCached("df")}")
println(s"2- Is df2 cached -> ${catalog.isCached("df2")}")
println(s"2- Is df3 cached -> ${catalog.isCached("df3")}")

df.unpersist()
df3.unpersist()

println(s"3- Is df cached despues del unpersist de df -> ${catalog.isCached("df")}")
// Funciona la eliminación del cache tanto con df3.unpersist como con sparkSQL utilizando uncache
println(s"3- Is df3 cached despues del unpersist de df3 -> ${catalog.isCached("df3")}")

catalog.clearCache()
// Funciona la eliminación del cache también con catalog.clearCache()
println(s"3- Is df2 cached despues del catalog.clearCache() -> ${catalog.isCached("df2")}")

// NOTA: Al ejecutar la misma celda una y otra vez los DataFrame.cache() no almacenan los datos en cache, 
// mientras que únicamente funcionan aquellos invocados mediante Spark SQL
// No sé si se deba a algo propio de la plataforma o en cambio atañe a Spark



2- Is df cached -> true
2- Is df2 cached -> true
2- Is df3 cached -> true
3- Is df cached despues del unpersist de df -> false
3- Is df3 cached despues del unpersist de df3 -> false
3- Is df2 cached despues del catalog.clearCache() -> false
