####Read & Write en Delta Lake
1. Escribir datos en Delta Lake (Managed Table)
1. Escribir datos en Delta Lake (External Table)
1. Leer datos desde Delta Lake (Table)
1. Leer datos desde Delta Lake (File)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION "/mnt/mymoviehistory/demo";

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType(fields= [

    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True),
] )

In [0]:
movie_df = spark.read \
            .option("header", True) \
            .schema(movie_schema) \
            .csv("/mnt/mymoviehistory/bronze/2024-12-30/movie.csv")

In [0]:
#display(movie_df)

In [0]:
#así creamos una tabla administrada por databricks
movie_df.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movies_managed")

In [0]:
%sql
select * from movie_demo.movies_managed;

In [0]:
#carpeta que guarda los datos en una ubicación expecíficada, creada en formato delta, pero no es una tabla
movie_df.write.format("delta").mode("overwrite").save("/mnt/mymoviehistory/demo/movies_external")

In [0]:
%sql
--ahora creamos una tabla desde esos datos guardados en formato delta
CREATE TABLE movie_demo.movies_external 
USING DELTA 
LOCATION "/mnt/mymoviehistory/demo/movies_external";


In [0]:
%sql
select * from movie_demo.movies_external;

In [0]:
#leer de la carpeta a un dataframe
movie_external_df = spark.read.format("delta").load("/mnt/mymoviehistory/demo/movies_external")

In [0]:
display(movie_external_df)

In [0]:
#particionar los datos por un campo
movie_df.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:

%sql
--para ver las particiones de la tabla
show partitions movie_demo.movies_partitioned;

####Update & delete en Delta Lake
1. Update desde Delta Lake
2. Delete desde Delta Lake

In [0]:
%sql
--usaremos la tabla administrada por datalake
select * from movie_demo.movies_managed;

In [0]:
%sql
--actualizar datos
UPDATE movie_demo.movies_managed 
SET durationTime = 60 
WHERE yearReleaseDate = 2012;

In [0]:
%sql
select * from movie_demo.movies_managed where yearReleaseDate = 2012;
    
--borrar datos

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/mymoviehistory/demo/movies_managed')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "yearReleaseDate = '2013'",
  set = { "durationTime": "100" }
)

In [0]:
%sql
select * from movie_demo.movies_managed where yearReleaseDate = 2013;

In [0]:
%sql
DELETE FROM movie_demo.movies_managed 
WHERE yearReleaseDate = 2014;

In [0]:
%sql
select * from movie_demo.movies_managed where yearReleaseDate = 2014;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/mymoviehistory/demo/movies_managed')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("yearReleaseDate = 2015")


In [0]:
%sql
select * from movie_demo.movies_managed where yearReleaseDate = 2015;

####Merge/Upsert en Delta Lake

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType(fields= [

    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True),
] )

In [0]:
movies_day1_df = spark.read \
            .option("header", True) \
            .schema(movie_schema) \
            .csv("/mnt/mymoviehistory/bronze/2024-12-30/movie.csv") \
            .filter("yearReleaseDate < 2000") \
            .select("movieId","title","yearReleaseDate","releaseDate","durationTime")


In [0]:
display(movies_day1_df)

In [0]:
movies_day1_df.createOrReplaceTempView("movies_day1")

In [0]:
from pyspark.sql.functions import upper
movies_day2_df = spark.read \
            .option("header", True) \
            .schema(movie_schema) \
            .csv("/mnt/mymoviehistory/bronze/2024-12-30/movie.csv") \
            .filter("yearReleaseDate Between 1998 and 2005") \
            .select("movieId",upper("title").alias("title"),"yearReleaseDate","releaseDate","durationTime")


In [0]:
display(movies_day2_df)

In [0]:
movies_day2_df.createOrReplaceTempView("movies_day2")

In [0]:

movies_day3_df = spark.read \
            .option("header", True) \
            .schema(movie_schema) \
            .csv("/mnt/mymoviehistory/bronze/2024-12-30/movie.csv") \
            .filter("yearReleaseDate between 1983 and 1998 or yearReleaseDate between 2006 and 2010") \
            .select("movieId",upper( "title").alias("title"),"yearReleaseDate","releaseDate","durationTime")

display(movies_day3_df)


In [0]:
%sql
create table if not exists movie_demo.movies_merge (
  movieId int,
  title string,
  yearReleaseDate int,
  releaseDate Date,
  durationTime int,
  createdDate Date,
  updatedDate Date
)

####Día1

In [0]:
%sql
--hacemos merge de las vistas para crear una nueva tabla para indicar cuando se haga match, se hace una actualización de los registros y si no existe conincidencia se inserta un nuevo registro
MERGE INTO movie_demo.movies_merge tgt
USING movies_day1 src --es una vista del día1
ON tgt.movieId = src.movieId --columnas de coincidencia entre las tablas origen y destino
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (
    movieId,
    title,
    yearReleaseDate,
    releaseDate,
    durationTime,
    createdDate
  )
  VALUES (
    movieId,
    title,
    yearReleaseDate,
    releaseDate,
    durationTime,
    current_timestamp
  )

In [0]:
%sql
select* from movie_demo.movies_merge;

####Día 2

In [0]:
%sql
--hacemos merge de las vistas para crear una nueva tabla para indicar cuando se haga match, se hace una actualización de los registros y si no existe conincidencia se inserta un nuevo registro
MERGE INTO movie_demo.movies_merge tgt
USING movies_day2 src --es una vista del día1
ON tgt.movieId = src.movieId --columnas de coincidencia entre las tablas origen y destino
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (
    movieId,
    title,
    yearReleaseDate,
    releaseDate,
    durationTime,
    createdDate
  )
  VALUES (
    movieId,
    title,
    yearReleaseDate,
    releaseDate,
    durationTime,
    current_timestamp
  )

In [0]:
%sql
select* from movie_demo.movies_merge;

####Día 3

In [0]:
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/mnt/mymoviehistory/demo/movies_merge')

deltaTablePeople.alias('tgt') \
  .merge(
    movies_day3_df.alias('src'),
    'tgt.movieId = src.movieId'
  ) \
  .whenMatchedUpdate(set =
    {
      "tgt.title": "src.title",
      "tgt.yearReleaseDate": "src.yearReleaseDate",
      "tgt.releaseDate": "src.releaseDate",
      "tgt.durationTime": "src.durationTime",
      "tgt.updatedDate": "current_timestamp()"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "movieId": "movieId",
      "title": "title",
      "yearReleaseDate": "yearReleaseDate",
      "releaseDate": "releaseDate",
      "durationTime": "durationTime",
      "createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
select* from movie_demo.movies_merge;

####History, Time Travel y Vacuum
1. Historia y Control de Versiones
2. Viaje en el Tiempo
3. Vacío

In [0]:
%sql 
desc history movie_demo.movies_merge;

así voy viendo los puntos de versión de las tablas

In [0]:
%sql
select * from movie_demo.movies_merge version as of 1;

In [0]:
%sql
select * from movie_demo.movies_merge version as of 2;

aquí lo hacemos por timestamp

In [0]:
%sql
select * from movie_demo.movies_merge timestamp as of '2025-11-12T11:39:40.000+00:00';

In [0]:
df = spark.read.format("delta").option("timestampAsOf","2025-11-12T11:39:40.000+00:00").load("/mnt/mymoviehistory/demo/movies_merge")

In [0]:
display(df)

con esto se borra la historia de la tabla

In [0]:
%sql
vacuum movie_demo.movies_merge;

In [0]:
%sql
set spark.databricks.delta.retentionDurationCheck.enabled = false;
vacuum movie_demo.movies_merge retain 0 hours;
--da error porque databricks no permite borrar la historia de una tabla, por defecto almacena la historia durante 7 días.
--la única manera es forzar con enabled false para evitarlo.

In [0]:
%sql
select * from movie_demo.movies_merge timestamp as of '2025-11-12T11:39:40.000+00:00';
--esto da error porque no encuentra el fichero parquet, la tabla existe, pero se han borrado las versiones anteriores de su historia.

In [0]:
%sql
desc history movie_demo.movies_merge;

In [0]:
%sql
delete from movie_demo.movies_merge 
where yearReleaseDate = 2004;

In [0]:
%sql
desc history movie_demo.movies_merge;

In [0]:
%sql
select * from movie_demo.movies_merge version as of 9;
--aquí estarían los registros eliminados, al volver a una versión anterior al delete

In [0]:
%sql
merge into movie_demo.movies_merge tgt
using movie_demo.movies_merge version as of 9 src
on tgt.movieId = src.movieId
when not matched then insert *;



In [0]:
%sql
desc history movie_demo.movies_merge;

In [0]:
%sql
select * from movie_demo.movies_merge;

####Transaction Log en Delta Lake

In [0]:
%sql
create table if not exists movie_demo.movies_log (
  movieId int,
  title string,
  yearReleaseDate int,
  releaseDate Date,
  durationTime int,
  createdDate Date,
  updatedDate Date
)
using delta
--es una tabla de tipo delta, administrada por databricks
--cuando la crea, no tiene datos, sólo crea la tabla, al ser de tipo delta

In [0]:
%sql
desc history movie_demo.movies_log;

In [0]:
%sql
insert into movie_demo.movies_log
  select * from movie_demo.movies_merge
  where movieId = 125537;

In [0]:
%sql
select * from movie_demo.movies_log;

In [0]:
%sql
desc history movie_demo.movies_log;

In [0]:
%sql
insert into movie_demo.movies_log
  select * from movie_demo.movies_merge
  where movieId = 133575;

In [0]:
%sql
desc history movie_demo.movies_log;

In [0]:
%sql
delete from movie_demo.movies_log
where movieId=125537;

In [0]:
%sql
desc history movie_demo.movies_log;

In [0]:
list = [118452, 124606, 125052, 125123, 125263, 125537, 126141, 133575, 142132, 146269, 157185]
for movieId in list:
        spark.sql(f"""insert into movie_demo.movies_log
                  select * from movie_demo.movies_merge
                  where movieId = {movieId}""")

In [0]:
%sql
insert into movie_demo.movies_log
  select * from movie_demo.movies_merge;
--pasados 30 días se borran los transaction logs de databricks

####Convertir formato "Parquet" a "Delta"

In [0]:
%sql
create table if not exists movie_demo.movies_convert_to_delta (
  movieId int,
  title string,
  yearReleaseDate int,
  releaseDate Date,
  durationTime int,
  createdDate Date,
  updatedDate Date
)
using parquet
--es una tabla administrada por databricks
--al ser un tabla con formato parquet, no va a tener historia y no crea archivos, sólo la carpeta vacía

In [0]:
%sql
insert into movie_demo.movies_convert_to_delta
select * from movie_demo.movies_merge;
--ahora, que se han insertado registros, sí hay archivos de los datos

In [0]:
%sql
convert to delta movie_demo.movies_convert_to_delta
--ahora la convertimos en delta
--por lo que ahora sí tendremos la carpeta delta_log y podremos consultar su historia y al ser administrada por databricks, podemos convertir una ruta de archivos parquet en formato delta.

In [0]:
df = spark.table("movie_demo.movies_convert_to_delta")

In [0]:
display(df)

In [0]:
#guardar en formato parquet
df.write.format("parquet").save("/mnt/mymoviehistory/demo/movies_convert_to_delta_new")

In [0]:
%sql
--convertir en formato delta
CONVERT TO DELTA parquet.`/mnt/mymoviehistory/demo/movies_convert_to_delta_new`