
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>


# Load Data Lab

In this lab, you will load data into new and existing Delta tables.

## Learning Objectives
By the end of this lab, you should be able to:
- Create an empty Delta table with a provided schema
- Use `COPY INTO` and `CAST` to ingest data to an existing Delta table
- Use a CTAS statement to create a Delta table from files

## Run Setup

Run the following cell to configure variables and datasets for this lesson.

In [0]:
%run ./Includes/Classroom-Setup-03.6L

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


Resetting the learning environment:
| dropping the catalog "labuser9693205_1742837481_u8bu_da"...(0 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/labuser9693205_1742837481@vocareum.com/data-engineering-with-databricks"...(0 seconds)

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v04"

Validating the locally installed datasets:
| listing local files...(7 seconds)
| validation completed...(7 seconds total)
Creating & using the catalog "labuser9693205_1742837481_u8bu_da"...(1 seconds)

Predefined tables in "labuser9693205_1742837481_u8bu_da.default":
| events_json

Predefined paths variables:
| DA.paths.working_dir:  dbfs:/mnt/dbacademy-users/labuser9693205_1742837481@vocareum.com/data-engineering-with-databricks
| DA.paths.datasets:     dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v04
| DA.paths.kafka_events: dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v04/ecommerce/raw/events-kafka

Setu

## Data Overview

We will work with a sample of raw Kafka data written as JSON files. 

Each file contains all records consumed during a 5-second interval, stored with the full Kafka schema as a multiple-record JSON file. 

The schema for the table:

| field  | type | description |
| ------ | ---- | ----------- |
| key    | BINARY | The **`user_id`** field is used as the key; this is a unique alphanumeric field that corresponds to session/cookie information |
| offset | BIGINT | This is a unique value, monotonically increasing for each partition |
| partition | INTEGER | Our current Kafka implementation uses only 2 partitions (0 and 1) |
| timestamp | BIGINT    | This timestamp is recorded as milliseconds since epoch, and represents the time at which the producer appends a record to a partition |
| topic | STRING | While the Kafka service hosts multiple topics, only those records from the **`clickstream`** topic are included here |
| value | BINARY | This is the full data payload (to be discussed later), sent as JSON |


## Define Schema for Empty Delta Table
Create an empty managed Delta table named **`events_bronze`** using the same schema.

In [0]:
CREATE OR REPLACE TABLE events_bronze
(
    key BINARY,
    offset BIGINT,
    partition INTEGER,
    timestamp BIGINT,
    topic STRING,
    value BINARY
)

Run the cell below to confirm the table was created correctly.

In [0]:
%python
suite = DA.tests.new("Define Schema")
expected_table = lambda: spark.table("events_bronze")
suite.test_not_none(lambda: expected_table(), "Created the table \"events_bronze\"")
suite.test_equals(lambda: expected_table().count(), 0, "The table should have 0 records")

suite.test_schema_field(lambda: expected_table().schema, "key", "BinaryType")
suite.test_schema_field(lambda: expected_table().schema, "offset", "LongType")
suite.test_schema_field(lambda: expected_table().schema, "partition", "IntegerType")
suite.test_schema_field(lambda: expected_table().schema, "timestamp", "LongType")
suite.test_schema_field(lambda: expected_table().schema, "topic", "StringType")
suite.test_schema_field(lambda: expected_table().schema, "value", "BinaryType")

suite.display_results()
assert suite

Points,Test,Result
1,"Created the table ""events_bronze""",
1,The table should have 0 records,
1,"Schema contains ""key"" of type BinaryType",
1,"Schema contains ""offset"" of type LongType",
1,"Schema contains ""partition"" of type IntegerType",
1,"Schema contains ""timestamp"" of type LongType",
1,"Schema contains ""topic"" of type StringType",
1,"Schema contains ""value"" of type BinaryType",


## Using `CAST` with JSON Data
In the next cell, you will use COPY INTO to ingest data into the table.  
  
In order to force the JSON data to fit the schema you used when you created the table, you will need to use `CAST` keyword. The syntax for `CAST` is `CAST(column AS data_type)`.  To use `CAST` with `COPY INTO`, replace the path in the `COPY INTO` command you learned in the previous lesson, with a SELECT query (make sure you include the parentheses):
  
  <code>(SELECT
  CAST(key AS BINARY) AS key,<br />
  CAST(offset AS BIGINT) AS offset,<br />
  CAST(partition AS INT) AS partition,<br />
  CAST(timestamp AS BIGINT) AS timestamp,<br />
  CAST(topic AS STRING) AS topic,<br />
  CAST(value AS BINARY) AS value<br />
FROM '${DA.paths.kafka_events}')</code>
  
Note: Because the data files are in JSON format, you will not need to use the "delimiter" or "header" options.

In [0]:
COPY INTO events_bronze
FROM (
  SELECT CAST(key AS BINARY) AS key,
  CAST(offset AS BIGINT) AS offset,
  CAST(partition AS INT) AS partition,
  CAST(timestamp AS BIGINT) AS timestamp,
  CAST(topic AS STRING) AS topic,
  CAST(value AS BINARY) AS value
FROM '${DA.paths.kafka_events}'
)
FILEFORMAT = JSON;

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
2252,2252,0





Manually review the table contents to ensure data was written as expected.

In [0]:
SELECT * FROM events_bronze LIMIT 20;

key,offset,partition,timestamp,topic,value
VlVFd01EQXdNREF4TURjek9UZ3dOVFE9,219255030,0,1593880885085,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltMWhhVzRpTENKbGRtVnVkRjkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREE0T0RVd016WXhNamtzSW1kbGJ5STY= (truncated)
VlVFd01EQXdNREF4TURjek9USTBOVGc9,219255043,0,1593880892303,clickstream,ZXlKa1pYWnBZMlVpT2lKcFQxTWlMQ0psWTI5dGJXVnlZMlVpT250OUxDSmxkbVZ1ZEY5dVlXMWxJam9pWVdSa1gybDBaVzBpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREF6TURBMk9UWTM= (truncated)
VlVFd01EQXdNREF4TURjek9UVTVOamc9,219255108,0,1593880889174,clickstream,ZXlKa1pYWnBZMlVpT2lKdFlXTlBVeUlzSW1WamIyMXRaWEpqWlNJNmUzMHNJbVYyWlc1MFgyNWhiV1VpT2lKd2NtVnRhWFZ0SWl3aVpYWmxiblJmY0hKbGRtbHZkWE5mZEdsdFpYTjBZVzF3SWpveE5Ua3pPRGd3T0RZeE1ETXc= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNekE9,219255118,0,1593880889725,clickstream,ZXlKa1pYWnBZMlVpT2lKcFQxTWlMQ0psWTI5dGJXVnlZMlVpT250OUxDSmxkbVZ1ZEY5dVlXMWxJam9pYjNKcFoybHVZV3dpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREE0T0RJME1qazU= (truncated)
VlVFd01EQXdNREF4TURjek9ESXlNek09,219438025,1,1593880886106,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmpYMmx1Wm04aUxDSmxkbVZ1ZEY5d2NtVjJhVzkxYzE5MGFXMWxjM1JoYlhBaU9qRTFPVE00T0RBek5qUXo= (truncated)
VlVFd01EQXdNREF4TURjek9ESXlNek09,219438069,1,1593880886106,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmpYMmx1Wm04aUxDSmxkbVZ1ZEY5d2NtVjJhVzkxYzE5MGFXMWxjM1JoYlhBaU9qRTFPVE00T0RBek5qUXo= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNemM9,219438089,1,1593880887640,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltUmxiR2wyWlhKNUlpd2laWFpsYm5SZmNISmxkbWx2ZFhOZmRHbHRaWE4wWVcxd0lqb3hOVGt6T0Rnd09EZ3k= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3hOVGs9,219438114,1,1593880894803,clickstream,ZXlKa1pYWnBZMlVpT2lKdFlXTlBVeUlzSW1WamIyMXRaWEpqWlNJNmUzMHNJbVYyWlc1MFgyNWhiV1VpT2lKdFlXbHVJaXdpWlhabGJuUmZkR2x0WlhOMFlXMXdJam94TlRrek9EZ3dPRGswTnpnNU5UYzVMQ0puWlc4aU9uc2k= (truncated)
VlVFd01EQXdNREF4TURjek56WTBOamM9,219438126,1,1593880888445,clickstream,ZXlKa1pYWnBZMlVpT2lKWGFXNWtiM2R6SWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmhjblFpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTROemsyTVRrNE5USTI= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNemM9,219438135,1,1593880887640,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltUmxiR2wyWlhKNUlpd2laWFpsYm5SZmNISmxkbWx2ZFhOZmRHbHRaWE4wWVcxd0lqb3hOVGt6T0Rnd09EZ3k= (truncated)






Run the cell below to confirm the data has been loaded correctly.

In [0]:
%python
import pyspark.sql.functions as F

suite = DA.tests.new("Validate events_bronze")
expected_table = lambda: spark.table("events_bronze")
suite.test_not_none(lambda: expected_table(), "Created the table \"events_bronze\"")
suite.test_equals(lambda: expected_table().count(), 2252, "The table should have 2252 records")

first_five = lambda: [r["timestamp"] for r in expected_table().orderBy(F.col("timestamp").asc()).limit(5).collect()]
suite.test_sequence(first_five, [1593879303631, 1593879304224, 1593879305465, 1593879305482, 1593879305746], True, "First 5 values are correct")

last_five = lambda: [r["timestamp"] for r in expected_table().orderBy(F.col("timestamp").desc()).limit(5).collect()]
suite.test_sequence(last_five, [1593881096290, 1593881095799, 1593881093452, 1593881093394, 1593881092076], True, "Last 5 values are correct")

suite.display_results()
assert suite.passed

Points,Test,Result
1,"Created the table ""events_bronze""",
1,The table should have 2252 records,
1,First 5 values are correct,
1,Last 5 values are correct,




## Create a Delta Table From Query Results

In addition to new events data, let's also load a small lookup table that provides product details that we'll use later in the course.
Use a CTAS statement to create a managed Delta table named **`item_lookup`** that extracts data from the parquet directory provided below.

In [0]:
CREATE OR REPLACE TABLE item_lookup
    AS SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/item-lookup`

num_affected_rows,num_inserted_rows






Run the cell below to confirm the lookup table has been loaded correctly.

In [0]:
%python
suite = DA.tests.new("Validate item_lookup")
expected_table = lambda: spark.table("item_lookup")
suite.test_not_none(lambda: expected_table(), "Created the table \"item_lookup\"")

actual_values = lambda: [r["item_id"] for r in expected_table().collect()]
expected_values = ['M_PREM_Q','M_STAN_F','M_PREM_F','M_PREM_T','M_PREM_K','P_DOWN_S','M_STAN_Q','M_STAN_K','M_STAN_T','P_FOAM_S','P_FOAM_K','P_DOWN_K']
suite.test_sequence(actual_values, expected_values, False, "Contains the 12 expected item IDs")

suite.display_results()
assert suite.passed

Points,Test,Result
1,"Created the table ""item_lookup""",
1,Contains the 12 expected item IDs,




 
Run the following cell to delete the tables and files associated with this lesson.

In [0]:
%python
DA.cleanup()

Resetting the learning environment:
| dropping the catalog "labuser9693205_1742837481_u8bu_da"...(0 seconds)

Validating the locally installed datasets:
| listing local files...(6 seconds)
| validation completed...(6 seconds total)



&copy; 2024 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the 
<a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>