In [None]:
import yaml
import pandas as pd
from pathlib import Path

import logging


log_format = "%(levelname)-8s %(asctime)s   %(message)s"
date_format = "%d/%m %H:%M:%S"
logging.basicConfig(format=log_format, datefmt=date_format, level=logging.DEBUG)



def clean_raw_excel_cemist_metadata(dbt_profile_path, outdir = "data_warehouse/soil", column_to_exclude = ['Sample no.', 'Created from CeMiSt A Code', 'CeMiSt\nPlasmid ID', 'Extract no.']):
    dbt_profile_path = Path(dbt_profile_path)
    dbt_dir = dbt_profile_path.parent

    logging.debug(f"Using dbt profile from directory: {dbt_dir.resolve()}")
    
    with open(str(dbt_profile_path), "r") as f:
        profile = yaml.safe_load(f)

    
    external_root = dbt_dir / profile["dbt_cemist"]["outputs"]["dev"]["external_root"]
    logging.debug(f"Reading external data from: {external_root.resolve()}")
    
    metadata = {}

    logging.info("Processing excel files...")
    logging.debug(f"Excluding columns: {column_to_exclude}")
    excel_files = [i for i in external_root.glob("*.xlsx") if i.name.startswith("CeMiSt")]
    for m in excel_files:
        logging.debug(f"Reading file: {m.name}")
        metadata_id = m.stem.replace(" ", "__")    
        df = pd.read_excel(m)
        logging.debug("Dropping empty rows...")
        logging.debug(f"Original row length: {len(df)}")
        df = df[df[[i for i in df.columns if i not in column_to_exclude]].notna().any(axis=1)]
        logging.debug(f"Cleaned row length: {len(df)}")
        metadata[metadata_id] = {"path" : m, "dataframe" : df}

    logging.debug(f"Replacing illegal SQL column character with '_'")
    for k, v in metadata.items():
        df = v["dataframe"]
        df.columns = df.columns.map(lambda x: x.replace(' ', '_').replace('\n', '').replace("(", "_").replace(")", "_").replace(",", "_").replace(".", "_").replace("/", "_").replace("?", "_").lower())
        v["dataframe"] = df

    logging.info(f"Writing output files to: {outdir}")
    outdir = Path(outdir)
    outdir.mkdir(parents=True, exist_ok=True)
    for k, v in metadata.items():
        outfile = outdir / f"{k}.parquet"
        logging.info(f"Converting {k} to parquet: {outfile}")
        df = v["dataframe"]
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime('%d-%m-%Y')
        prefix = k.split("_")
        prefix = "".join([prefix[1][0], prefix[2][0], prefix[0][:2]])
        pid_terms = ["sample_no_", "extract_no_", "cemistplasmid_id"]
        for pid_term in pid_terms:
            if pid_term in df.columns:
                logging.debug(f"Using {pid_term} to label items in {k} with prefix: {prefix}")
                df['pid'] = df[pid_term].apply(lambda x: f"{prefix}{str(x).zfill(4)}")
                df = df.set_index('pid')
        df = df.astype(str)
        logging.info(f"Writing output files: {outfile}")
        df.to_parquet(outfile)
        df.to_csv(str(outfile).replace(".parquet", ".csv"))

In [None]:
clean_raw_excel_cemist_metadata(dbt_profile_path, outdir = "data_warehouse/soil")

In [None]:
for k, v in metadata.items():
    print(k)

In [None]:
metadata['CeMiSt_Soil_B__Label__-__Environmental__analysis']["dataframe"].head(2)

In [None]:
metadata['CeMiSt_Soil_A__Label__-__Environmental__Samples']["dataframe"].head(2)

In [None]:
label = 'CeMiSt_Soil_A__Label__-__Environmental__Samples'
df = metadata['CeMiSt_Soil_A__Label__-__Environmental__Samples']["dataframe"]
prefix = label.split("_")
prefix = "".join([prefix[1][0], prefix[2][0], prefix[0][:2]])

In [None]:
df['pid'] = df['sample_no_'].apply(lambda x: f"{prefix}{str(x).zfill(4)}")
df = df.set_index('pid')

In [None]:
df