In [1]:
import pandas
import psycopg2
import numpy
import time
import io
import timeit
import common
import datetime

In [2]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [3]:
conn = psycopg2.connect(
    dbname='coveo',
    host='',
    user='public_loader',
    password='',
    application_name = 'steger loader_notebook'
)
C = conn.cursor()

In [4]:
schema = 'datahub_0'

In [5]:
common_comment = 'upgrade meta 230223'

In [6]:
class TimeLogCommit:
    def __init__(self, task, table_name = None, commit = True, verbose=True):
        self.table_name = table_name
        self.task = task
        self.verbose = verbose
        self.commit = commit

    def __enter__(self):
        self.t0 = datetime.datetime.now()
        self.start = timeit.default_timer()

    def __exit__(self, exc_type, exc_value, traceback):
        self.took = (timeit.default_timer() - self.start)
        if self.table_name:
            C.execute(f"""
INSERT INTO {schema}.merge_log
VALUES ('{common_comment}', '{self.table_name}', '{self.task}', '{self.t0}', '{datetime.datetime.now()}');
""")
        if self.commit:
            conn.commit()
        if self.verbose:
            t = f'on {self.table_name} ' if self.table_name else ' '
            print(f'\n\033[38;5;208mCode block {self.task} {t}took:\t{self.took:.5f} seconds\033[0;0m')

# 2023 23 Feb dataset

In [7]:
archive = '/v/volumes/coveoarchive/TMP/meta_20230223.dat.gz'

In [8]:
url = 'https://www.ebi.ac.uk/ena/portal/api/search?result=read_run&query=tax_tree(2697049)&fields=accession%2Csample_accession%2Cexperiment_accession%2Cstudy_accession%2Cdescription%2Ccountry%2Ccollection_date%2Cfirst_created%2Cfirst_public%2Chost%2Chost_sex%2Chost_tax_id%2Chost_body_site%2Cbio_material%2Cculture_collection%2Cinstrument_model%2Cinstrument_platform%2Clibrary_layout%2Clibrary_name%2Clibrary_selection%2Clibrary_source%2Clibrary_strategy%2Csequencing_method%2Cisolate%2Cstrain%2Cbase_count%2Ccollected_by%2Cbroker_name%2Ccenter_name%2Csample_capture_status%2Cfastq_ftp%2Ccollection_date_submitted%2Cchecklist&format=tsv&limit=0'

In [9]:
! curl "{url}" | gzip -9 > {archive}

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 3399M    0 3399M    0     0  1915k      0 --:--:--  0:30:17 --:--:--  838k7k      0 --:--:--  0:01:34 --:--:--  911k0     0  1981k      0 --:--:--  0:03:58 --:--:--  184k --:--:--  0:09:49 --:--:-- 5857k  0:10:05 --:--:-- 4151k 0     0  2068k      0 --:--:--  0:10:17 --:--:-- 2920k:10 --:--:-- 2655k5 --:--:--  147k --:--:--  163k      0 --:--:--  0:17:21 --:--:--  323k0  1959k      0 --:--:--  0:17:43 --:--:-- 3825k367M    0 2367M    0     0  1913k      0 --:--:--  0:21:07 --:--:--  255kM    0     0  1935k      0 --:--:--  0:21:32 --:--:--  749k:28:00 --:--:-- 2608k 0 --:--:--  0:28:39 --:--:--  238k0  1943k      0 --:--:--  0:28:44 --:--:--  156k-:--:--  0:29:07 --:--:-- 3891k  137k


In [10]:
with TimeLogCommit(task = 'load dataset', commit = False):
    meta = pandas.read_csv(archive, on_bad_lines = 'error', sep = '\t')

  meta = pandas.read_csv(archive, on_bad_lines = 'error', sep = '\t')



[38;5;208mCode block load dataset  took:	55.70363 seconds[0;0m


## Redundáns vagy üres oszlopok

In [11]:
meta['host_body_site'].unique()

array([nan])

In [12]:
meta['bio_material'].unique()

array([nan])

In [13]:
meta.drop(columns = ['host_body_site', 'bio_material', 'sample_accession'], inplace = True)

## Table `instrument`

In [14]:
with TimeLogCommit(task = 'prepare instrument', commit = False):
    instrument_db = pandas.read_sql(sql = f'SELECT * FROM {schema}.instrument', con = conn)
    inst = meta[['instrument_platform', 'instrument_model']].copy().drop_duplicates()
    _j = pandas.merge(
        left = instrument_db, right = inst,
        left_on = ('instrument_platform', 'instrument_model'),
        right_on = ('instrument_platform', 'instrument_model'),
        how = 'right'
    )
    inst_new = _j[_j['id'].isna()].reset_index().drop(columns=['id', 'index'])
    inst_new.index += 1 + max(instrument_db['id']) if instrument_db.shape[0] > 0 else 1




[38;5;208mCode block prepare instrument  took:	4.14183 seconds[0;0m


In [15]:
inst_new

Unnamed: 0,instrument_platform,instrument_model


#### Merge necessary

In [16]:
if inst_new.shape[0] > 0:
    with TimeLogCommit(task = 'insert items', table_name = 'instrument'):
        pipe = io.StringIO()

        inst_new.to_csv(
            pipe, sep = '\t', header = False, index = True
        )
        pipe.seek(0)
        C.copy_expert(f"COPY {schema}.instrument FROM STDIN WITH (format csv, delimiter '\t')", pipe)
        pipe.close()


## Table `host`

In [17]:
with TimeLogCommit(task = 'prepare host', commit = False):
    host_db = pandas.read_sql(sql = f'SELECT * FROM {schema}.host', con = conn)
    host_truncate_1 = lambda x: 'Macaca mulatta' if isinstance(x, str) and x.lower().startswith('macaca mulatta') else x
    meta['host'] = meta['host'].apply(host_truncate_1)
    host_truncate_2 = lambda x: 'Homo sapiens' if isinstance(x, str) and x.lower().startswith('homo sapiens') else x
    meta['host'] = meta['host'].apply(host_truncate_2)
    host_truncate_3 = lambda x: 'Macaca fascicularis' if isinstance(x, str) and x.lower().startswith('macaca fascicularis') else x
    meta['host'] = meta['host'].apply(host_truncate_3)
    host_truncate_3 = lambda x: 'Cercopithecus aethiops' if isinstance(x, str) and x.lower().startswith('cercopithecus aethiops') else x
    meta['host'] = meta['host'].apply(host_truncate_3)
    host_map = {
        'Homo Sapien': 'Homo sapiens',
        'Homo Sapiens': 'Homo sapiens',
        'homo sapiens': 'Homo sapiens',
        'Homo sapiens': 'Homo sapiens', 
        'HomoSapiens': 'Homo sapiens', 
        'homo sapien': 'Homo sapiens', 
        'Human': 'Homo sapiens', 
        'human': 'Homo sapiens', 
        'Homo\xa0sapiens\xa0': 'Homo sapiens',
        'mus musculus': 'Mus musculus',
    }
    host_mapper = lambda x: host_map[x] if x in host_map else x
    meta['host'] = meta['host'].apply(host_mapper)

    host = meta[['host', 'host_tax_id']].drop_duplicates().dropna().sort_values(['host']).reset_index(drop = True)
    host = host.astype({'host_tax_id': pandas.Int64Dtype()})
    host_joined = host.merge(host_db, left_on = 'host_tax_id', right_on = 'tax_id', how = 'left')
    m_new = host_joined['id'].isna()
    host_new = host_joined[m_new][['host_x', 'host_tax_id']].reset_index(drop = True).reset_index()
    host_new['index'] += 1 + host_db['id'].max() if host_db.shape[0] > 0 else 1




[38;5;208mCode block prepare host  took:	16.40497 seconds[0;0m


In [18]:
meta['host'].sort_values().unique(), len(meta['host'].sort_values().unique())

(array(['Canis lupus familiaris', 'Cercocebus aethiops',
        'Cercopithecus aethiops', 'Chlorocebus aethiops',
        'Chlorocebus sabaeus', 'Crocuta crocuta', 'Felis catus', 'Gorilla',
        'Homo sapiens', 'Lagothrix lagotricha', 'Macaca',
        'Macaca fascicularis', 'Macaca mulatta', 'Mesocricetus auratus',
        'Mus musculus', 'Mustela putorius furo', 'Neogale vison',
        'Neovison vison', 'Odocoileus virginianus', 'Panthera leo',
        'Panthera pardus', 'Panthera tigris jacksoni',
        'Phodopus roborovskii', 'Rhesus macaque',
        'Severe acute respiratory syndrome coronavirus 2', 'not provided',
        nan], dtype=object),
 27)

In [19]:
meta[['host', 'host_tax_id']].drop_duplicates().sort_values(['host'])

Unnamed: 0,host,host_tax_id
3142719,Canis lupus familiaris,9615.0
225633,Cercocebus aethiops,
286278,Cercopithecus aethiops,9534.0
366847,Cercopithecus aethiops,
102958,Chlorocebus aethiops,9534.0
1018171,Chlorocebus sabaeus,60711.0
4281455,Crocuta crocuta,9678.0
3116801,Felis catus,9685.0
2101084,Gorilla,9592.0
3,Homo sapiens,9606.0


<font color="red">Keep only those where tax id and tax name is present</font>

In [20]:
host_new

Unnamed: 0,index,host_x,host_tax_id


#### Merge needed

In [21]:
if host_new.shape[0] > 0:
    with TimeLogCommit(task = 'insert items', table_name = 'instrument'):
        pipe = io.StringIO()
        host_new[['index', 'host_x', 'host_tax_id']].to_csv(
            pipe, sep = '\t', header = False, index = False
        )
        pipe.seek(0)
        C.copy_expert(f"COPY {schema}.host FROM STDIN WITH (format csv, delimiter '\t')", pipe)
        pipe.close()

## Table `metadata`

In [22]:
with TimeLogCommit(task = 'append', table_name = 'runid'):
    the_map = common.Map(conn, C, f'{schema}.runid')
    
    metadata_old = pandas.read_sql(f"""
SELECT runid, collection_date, collection_date_valid, country_id,
       host_id, host_sex, instrument_id, 
       sample_accession, study_accession, experiment_accession
FROM {schema}.metadata
    """, con = conn)

    run_id = meta[['run_accession']].copy()

    run_id_map = pandas.merge(
        left = run_id, right = the_map.from_db,
        left_on = 'run_accession', right_on = 'ena_run',
        how = 'left'
    )

    runid_new = run_id_map[run_id_map['ena_run'].isna()].reset_index()
    runid_new['id'] = runid_new.index + the_map.largest_id
    runid_new.drop(columns=['index', 'ena_run'], inplace=True)
    runid_new.rename(columns={'run_accession': 'ena_run'}, inplace=True)

    pipe = io.StringIO()
    runid_new[['id', 'ena_run']].to_csv(
        pipe, sep = '\t', header = False, index = False
    )
    pipe.seek(0)
    C.copy_expert(f"COPY {schema}.runid FROM STDIN WITH (format csv, delimiter '\t')", pipe)
    pipe.close()



2023-02-23 10:25:46.379397 #6187417 ena_run items in db, largest id=6187417





[38;5;208mCode block append on runid took:	53.06522 seconds[0;0m


In [23]:
with TimeLogCommit(task = 'prepare country', commit = False):
    the_map = common.Map(conn, C, f'{schema}.runid')
    country_back = pandas.read_sql(sql = f'SELECT * FROM {schema}.country', con = conn)

    metadata = pandas.merge(
        left = meta[['run_accession', 'collection_date', 'collection_date_submitted', 'country', 'host_tax_id', 'host_sex', 'instrument_model', 'instrument_platform', 'accession', 'study_accession', 'experiment_accession']].copy(),
        right = the_map.from_db,
        left_on = 'run_accession', right_on = 'ena_run',
        how = 'inner'
    )

    metadata.rename(columns={'id':'runid'}, inplace=True)

    d_none = metadata['collection_date'].isna()
    jan1 = metadata[~ d_none]['collection_date'].apply(lambda x: x.endswith('-01-01'))
    jan1_match = metadata[~ d_none][jan1][['collection_date', 'collection_date_submitted']].apply(lambda x: x[1].startswith(x[0]), axis = 1)
    metadata['collection_date_valid'] = ~ d_none
    jan1idx = metadata.loc[~ d_none].loc[jan1].index 
    metadata.loc[jan1idx, 'collection_date_valid'] = jan1_match

    metadata['country'] = metadata['country'].apply(lambda x: None if pandas.isna(x) else x.split(':')[0])
    country_map = {
        'USA': 'United States',
        'Russia': 'Russian Federation',
        'Czech Republic': 'Czechia',
        'Myanmar': 'Myanmar/Burma',
        'State of Palestine': 'Palestine',
    }
    metadata['country'] = metadata['country'].apply(lambda x: country_map.get(x, x))
    metadata__ = pandas.merge(
        left = metadata,  right = country_back, 
        left_on = 'country', right_on = 'country_name',
        how = 'left'
    )




2023-02-23 10:26:41.111733 #6214466 ena_run items in db, largest id=6214466





[38;5;208mCode block prepare country  took:	38.22192 seconds[0;0m


In [24]:
metadata__[metadata__['country_name'].isna() & ~ metadata__['country'].isna() & (metadata__['country']!='not collected')][['runid', 'run_accession', 'country_name', 'country']].head()

Unnamed: 0,runid,run_accession,country_name,country


**If table is not empty**, a country_map needs an append...

In [25]:
with TimeLogCommit(task = 'append', table_name = 'metadata'):
    host_back = pandas.read_sql(sql = f'SELECT * FROM {schema}.host', con = conn)
    instrument_back = pandas.read_sql(sql = f'SELECT * FROM {schema}.instrument', con = conn)
    metadata = metadata__
    metadata.drop(columns=['country_name', 'country_name_local', 'iso_a3', 'iso_a2', 'country'], inplace=True)
    metadata.rename(columns={'id': 'country_id'}, inplace=True)

    metadata['host_sex'] = metadata['host_sex'].apply(lambda x: None if pandas.isna(x) else x.lower())
    metadata = metadata.astype({'host_tax_id': pandas.Int64Dtype()})

    tax_id_missing = metadata['host_tax_id'].isna()
    m_ok = metadata[~ tax_id_missing].set_index('host_tax_id').join(host_back.set_index('tax_id'), how = 'left').reset_index().rename(columns = {'id': 'host_id'}).drop(columns = ['host', 'index'])
    m_ok = m_ok.astype({'host_id': pandas.Int64Dtype()})
    m_nok = metadata[tax_id_missing].rename(columns = { 'host_tax_id': 'host_id' })

    metadata = pandas.concat([m_ok, m_nok])

    metadata = metadata.set_index(['instrument_platform', 'instrument_model']).join(instrument_back.set_index(['instrument_platform', 'instrument_model']), how = 'left').reset_index().rename(columns = {'id': 'instrument_id'}).drop(columns = ['instrument_platform', 'instrument_model'])

    metadata_old['dummy'] = 1

    metadata_join = metadata.merge(metadata_old, left_on = 'runid', right_on = 'runid', how = 'left')

    m_new = metadata_join['dummy'].isna()    

    K = [
        'runid', 'collection_date_x', 'collection_date_valid_x', 'country_id_x', 
        'host_id_x', 'host_sex_x', 'instrument_id_x', 
        'accession', 'study_accession_x', 'experiment_accession_x'
    ]
    metadata_new = metadata_join[m_new][K]

    pipe = io.StringIO()

    metadata_new[K].astype({'country_id_x': pandas.Int64Dtype()}).to_csv(
        pipe, sep = '\t', header = False, index = False
    )
    pipe.seek(0)
    C.copy_expert(f"COPY {schema}.metadata FROM STDIN WITH (format csv, delimiter '\t')", pipe)
    pipe.close()




[38;5;208mCode block append on metadata took:	106.63845 seconds[0;0m


## Table `library`

In [26]:
with TimeLogCommit(task = 'prepare library', commit = False):
    library_db = pandas.read_sql(f"""
SELECT id, layout type_layout, source, selection, strategy
FROM {schema}.library
    """, con = conn)
    library = meta[['library_layout', 'library_selection', 'library_source', 'library_strategy']].drop_duplicates().sort_values(['library_source', 'library_strategy']).reset_index(drop=True)
    library['library_layout'] = library['library_layout'].map(lambda x: x.lower())

    K_left = ['library_layout', 'library_source', 'library_selection', 'library_strategy']
    K_right = ['type_layout', 'source', 'selection', 'strategy']
    library_joined = library.merge(library_db, left_on = K_left, right_on = K_right, how = 'left')
    library_joined = library_joined.astype({'id': pandas.Int64Dtype()})
    m_new = library_joined['id'].isna()

    library_new = library_joined[m_new][K_left].reset_index(drop = True).reset_index()
    library_new['index'] += 1 + library_db['id'].max()




[38;5;208mCode block prepare library  took:	2.98928 seconds[0;0m


In [27]:
library_new

Unnamed: 0,index,library_layout,library_source,library_selection,library_strategy


In [28]:
if library_new.shape[0] > 0:
    with TimeLogCommit(task = 'insert items', table_name = 'library'):
        pipe = io.StringIO()
        library_new[['index', 'library_layout', 'library_source', 'library_selection', 'library_strategy']].to_csv(
            pipe, sep = '\t', header = False, index = False
        )
        pipe.seek(0)
        C.copy_expert(f"COPY {schema}.library FROM STDIN WITH (format csv, delimiter '\t')", pipe)
        pipe.close()

## Table `collector`

In [29]:
with TimeLogCommit(task = 'insert items', table_name = 'collector'):
    collector_db = pandas.read_sql(f"""
SELECT id, broker_name, collected_by, center_name
FROM {schema}.collector
    """, con = conn)

    collector = meta[['broker_name', 'collected_by', 'center_name']].drop_duplicates().sort_values(['center_name', 'broker_name']).reset_index(drop=True)
    nullidx = collector[collector['broker_name'].isna() & collector['collected_by'].isna() & collector['center_name'].isna()].index
    collector.drop(index = nullidx, inplace = True)
    br_nan = collector['broker_name'].isna()
    collector['broker_name'][br_nan] = None
    
    K = ['broker_name', 'collected_by', 'center_name']
    collector_joined = collector.merge(collector_db, left_on = K, right_on = K, how = 'left')
    m_new = collector_joined['id'].isna()
    collector_joined = collector_joined.astype({'id': pandas.Int64Dtype()})
    collector_new = collector_joined[m_new][K].reset_index(drop = True).reset_index()
    collector_new['index'] += 1 + collector_db['id'].max()

    pipe = io.StringIO()
    collector_new[['index', 'broker_name', 'collected_by', 'center_name']].to_csv(
        pipe, sep = '\t', header = False, index = False
    )
    pipe.seek(0)
    C.copy_expert(f"COPY {schema}.collector FROM STDIN WITH (format csv, delimiter '\t')", pipe)
    pipe.close()




[38;5;208mCode block insert items on collector took:	3.06950 seconds[0;0m


## Table `meta_extension`

In [30]:
with TimeLogCommit(task = 'insert items', table_name = 'metaextension'):
    meta_extension_db = pandas.read_sql(f"""
SELECT runid, description, fastq_ftp, isolate, sample_capture_status,
       strain, checklist, base_count, library_name, library_id, 
       first_created, first_public, collector_id, country_raw
FROM {schema}.metaextension
    """, con = conn)

    K = ['library_layout', 'library_source', 'library_selection', 'library_strategy']
    Kb = ['type_layout', 'source', 'selection', 'strategy']

    lib_slice = meta[K].copy()
    lib_slice['library_layout'] = lib_slice['library_layout'].apply(lambda x: x.lower())
    lib_id = lib_slice.merge(library_db, left_on = K, right_on = Kb, how = 'left')['id']

    K = [ 'broker_name', 'collected_by', 'center_name' ]

    col_slice = meta[K].copy()
    col_id = col_slice.merge(collector_db, left_on = K, right_on = K, how = 'left')['id']

    extension = meta[['run_accession', 'description', 'fastq_ftp', 'isolate', 'sample_capture_status', 'strain',
                     'checklist', 'base_count', 'library_name', 'first_created', 'first_public', 'country']].copy()

    extension['library_id'] = lib_id
    extension['collector_id'] = col_id

    extension = extension.astype({
        'collector_id': pandas.Int64Dtype(),
        'library_id': pandas.Int64Dtype(),
    })

    extension_ = pandas.merge(
        left = extension,
        right = the_map.from_db,
        left_on = 'run_accession', right_on = 'ena_run',
        how = 'inner'
    )

    metadata_extension_join = extension_.merge(meta_extension_db, left_on = 'id', right_on = 'runid', how = 'left')

    m_new = metadata_extension_join['ena_run'].isna()
    K = [
        'id', 'description_x', 'fastq_ftp_x', 'isolate_x', 'sample_capture_status_x', 'strain_x',
        'checklist_x', 'base_count_x', 'library_name_x', 'library_id_x', 'first_created_x', 'first_public_x', 
        'collector_id_x', 'country_raw'
    ]
    metadata_extension_new = metadata_extension_join[m_new][K]

    pipe = io.StringIO()

    metadata_extension_new[K].to_csv(
        pipe, sep = '\t', header = False, index = False
    )
    pipe.seek(0)
    C.copy_expert(f"COPY {schema}.metaextension FROM STDIN WITH (format csv, delimiter '\t')", pipe)
    pipe.close()




[38;5;208mCode block insert items on metaextension took:	38.86535 seconds[0;0m


In [None]:
#conn.commit()