---
cdt: "2024-09-19T12:16:21"
title: "clean and load chm, sequences"
description: "clean chm and break it into a sequences and chm tables, then load into the db."
status: "closed"
conclusion: "chm has been cleaned, sequence table has been created. chm has foregin keys to st and sequences for joining to them."
project: "database_etl"
---

following on from 'chm_extractor', recreate the chm metadata table.

In [None]:
# %xmode minimal
from pathlib import Path
import polars as pl
import duckdb as db
from database_etl.code.chm_extractor import extract_run_data
from database_etl.definitions import DB_PATH

pl.Config.set_tbl_rows(99).set_fmt_str_lengths(999)
con = db.connect(DB_PATH)

overwrite: bool = False

if overwrite:
    con.sql(
        """--sql
    drop table if exists chm;
    """
    )
    con.sql(
        """--sql
    drop table if exists sequences cascade;
    """
    )

# Load Raw Metadata


In [None]:
lib_path = Path("/Users/jonathan/mres_thesis/database_etl/data/raw_uv")
extract = False
if extract:
    for path in lib_path.glob("*.D"):
        extract_run_data(path=path, overwrite=True)

In [None]:
extract_paths = list(lib_path.glob("*.D/extract_*"))

metadata_dfs = []
for path in extract_paths:
    metadata_path = path / "metadata.parquet"
    metadata_dfs.append(
        pl.read_parquet(metadata_path).with_columns(
            pl.lit(str(path.parent)).alias("path")
        )
    )

metadata = pl.concat(metadata_dfs)
metadata.head()

Wow that is quick.

In [None]:
metadata.shape

# Clean Metadata

Now to recreate the 'clean' version of the table. What needs to be done?

clean the strings, rename the columns, format the acquisition dates, replace 116 'sigurd', clean the remaining runids.


In [None]:
con.sql(
    """--sql
select
    notebook,
    count(*) as count
from
    metadata
group by
    notebook
having
    count  > 1
"""
).pl()

In [None]:
con.sql(
    """--sql
drop sequence if exists chm_seq cascade;
create sequence chm_seq start 1;
"""
)

con.sql(
    """--sql
    create or replace table chm_loading (
    pk integer primary key default nextval('chm_seq'),
    ch_runid varchar unique,
    st_runid varchar unique,
    acq_date datetime unique,
    acq_method varchar not null,
    inj_vol float not null,
    seq_name varchar not null,
    seq_desc varchar,
    vialnum varchar not null,
    originalfilepath varchar not null,
    id varchar unique not null,
    description varchar,
    );
insert into chm_loading
select
    nextval('chm_seq') as pk,
    notebook as ch_runid,
    case
        when
            len(notebook) = 4
            and
                notebook[1] = '0'
            and
                (notebook[-1] = '1' or notebook[-1] = '2')
            then
                notebook[2:3]
        when
            notebook = 'mt-diff-bannock-pn'
        then
            'mt-diff-bannockburn-pn'
        when
            notebook = '2021-debortoli-cabernet-merlot_avantor'
        then
            '72'
        when
            notebook = 'stoney-rise-pn_02-21'
        then
            '73'
        when
            notebook = 'crawford-cab_02-21'
        then
            '74'
        when
            notebook = 'hey-malbec_02-21'
        then
            '75'
        when
            notebook = 'koerner-nellucio-02-21'
        then
            '76'
        when
            notebook = 'z3'
        then
            '00'
        when
            notebook = '116'
        then
            'sigurdcb'
        else
            notebook
        end
            as st_runid,
    cast(strptime(date, '%d-%b-%y, %H:%M:%S') as datetime) as acq_date,
    trim(lower(method)) as acq_method,
    injection_volume as inj_vol,
    replace(trim(lower(seq_name)), 'wines_2023-03-15_11-33-51', '2023-03-15_11:33:51') as seq_name,
    trim(lower(seq_desc)) as seq_desc,
    vialnum,
    trim(lower(originalfilepath)) as originalfilepath,
    id as id,
    trim(lower("desc")) as description,
from
    metadata;
select
    *
from
    chm_loading
limit
    5
"""
).pl()

is st_runid now a foreign key?


In [None]:
con.sql(
    """--sql
select
    *
from
    chm_loading
anti join
    st
on
    chm_loading.st_runid = st.runid
"""
).pl()

all entries in chm_loading.st_runid have a corresponding entry in st.


In [None]:
con.sql(
    """--sql
select
    *
from
    st
anti join
    chm_loading
on
    chm_loading.st_runid = st.runid
limit 5
"""
).pl()

Why is there a difference here to the results in the previous notebook [correcting_sampletracker_name](./correcting_sampletracker_name.ipynb)? So in the metadata table every runid has a match in st, but not the other way around, even within the raw subset. thats because there are subsets in st. If we want to use st as a base table for all of the sub projects we should label the projects..

anyway, move on. might have deleted those runs.. I think we're good? Finally, i'd like to swap out the runids for the pk.

In [None]:
con.sql(
    """--sql
describe chm_loading
"""
).pl()

In [None]:
cols = (
    con.sql(
        """--sql
create temp view chm_st_join as
    select
        chm.pk,
        chm.ch_runid as runid,
        chm.acq_date,
        chm.acq_method,
        chm.inj_vol,
        chm.seq_name,
        chm.seq_desc,
        chm.vialnum,
        chm.originalfilepath,
        chm.id,
        chm.description,
        st.pk as pk_st
    from
    chm_loading chm
join
    st
on
    chm.st_runid = st.runid;
select
    *
from
    chm_st_join
"""
    )
    .pl()
    .columns
)
len(cols)

make a sequence table.

In [None]:
try:
    con.sql(
        """--sql
    create table sequences (
        seq_name varchar primary key,
        dt_run datetime unique,
        seq_desc varchar
    );
    insert into sequences
        with
            parsing as (
            select
                distinct seq_name as seq_name,
                seq_name[11] as sep,
                seq_name[-8:-1] as hms,
                seq_name[-19:-10] as ymd,
                case
                    when
                        seq_name = 'singlesample'
                    then
                        null
                    else
                        cast(concat(ymd, 'T', replace(hms,'-',':')) as datetime)
                end as dt_run,
                seq_desc as seq_desc
            from
                chm_loading
                )
            select
                seq_name,
                dt_run,
                seq_desc
            from
                parsing
            order by
                dt_run
    """
    )

    con.sql(
        """--sql
    select
        *
    from
        sequences
    limit 10
    """
    ).pl().pipe(display)
except db.CatalogException as e:
    con.close()
    del con
    raise e

In [None]:
assert (
    con.sql(
        """--sql
    select
        count(*)
    from
    sequences
    """
    ).fetchone()[0]
    == 10
), "expect 10 unique sequences in the raw dataset"

In [None]:
try:
    con.sql(
        """--sql
    create table chm (
        pk integer primary key,
        runid varchar unique,
        pk_st integer references st(pk) not null,
        acq_date datetime unique,
        acq_method varchar not null,
        inj_vol float not null,
        seq_name varchar references sequences(seq_name),
        vialnum varchar not null,
        originalfilepath varchar not null,
        id varchar unique not null,
        description varchar,
    );
    insert into chm by name
        select
            pk as pk,
            runid as runid,
            pk_st as pk_st,
            acq_date as acq_date,
            acq_method as acq_method,
            inj_vol as inj_vol,
            seq_name as seq_name,
            vialnum as vialnum,
            originalfilepath as originalfilepath,
            id as id,
            description as description,
        from
            chm_st_join
        on conflict do nothing
        ;
    select
        *
    from
        chm
    limit 5
    """
    ).pl()
except db.CatalogException as e:
    con.close()
    del con
    raise e

In [None]:
con.sql(
    """--sql
create temp view left_join_counts_chm_st as
    select
        (
            select
                count(*) 
            from
                chm_loading chm
            left join
                st
            on
                chm.st_runid = st.runid
        ) as chm_st_left_joins,
        (
        select
            count(*)
        from
            chm_loading chm
        ) as total_chm;
select
    *
from
    left_join_counts_chm_st
"""
).pl().pipe(display)

assert con.sql(
    """--sql
select
    case
        when
            chm_st_left_joins = total_chm
        then
            true
        else
            false
        end
            as all_chm_joined
from
    left_join_counts_chm_st
"""
).fetchone()[0], "couldnt join all rows of chm to a row of st, check the runids."

In [None]:
con.sql(
    """--sql
describe chm
"""
).pl()

In [None]:
con.sql(
    """--sql
drop table if exists chm_loading;
drop view if exists chm_st_join;
drop view if exists left_join_counts_chm_st;
"""
)
con.sql(
    """--sql
show tables
"""
).pl().pipe(display)

con.close()
del con

# Conclusion

A clean `chm` table and a `sequences` table have been created. `chm` is restricted to the 'raw' dataset, and has a `pk_st` and `seq_name` foreign keys for joining to those tables. It has been confirmed that every entry of `chm` has a corresponding entry in `st`.