In [5]:
%session_id_prefix native-delta-dataframe-
%glue_version 3.0
%idle_timeout 60
%%configure
{
  "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
  "--datalake-formats": "delta"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Setting session ID prefix to native-delta-dataframe-
Setting Glue version to: 3.0
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog', '--datalake-formats': 'delta'}


# Create database

In [1]:
target_bucket = "lakehouse-deltalake"
source_bucket = "tpc-h-dataset"
tpc_h_scale_factor = 1
tpc_h_database_name = f"tpc_h_sf{tpc_h_scale_factor}"
tpc_h_tables = ["region", "nation", "customer", "lineitem", "orders", "part", "partsupp", "supplier"]

target_bucket_prefix = f"{target_bucket}/{tpc_h_database_name}/"
database_location = f"s3://{target_bucket_prefix}"

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: add91ad0-e394-4ba0-bc61-054fba98e207
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
--datalake-formats delta
Waiting for session add91ad0-e394-4ba0-bc61-054fba98e207 to get into ready status...
Session add91ad0-e394-4ba0-bc61-054fba98e207 has been created.



In [2]:
import boto3
import time

## Delete existing Files in DeltaLake S3 Bucket
s3 = boto3.resource('s3')
bucket = s3.Bucket(target_bucket)
bucket.objects.filter(Prefix=target_bucket_prefix).delete()

## Drop Tables in Glue Data Catalog
for table_name in tpc_h_tables:
    try:
        glue = boto3.client('glue')
        glue.delete_table(DatabaseName=tpc_h_database_name, Name=table_name)
    except glue.exceptions.EntityNotFoundException:
        print(f"Table {tpc_h_database_name}.{table_name} does not exist")


Table tpc_h_sf1.region does not exist
Table tpc_h_sf1.nation does not exist
Table tpc_h_sf1.customer does not exist
Table tpc_h_sf1.lineitem does not exist
Table tpc_h_sf1.orders does not exist
Table tpc_h_sf1.part does not exist
Table tpc_h_sf1.partsupp does not exist
Table tpc_h_sf1.supplier does not exist


In [3]:
try:
    glue = boto3.client('glue')
    res = glue.get_database(Name=tpc_h_database_name)
    print(f"Database {tpc_h_database_name} exists.")
    if 'LocationUri' not in res['Database']:
        print(f"Warning: Database {tpc_h_database_name} does not have Location. You need to configure location in the database.")
except glue.exceptions.EntityNotFoundException:
    print(f"Database {tpc_h_database_name} does not exist.")
    glue = glue.create_database(
        DatabaseInput={
            'Name': tpc_h_database_name,
            'LocationUri': database_location
        }
    )
    print(f"Created a new database {tpc_h_database_name}.")

Database tpc_h_sf1 does not exist.
Created a new database tpc_h_sf1.


# Create tables

In [None]:
start = time.time()

In [4]:
query = f"""
CREATE TABLE {tpc_h_database_name}.region
(
    R_REGIONKEY INT,
    R_NAME STRING,
    R_COMMENT STRING
) USING delta LOCATION '{database_location}region';
"""
spark.sql(query)

DataFrame[]


In [5]:
query = f"""
CREATE TABLE {tpc_h_database_name}.nation
(
    N_NATIONKEY INT,
    N_NAME STRING,
    N_REGIONKEY INT,
    N_COMMENT STRING
) USING delta LOCATION '{database_location}nation';
"""
spark.sql(query)

DataFrame[]


In [6]:
query = f"""
CREATE TABLE {tpc_h_database_name}.customer
(
    C_CUSTKEY INT,
    C_NAME STRING,
    C_ADDRESS STRING,
    C_NATIONKEY INT,
    C_PHONE STRING,
    C_ACCTBAL DOUBLE,
    C_MKTSEGMENT STRING,
    C_COMMENT STRING
) USING delta LOCATION '{database_location}customer';
"""
spark.sql(query)

DataFrame[]


In [7]:
query = f"""
CREATE TABLE {tpc_h_database_name}.lineitem
(
    L_ORDERKEY INT,
    L_PARTKEY INT,
    L_SUPPKEY INT,
    L_LINENUMBER INT,
    L_QUANTITY DOUBLE,
    L_EXTENDEDPRICE DOUBLE,
    L_DISCOUNT DOUBLE,
    L_TAX DOUBLE,
    L_RETURNFLAG STRING,
    L_LINESTATUS STRING,
    L_SHIPDATE DATE,
    L_COMMITDATE DATE,
    L_RECEIPTDATE DATE,
    L_SHIPINSTRUCT STRING,
    L_SHIPMODE STRING,
    L_COMMENT STRING
) USING delta LOCATION '{database_location}lineitem';
"""
spark.sql(query)

DataFrame[]


In [8]:
query = f"""
CREATE TABLE {tpc_h_database_name}.orders
(
    O_ORDERKEY INT,
    O_CUSTKEY INT,
    O_ORDERSTATUS STRING,
    O_TOTALPRICE DOUBLE,
    O_ORDERDATE DATE,
    O_ORDERPRIORITY STRING,
    O_CLERK STRING,
    O_SHIPPRIORITY INT,
    O_COMMENT STRING
) USING delta LOCATION '{database_location}orders';
"""
spark.sql(query)

DataFrame[]


In [9]:
query = f"""
CREATE TABLE {tpc_h_database_name}.part
(
    P_PARTKEY INT,
    P_NAME STRING,
    P_MFGR STRING,
    P_BRAND STRING,
    P_TYPE STRING,
    P_SIZE INT,
    P_CONTAINER STRING,
    P_RETAILPRICE DOUBLE,
    P_COMMENT STRING
) USING delta LOCATION '{database_location}part';
"""
spark.sql(query)

DataFrame[]


In [10]:
query = f"""
CREATE TABLE {tpc_h_database_name}.partsupp
(
    PS_PARTKEY INT,
    PS_SUPPKEY INT,
    PS_AVAILQTY INT,
    PS_SUPPLYCOST DOUBLE,
    PS_COMMENT STRING
) USING delta LOCATION '{database_location}partsupp';
"""
spark.sql(query)

DataFrame[]


In [11]:
query = f"""
CREATE TABLE {tpc_h_database_name}.supplier
(
    S_SUPPKEY INT,
    S_NAME STRING,
    S_ADDRESS STRING,
    S_NATIONKEY INT,
    S_PHONE STRING,
    S_ACCTBAL DOUBLE,
    S_COMMENT STRING
) USING delta LOCATION '{database_location}supplier';
"""
spark.sql(query)

DataFrame[]


In [None]:
duration = time.time() - start
print(f"Total to create tables: {duration}")

# Load Table Data from S3

In [29]:
from delta.tables import *

for table_name in tpc_h_tables:
    print(f"Importing table {table_name}")
    table_location = f"{database_location}{table_name}"
    csv_location = f"s3://{source_bucket}/sf{tpc_h_scale_factor}/{table_name}/"
    
    start = time.time()

    # Read the schema of the Delta table
    delta_table_schema = spark.read.format("delta").load(table_location).schema

    duration_1 = time.time() - start
    print(f"Time for reading schema of {table_name}: {duration_1}")
    start_2 = time.time()

    # Define the schema for the CSV file based on the Delta table schema
    csv_schema = StructType([StructField(field.name, field.dataType, field.nullable) for field in delta_table_schema])

    # Read the CSV file into a DataFrame with defined schema
    df_orders = spark.read.options(delimiter="|", header=False, inferSchema=True).schema(csv_schema).csv(csv_location)
    
    duration_2 = time.time() - start_2
    print(f"Time for reading csv {table_name}: {duration_2}")
    start_3 = time.time()

    df_orders.write.format("delta").mode("overwrite").option("path", table_location).saveAsTable(f"{tpc_h_database_name}.{table_name}")
    
    duration_3 = time.time() - start_3
    print(f"Time for writing delta table {table_name}: {duration_3}")
    duration = time.time() - start
    print(f"Total time for {table_name}: {duration}")

Importing table region
Time for reading schema of region: 0.08390641212463379
Time for reading csv region: 0.2007737159729004
Time for writing delta table region: 4.351106405258179
Total time for region: 4.635859727859497
Importing table nation
Time for reading schema of nation: 0.0602872371673584
Time for reading csv nation: 0.1536424160003662
Time for writing delta table nation: 4.272294521331787
Total time for nation: 4.486300468444824
Importing table customer
Time for reading schema of customer: 0.06468987464904785
Time for reading csv customer: 0.1550302505493164
Time for writing delta table customer: 4.602980375289917
Total time for customer: 4.822767019271851
Importing table lineitem
Time for reading schema of lineitem: 0.05806779861450195
Time for reading csv lineitem: 0.17192649841308594
Time for writing delta table lineitem: 21.426116943359375
Total time for lineitem: 21.656177043914795
Importing table orders
Time for reading schema of orders: 0.062352657318115234
Time for re