Cat-merge is currently implemented using Pandas, this is an experiment to follow the same steps using Duckdb

In [2]:
# install jupysql for pretty SQL output
!pip install jupysql
%load_ext sql


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[33mThere's a new jupysql version available (0.10.3), you're running 0.10.2. To upgrade: pip install jupysql --upgrade[0m


In [3]:
import duckdb

# config
input_dir = "../output/transform_output"
output_dir = "../output"
qc_dir = "../output/qc"
kg_name = "monarch-kg"
mapping_files = ['../data/monarch/mondo.sssom.tsv', '../data/monarch/gene_mappings.tsv', '../data/monarch/chebi-mesh.biomappings.sssom.tsv']

# initialize the database
db = duckdb.connect(database=f"{output_dir}/monarch-kg.duckdb")
%sql db

Loading multiple TSV files with overlapping but distinct schemas is a built-in feature of duckdb. 

The filename=true parameter of read_csv_auto captures the original filename, including the path, our provided_by value can be regex captured from this. The nested select allows for excluding the provided_by column which is populated with infores curies for a few ingests node ingests, but can't be included for edge ingests. This is a little awkward. We should probably pull the provided_by usage from those columns.


In [4]:
for file in ["edges", "nodes"]:
    db.execute(f"""
        CREATE OR REPLACE TABLE all_{file} as (
            SELECT * 
               EXCLUDE('filename'), 
               regexp_extract(filename, '/([a-z_]+).tsv', 1) as provided_by 
            FROM (
                SELECT * { "EXCLUDE('provided_by')" if file == 'nodes' else '' } 
                FROM read_csv_auto('{input_dir}/*_{file}.tsv', sep='\t', header=TRUE, union_by_name=true, filename=true)
            )
        );
    """)



FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [23]:
%sql select * from all_nodes limit 3;


id,category,name,description,xref,synonym,full_name,in_taxon,in_taxon_label,symbol,provided_by
ZFIN:ZDB-GENE-010226-1,biolink:Gene,gdnfa,,PANTHER:PTHR12173|NCBIGene:79379|ZFIN:ZDB-GENE-010226-1|ENSEMBL:ENSDARG00000039959|UniProtKB:A7E220|UniProtKB:Q98TU0,gdnf,glial cell derived neurotrophic factor a,NCBITaxon:7955,Danio rerio,gdnfa,alliance_gene_nodes
ZFIN:ZDB-GENE-000823-4,biolink:Gene,hoxa4a,,UniProtKB:Q9PWL5|PANTHER:PTHR24326|ZFIN:ZDB-GENE-000823-4|UniProtKB:A0A8M9PB41|UniProtKB:A0A8M9PNI5|NCBIGene:58050,hoxx4|ZF-26|zf-es36|hoxzf26|zf26|ns:zf-es36|im:6899783,homeobox A4a,NCBITaxon:7955,Danio rerio,hoxa4a,alliance_gene_nodes
ZFIN:ZDB-GENE-220914-1,biolink:Gene,uts2r2,,ENSEMBL:ENSDARG00000115189|ZFIN:ZDB-GENE-220914-1|NCBIGene:110440145,LO017791.1,urotensin 2 receptor 2,NCBITaxon:7955,Danio rerio,uts2r2,alliance_gene_nodes


In [24]:
%sql select * from all_edges limit 3;


id,subject,predicate,object,category,aggregator_knowledge_source,primary_knowledge_source,publications,has_evidence,negated,qualifiers,stage_qualifier,relation,frequency_qualifier,onset_qualifier,sex_qualifier,provided_by
uuid:4e2e2a42-7368-11ee-8e44-e629eea977ba,ZFIN:ZDB-GENE-210324-7,biolink:has_phenotype,ZP:0012643,biolink:GeneToPhenotypicFeatureAssociation,infores:monarchinitiative,infores:zfin,ZFIN:ZDB-PUB-201209-13,,,,,,,,,zfin_gene_to_phenotype_edges
uuid:4e2e3050-7368-11ee-8e44-e629eea977ba,ZFIN:ZDB-GENE-210324-7,biolink:has_phenotype,ZP:0002478,biolink:GeneToPhenotypicFeatureAssociation,infores:monarchinitiative,infores:zfin,ZFIN:ZDB-PUB-201209-13,,,,,,,,,zfin_gene_to_phenotype_edges
uuid:4e2e3406-7368-11ee-8e44-e629eea977ba,ZFIN:ZDB-GENE-210324-8,biolink:has_phenotype,ZP:0012643,biolink:GeneToPhenotypicFeatureAssociation,infores:monarchinitiative,infores:zfin,ZFIN:ZDB-PUB-201209-13,,,,,,,,,zfin_gene_to_phenotype_edges


In [25]:
# Performance when deleting dangling_edges from edges seems poor, so creating two tables as separate selects from a table that gets dropped may make more sense

db.execute("""
create or replace table dangling_edges as
select * from all_edges where not exists (select 1 from nodes where id = subject)
union
select * from all_edges where not exists (select 1 from nodes where id = object)
""")

db.execute("""
create or replace table edges as
select * from all_edges where exists (select 1 from nodes where id = subject)
union
select * from all_edges where exists (select 1 from nodes where id = object)
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x11fb1ce70>

In [26]:
# create a duplicate nodes table that includes any node in all_nodes that has a duplicate id
db.execute("""
create or replace table duplicate_nodes as
select * from all_nodes where id in (select id from all_nodes group by id having count(*) > 1)
""")

# create a nodes table that excludes any duplicate nodes (based on id) from the all_nodes table
db.execute("""
create or replace table nodes as
select * from all_nodes where id not in (select id from duplicate_nodes)
""")


<duckdb.duckdb.DuckDBPyConnection at 0x11fb1ce70>

In [27]:
db.execute("""
drop table all_nodes;
drop table all_edges;
""")


<duckdb.duckdb.DuckDBPyConnection at 0x11fb1ce70>

In [28]:
# create indexes on nodes.id, edges.id, edges.subject, edges.object

db.execute("""
create unique index nodes_id on nodes(id);
create unique index edges_id on edges(id);
create index edges_subject on edges(subject);
create index edges_object on edges(object);
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x11fb1ce70>

In [34]:
# export nodes, edges, duplicate_nodes and dangling_edges tables to TSV and parquet

db.execute(f"""
copy (select * from nodes) to '{output_dir}/nodes.tsv' (header, delimiter '\t');
copy (select * from edges) to '{output_dir}/edges.tsv' (header, delimiter '\t');
copy (select * from duplicate_nodes) to '{qc_dir}/duplicate_nodes.tsv' (header, delimiter '\t');
copy (select * from dangling_edges) to '{qc_dir}/dangling_edges.tsv' (header, delimiter '\t');
""")



<duckdb.duckdb.DuckDBPyConnection at 0x11e80e2f0>

In [36]:
db.execute(f"""
copy (select * from nodes) to '{output_dir}/{kg_name}_nodes.parquet' (format parquet);
copy (select * from edges) to '{output_dir}/{kg_name}_edges.parquet' (format parquet);
copy (select * from duplicate_nodes) to '{qc_dir}/{kg_name}-duplicate_nodes.parquet' (format parquet);
copy (select * from dangling_edges) to '{qc_dir}/{kg_name}-dangling_edges.parquet' (format parquet);
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x11e80e2f0>

In [5]:


db.execute(f"""
 create or replace table mappings as select * from read_csv_auto('{mapping_files[1]}', sep='\t', header=TRUE);
""")


<duckdb.duckdb.DuckDBPyConnection at 0x143426fb0>

In [66]:
db.execute(f"""
 select * exclude(subject, object),
    case when subject_mappings.subject_id is not null then subject_mappings.subject_id else subject end as subject 
 from all_edges 
 left join mappings as subject_mappings on subject = subject_mappings.object_id 
""")


SyntaxError: Missing parentheses in call to 'print'. Did you mean print(...)? (183395698.py, line 1)

In [13]:
%%sql

select * exclude(subject, object),
    case when subject_mappings.subject_id is not null then subject_mappings.subject_id else subject end as subject,
    case when subject_mappings.subject_id is not null then subject else null end as original_subject,
    case when subject_mappings.subject_id is not null then subject_mappings.subject_id else subject end as object,
    case when subject_mappings.subject_id is not null then subject else null end as original_subject,
 from all_edges 
 left join mappings as subject_mappings on subject = subject_mappings.object_id 

Count
12284692


count_star()
12276486
