In [0]:
##########################################################################################################
# VERSION  	DESARROLLADOR             FECHA        DESCRIPCION
# -------------------------------------------------------------
#  1        Walter Albites Azarte     02/10/2022   Empezando con Delta lake
##########################################################################################################

### Empezando con <img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

Delta Lake es una capa de almacenamiento de código abierto que aporta confiabilidad a los lagos de datos. Delta Lake proporciona transacciones ACID, control escalable de metadatos y unifica el procesamiento de datos de streaming y por lotes. Delta Lake se ejecuta sobre su lago de datos existente y es totalmente compatible con las API de Apache Spark. Delta Lake en Azure Databricks permite configurar el lago Delta en función de los patrones de las cargas de trabajo.<br><br>

* **Transacciones ACID**: garantiza la integridad de los datos y la coherencia de lectura con datos concurrentes y complejos.
* **Sistema unificado para Streaming y Bacth**: La ingesta de datos en streaming, el reabastecimiento del histórico en batch y las consultas interactivas funcionan de inmediato para ambos sistemas de obtención de datos
* **Schema Enforcement and Evolution**: asegura la estructura de datos, y permite actualizarla bajo demanda facilmente
* **Recuperación de datos**: Consulta versiones anteriores de la tabla por tiempo o número de versión.
* **Elimina y actualiza**: admite la eliminación y la inserción en tablas con APIs.
* **Formato abierto**: almacenado como formato Parquet en el almacenamiento de blobs.
* **Historial de auditoría**: Historial de todas las operaciones que ocurrieron en la tabla.
* **Gestión de metadatos escalables**: Capaz de manejar millones de archivos, escalan las operaciones de metadatos con Spark.<br><br>
<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

In [0]:
#Importar librerias 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

In [0]:
# Para el caso practico vamos a trabajar con el archivo ubigeo.csv

In [0]:
%fs
ls FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/DE_CATEGORY/,DE_CATEGORY/,0,0
dbfs:/FileStore/tables/Listado_Nombres_Unicos_Centro_v10.csv,Listado_Nombres_Unicos_Centro_v10.csv,4839434,1639223918000
dbfs:/FileStore/tables/base_datos_muestra.csv,base_datos_muestra.csv,68865954,1664320235000
dbfs:/FileStore/tables/de_categoria/,de_categoria/,0,0
dbfs:/FileStore/tables/state_income-9f7c5.csv,state_income-9f7c5.csv,2412,1586450259000
dbfs:/FileStore/tables/ubigeo.csv,ubigeo.csv,109495,1588690184000


In [0]:
# leer el csv para mantenerlo en un dataframe de spark
path="/FileStore/tables/ubigeo.csv"
df_ubigeo=spark.read.options(header="true").csv(path)
df_ubigeo.show(5)

+------+--------------------+----------+------------+------------+------------+
|UBIGEO|            DISTRITO| PROVINCIA|DEPARTAMENTO|     LATITUD|    LONGITUD|
+------+--------------------+----------+------------+------------+------------+
|060401|               CHOTA|     CHOTA|   CAJAMARCA|-6.555281874|-78.64146956|
|010302|          CHISQUILLA|   BONGARA|    AMAZONAS| -5.89320537|-77.78280858|
|120807|SANTA BARBARA DE ...|     YAULI|       JUNÍN|-11.21381302|-76.42904358|
|140304|             JAYANCA|LAMBAYEQUE|  LAMBAYEQUE|-6.394345534|-79.81451536|
|200601|             SULLANA|   SULLANA|       PIURA|-4.898786419|-80.63618267|
+------+--------------------+----------+------------+------------+------------+
only showing top 5 rows



In [0]:
df_ubigeo.count()

Out[28]: 1814

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png)  Convertir a formato Delta Lake
Delta Lake es 100% compatible con Apache Spark; lo que facilita comenzar si ya usa Spark para sus flujos de trabajo de big data.
Delta Lake cuenta con API para SQL,Python y Scala.

<img src="https://databricks.com/wp-content/uploads/2020/12/simplysaydelta.png" width=600/>

In [0]:
#Crear una capa Bronze
dbutils.fs.mkdirs("dbfs:/FileStore/Bronze")

In [0]:
#Path para guardar el dataframe en formato delta
DataPath="/FileStore/Bronze/ubigeo/"

In [0]:
# write to Delta Lake
df_ubigeo.write.mode("overwrite").format("delta").save(DataPath)

In [0]:
# Verificamos los archivos generados

In [0]:
%fs
ls /FileStore/Bronze/ubigeo/

path,name,size,modificationTime
dbfs:/FileStore/Bronze/ubigeo/_delta_log/,_delta_log/,0,0
dbfs:/FileStore/Bronze/ubigeo/part-00000-1f62cdc8-10b8-4f30-913b-6e9fbaf174e3-c000.snappy.parquet,part-00000-1f62cdc8-10b8-4f30-913b-6e9fbaf174e3-c000.snappy.parquet,2237,1665072792000
dbfs:/FileStore/Bronze/ubigeo/part-00000-21f47d7c-4943-43a4-b702-95c5204c744e-c000.snappy.parquet,part-00000-21f47d7c-4943-43a4-b702-95c5204c744e-c000.snappy.parquet,2237,1665072539000
dbfs:/FileStore/Bronze/ubigeo/part-00000-2cb2b102-cf32-4ffa-9657-ddfc3e4d499b-c000.snappy.parquet,part-00000-2cb2b102-cf32-4ffa-9657-ddfc3e4d499b-c000.snappy.parquet,67834,1665071996000
dbfs:/FileStore/Bronze/ubigeo/part-00000-2fcc7799-4a3a-487a-80a7-1e95c29ffa24-c000.snappy.parquet,part-00000-2fcc7799-4a3a-487a-80a7-1e95c29ffa24-c000.snappy.parquet,67574,1664227671000
dbfs:/FileStore/Bronze/ubigeo/part-00000-a573a0e3-f4a7-43f2-9b16-34167d038a86-c000.snappy.parquet,part-00000-a573a0e3-f4a7-43f2-9b16-34167d038a86-c000.snappy.parquet,67847,1664497193000
dbfs:/FileStore/Bronze/ubigeo/part-00000-c0cf2b7d-153e-424d-9f35-1d3d709fe4d1-c000.snappy.parquet,part-00000-c0cf2b7d-153e-424d-9f35-1d3d709fe4d1-c000.snappy.parquet,67920,1665076334000
dbfs:/FileStore/Bronze/ubigeo/part-00000-c36415b9-5998-4df9-89da-1fb0844597e0-c000.snappy.parquet,part-00000-c36415b9-5998-4df9-89da-1fb0844597e0-c000.snappy.parquet,67574,1665077021000
dbfs:/FileStore/Bronze/ubigeo/part-00003-6b5c53d9-509b-4e2f-b33e-c1add7231db8-c000.snappy.parquet,part-00003-6b5c53d9-509b-4e2f-b33e-c1add7231db8-c000.snappy.parquet,2211,1665075816000
dbfs:/FileStore/Bronze/ubigeo/part-00007-af6f0afd-9b3d-49d9-9cc7-7d67464957b5-c000.snappy.parquet,part-00007-af6f0afd-9b3d-49d9-9cc7-7d67464957b5-c000.snappy.parquet,2301,1665075816000


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Metadata

Almacenamos el nombre de la tabla, la ruta, la información de la base de datos en Hive metastore,
el esquema real se almacena en el directorio `_delta_log` como se muestra a continuación.

In [0]:
%fs
ls /FileStore/Bronze/ubigeo/_delta_log/

path,name,size,modificationTime
dbfs:/FileStore/Bronze/ubigeo/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1664227684000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1664227685000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1664227685000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2201,1664227683000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000000.json,00000000000000000000.json,2014,1664227676000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000001.crc,00000000000000000001.crc,2274,1664497199000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000001.json,00000000000000000001.json,2392,1664497195000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000002.crc,00000000000000000002.crc,2274,1665072001000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000002.json,00000000000000000002.json,1791,1665071997000
dbfs:/FileStore/Bronze/ubigeo/_delta_log/00000000000000000003.crc,00000000000000000003.crc,2277,1665072543000


In [0]:
#las consultas Spark SQL pueden ejecutarse directamente en un directorio de datos, para delta use la siguiente sintaxis:
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(DataPath)))

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA
60401,CHOTA,CHOTA,CAJAMARCA,-6.555281874,-78.64146956,
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858,
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358,
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536,
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267,


In [0]:
%sql
describe delta.`/FileStore/Bronze/ubigeo/`

col_name,data_type,comment
UBIGEO,string,
DISTRITO,string,
PROVINCIA,string,
DEPARTAMENTO,string,
LATITUD,string,
LONGITUD,string,
FECHA,date,
,,
# Partitioning,,
Not partitioned,,


In [0]:
%sql
SELECT * FROM delta.`/FileStore/Bronze/ubigeo/` LIMIT 5

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858,2022-10-06,PERU
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358,2022-10-06,PERU
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536,2022-10-06,PERU
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267,2022-10-06,PERU
61303,CATACHE,SANTA CRUZ,CAJAMARCA,-6.735890508,-79.04859161,2022-10-06,PERU


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Transacciones ACID:
Ver el registro de transacciones de Delta Lake

In [0]:
%sql DESCRIBE HISTORY delta.`/FileStore/Bronze/ubigeo/`

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2022-10-06T17:23:42.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,6.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12
6,2022-10-06T17:12:15.000+0000,4454288869856112,walbites@hotmail.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""UBIGEO""], batchId -> 0, auto -> false)",,List(954572819838097),1006-135601-ffhsikgx,5.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 72346, p25FileSize -> 67920, minFileSize -> 67920, numAddedFiles -> 1, maxFileSize -> 67920, p75FileSize -> 67920, p50FileSize -> 67920, numAddedBytes -> 67920)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-10-06T17:03:37.000+0000,4454288869856112,walbites@hotmail.com,MERGE,"Map(predicate -> (l.UBIGEO = m.UBIGEO), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(954572819838097),1006-135601-ffhsikgx,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 8102, numTargetRowsInserted -> 1, scanTimeMs -> 4569, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3008)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-10-06T16:13:12.000+0000,4454288869856112,walbites@hotmail.com,UPDATE,Map(predicate -> (cast(ubigeo#4102 as int) = 60401)),,List(954572819838097),1006-135601-ffhsikgx,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3103, scanTimeMs -> 71, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 3021)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-10-06T16:08:59.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2237)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-06T15:59:57.000+0000,4454288869856112,walbites@hotmail.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.default.t_ubigeo_delta.ubigeo AS INT) = 60401)""])",,List(954572819838097),1006-135601-ffhsikgx,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1813, numAddedChangeFiles -> 0, executionTimeMs -> 4889, numDeletedRows -> 1, scanTimeMs -> 2187, numAddedFiles -> 1, rewriteTimeMs -> 2698)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-09-30T00:19:55.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0929-235617-x7tith3d,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67847)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-09-26T21:27:56.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0926-211157-bp6725uq,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Enforcement and Evolution:
Asegura la estructura de datos, y permite actualizarla bajo demanda facilmente

Para mostrarle cómo funciona la aplicación del esquema, creemos un nuevo dataframe  tenga una columna adicional, `PAIS`, que no coincide con nuestro esquema

In [0]:
df_ubigeo_delta=spark.sql("SELECT * FROM delta.`/FileStore/Bronze/ubigeo/`")
df_ubigeo_delta.count()

Out[33]: 1814

In [0]:
df_ubigeo_delta=df_ubigeo_delta.withColumn("PAIS",lit("PERU")).withColumn("FECHA",current_date())
df_ubigeo_delta.printSchema()

root
 |-- UBIGEO: string (nullable = true)
 |-- DISTRITO: string (nullable = true)
 |-- PROVINCIA: string (nullable = true)
 |-- DEPARTAMENTO: string (nullable = true)
 |-- LATITUD: string (nullable = true)
 |-- LONGITUD: string (nullable = true)
 |-- FECHA: date (nullable = false)
 |-- PAIS: string (nullable = false)



In [0]:
df_ubigeo_delta.show(5)

+------+--------------------+----------+------------+------------+------------+----------+----+
|UBIGEO|            DISTRITO| PROVINCIA|DEPARTAMENTO|     LATITUD|    LONGITUD|     FECHA|PAIS|
+------+--------------------+----------+------------+------------+------------+----------+----+
|060401|               CHOTA|     CHOTA|   CAJAMARCA|-6.555281874|-78.64146956|2022-10-06|PERU|
|010302|          CHISQUILLA|   BONGARA|    AMAZONAS| -5.89320537|-77.78280858|2022-10-06|PERU|
|120807|SANTA BARBARA DE ...|     YAULI|       JUNÍN|-11.21381302|-76.42904358|2022-10-06|PERU|
|140304|             JAYANCA|LAMBAYEQUE|  LAMBAYEQUE|-6.394345534|-79.81451536|2022-10-06|PERU|
|200601|             SULLANA|   SULLANA|       PIURA|-4.898786419|-80.63618267|2022-10-06|PERU|
+------+--------------------+----------+------------+------------+------------+----------+----+
only showing top 5 rows



**La aplicación del esquema ayuda a mantener nuestras tablas limpias y ordenadas para que podamos confiar en los datos que hemos almacenado en Delta Lake.** Las escrituras anteriores se bloquearon porque el esquema de los nuevos datos no coincidía con el esquema de la tabla

In [0]:
df_ubigeo_delta.write.mode("overwrite").format("delta").save(DataPath)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-427097954111921>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdf_ubigeo_delta[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0mDataPath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msave[0;34m(self, path, format, mode, partitionBy, **options)[0m
[1;32m    738[0m             [0mself[0m[0;34m.[0m[0m_jwrite[0m[0;34m.[0m[0msave[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    739[0m         [0;32melse[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 740[0;31m             [0mself[0m[0;34m.[0m[0m_jwrite[0m[0;34m.[0m[0msa

In [0]:
# para efecto de alterar el Schema en caso de una nueva Columna
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled=true")

Out[40]: DataFrame[key: string, value: string]

In [0]:
df_ubigeo_delta.write.mode("overwrite").format("delta").save(DataPath)

In [0]:
%sql DESCRIBE  delta.`/FileStore/Bronze/ubigeo/`

col_name,data_type,comment
UBIGEO,string,
DISTRITO,string,
PROVINCIA,string,
DEPARTAMENTO,string,
LATITUD,string,
LONGITUD,string,
FECHA,date,
PAIS,string,
,,
# Partitioning,,


In [0]:
%sql DESCRIBE HISTORY delta.`/FileStore/Bronze/ubigeo/`

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
8,2022-10-06T17:34:48.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,7.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 68124)",,Databricks-Runtime/10.4.x-scala2.12
7,2022-10-06T17:23:42.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,6.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12
6,2022-10-06T17:12:15.000+0000,4454288869856112,walbites@hotmail.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""UBIGEO""], batchId -> 0, auto -> false)",,List(954572819838097),1006-135601-ffhsikgx,5.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 72346, p25FileSize -> 67920, minFileSize -> 67920, numAddedFiles -> 1, maxFileSize -> 67920, p75FileSize -> 67920, p50FileSize -> 67920, numAddedBytes -> 67920)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-10-06T17:03:37.000+0000,4454288869856112,walbites@hotmail.com,MERGE,"Map(predicate -> (l.UBIGEO = m.UBIGEO), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(954572819838097),1006-135601-ffhsikgx,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 8102, numTargetRowsInserted -> 1, scanTimeMs -> 4569, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3008)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-10-06T16:13:12.000+0000,4454288869856112,walbites@hotmail.com,UPDATE,Map(predicate -> (cast(ubigeo#4102 as int) = 60401)),,List(954572819838097),1006-135601-ffhsikgx,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3103, scanTimeMs -> 71, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 3021)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-10-06T16:08:59.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2237)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-06T15:59:57.000+0000,4454288869856112,walbites@hotmail.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.default.t_ubigeo_delta.ubigeo AS INT) = 60401)""])",,List(954572819838097),1006-135601-ffhsikgx,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1813, numAddedChangeFiles -> 0, executionTimeMs -> 4889, numDeletedRows -> 1, scanTimeMs -> 2187, numAddedFiles -> 1, rewriteTimeMs -> 2698)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-09-30T00:19:55.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0929-235617-x7tith3d,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67847)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-09-26T21:27:56.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0926-211157-bp6725uq,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Time Travel, recuperación de versiones
Las capacidades de Delta Lake simplifican la creación de canalizaciones de datos para casos de uso que incluyen:

* Auditoría de cambios de datos
* Reproducción de experimentos e informes
* Retrocesos

A medida que escribe en una tabla o directorio Delta, cada operación se versiona automáticamente.

<img src="https://github.com/risan4841/img/blob/master/transactionallogs.png?raw=true" width=250/>

Puedes consultar instantáneas de sus tablas con:
1. **Version number**, 
2. **Timestamp.**

utilizando sintaxis de Python, Scala y / o SQL; para estos ejemplos usaremos la sintaxis Spark SQL.

In [0]:
###### Usa el viaje en el tiempo para seleccionar y ver la versión original de nuestra tabla (Versión 0).

In [0]:
spark.sql("SELECT * FROM delta.`/FileStore/Bronze/ubigeo/` VERSION AS OF 0").show(3)

+------+--------------------+---------+------------+------------+------------+
|UBIGEO|            DISTRITO|PROVINCIA|DEPARTAMENTO|     LATITUD|    LONGITUD|
+------+--------------------+---------+------------+------------+------------+
|060401|               CHOTA|    CHOTA|   CAJAMARCA|-6.555281874|-78.64146956|
|010302|          CHISQUILLA|  BONGARA|    AMAZONAS| -5.89320537|-77.78280858|
|120807|SANTA BARBARA DE ...|    YAULI|       JUNÍN|-11.21381302|-76.42904358|
+------+--------------------+---------+------------+------------+------------+
only showing top 3 rows



-sandbox
### CREAR una tabla usando Delta Lake

Cree una tabla llamada `ubigeo_delta` usando `DELTA` a partir de los datos anteriores.

la notación es:
> `CREATE TABLE <table-name>` <br>
  `USING DELTA` <br>
  `LOCATION <path-do-data> ` <br>

In [0]:
#Creando la tabla con spark
spark.sql("""
  DROP TABLE IF EXISTS t_ubigeo_delta
""")
spark.sql("""
  CREATE TABLE t_ubigeo_delta
  USING DELTA
  LOCATION '{}'
""".format(DataPath))

Out[44]: DataFrame[]

In [0]:
%sql
SELECT count(*) FROM t_ubigeo_delta

count(1)
1814


In [0]:
%sql
SELECT * FROM t_ubigeo_delta LIMIT 5

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858,2022-10-06,PERU
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358,2022-10-06,PERU
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536,2022-10-06,PERU
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267,2022-10-06,PERU
61303,CATACHE,SANTA CRUZ,CAJAMARCA,-6.735890508,-79.04859161,2022-10-06,PERU


In [0]:
#Creando la tabla con SQL

In [0]:
%sql
CREATE TABLE t_ubigeo_delta_3
USING delta
AS SELECT * FROM csv.`/FileStore/tables/ubigeo.csv`

num_affected_rows,num_inserted_rows


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Transacciones  DML:
Delta Lake trae transacciones ACID y soporte DML completo a los lagos de datos.<br>
Parquet **no** admite estos comandos, son exclusivos de Delta Lake.

**DELETE: Elimine los datos del UBIGEO con un solo comando `DELETE` usando Delta Lake.**

In [0]:
%sql
DELETE FROM t_ubigeo_delta WHERE ubigeo=060401;
-- Confirmar la eliminacion
SELECT * FROM t_ubigeo_delta WHERE ubigeo=060401

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS


**INSERT: Utilice el viaje en el tiempo e `INSERT INTO` para volver a agregar el ubigeo a nuestra tabla.**

In [0]:
%sql
DESCRIBE HISTORY t_ubigeo_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
9,2022-10-06T17:39:23.000+0000,4454288869856112,walbites@hotmail.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.default.t_ubigeo_delta.ubigeo AS INT) = 60401)""])",,List(954572819838097),1006-135601-ffhsikgx,8.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1813, numAddedChangeFiles -> 0, executionTimeMs -> 2746, numDeletedRows -> 1, scanTimeMs -> 945, numAddedFiles -> 1, rewriteTimeMs -> 1801)",,Databricks-Runtime/10.4.x-scala2.12
8,2022-10-06T17:34:48.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,7.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 68124)",,Databricks-Runtime/10.4.x-scala2.12
7,2022-10-06T17:23:42.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,6.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12
6,2022-10-06T17:12:15.000+0000,4454288869856112,walbites@hotmail.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""UBIGEO""], batchId -> 0, auto -> false)",,List(954572819838097),1006-135601-ffhsikgx,5.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 72346, p25FileSize -> 67920, minFileSize -> 67920, numAddedFiles -> 1, maxFileSize -> 67920, p75FileSize -> 67920, p50FileSize -> 67920, numAddedBytes -> 67920)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-10-06T17:03:37.000+0000,4454288869856112,walbites@hotmail.com,MERGE,"Map(predicate -> (l.UBIGEO = m.UBIGEO), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(954572819838097),1006-135601-ffhsikgx,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 8102, numTargetRowsInserted -> 1, scanTimeMs -> 4569, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3008)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-10-06T16:13:12.000+0000,4454288869856112,walbites@hotmail.com,UPDATE,Map(predicate -> (cast(ubigeo#4102 as int) = 60401)),,List(954572819838097),1006-135601-ffhsikgx,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3103, scanTimeMs -> 71, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 3021)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-10-06T16:08:59.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2237)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-06T15:59:57.000+0000,4454288869856112,walbites@hotmail.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.default.t_ubigeo_delta.ubigeo AS INT) = 60401)""])",,List(954572819838097),1006-135601-ffhsikgx,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1813, numAddedChangeFiles -> 0, executionTimeMs -> 4889, numDeletedRows -> 1, scanTimeMs -> 2187, numAddedFiles -> 1, rewriteTimeMs -> 2698)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-09-30T00:19:55.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0929-235617-x7tith3d,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67847)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-09-26T21:27:56.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),0926-211157-bp6725uq,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%sql
INSERT INTO t_ubigeo_delta
SELECT * FROM t_ubigeo_delta VERSION AS OF 8 WHERE ubigeo=060401

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SELECT * FROM t_ubigeo_delta WHERE ubigeo=060401

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
60401,CHOTA,CHOTA,CAJAMARCA,-6.555281874,-78.64146956,2022-10-06,PERU


**UPDATE: Modifica los registros existentes en una tabla.**

In [0]:
%sql 
UPDATE t_ubigeo_delta SET FECHA = current_date() WHERE ubigeo = 060401

num_affected_rows
1


In [0]:
%sql
SELECT * FROM t_ubigeo_delta WHERE ubigeo=060401

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
60401,CHOTA,CHOTA,CAJAMARCA,-6.555281874,-78.64146956,2022-10-06,PERU


In [0]:
%sql
describe t_ubigeo_delta

col_name,data_type,comment
UBIGEO,string,
DISTRITO,string,
PROVINCIA,string,
DEPARTAMENTO,string,
LATITUD,string,
LONGITUD,string,
FECHA,date,
PAIS,string,
,,
# Partitioning,,


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) INSERTAR o ACTUALIZAR use MERGE:<br>
Proceso de 2 pasos:
1. Identificar filas para insertar o actualizar
2. Utilice "MERGE"

In [0]:
# Create merge table with 1 row update, 1 insertion
#to_date('2022-10-06', 'yyyy-MM-dd')
data = [("888888", "NUEVA", "NUEVA", "NUEVA", "-6.555281874","-78.64146956",datetime.now(),"PERU"),  # record to insert
        ("060401", "CHOTA CASTILLO", "CHOTA", "CAJAMARCA","-6.555281874","-78.64146956",datetime.now(),"PERU")]  # record to update
schema = spark.table("t_ubigeo_delta").schema
spark.createDataFrame(data, schema).createOrReplaceTempView("merge_t_ubigeo_delta")
spark.sql("SELECT * FROM merge_t_ubigeo_delta").show()

+------+--------------+---------+------------+------------+------------+----------+----+
|UBIGEO|      DISTRITO|PROVINCIA|DEPARTAMENTO|     LATITUD|    LONGITUD|     FECHA|PAIS|
+------+--------------+---------+------------+------------+------------+----------+----+
|888888|         NUEVA|    NUEVA|       NUEVA|-6.555281874|-78.64146956|2022-10-06|PERU|
|060401|CHOTA CASTILLO|    CHOTA|   CAJAMARCA|-6.555281874|-78.64146956|2022-10-06|PERU|
+------+--------------+---------+------------+------------+------------+----------+----+



In [0]:
%sql
MERGE INTO t_ubigeo_delta AS l
USING merge_t_ubigeo_delta AS m
ON l.UBIGEO = m.UBIGEO
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
select * from t_ubigeo_delta where UBIGEO IN ("888888","060401")

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
60401,CHOTA CASTILLO,CHOTA,CAJAMARCA,-6.555281874,-78.64146956,2022-10-06,PERU
888888,NUEVA,NUEVA,NUEVA,-6.555281874,-78.64146956,2022-10-06,PERU


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Optimizaciones de rendimiento y compactación de archivos:<br>

In [0]:
%sql CACHE SELECT * FROM t_ubigeo_delta

In [0]:
%sql OPTIMIZE t_ubigeo_delta ZORDER BY UBIGEO

path,metrics
dbfs:/FileStore/Bronze/ubigeo,"List(1, 3, List(68188, 68188, 68188.0, 1, 68188), List(2444, 68111, 24363.0, 3, 73089), 0, List(minCubeSize(107374182400), List(0, 0), List(3, 73089), 0, List(3, 73089), 1, null), 1, 3, 0, false)"


In [0]:
%sql
DESCRIBE HISTORY t_ubigeo_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
13,2022-10-06T17:42:36.000+0000,4454288869856112,walbites@hotmail.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""UBIGEO""], batchId -> 0, auto -> false)",,List(954572819838097),1006-135601-ffhsikgx,12.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 73089, p25FileSize -> 68188, minFileSize -> 68188, numAddedFiles -> 1, maxFileSize -> 68188, p75FileSize -> 68188, p50FileSize -> 68188, numAddedBytes -> 68188)",,Databricks-Runtime/10.4.x-scala2.12
12,2022-10-06T17:42:13.000+0000,4454288869856112,walbites@hotmail.com,MERGE,"Map(predicate -> (l.UBIGEO = m.UBIGEO), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(954572819838097),1006-135601-ffhsikgx,11.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 7250, numTargetRowsInserted -> 1, scanTimeMs -> 3609, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3318)",,Databricks-Runtime/10.4.x-scala2.12
11,2022-10-06T17:40:39.000+0000,4454288869856112,walbites@hotmail.com,UPDATE,Map(predicate -> (cast(ubigeo#13181 as int) = 60401)),,List(954572819838097),1006-135601-ffhsikgx,10.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 2394, scanTimeMs -> 35, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 2358)",,Databricks-Runtime/10.4.x-scala2.12
10,2022-10-06T17:40:14.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,9.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2470)",,Databricks-Runtime/10.4.x-scala2.12
9,2022-10-06T17:39:23.000+0000,4454288869856112,walbites@hotmail.com,DELETE,"Map(predicate -> [""(CAST(spark_catalog.default.t_ubigeo_delta.ubigeo AS INT) = 60401)""])",,List(954572819838097),1006-135601-ffhsikgx,8.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 1813, numAddedChangeFiles -> 0, executionTimeMs -> 2746, numDeletedRows -> 1, scanTimeMs -> 945, numAddedFiles -> 1, rewriteTimeMs -> 1801)",,Databricks-Runtime/10.4.x-scala2.12
8,2022-10-06T17:34:48.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,7.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 68124)",,Databricks-Runtime/10.4.x-scala2.12
7,2022-10-06T17:23:42.000+0000,4454288869856112,walbites@hotmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(954572819838097),1006-135601-ffhsikgx,6.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1814, numOutputBytes -> 67574)",,Databricks-Runtime/10.4.x-scala2.12
6,2022-10-06T17:12:15.000+0000,4454288869856112,walbites@hotmail.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""UBIGEO""], batchId -> 0, auto -> false)",,List(954572819838097),1006-135601-ffhsikgx,5.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 72346, p25FileSize -> 67920, minFileSize -> 67920, numAddedFiles -> 1, maxFileSize -> 67920, p75FileSize -> 67920, p50FileSize -> 67920, numAddedBytes -> 67920)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-10-06T17:03:37.000+0000,4454288869856112,walbites@hotmail.com,MERGE,"Map(predicate -> (l.UBIGEO = m.UBIGEO), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(954572819838097),1006-135601-ffhsikgx,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 8102, numTargetRowsInserted -> 1, scanTimeMs -> 4569, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3008)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-10-06T16:13:12.000+0000,4454288869856112,walbites@hotmail.com,UPDATE,Map(predicate -> (cast(ubigeo#4102 as int) = 60401)),,List(954572819838097),1006-135601-ffhsikgx,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3103, scanTimeMs -> 71, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 3021)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%sql
SELECT * FROM t_ubigeo_delta LIMIT 5

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD,FECHA,PAIS
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858,2022-10-06,PERU
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358,2022-10-06,PERU
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536,2022-10-06,PERU
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267,2022-10-06,PERU
61303,CATACHE,SANTA CRUZ,CAJAMARCA,-6.735890508,-79.04859161,2022-10-06,PERU
