In [1]:
%%time
###################################################################################################
### IMPORTS
###################################################################################################
from time import gmtime, strftime
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()))

import warnings
warnings.filterwarnings('ignore')

import whyqd as qd
import modin.pandas as pd
from pathlib import Path
import re
import random

2023-11-02 15:53:41


2023-11-02 16:53:43,797	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


CPU times: user 1.97 s, sys: 1.15 s, total: 3.12 s
Wall time: 2.33 s


In [2]:
DIRECTORY = "/mnt/c/Users/turuk/Documents/GitHub/biohackathon-assets/test/data/bucket/"
FILENAMES = ["isa.study.xlsx", "isa.assay.xlsx"]
SCHEMA = "geo-format.SCHEMA"

In [3]:
%%time
###################################################################################################
### 1. IMPORT SOURCE BUCKET OF FILES / TABS
###################################################################################################
# 1. Loop bucket and extract all tables
# 2. Test is a process: fields 'Source Name', 'Sample Name', 'Raw Data File' 'Derived Data File'
# 3. Sort into list of df_processes and df_other
# , skip_blank_lines=True, set(['Source Name', 'Sample Name', 'Raw Data File' 'Derived Data File']) in set(df.columns)

# Special extra is identifying key-value terms only available in the process data files
special_other_terms = [
    {
        "source": "Parameter [Genome reference sequence]",
        "destination": "genome build/assembly",
        "value": "",
    },
    {
        "source": "Parameter [Processed data file format]",
        "destination": "processed data files format and content",
        "value": "",
    }
]

df_processes = []
df_other = []
for FILENAME in FILENAMES:
    datasource = qd.DataSourceDefinition()
    DATASOURCE_PATH = Path(DIRECTORY) / FILENAME
    MIMETYPE = "XLSX"
    datasource.derive_model(source=DATASOURCE_PATH, mimetype=MIMETYPE, keep_default_na=False) #, skip_blank_lines=True)
    for ds in datasource.get:
        ds_in = qd.DataSourceDefinition(source=ds)
        df_in = ds_in.get_data()
        df_in.dropna(how="all")
        if any([c.startswith("Unnamed:") for c in df_in.columns]):
            headers = df_in.iloc[0].values
            df_in.columns = headers
            df_in.drop(index=0, axis=0, inplace=True)
        if all([any([c.startswith("Input [") for c in df_in.columns]), any([c.startswith("Output [") for c in df_in.columns])]):
            df_processes.append(df_in)
            # Deal with the specials
            for special in special_other_terms:
                if special["source"] in df_in.columns:
                    special["value"] = df_in[special["source"]].iloc[0]                        
        else:
            df_other.append(df_in)

2023-11-02 16:53:49,343	INFO worker.py:1642 -- Started a local Ray instance.


CPU times: user 2.92 s, sys: 483 ms, total: 3.41 s
Wall time: 10.4 s


In [5]:
#df_in.loc[1, "Source Name"], df_in.loc[1, "Sample Name"]
len(df_other), len(df_processes)

(2, 4)

In [4]:
special_other_terms

[{'source': 'Parameter [Genome reference sequence]',
  'destination': 'genome build/assembly',
  'value': 'CCGAGTGGTA, GATTTAACGG'},
 {'source': 'Parameter [Processed data file format]',
  'destination': 'processed data files format and content',
  'value': 'mzML format'}]

In [155]:
# Sample data to emulate a branching tree structure for the ISA files, to be used in the recursive function

processes = [
    {
        "index": 0,
        "inputValue": "z",
        "outputValue": "a"
    },
    {
        "index": 1,
        "inputValue": "a",
        "outputValue": "b"
    },
    {
        "index": 2,
        "inputValue": "b",
        "outputValue": "c"
    },
    {
        "index": 3,
        "inputValue": "b",
        "outputValue": "f"
    },
    {
        "index": 4,
        "inputValue": "c",
        "outputValue": "e"
    },
    {
        "index": 5,
        "inputValue": "c",
        "outputValue": "d"
    },
    {
        "index": 6,
        "inputValue": "f",
        "outputValue": "g"
    },
    {
        "index": 7,
        "inputValue": "f",
        "outputValue": "h"
    },
    {
        "index": 8,
        "inputValue": "h",
        "outputValue": "i"
    },
]
random.shuffle(processes)
processes

[{'index': 4, 'inputValue': 'c', 'outputValue': 'e'},
 {'index': 5, 'inputValue': 'c', 'outputValue': 'd'},
 {'index': 1, 'inputValue': 'a', 'outputValue': 'b'},
 {'index': 3, 'inputValue': 'b', 'outputValue': 'f'},
 {'index': 0, 'inputValue': 'z', 'outputValue': 'a'},
 {'index': 7, 'inputValue': 'f', 'outputValue': 'h'},
 {'index': 2, 'inputValue': 'b', 'outputValue': 'c'},
 {'index': 8, 'inputValue': 'h', 'outputValue': 'i'},
 {'index': 6, 'inputValue': 'f', 'outputValue': 'g'}]

In [6]:
%%time
###################################################################################################
### 2. ORGANISE df_processes AND MERGE
###################################################################################################
# 1. Organise source -> sample -> raw -> derived
#    Going to use array of dicts with [source, sample, raw, derived]
# 2. Merge on the order and columns
# NOTE: this does not yet work for complex trees ... needs review and validation

def get_process_list(df_processes: list[pd.DataFrame]):
    processes = []
    for index, df_in in enumerate(df_processes):
        input = next(c for c in df_in.columns if c.startswith("Input ["))
        output = next(c for c in df_in.columns if c.startswith("Output ["))
        term = {
            "index": index,
            "input": input,
            "inputValue": df_in.loc[1, input],
            "output": output,
            "outputValue": df_in.loc[1, output]
        }
        processes.append(term)
    return processes

def get_root_process(processes: list[dict]):
    roots = []
    for prospect in processes:
        root = prospect
        for process in processes:
            if prospect["index"] == process["index"]:
                continue
            if prospect["inputValue"] == process["outputValue"]:
                root = None
                break
        if root:
            roots.append(root)
    return roots


def reduce_process_list(root: dict, processes: list[dict]):
    return [p for p in processes if p["index"] != root["index"]]

def order_processes(
    *, 
    processes: list[dict] = [], 
    ordered: list[dict] = [], 
    ordered_processes: list[list[dict]] = [], 
    done: set[int] = set(),
):
    roots = get_root_process(processes=processes)
    set_order = False
    if not ordered:
        set_order = True
    for root in roots:
        if set_order:
            ordered = [root]
        processes = reduce_process_list(root=root, processes=processes)
        # if not processes:
        #     print(root)
        #     ordered_processes.append(ordered)
        for process in processes:
            if root["outputValue"] == process["inputValue"]:
                ordered.append(process)
                if not any([process["outputValue"] == p["inputValue"] for p in processes]) and not process["index"] in done:
                    ordered_processes.append(ordered)
                    done.add(process["index"])
                    print(process["index"], "ordered", [p["index"] for p in ordered])
                else:
                    ordered_processes = order_processes(processes=processes, ordered=ordered, ordered_processes=ordered_processes, done=done)
    return ordered_processes

processes = get_process_list(df_processes=df_processes)
ordered_processes = order_processes(processes=processes)
ordered_processes

3 ordered [0, 1, 2, 3]
CPU times: user 120 ms, sys: 58.1 ms, total: 178 ms
Wall time: 191 ms


[[{'index': 0,
   'input': 'Input [Source Name]',
   'inputValue': '001_uncult_8°',
   'output': 'Output [Sample Name]',
   'outputValue': '001-007_uncult_8°_son'},
  {'index': 1,
   'input': 'Input [Sample Name]',
   'inputValue': '001-007_uncult_8°_son',
   'output': 'Output [Sample Name]',
   'outputValue': '001-007_uncult_8°_ext'},
  {'index': 2,
   'input': 'Input [Sample Name]',
   'inputValue': '001-007_uncult_8°_ext',
   'output': 'Output [Raw Data File]',
   'outputValue': '20210913_1558_001.fastq'},
  {'index': 3,
   'input': 'Input [Raw Data File]',
   'inputValue': '20210913_1558_001.fastq',
   'output': 'Output [Derived Data File]',
   'outputValue': '20210913_1558_001.mzml'}]]

In [7]:
%%time
###################################################################################################
### 2. Perform the merge
###################################################################################################

def get_merge_dataframe(df_processes: list[pd.DataFrame], ordered_processes: list[list[dict]]):
    chunks = []
    for processes in ordered_processes:
        root_process = processes[0]
        df_chunk = df_processes[root_process["index"]]
        for process in processes[1:]:
            root_on = [c for c in df_chunk.columns if c.startswith(root_process["output"])]
            if len(root_on) > 1:
                root_process["output"] = root_on[-1]
            try:
                df_chunk = df_chunk.merge(
                    df_processes[process["index"]], 
                    left_on=root_process["output"], 
                    right_on=process["input"],
                )
            except:
                return df_chunk
                
            root_process = process
        return df_chunk
        chunks.append(df_chunk)
    return pd.concat(chunks)

DIRECTORY = "/mnt/c/Users/turuk/Documents/GitHub/biohackathon-assets/test/data/"
FILENAME = "biohackathon.xlsx"
DATASOURCE_PATH = Path(DIRECTORY) / FILENAME

df = get_merge_dataframe(df_processes=df_processes, ordered_processes=ordered_processes)
df = df.drop_duplicates()
df.to_excel(DATASOURCE_PATH, index=False)

CPU times: user 5.53 s, sys: 245 ms, total: 5.78 s
Wall time: 8.26 s


In [8]:
%%time
###################################################################################################
### 3. Import data for crosswalk
###################################################################################################

DIRECTORY = "/mnt/c/Users/turuk/Documents/GitHub/biohackathon-assets/test/data/"
FILENAME = "biohackathon.xlsx"
SCHEMA = "geo-format.SCHEMA"
DATASOURCE_PATH = Path(DIRECTORY) / FILENAME
MIMETYPE = "XLSX"

datasource = qd.DataSourceDefinition()
datasource.derive_model(source=DATASOURCE_PATH, mimetype=MIMETYPE)

###################################################################################################
### 4. Derive source schema
###################################################################################################

#schema_source = qd.SchemaDefinition(source=datasource.get.dict())
schema_source = qd.SchemaDefinition()
schema_source.derive_model(data=datasource.get)
# Get script 'title'
prospects = [field.name for field in schema_source.fields.get_all() if field.name.startswith("Output [Sample Name]")]
prospects.sort()
title = prospects[-1]
SCRIPTS = [
    f"SELECT > 'library name' < ['{title}']",
    f"RENAME > 'title' < ['{title}']",
    "RENAME > 'organism' < ['Characteristic [organism]']",
    "RENAME > 'cell line' < ['Characteristic [cell line]']",
]
ignore = []
# Get 'cell type'
if schema_source.fields.get(name='Characteristics [Sample type]'):
    SCRIPTS.append("RENAME > 'cell type' < ['Characteristics [Sample type]']")
    ignore.append('Characteristics [Sample type]')
elif schema_source.fields.get(name='Characteristic [Sample type]'):
    SCRIPTS.append("RENAME > 'cell type' < ['Characteristic [Sample type]']")
    ignore.append('Characteristic [Sample type]')
# Get 'library strategy'
if schema_source.fields.get(name='Characteristic [library strategy]'):
    schema_source.fields.set_categories(name='Characteristic [library strategy]', terms=["paired-end", "single-end"])
    SCRIPTS.append("CATEGORISE  > 'single or paired-end'::'single' < 'Characteristic [library strategy]'::['single-end']")
    SCRIPTS.append("CATEGORISE  > 'single or paired-end'::'paired-end' < 'Characteristic [library strategy]'::['paired-end']")
    ignore.append('Characteristic [library strategy]')
# And the rest of the standards ['Component [instrument model]', 'Output [Raw Data File]', 'Output [Derived Data File]']
if schema_source.fields.get(name='Component [instrument model]'):
    SCRIPTS.append("RENAME > 'instrument model' < ['Component [instrument model]']")
    ignore.append('Component [instrument model]')
if schema_source.fields.get(name='Output [Raw Data File]'):
    SCRIPTS.append("RENAME > 'processed data file' < ['Output [Raw Data File]']")
    ignore.append('Output [Raw Data File]')
if schema_source.fields.get(name='Output [Derived Data File]'):
    SCRIPTS.append("RENAME > 'raw data file' < ['Output [Derived Data File]']")
    ignore.append('Output [Derived Data File]')
schema_source.save(directory=Path(DIRECTORY), filename="biohackathon-source.SCHEMA", created_by="Gavin Chait")

CPU times: user 5.23 s, sys: 115 ms, total: 5.34 s
Wall time: 5.56 s


True

In [9]:
%%time
###################################################################################################
### 5. Update destination schema and cover crosswalk
###################################################################################################

def get_new_destination_characteristics(fields: list[str], ignore: list[str] = []) -> list[dict]:
    characteristics = []
    for field in fields:
        if field.startswith("Characteristic") and field not in ignore:
            destination = re.search('\[(.+?)\]', field)
            if destination:
                term = {
                    "source": field,
                    "destination": destination.group(1)
                }
                characteristics.append(term)
    return characteristics

fields = [field.name for field in schema_source.fields.get_all() if field.name.startswith("Characteristic") and field not in ignore]
new_terms = get_new_destination_characteristics(fields=fields, ignore=ignore)

schema_destination = qd.SchemaDefinition(source=Path(DIRECTORY) / SCHEMA)
for term in new_terms:
    if not schema_destination.fields.get(name=term["destination"]):
        field = {
            "name": term["destination"],
            "type": "string"
        }
        schema_destination.fields.add(term=field)
        script = f"RENAME > '{term['destination']}' < ['{term['source']}']"
        SCRIPTS.append(script)

CPU times: user 18.6 ms, sys: 1.32 ms, total: 19.9 ms
Wall time: 21.9 ms


In [10]:
SCRIPTS

["SELECT > 'library name' < ['Output [Sample Name]_y']",
 "RENAME > 'title' < ['Output [Sample Name]_y']",
 "RENAME > 'organism' < ['Characteristic [organism]']",
 "RENAME > 'cell line' < ['Characteristic [cell line]']",
 "RENAME > 'cell type' < ['Characteristics [Sample type]']",
 "CATEGORISE  > 'single or paired-end'::'single' < 'Characteristic [library strategy]'::['single-end']",
 "CATEGORISE  > 'single or paired-end'::'paired-end' < 'Characteristic [library strategy]'::['paired-end']",
 "RENAME > 'instrument model' < ['Component [instrument model]']",
 "RENAME > 'processed data file' < ['Output [Raw Data File]']",
 "RENAME > 'raw data file' < ['Output [Derived Data File]']",
 "RENAME > 'Biological replicate' < ['Characteristics [Biological replicate]']",
 "RENAME > 'Isolate' < ['Characteristics [Isolate]']",
 "RENAME > 'Cultivar' < ['Characteristics [Cultivar]']",
 "RENAME > 'Ecotype' < ['Characteristics [Ecotype]']",
 "RENAME > 'Genotype' < ['Characteristics [Genotype]']",
 "RENA

In [11]:
%%time
###################################################################################################
### 6. Create crosswalk and perform transform
###################################################################################################

# Crosswalk
crosswalk = qd.CrosswalkDefinition()
crosswalk.set(schema_source=schema_source.get.dict(), schema_destination=schema_destination.get.dict())
crosswalk.actions.add_multi(terms=SCRIPTS)
# Transform
transform = qd.TransformDefinition(crosswalk=crosswalk, data_source=datasource.get)
transform.process()
transform.save(directory=DIRECTORY, filename="biohackathon-out", mimetype="XLSX")
# Deduplicate in file
df = transform.data
df = df.drop_duplicates()
df.to_excel(Path(transform.model.dataDestination.path), index=False)

CPU times: user 9.15 s, sys: 946 ms, total: 10.1 s
Wall time: 9.06 s


In [12]:
%%time
###################################################################################################
### 7. Create output file
###################################################################################################

# Get the distinct fields as lists
df_output = pd.DataFrame()
max_lengths = {}
rows = []
for library, df_group in df.groupby('library name'):
    row = []
    for c in df_group.columns:
        terms = df_group[c].unique()
        if len(terms) == 1 and pd.isnull(terms[0]):
            continue
        row.append({
            c: terms
        })
        if max_lengths.get(c, 0) <= len(terms):
            max_lengths[c] = len(terms)
    rows.append(row)
# Process into a headerless dataframe
chunks = []
# Headers
data = []
processed = []
raw = []
for field, value in max_lengths.items():
    dv = [field] * value
    if field == "processed data file":
        processed = dv
    elif field == "raw data file":
        raw = dv
    else:
        data.extend(dv)
data.extend(processed)
data.extend(raw)
chunks.append(pd.DataFrame(data=[data]))
# Rows
for row in rows:
    data = []
    processed = []
    raw = []
    for term in row:
        for field, values in term.items():
            max = max_lengths.get(field, 1)
            dv = list(values) + [""] * (max - len(values))
            if field == "processed data file":
                processed = dv
            elif field == "raw data file":
                raw = dv
            else:
                data.extend(dv)
    data.extend(processed)
    data.extend(raw)
    chunks.append(pd.DataFrame(data=[data]))
dfo = pd.concat(chunks)

CPU times: user 5.13 s, sys: 2.77 s, total: 7.9 s
Wall time: 5.79 s


In [80]:
other_terms = [
    {
        "source": "Study Title",
        "destination": "title",
        "value": "",
    },
    {
        "source": "Study Description",
        "destination": "summary (abstract)",
        "value": "",
    },
    {
        "source": "Study Design Type",
        "destination": "experimental design",
        "value": "",
    },
]

contributors = []
contributors_prefered = [
    {
        "first": "Assay Person First Name",
        "last": "Assay Person Last Name",
        "mid": "Assay Person Mid Initials",
    },
    {
        "first": "Study Person First Name",
        "last": "Study Person Last Name",
        "mid": "Study Person Mid Initials",
    }
]
    

for df_o in df_other:
    for other in other_terms:
        # general
        index = list(df_o[df_o[df_o.columns[0]] == other["source"]].index)
        if index:
            for k, v in df_o.iloc[index[0]-1].items():
                if k == other["source"]:
                    continue
                other["value"] = v
    for contr in contributors_prefered:
        # contributors
        index = list(df_o[df_o[df_o.columns[0]] == contr["last"]].index)
        if index:
            lasts = [t for t in df_o.iloc[index[0]-1].to_list() if t != contr["last"]]
            firsts = [t for t in df_o.iloc[index[0]].to_list() if t != contr["first"]]
            mids = [t for t in df_o.iloc[index[0]+1].to_list() if t != contr["mid"]]
            contributors.extend([", ".join([c for c in [t, m, l] if c]) for t, m, l in zip(firsts, mids, lasts)])

In [93]:
chunks = []
# STUDY
chunks.append(pd.DataFrame(data=[["STUDY"]]))
for other in other_terms:
    data = [other["destination"], other["value"]]
    chunks.append(pd.DataFrame(data=[data]))
contributors.sort()
for contr in contributors:
    data = ["contributor", contr]
    chunks.append(pd.DataFrame(data=[data]))
chunks.append(pd.DataFrame(data=[[""]]))
# SAMPLES
chunks.append(pd.DataFrame(data=[["SAMPLES"]]))
chunks.append(dfo)
chunks.append(pd.DataFrame(data=[[""]]))
# PROTOCOL
chunks.append(pd.DataFrame(data=[["PROTOCOL"]]))
for other in special_other_terms:
    data = [other["destination"], other["value"]]
    chunks.append(pd.DataFrame(data=[data]))
df_output = pd.concat(chunks).fillna('')

In [96]:
DIRECTORY = "/mnt/c/Users/turuk/Documents/GitHub/biohackathon-assets/test/data/"
FILENAME = "biohackathon-output.xlsx"
DATASOURCE_PATH = Path(DIRECTORY) / FILENAME
df_output.to_excel(DATASOURCE_PATH, index=False, header=False)