PART III Prep

https://docs.databricks.com/en/sql/language-manual/index.html

How to use Hive Variables to substitute in string values derived from the account email of the current user

In [None]:
SELECT "${da.db_name}" AS db_name,
       "${da.paths.working_dir}" AS working_dir


CREATE DATABASE IF NOT EXISTS ${da.db_name}_default_location;
CREATE DATABASE IF NOT EXISTS ${da.db_name}_custom_location LOCATION '${da.paths.working_dir}/_custom_location.db';

Identify where Delta Lake provides ACID transactions

Merge update does this AND Delta Lake guarantees that any read against a table will return the most recent version of the table. The purpose of merge is to consolidate update, insert, and other manipulations into one command much like an upsert

DE2.1

In [None]:
MERGE INTO students b
USING updates u
ON b.id=u.id
WHEN MATCHED AND u.type = "update"
  THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
  THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
  THEN INSERT *

Managed vs. External

Managed = delta by default, in unity catalog, better performance, meta & data management by databricks, ACID, dropping deletes meta and data

External = outside of unity catalog in a specified cloud object store. Can make delta, meta management only, not ACID, dropping deletes only meta the underlying table data will still exist

Create a managed table (Delta by default as it is backed by delta lake) Metadata + Data Management

Managed tables manage underlying data files alongside the metastore registration. Databricks recommends that you use managed tables whenever you create a new table. Unity Catalog managed tables are the default when you create tables in Databricks. They always use Delta Lake. See Work with managed tables.

By default, managed tables in a database without the location specified will be created in the dbfs:/user/hive/warehouse/<database_name>.db/ directory.

We can see that, as expected, the data and metadata for our Delta Table are stored in that location.

Dropping = drop metadata and data. Directory is still kept



In [None]:
/*full specification and Unity Catalog Approach*/
CREATE TABLE catalog_name.schema_name.table_name
  (width INT, length INT, height INT);

INSERT INTO managed_default
VALUES (3 INT, 2 INT, 1 INT)

/*HIVE approach, this will create the table in the default location under dbfs:/user/hive/warehouse/ remember the variables up top*/
CREATE DATABASE IF NOT EXISTS ${da.db_name}_default_location;
CREATE DATABASE IF NOT EXISTS ${da.db_name}_custom_location LOCATION '${da.paths.working_dir}/_custom_location.db';

Create an External Table (Metadata managed by Databricks only)

Here you are creating a delta table in cloud object store

Why use external?
1. Cost management
2. Controlling access and keeping data in one region
3. Multiple applications need to access the data and to avoid data duplication

In [None]:
/*Exammple 1 with HIVE*/
CREATE TABLE my_external_table
  (width INT, length INT, height INT)
LOCATION 'dbfs:/mnt/demo/external_default';

  
INSERT INTO external_default
VALUES (3 INT, 2 INT, 1 INT)

hive_metastore.default.managed_default

In [None]:
/*Example 2 with ADLS external location*/

CREATE TABLE my_external_table
    (width INT. length INT, height INT)
LOCATION 'abfss://<bucket-path>/<table-directory>';

Explore table Metadata

In [None]:
/*basic metadata*/
DESCRIBE TABLE customer;

DESCRIBE TABLE extended my_table_name;



Return information about a schema: 

Returns information about schema, partitioning, table size, and so on

In [None]:
/*Managed Table*/
DESCRIBE DETAIL [schema_name].table_name

DESCRIBE EXTENDED students_table

/*External Location*/
DESCRIBE EXTERNAL LOCATION table_location_name;

/*Show external locations*/
SHOW EXTERNAL LOCATION

Identify the location of a table

**The exact same for external or Delta

In [None]:
/*for External*/
DESCRIBE DETAIL my_table

/*For Delta*/
DESCRIBE DETAIL delta_table_name

Use the SHOW table command

In [None]:
USE <your_database_name>

/*show tables*/
SHOW TABLES

/*show tables in global temp*/
SHOW TABLES IN global_temp

Inspect the directory structure of Delta Lake files

DE2.3

In [None]:
display(dbutils.fs.ls(f"{DA.paths.user_db}/students"))

#using DB Utils
dbuitls.fs.ls('dbfs:/user/hive/warehouse/employees')

dbutils.fs.ls('dbfs:/user/hive/warehouse/buster_schema_defult_location.db/my_delta_table')

#specific delta log file
dbutils.fs.ls('dbfs:/user/hive/warehouse/buster_schema_defult_location.db/my_delta_table/_delta_log')


Reasoning about data files

DESCRIBE DETAIL allows us to see some other details about our Delta table, including the number of files.

DE2.3

In [None]:
DESCRIBE DETAIL my_table

Identify who has written previous versions of a table

DE2.3

In [None]:
DESCRIBE HISTORY table_name;

Review a history of table transactions

DE2.3

In [None]:
DESCRIBE HISTORY my_table_name

Roll back a table to a previous version

DE2.3

In [None]:
/*Option 1*/
RESTORE TABLE employees TO VERSION AS OF 5

/*Option 2*/
RESTORE TABLE table_name TO TIMESTAMP AS OF 'yyyy-MM-dd HH:mm:ss';

Identify that a table can be rolled back to a previous version

In [None]:
DESCRIBE HISTORY table_name;

Query a specific version of a table

In [None]:
SELECT * 
FROM employees VERSION AS OF 4

Identify why Zordering is beneficial to Delta Lake tables.

It helps consolidate the underlying parquet data files for the table that are too small into a more manageable size using criteria specified such as a field. Used in conjunction with Optimize

See 1.3 in my_free_trial_databricks & DE 2.3

In [None]:
OPTIMIZE employees
ZORDER BY id

VACUUM command for a table (including override)

In [None]:
VACUUM my_table

/*If you want to override*/
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
SET spark.databricks.delta.vacuum.logging.enabled = true;

VACUUM my_table RETAIN 0 HOURS

Identify CTAS as a solution and create a table using CTAS




CTAS statements automatically infer schema information from query results and do not support manual schema declaration.

This means that CTAS statements are useful for external data ingestion from sources with well-defined schema, such as Parquet files and tables.

CTAS statements also do not support specifying additional file options.

We can see how this would present significant limitations when trying to ingest data from CSV files.

In [None]:
/*FROM A TABLE*/
CREATE OR REPLACE TABLE my_table AS
SELECT * FROM my_other_table


/*FROM A FILE*/
CREATE OR REPLACE TABLE sales_unparsed AS
SELECT * FROM csv.`${da.paths.datasets}/raw/sales-csv/`;
SELECT * FROM sales_unparsed;

Create a generated column

Purpose of generated columns is to do a calculation

DE 4.3

In [None]:
CREATE OR REPLACE TABLE purchase_dates (
  id STRING, 
  transaction_timestamp STRING, 
  price STRING,
  date DATE GENERATED ALWAYS AS (
    cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
    COMMENT "generated based on `transactions_timestamp` column")

/*the generated column name is 'date'*/

Add a table comment

DE 4.3

In [None]:
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
LOCATION "${da.paths.working_dir}/tmp/users_pii"
PARTITIONED BY (first_touch_date)
AS
  SELECT *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${da.paths.datasets}/raw/users-historical/`;
  
SELECT * FROM users_pii;

Use CREATE OR REPLACE TABLE and INSERT OVERWRITE

DE 4.3 and 4.4

We can use overwrites to atomically replace all of the data in a table. There are multiple benefits to overwriting tables instead of deleting and recreating tables:

Overwriting a table is much faster because it doesn’t need to list the directory recursively or delete any files.
The old version of the table still exists; can easily retrieve the old data using Time Travel.
It’s an atomic operation. Concurrent queries can still read the table while you are deleting the table.
Due to ACID transaction guarantees, if overwriting the table fails, the table will be in its previous state.
Spark SQL provides two easy methods to accomplish complete overwrites.

Some students may have noticed previous lesson on CTAS statements actually used CRAS statements (to avoid potential errors if a cell was run multiple times).

CREATE OR REPLACE TABLE (CRAS) statements fully replace the contents of a table each time they execute.

INSERT OVERWRITE:

Can only overwrite an existing table, not create a new one like our CRAS statement
Can overwrite only with new records that match the current table schema -- and thus can be a "safer" technique for overwriting an existing table without disrupting downstream consumers
Can overwrite individual partitions

In [None]:
CREATE OR REPLACE TABLE my_table

/*CRAS STATEMENT*/
CREATE OR REPLACE TABLE events AS
SELECT * FROM parquet.`${da.paths.datasets}/raw/events-historical`

/*INSERT OVERWRITE*/
INSERT OVERWRITE sales
SELECT * FROM parquet.`${da.paths.datasets}/raw/sales-historical/`

INSERT OVERWRITE vs. Append Operation (INSERT INTO) DE4.4

Why? INSERT INTO is more effecient than an entire overwrite, and is an atomic way to append data

Why Not? No guarantees that data is free from duplication

In [None]:
INSERT INTO sales
SELECT * FROM parquet.`${da.paths.datasets}/raw/sales-30m`

Identify a scenario in which MERGE should be used

Upsert & to guarantee Atomicity

DE 4.4

MERGE UPDATES and its benefits

You can upsert data from a source table, view, or DataFrame into a target Delta table using the MERGE SQL operation. Delta Lake supports inserts, updates and deletes in MERGE, and supports extended syntax beyond the SQL standards to facilitate advanced use cases.

The main benefits of MERGE:

updates, inserts, and deletes are completed as a single transaction
multiple conditionals can be added in addition to matching fields
provides extensive options for implementing custom logic
very useful for preventing duplicates

In [None]:
/*BASIC SYNTAX*/
MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}


MERGE INTO users a
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
  UPDATE SET email = b.email, updated = b.updated
WHEN NOT MATCHED THEN INSERT *

Identify MERGE as a command to deduplicate data upon writing

Called an insert only merge

A common ETL use case is to collect logs or other every-appending datasets into a Delta table through a series of append operations.
Many source systems can generate duplicate records. With merge, you can avoid inserting the duplicate records by performing an insert-only merge.
This optimized command uses the same MERGE syntax but only provided a WHEN NOT MATCHED clause.
Below, we use this to confirm that records with the same user_id and event_timestamp aren't already in the events table.

In [None]:
/*INSERT only MERGE*/
MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN 
  INSERT *

Identify why a COPY INTO statement is not duplicating data in the target table

DE 4.4


COPY INTO provides SQL engineers an idempotent option to incrementally ingest data from external systems. Idempotent = multiple writes or deletes don't cause duplication

Note that this operation does have some expectations:

Data schema should be consistent
Duplicate records should try to be excluded or handled downstream
This operation is potentially much cheaper than full table scans for data that grows predictably.

**Difference to autoloader

COPY INTO = THOUSANDS of FILES, initial setup, record set is not expected to grow/stream
AUTO LOADER = MILLIONS OF FILES, record set is expected to grow

While here we'll show simple execution on a static directory, the real value is in multiple executions over time picking up new files in the source automatically.

In [None]:
COPY INTO sales
FROM "${da.paths.datasets}/raw/sales-30m"
FILEFORMAT = PARQUET

Use COPY INTO to insert data

In [None]:
COPY INTO sales
FROM "${da.paths.datasets}/raw/sales-30m"
FILEFORMAT = PARQUET

Identify the components necessary to create a new DLT pipeline

In [None]:
/*Create Bronze table*/

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"))

/*Create a Silver Table*/
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
AS
  SELECT f.customer_id, f.customer_name, f.number_of_line_items, 
         timestamp(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, 
         date(from_unixtime((cast(f.order_datetime as long)))) as order_date, 
         f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f /*The STREAM() function is required around dataset names when specifying other tables or views in your pipeline as a streaming source. Note this is the table created above*/
  LEFT JOIN LIVE.customers c
    ON c.customer_id = f.customer_id
    AND c.customer_name = f.customer_name

/*Create a Gold table*/
CREATE OR REFRESH LIVE TABLE sales_order_in_la
COMMENT "Sales orders in LA."
AS
  SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, 
         sum(ordered_products_explode.price) as sales, 
         sum(ordered_products_explode.qty) as quantity, 
         count(ordered_products_explode.id) as product_count
  FROM (SELECT city, order_date, customer_id, customer_name, explode(ordered_products) as ordered_products_explode
        FROM LIVE.sales_orders_cleaned 
        WHERE city = 'Los Angeles')
  GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr

CDC with DLT

Remember that CDC is used to handle updates to a table such as additions and deletes

Default assumption is that rows will contain inserts and updates. So in the example below any book ids matching the books_id key will be updated. Any without will be added.

Support applying changes as SCD Type 1 or SCD Type 2

In [None]:
/*General Syntax*/

APPLY CHANGES INTO LIVE.target_table 
FROM STREAM(LIVE.cdc_feed_table)
KEYS(key_field) /*primary key fields for table can be one or many*/
APPLY AS DELETE WHEN operation_field = "DELETE" /*checks for deleted records, this is an optional clause*/
SEQUENCE BY sequence_field
COLUMNS *

In [None]:
/*Create bronze table*/
CREATE OR REFRESH STREAMING LIVE TABLE books_bronze
COMMENT "the bronze table"
AS SELECT * FROM cloud_files('path',"json")

/*Create Silver Table*/
CREATE OR REFERESH STERAMING LIVE TABLE books silver;

/*CDC*/
APPLY CHANGES INTO LIVE.books_silver
FROM STREAM(LIVE.book_bronze)
KEYS(books_id)
APPLY AS DELETE WHEN row_status = "DELETE"
SEQUENCE BY row_time
COLUMNS * EXCEPT(row_status, row_time)

Identify which source location is utilizing Auto Loader

DE 6.0 Series

Consolidated Incremental Data Ingestion Query

In [None]:
def autoload_to_table(data_source, source_format, table_name, checkpoint_directory):
    query = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format)
                  .option("cloudFiles.schemaLocation", checkpoint_directory)
                  .load(data_source)
                  .writeStream
                  .option("checkpointLocation", checkpoint_directory)
                  .option("mergeSchema", "true")
                  .table(table_name))
    return query

Explain change data capture and the behavior of APPLY CHANGES INTO