In [6]:
import logging
import os
import sys
from pathlib import Path
import dotenv
import json

import pandas as pd

In [7]:
logging.basicConfig(stream=sys.stdout)

In [8]:
project_name = "dbt-elt"
paths = []
for path in str(Path.cwd().absolute()).split("\\"):
    paths.append(path)
    if project_name in path:
        break
project_path = "\\".join(paths)
os.chdir(project_path)

In [9]:
from customlib.sqldao import DatabaseAccessObject

dotenv.load_dotenv(".env")

sql_conn = DatabaseAccessObject(db="MD")
sql_conn.query("show databases")

┌───────────────┐
│ database_name │
│    varchar    │
├───────────────┤
│ raw           │
│ sample_data   │
└───────────────┘

In [10]:
SCHEMA = "prod"

In [35]:
sql_conn.query("use raw;")
sql_conn.query(f"create schema if not exists '{SCHEMA}';")
sql_conn.query(f"set schema '{SCHEMA}';")

In [14]:
sql_conn.query(f"""
    CREATE OR REPLACE SECRET IN MOTHERDUCK (
        TYPE S3,
        KEY_ID '{os.environ['AWS_ACCESS_KEY_ID']}',
        SECRET '{os.environ['AWS_SECRET_ACCESS_KEY']}',
        REGION '{os.environ['REGION']}'
    );
""")

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

In [36]:
sql_conn.query_db(f"""
create table if not exists raw.{SCHEMA}.prod__hdb_resale_records (
    _id integer primary key,
    month text,
    town text,
    flat_type text,
    block text,
    street_name text,
    storey_range text,
    floor_area_sqm text,
    flat_model text,
    lease_commence_date text,
    remaining_lease text,
    resale_price text									
);
""")

In [17]:
sql_conn.query(f"SELECT count(*) FROM 's3://{os.environ['AMAZON_S3_BUCKET_NAME']}/dagster-elt/hdb-resale-records/hdb_resale_records_2018-01.json';")

┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│         1072 │
└──────────────┘

In [18]:
sql_conn.query(f"""
insert into raw.prod.prod__hdb_resale_records
select * 
from 's3://{os.environ['AMAZON_S3_BUCKET_NAME']}/dagster-elt/hdb-resale-records/*.json'
""")