  ## Parquet

In [30]:
import org.apache.spark.sql._
import org.apache.spark.sql.DataFrameWriter

import org.apache.spark.sql._
import org.apache.spark.sql.DataFrameWriter


In [3]:
val spark = SparkSession.builder.appName("Read").getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@45719e9e


In [4]:
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW ejemplo USING parquet OPTIONS(path "ejemplo.parquet")""")

res0: org.apache.spark.sql.DataFrame = []


In [10]:
val ejDF=spark.sql("""SELECT name, (age+20) as ageModif FROM ejemplo""")
ejDF.show()

+------+--------+
|  name|ageModif|
+------+--------+
|Brooke|      40|
| Brook|      43|
| Jules|      50|
| Denny|      51|
|    TD|      55|
+------+--------+



ejDF: org.apache.spark.sql.DataFrame = [name: string, ageModif: int]


Esto nos genera un conjunto de ficheros compactos y comprimidos parquet en el path especificado.

In [11]:
ejDF.write.format("parquet").mode("overwrite").option("compression","snappy").save("ejemploModif.parquet")

Bucketing es una técnica para descomponer conjuntos de datos en partes más manejables. Por ejemplo, supongamos que una tabla en que se usa "date" como partición de nivel superior y "employee_id" como partición de segundo nivel genera demasiadas particiones pequeñas. En cambio, si clasificamos la tabla de empleados y usamos "employee_id" como columna de clasificación, el valor de esta columna se dividirá mediante un número definido por el usuario en depósitos. Los registros con el mismo "employee_id" siempre se almacenarán en el mismo cubo. Suponiendo que la cantidad de employee_ides mucho mayor que la cantidad de cubos, cada cubo tendrá muchos employee_id. Al crear la tabla, puede especificar como (en HQL) "CLUSTERED BY (employee_id) INTO XX  BUCKETS"; donde XX es el número de cubos. El agrupamiento tiene varias ventajas. El número de cubos es fijo para que no fluctúe con los datos. Si dos tablas están agrupadas por employee_id, Hive puede crear un muestreo lógicamente correcto. La agrupación también ayuda a realizar uniones eficientes en el lado del mapa, etc

In [137]:
ejDF.write.format("parquet").mode("overwrite").option("compression","snappy").bucketBy(2,"name").saveAsTable("ejemploModifi.parquet")
ejDF.write.format("parquet").mode("overwrite").option("compression","snappy").partitionBy("name").save("ejemploModifi.parquet")
ejDF.repartition(2, col("name")).write.format("csv").mode("overwrite").option("path", "ipaÇ").bucketBy(2,"name").saveAsTable("ejemploModifii.parquet")

In [138]:
ejDF.write.format("parquet").mode("overwrite").option("compression","snappy").bucketBy(2,"name").save("ejemploModifi.parquet")

In [139]:
ejDF.write.format("parquet").mode("overwrite").option("compression","snappy").bucketBy(2,"name").saveAsTable("ejemploModif.parquet")

bucketBy() no funciona en este entorno....

In [140]:
ejDF.write.mode("overwrite").saveAsTable("ejemploModificado.parquet")

## JSON

In [58]:
ejDF.write.format("json").mode("overwrite").option("compresseion","snappy").save("ejemploJsonDelays")

In [59]:
val eJsonDF = spark.read.format("json").load("ejemploJsonDelays")

eJsonDF: org.apache.spark.sql.DataFrame = [ageModif: bigint, name: string]


In [60]:
eJsonDF.show()

+--------+------+
|ageModif|  name|
+--------+------+
|      40|Brooke|
|      43| Brook|
|      51| Denny|
|      50| Jules|
|      55|    TD|
+--------+------+



se puede elegir el formato de codificacion .option("charset", "UTF-16BE") 

Podemos también crear View's

In [62]:
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW ejemplo USING json OPTIONS(path "ejemploJsonDelays")""")
spark.sql("SELECT * FROM ejemplo").show()

+--------+------+
|ageModif|  name|
+--------+------+
|      40|Brooke|
|      43| Brook|
|      51| Denny|
|      50| Jules|
|      55|    TD|
+--------+------+



## CSV

El modo PERMISSIVE establece valores de campo nulos cuando se detectan registros corruptos. De manera predeterminada, si no especifica el modo de parámetro, Spark establece el valor PERMISSION.

El modo DROPMALFORMED ignora los registros corruptos. Lo que significa que, si elige este tipo de modo, los registros corruptos no aparecerán en la lista.

A diferencia del modo DROPMALFORMED y PERMISSIVE, FAILFAST lanza una excepción cuando detecta registros dañados. 

In [85]:
val ejCSV = spark.read.format("csv").schema("DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT")
.option("header","true")
.option("nullValue"," ")
.option("mode","FAILFAST") //Sale si se encuentra algun error
.load("summaryFlightCSV")

ejCSV: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [86]:
ejCSV.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

Probemos los metodos .option("mode","FAILFAST") y .option(nullValue, " ") con JSON

In [89]:
val ejJSON = spark.read.format("json").schema("DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT")
.option("header","true")
.option("nullValue"," ")
.option("mode","PERMISSIVE") //Sale si se encuentra algun error
.load("summaryFlightJSON")

ejJSON: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [90]:
ejJSON.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [103]:
ejJSON.repartition(9, col("count")).write.format("json").mode("overwrite").saveAsTable("ejemploModifiFLIGHTJSON")
ejJSON.repartition(9, col("count")).write.format("json").mode("overwrite").save("ejemploModifiFLIGHTJSON")

In [105]:
spark.sql("SELECT * FROM ejemploModifiFLIGHTJSON").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|   Marshall Islands|   39|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|         The Bahamas|      United States|  955|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|       United States|           Suriname|   34|
|       United States|              Chile|  185|
|        Burkina Faso|      United States|    1|
|       United States|             Poland|   33|
|    Saint Barthelemy|      United States|   39|
|            Djibouti|      United States|    1|
|       United State

También se pueden crear View's usando Spark SQL

In [106]:
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW summaryCSV
USING csv
OPTIONS (
path "summaryFlightCSV",
header "true",
inferSchema "true",
mode "FAILFAST"
)""")

res82: org.apache.spark.sql.DataFrame = []


In [107]:
spark.sql("SELECT * FROM summaryCSV").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

## AVRO

Problemas con usar AVRO.

In [111]:
import org.apache.spark.sql.avro._

<console>: 30: error: object avro is not a member of package org.apache.spark.sql

In [113]:
val ejAVRO = spark.read.format("avro").load("summaryAVRO")

org.apache.spark.sql.AnalysisException:   Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".

Crear una tabla SQL usando un data source AVRO no es distinto de con PARQUET, JSON, CSV.

## ORC

In [114]:
val ejORC = spark.read.format("orc").load("ejemploORC.orc")

ejORC: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [115]:
ejORC.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

Las view funcionan exactamente igual

In [116]:
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW summaryORC
USING orc
OPTIONS (
path "ejemploORC.orc"
)""")

res86: org.apache.spark.sql.DataFrame = []


In [117]:
spark.sql("SELECT * FROM summaryORC").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

Y la escritura también igual

In [118]:
ejORC.write.format("orc").mode("overwrite").option("compression", "snappy").save("df_ORC")

## IMAGES

In [119]:
import org.apache.spark.ml.source.image

import org.apache.spark.ml.source.image


In [120]:
val imagesDF = spark.read.format("image").load("ejemploIMAGE")

imagesDF: org.apache.spark.sql.DataFrame = [image: struct<origin: string, height: int ... 4 more fields>, label: int]


In [123]:
imagesDF.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: integer (nullable = true)



In [126]:
imagesDF.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show()

+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    1|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    1|
|   288|  384|        3|  16|    1|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    1|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    1|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
|   288|  384|        3|  16|    0|
+------+-----+---------+----+-----+
only showing top 20 rows



## BINARY FILES

In [127]:
val binaryFilesDF = spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg") // le dices el tipo de formato
.load("ejemploIMAGE")

binaryFilesDF: org.apache.spark.sql.DataFrame = [path: string, modificationTime: timestamp ... 3 more fields]


In [130]:
binaryFilesDF.show(5)

+--------------------+--------------------+------+--------------------+-----+
|                path|    modificationTime|length|             content|label|
+--------------------+--------------------+------+--------------------+-----+
|file:/home/jovyan...|2022-06-23 10:04:...| 55037|[FF D8 FF E0 00 1...|    0|
|file:/home/jovyan...| 2022-06-23 10:05:09| 54634|[FF D8 FF E0 00 1...|    1|
|file:/home/jovyan...|2022-06-23 10:04:...| 54624|[FF D8 FF E0 00 1...|    0|
|file:/home/jovyan...|2022-06-23 10:04:...| 54505|[FF D8 FF E0 00 1...|    0|
|file:/home/jovyan...|2022-06-23 10:04:...| 54475|[FF D8 FF E0 00 1...|    0|
+--------------------+--------------------+------+--------------------+-----+
only showing top 5 rows



In [131]:
val binaryFilesDF2 = spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg") // le dices el tipo de formato
.option("recursiveFileLookup","true")
.load("ejemploIMAGE")

binaryFilesDF2: org.apache.spark.sql.DataFrame = [path: string, modificationTime: timestamp ... 2 more fields]


Notemos que la columna label desaparece cuando recursiveFileLookup es true.

In [136]:
binaryFilesDF2.show()
binaryFilesDF2.select(col("content")).show()

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/home/jovyan...|2022-06-23 10:04:...| 55037|[FF D8 FF E0 00 1...|
|file:/home/jovyan...| 2022-06-23 10:05:09| 54634|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54624|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54505|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54475|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54449|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54440|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54377|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:05:...| 54365|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:05:...| 54330|[FF D8 FF E0 00 1...|
|file:/home/jovyan...|2022-06-23 10:04:...| 54289|[FF D8 FF E0 0