In [None]:
!pip install duckdb

In [None]:
dbutils.widgets.text("CATALOG", "default")
dbutils.widgets.text("SCHEMA", "default")
dbutils.widgets.text("MAX_WORKERS", "default")

In [None]:
import duckdb, datetime

conn = duckdb.connect(database=':memory:')
conn.sql("INSTALL httpfs; INSTALL azure; INSTALL DELTA")
conn.sql("LOAD httpfs; LOAD azure; LOAD DELTA")

conn.sql("SELECT extension_name, extension_version, installed_from, install_mode FROM duckdb_extensions()").show()

conn.sql("set azure_transport_option_type = 'curl'")

tables = [
    "nation", "region", "part", "supplier",
    "partsupp", "customer", "orders", "lineitem"
]

conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/customer' AS customer (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/lineitem' AS lineitem (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/nation' AS nation (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/orders' AS orders (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/part' AS part (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/partsupp' AS partsupp (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/region' AS region (TYPE delta, PIN_SNAPSHOT);")
conn.sql("ATTACH 'abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/supplier' AS supplier (TYPE delta, PIN_SNAPSHOT);")

# nation =    conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/nation')")
# region =    conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/region')")
# part =      conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/part')")
# supplier =  conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/supplier')")
# partsupp =  conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/partsupp')")
# customer =  conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/customer')")
# orders =    conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/orders')")
# lineitem =  conn.sql("SELECT * FROM delta_scan('abfss://dbrkext@adlsdbrkstore.dfs.core.windows.net/tpch/tpch100/lineitem')")

In [None]:
from queries import *
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_query(q_num):
    print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') } Starting query {q_num}...")
    df = conn.sql(eval(f"tpch100_q{str(q_num)}"))
    return (q_num, len(df))

In [None]:
with ThreadPoolExecutor(max_workers= int(dbutils.widgets.get('MAX_WORKERS'))) as executor:
    futures = [ executor.submit(run_query, q_num) for q_num in range(1,23) ]
    for completed_future in as_completed(futures):
        try:
            q_num, result = completed_future.result()
            print("\n")
            print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Query {q_num} returned {result} rows")
        except Exception as e:
            print(f"Query generated an exception: {e}")
            #