### Read & Write en Delta Lake
1.  Escribir datos en dela lake (managed table)
2.  Escribir datos en dela lake (external table)
3.  Leer datos en dela lake (SQL)
4.  Leer datos en dela lake (Archivos)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION "/mnt/moviehistorydany3/demo"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType(fields =[

    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homepage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), False),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_df = spark.read \
.option("header",True)\
.schema(movie_schema)\
.csv("/mnt/moviehistorydany3/bronze/2024-12-30/movie.csv")

In [0]:
movie_df.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movies_managed")


In [0]:
%sql
SELECT * FROM movie_demo.movies_managed

In [0]:
movie_df.write.format("delta").mode("overwrite").save("/mnt/moviehistorydany3/demo/movies_external")

In [0]:
%sql
CREATE TABLE movie_demo.movies_external
USING delta
LOCATION "/mnt/moviehistorydany3/demo/movies_external"

In [0]:
%sql
SELECT * FROM movie_demo.movies_external

In [0]:
movies_external_df = spark.read.format("delta").load("/mnt/moviehistorydany3/demo/movies_external")

In [0]:
display(movies_external_df)

In [0]:
movie_df.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:
%sql
SHOW PARTITIONS movie_demo.movies_partitioned

### Update & Delete en Delta Lake

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed;

In [0]:
%sql
UPDATE movie_demo.movies_managed
SET durationTime = 60
WHERE yearReleaseDate = 2012;
    


In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2012;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/mnt/moviehistorydany3/demo/movies_managed")
deltaTable.update(
    condition = "yearReleaseDate = 2013",
    set = {"durationTime":"100"}
)

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2013;

In [0]:
%sql
DELETE FROM movie_demo.movies_managed 
WHERE yearReleaseDate = 2014;

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2014;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/mnt/moviehistorydany3/demo/movies_managed")

deltaTable.delete("yearReleaseDate = 2015")


In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2015;

### Merge / Upsert en Delta Lake

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

In [0]:
movie_schema = StructType(fields =[

    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homepage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), False),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movies_day1_df = spark.read \
.option("header",True)\
.schema(movie_schema)\
.csv("/mnt/moviehistorydany3/bronze/2024-12-30/movie.csv") \
.filter("yearReleaseDate < 2000")\
.select("movieId","title","yearReleaseDate","releaseDate","durationTime")

In [0]:
display(movies_day1_df)

In [0]:
movies_day1_df.createOrReplaceTempView("movies_day1")

In [0]:
from pyspark.sql.functions import upper

movies_day2_df = spark.read \
.option("header",True)\
.schema(movie_schema)\
.csv("/mnt/moviehistorydany3/bronze/2024-12-30/movie.csv") \
.filter("yearReleaseDate BETWEEN 1998 AND 2005")\
.select("movieId",upper("title").alias("title"),"yearReleaseDate","releaseDate","durationTime")

In [0]:
display(movies_day2_df)

In [0]:
movies_day2_df.createOrReplaceTempView("movies_day2")

In [0]:
from pyspark.sql.functions import upper

movies_day3_df = spark.read \
.option("header",True)\
.schema(movie_schema)\
.csv("/mnt/moviehistorydany3/bronze/2024-12-30/movie.csv") \
.filter("yearReleaseDate BETWEEN 1983 AND 1998 OR yearReleaseDate BETWEEN 2006 AND 2010")\
.select("movieId",upper("title").alias("title"),"yearReleaseDate","releaseDate","durationTime")

In [0]:
display(movies_day3_df)

In [0]:
%sql
DROP TABLE IF EXISTS movie_demo.movies_merge;
CREATE TABLE IF NOT EXISTS movie_demo.movies_merge(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
);

#### Dia 1

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movies_day1 src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate =current_timestamp
WHEN NOT MATCHED
  THEN INSERT ( movieId, title,yearReleaseDate,releaseDate,durationTime,createdDate)
  VALUES (movieId, title,yearReleaseDate,releaseDate,durationTime,current_timestamp)

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

#### Day2

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movies_day2 src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate =current_timestamp
WHEN NOT MATCHED
  THEN INSERT ( movieId, yearReleaseDate,releaseDate,durationTime,createdDate)
  VALUES (movieId, yearReleaseDate,releaseDate,durationTime,current_timestamp)

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

#### Day3

In [0]:
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/mnt/moviehistorydany3/demo/movies_merge')


deltaTablePeople.alias('tgt') \
  .merge(
    movies_day3_df.alias('src'),
    'tgt.movieId = src.movieId'
  ) \
  .whenMatchedUpdate(set =
    {
      "tgt.title": "src.title",
      "tgt.yearReleaseDate": "src.yearReleaseDate",
      "tgt.releaseDate": "src.releaseDate",
      "tgt.durationTime": "src.durationTime",
      "tgt.updatedDate": "current_timestamp()",
      
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "movieId":"movieId",
      "title": "title",
      "yearReleaseDate": "yearReleaseDate",
      "releaseDate": "releaseDate",
      "durationTime": "durationTime",
      "createdDate": "current_timestamp()",
    }
  ) \
  .execute()

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

### Hitory, Time Travel y Vacuum

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 2;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-08-19T09:39:33.000+00:00'

In [0]:
df = spark.read.format('delta').option('timestampAsOf','2025-08-19T09:39:33.000+00:00').load('/mnt/moviehistorydany3/demo/movies_merge')
display(df)

In [0]:
%sql
VACUUM movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-08-19T09:39:33.000+00:00'

In [0]:
%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM movie_demo.movies_merge RETAIN 0 HOURS;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-08-19T09:39:33.000+00:00'

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge

In [0]:
%sql
DELETE FROM movie_demo.movies_merge
WHERE yearReleaseDate =2004;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 9;

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movie_demo.movies_merge VERSION AS OF 9 src
ON tgt.movieId = src.movieId
WHEN NOT MATCHED THEN
  INSERT *

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

### Transaction Log en Delta Lake

In [0]:
%sql
DROP TABLE IF EXISTS movie_demo.movies_log;
CREATE TABLE IF NOT EXISTS movie_demo.movies_log(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
) 
USING DELTA

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge 
WHERE movieID = 125537;

In [0]:
%sql
SELECT * FROM movie_demo.movies_log

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge 
WHERE movieID = 133575;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
DELETE FROM movie_demo.movies_log
WHERE movieID = 125537;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

### Convertir formato "parquet" a "delta"

In [0]:
%sql
DROP TABLE IF EXISTS movie_demo.movies_convert_to_delta;
CREATE TABLE IF NOT EXISTS movie_demo.movies_convert_to_delta(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
) 
USING PARQUET

In [0]:
%sql
INSERT INTO movie_demo.movies_convert_to_delta
SELECT * FROM movie_demo.movies_merge;

In [0]:
%sql
CONVERT TO DELTA movie_demo.movies_convert_to_delta;

In [0]:
df = spark.table("movie_demo.movies_convert_to_delta")

In [0]:
display(df)

In [0]:
df.write.format("parquet").save("/mnt/moviehistorydany3/demo/movies_convert_to_delta_new")

In [0]:
%sql
CONVERT TO DELTA parquet.`/mnt/moviehistorydany3/demo/movies_convert_to_delta_new`