##### This Notebook loads csv files from lakehouse, run sql analytics query using duckdb and saves into MotherDuck

##### Azure KeyVault has MotherDuck token that allows to authenticate, create a table and loads aggregated data into MotherDuck

##### Make sure you have a KeyVault created and a secret called mdtoken has been added

In [None]:
!pip install panda

In [None]:
!pip install dlt

In [None]:
import os
import dlt
import requests
import duckdb

In [None]:
md_token = notebookutils.credentials.getSecret("https://<akv name>.vault.azure.net/", "mdtoken")

In [None]:
conn = duckdb.connect(f'md:?motherduck_token={md_token}')

In [None]:
os.environ["DESTINATION__MOTHERDUCK__CREDENTIALS__PASSWORD"] = md_token

##### dltHub pipeline creates csv files with gzip compression enabled which is better for performance

In [None]:
options = {
            'header': True,
            'sep': ",",
            'compression': "gzip"
          }

options_str = ', '.join(f'{key}={repr(value)}' for key, value in options.items())

file_path = "/lakehouse/default/Files/load_raw_breaches_dataset/breaches/*.csv"

query = f"""
                SELECT domain
                       ,total_breaches
                FROM (
                        SELECT COALESCE(domain, 'unknown') as domain,
                               COUNT(*) as total_breaches 
                        FROM read_csv_auto('{file_path}', {options_str})
                        GROUP BY domain
                )
                WHERE total_breaches > 1
            """

In [None]:
import duckdb
import pandas as pd
import json

@dlt.resource(write_disposition="replace")
def fetch_data():
    df = conn.execute(query).fetchdf()
    json_data = df.to_json(orient='records')
    json_list = json.loads(json_data)
    yield json_list

pipeline = dlt.pipeline(
    pipeline_name="quick_start",
    destination="motherduck",
    dataset_name="mydata"
)

load_info = pipeline.run(fetch_data(), table_name="breachesanalytics")

conn.close()

print(load_info)

In [None]:
print(pipeline.last_trace)