In [0]:
%python
dbutils.fs.ls("dbfs:/databricks-datasets/online_retail/data-001/data.csv")

In [0]:
%sql
DROP SCHEMA IF EXISTS dev.bronze CASCADE;

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS dev;
CREATE CATALOG IF NOT EXISTS prod;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS dev.bronze;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS dev.bronze.sales
AS
SELECT *
FROM read_files(
  "dbfs:/databricks-datasets/online_retail/data-001/data.csv",
  header => true,
  format => "csv"
  )

In [0]:
%sql
DESCRIBE EXTENDED dev.bronze.sales

In [0]:
%sql
SELECT * FROM dev.bronze.sales LIMIT 10;

In [0]:
%sql
SHOW CREATE TABLE dev.bronze.sales

In [0]:
%sql
DELETE FROM dev.bronze.sales
WHERE InvoiceNo = '536366'

In [0]:
%sql
DESCRIBE HISTORY dev.bronze.sales

In [0]:
%sql
SHOW SCHEMAS IN  dev

In [0]:
%python
spark.read.table('dev.bronze.sales').show(10)

In [0]:
%sql
OPTIMIZE dev.bronze.sales

In [0]:
%sql
DESCRIBE HISTORY dev.bronze.sales

In [0]:
%sql
SELECT * FROM dev.bronze.sales@v0
WHERE InvoiceNo = '536365'

In [0]:
%sql
VACUUM dev.bronze.sales RETAIN 168 HOURS DRY RUN;

In [0]:
%sql
SELECT COUNT(1) FROM dev.bronze.sales

In [0]:
%sql
CREATE VOLUME dev.bronze.sales_vol

In [0]:
%sql
DESCRIBE VOLUME dev.bronze.sales_vol

In [0]:
%python
dbutils.fs.mkdirs("/Volumes/dev/bronze/sales_vol/files")

In [0]:
%python
dbutils.fs.cp("dbfs:/databricks-datasets/online_retail/data-001/data.csv", "dbfs:/Volumes/dev/bronze/sales_vol/files")

In [0]:
%python
df_sales=(
          spark
          .read
          .format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load("/Volumes/dev/bronze/sales_vol/files/data.csv")
)

In [0]:
%python
(
    df_sales
    .repartition(16)
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/dev/bronze/sales_vol/delta1")
)

In [0]:
from pyspark.sql.functions import col

In [0]:
df = spark.read.format("delta").load("/Volumes/dev/bronze/sales_vol/delta1")\
    .filter(col("InvoiceNo") == "536365")
display(df)

In [0]:
from delta import DeltaTable

sales_delta = DeltaTable.forPath(spark, "/Volumes/dev/bronze/sales_vol/delta1")
df_his = sales_delta.history()
display(df_his)

In [0]:
from pyspark.sql.functions import lit
sales_delta.update(
    condition = "InvoiceNo = '536390'",
    set = {"Country": lit("UK")}
)

In [0]:
display(
spark.sql(
    """
    DESCRIBE EXTENDED    
    delta.`/Volumes/dev/bronze/sales_vol/delta1`
    """
)
)

In [0]:
display(
spark.sql(
    """
    DESCRIBE HISTORY    
    delta.`/Volumes/dev/bronze/sales_vol/delta1`
    """
)
)

In [0]:
%sql
SELECT *
FROM delta.`/Volumes/dev/bronze/sales_vol/delta1/`
WHERE InvoiceNo = '536390'

In [0]:
%sql
SELECT MIN(InvoiceNo), MAX(InvoiceNo), _metadata.file_name
FROM delta.`/Volumes/dev/bronze/sales_vol/delta1/`
--WHERE InvoiceNo = '536367'
GROUP BY _metadata.file_name

In [0]:
%sql
OPTIMIZE delta.`/Volumes/dev/bronze/sales_vol/delta1/`

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

In [0]:
%sql
VACUUM delta.`/Volumes/dev/bronze/sales_vol/delta1` DRY RUN;

In [0]:
df = spark.read.format("delta")\
    .load("/Volumes/dev/bronze/sales_vol/delta1/")
df.select("InvoiceNo").distinct().count()

In [0]:
%python
(
    df_sales
    .repartition(16)
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/dev/bronze/sales_vol/delta2")
)

In [0]:
from delta import DeltaTable
sales2 = DeltaTable.forPath(spark, "/Volumes/dev/bronze/sales_vol/delta2")
sales2.history().display()

In [0]:
%sql
DESCRIBE HISTORY delta.`/Volumes/dev/bronze/sales_vol/delta2/`

In [0]:
%sql
SELECT MIN(InvoiceNo), MAX(InvoiceNo), _metadata.file_name
FROM delta.`/Volumes/dev/bronze/sales_vol/delta2/`
--WHERE InvoiceNo = '536367'
GROUP BY _metadata.file_name

In [0]:
%sql
SET spark.databricks.delta.optimize.maxFileSize = 64*1024*8;

In [0]:
%sql
OPTIMIZE delta.`/Volumes/dev/bronze/sales_vol/delta2/` ZORDER BY(InvoiceNo)

In [0]:
%sql
ALTER TABLE delta.`/Volumes/dev/bronze/sales_vol/delta2/` CLUSTERED BY (InvoiceNo)