# Data Commons Pilot - TOPMed Public Metadata Export

This demo shows how metadata can be exported from the gen3 platform.

In [1]:
%matplotlib inline
import dcp_analysis_functions as dcp
dcp.add_keys('credentials.json')

### Get summary metrics for each data type in the data-model for one project:

In [2]:
dcp.query_summary_counts('topmed-public')

Category,Counts
Cases,5.0
,
Studies,1.0
,
Demographic records,5.0
,
Diagnosis records,0.0
,
Exposure records,0.0
,


## Get an index of BAM files

First, we'll define a query to collect the submitted reads for the Public TOPMed data.

In [3]:
query = """
query AlignedReads ($projectID: [String], $first: Int, $offset: Int) 
    {
     submitted_aligned_reads(project_id: $projectID, first: $first, offset: $offset) {
        id
        file_name
        file_size
        md5sum
        submitter_id
        updated_datetime
        created_datetime
       _links {
          id
          type
        }
      }
    }
"""
variables = {'first': 1, 'offset': 0, 'projectID': 'topmed-public'}
dcp.query_api(query, variables)

def print_query(query, variables):
    response = dcp.query_api(query, variables)
    #print(response)
    return response

pages = [{'first': 1, 'offset': i, 'projectID': 'topmed-public'} for i in range(107)]
%time alignments = map(lambda page: print_query(query, page), pages)

CPU times: user 2.76 s, sys: 104 ms, total: 2.87 s
Wall time: 1min 22s


### Get the indexes for each alignment

We'll now make a list of the indices for each alignment as well.

In [4]:
query = query = """
query ReadIndexes ($id: String) 
    {
     aligned_reads_index(id: $id) {
        id
        file_name
        file_size
        md5sum
        submitter_id
        updated_datetime
        created_datetime
      }
    }

"""
variables = {'first': 1, 'offset': 0, 'projectID': 'topmed-public'}

index_ids = []

for alignment in alignments:
    for link in alignment['data']['submitted_aligned_reads'][0]['_links']:
        if link['type'] == 'aligned_reads_index':
            index_ids.append(link['id'])

print(len(alignments))
print(len(index_ids))
pages = [{'id': x} for x in index_ids]
            
%time indices = map(lambda page: print_query(query, page), pages)

107
107
CPU times: user 2.67 s, sys: 84 ms, total: 2.76 s
Wall time: 1min 2s


Now that we can get metadata for submitted aligned reads and indices, we can begin to generate a bundle like object that will bring the indices and alignments into a single dictionary.

In [5]:
raw_bundles = zip(alignments, indices)
print(raw_bundles[0])

({u'data': {u'submitted_aligned_reads': [{u'updated_datetime': u'2018-04-15T21:54:21.545596+00:00', u'created_datetime': u'2018-01-30T21:37:29.252021+00:00', u'file_name': u'NWD319341.recab.cram', u'md5sum': u'0044399fa0f9356565c8cc0234217338', u'submitter_id': u'NWD319341-cram', u'_links': [{u'type': u'read_group', u'id': u'337a3f79-2c69-4553-b887-16360d693254'}, {u'type': u'aligned_reads_index', u'id': u'28264de4-93b3-44ba-9a0a-b8a45a4a1a22'}, {u'type': u'alignment_workflow', u'id': u'eb0dd251-f364-4085-9c21-a4a995e0fa15'}, {u'type': u'core_metadata', u'id': u'ea7a44f2-6edc-4f59-85ae-2830bec0e9a4'}], u'file_size': 23438579833, u'id': u'73b12502-f60c-5f81-bd35-b0afc4678759'}]}}, {u'data': {u'aligned_reads_index': [{u'updated_datetime': u'2018-04-14T19:57:36.324661+00:00', u'created_datetime': u'2018-01-30T21:47:17.975531+00:00', u'file_name': u'NWD319341.recab.cram.crai', u'md5sum': u'c5fd2b9973f68840a4d9f155376d009a', u'submitter_id': u'NWD319341-crai', u'file_size': 1429317, u'id': 

## Getting indexd URLs for submissions

Now that we have a way of generating the list of alignments to be exported, along with their indices, we also want to provide the cloud URLs where these can be found.

We can get these links from indexd using the identifiers gathered from the metadata service above.

### Find CRAM index IDs by hash

Due to a problem when the metadata were loaded, we can not use the above identifiers to access the cram indices. For this reason, we will demonstrate a way to workaround this problem using indexd's "find by hash feature".

Since the indices were loaded, we can find them by hash and then update our metadata to include the correct identifer. We can then get the proper URLs for each item.

In [6]:
import requests

indexd_url = 'https://dcp.bionimbus.org/index'

hashes = [bundle[1]['data']['aligned_reads_index'][0]['md5sum'] for bundle in raw_bundles]

We can now request the items one by one and add the metadata to our bundle.

In [7]:
def get_indexd_id_by_checksum(checksum):
    response = requests.get("{}/index/?hash=md5:{}".format(indexd_url, checksum))
    return response.json()['ids'][0]

%time indexd_ids = [get_indexd_id_by_checksum(chk) for chk in hashes]

CPU times: user 2.98 s, sys: 72 ms, total: 3.06 s
Wall time: 40.4 s


Now, zipping them together, we'll have all the metadata collected together in a list where each item contains a tuple:

```
[((alignment_metadata, index_metadata), cram_indexd_ids))...]
```

So, to get the cloud URLs for each, we ask indexd for the full record. We'll add this as another tuple and overwrite the third tuple, so we'll have two metadata records and two indexd records.

In [8]:
fixed_bundles = zip(raw_bundles, indexd_ids)

In [9]:
def get_indexd_doc(indexd_id):
    response = requests.get("{}/index/{}".format(indexd_url, indexd_id))
    return response.json()

%time indexd_bundles = [(x[0][0], x[0][1], get_indexd_doc(x[1]), get_indexd_doc(x[0][0]['data']['submitted_aligned_reads'][0]['id'])) for x in fixed_bundles]

CPU times: user 5.75 s, sys: 208 ms, total: 5.96 s
Wall time: 1min 48s


We now have a pretty lengthy document we can flatten and offer as a minimum metadata schema.

In [10]:
print(indexd_bundles[0])

({u'data': {u'submitted_aligned_reads': [{u'updated_datetime': u'2018-04-15T21:54:21.545596+00:00', u'created_datetime': u'2018-01-30T21:37:29.252021+00:00', u'file_name': u'NWD319341.recab.cram', u'md5sum': u'0044399fa0f9356565c8cc0234217338', u'submitter_id': u'NWD319341-cram', u'_links': [{u'type': u'read_group', u'id': u'337a3f79-2c69-4553-b887-16360d693254'}, {u'type': u'aligned_reads_index', u'id': u'28264de4-93b3-44ba-9a0a-b8a45a4a1a22'}, {u'type': u'alignment_workflow', u'id': u'eb0dd251-f364-4085-9c21-a4a995e0fa15'}, {u'type': u'core_metadata', u'id': u'ea7a44f2-6edc-4f59-85ae-2830bec0e9a4'}], u'file_size': 23438579833, u'id': u'73b12502-f60c-5f81-bd35-b0afc4678759'}]}}, {u'data': {u'aligned_reads_index': [{u'updated_datetime': u'2018-04-14T19:57:36.324661+00:00', u'created_datetime': u'2018-01-30T21:47:17.975531+00:00', u'file_name': u'NWD319341.recab.cram.crai', u'md5sum': u'c5fd2b9973f68840a4d9f155376d009a', u'submitter_id': u'NWD319341-crai', u'file_size': 1429317, u'id': 

### Generating a file manifest

We'll now make a file manifest that uses a constrained vocabulary of our design.

In [11]:
def bundle_to_flat(bundle):
    dos_base = "https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects"
    alignment = {}
    index = {}
    alignment_metadata = bundle[0]['data']['submitted_aligned_reads'][0]
    index_metadata = bundle[1]['data']['aligned_reads_index'][0]
    alignment_indexd = bundle[3]
    index_indexd = bundle[2]
    alignment['gsurl'] = filter(lambda x: 'gs://' in x, alignment_indexd['urls'])[0]
    alignment['s3url'] = filter(lambda x: 's3://' in x, alignment_indexd['urls'])[0]
    alignment['name'] = alignment_metadata['file_name']
    alignment['dosurl'] = "{}/{}".format(dos_base, alignment_indexd['did'])
    alignment['updated_datetime'] = alignment_metadata['updated_datetime']
    alignment['created_datetime'] = alignment_metadata['created_datetime']
    alignment['md5sum'] = alignment_metadata['md5sum']
    alignment['size'] = alignment_metadata['file_size']
    alignment['did'] = alignment_indexd['did']
    index['gsurl'] = filter(lambda x: 'gs://' in x, index_indexd['urls'])[0]
    index['s3url'] = filter(lambda x: 's3://' in x, index_indexd['urls'])[0]
    index['dosurl'] = "{}/{}".format(dos_base, index_indexd['did'])
    index['name'] = index_metadata['file_name']
    index['updated_datetime'] = index_metadata['updated_datetime']
    index['created_datetime'] = index_metadata['created_datetime']
    index['md5sum'] = index_metadata['md5sum']
    index['size'] = index_metadata['file_size']
    index['did'] = index_indexd['did']
    return alignment, index, bundle

flattened_bundles = [bundle_to_flat(b) for b in indexd_bundles]

We now have a minimalistic file listing for each item that can be used to index easily.

In [12]:
flattened_bundles[0][1]

{'created_datetime': u'2018-01-30T21:47:17.975531+00:00',
 'did': u'16664d78-b05e-56d3-8589-7d155eb2b3ef',
 'dosurl': 'https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/16664d78-b05e-56d3-8589-7d155eb2b3ef',
 'gsurl': u'gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai',
 'md5sum': u'c5fd2b9973f68840a4d9f155376d009a',
 'name': u'NWD319341.recab.cram.crai',
 's3url': u's3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai',
 'size': 1429317,
 'updated_datetime': u'2018-04-14T19:57:36.324661+00:00'}

## Going for more metadata

The metadata we have collected so far gives us what is necessary to access files, however, other metadata may be hosted by the platform so we should check on it.

In particular, the alignment contained a read group that was described in a sample. We would like to provide these publicly available metadata in another flat dictionary.


In [13]:
print(alignments[0]['data']['submitted_aligned_reads'][0]['_links'])

[{u'type': u'read_group', u'id': u'337a3f79-2c69-4553-b887-16360d693254'}, {u'type': u'aligned_reads_index', u'id': u'28264de4-93b3-44ba-9a0a-b8a45a4a1a22'}, {u'type': u'alignment_workflow', u'id': u'eb0dd251-f364-4085-9c21-a4a995e0fa15'}, {u'type': u'core_metadata', u'id': u'ea7a44f2-6edc-4f59-85ae-2830bec0e9a4'}]


In [14]:
core_metadata_ids = []

for alignment in alignments:
    for link in alignment['data']['submitted_aligned_reads'][0]['_links']:
        if link['type'] == 'core_metadata':
            core_metadata_ids.append(link['id'])

In [15]:
query = """
query CoreMetadata ($id: String) 
    {
     core_metadata(id: $id) {
          description
          data_type
          rights
          format
          creator
          contributor
          submitter_id
          language
          relation
          source
          coverage
          date
          title
          project_id
          subject
     }
    }

"""
dcp.query_api(query, {'id': 'ea7a44f2-6edc-4f59-85ae-2830bec0e9a4'})

{u'data': {u'core_metadata': [{u'contributor': u'Broad',
    u'coverage': u'US',
    u'creator': u'Francisco Ortuno',
    u'data_type': u'WGS',
    u'date': u'04/13/18',
    u'description': u'Aligned reads for NWD319341 sample',
    u'format': u'CRAM',
    u'language': u'en-US',
    u'project_id': u'topmed-public',
    u'relation': u'Uchicago',
    u'rights': u'open access',
    u'source': u'HapMap',
    u'subject': u'sequencing',
    u'submitter_id': u'NWD319341-cram-core-metadata',
    u'title': u'NWD319341 CRAM File'}]}}

In [16]:
pages = [{'id': x} for x in core_metadata_ids]
%time core_metadata = [dcp.query_api(query, page) for page in pages]

CPU times: user 2.95 s, sys: 88 ms, total: 3.04 s
Wall time: 43.4 s


In [17]:
flat_core_metadata = [x['data']['core_metadata'][0] for x in core_metadata]

### Getting Sample metadata

Since the aliquot used for the index and alignment are the same, we can get unified bundle metadata that way.

In [18]:
query = """
query ReadGroupLinks ($id: String) 
    {
     read_group(id: $id) {
     _links {
       type
       id
     }
     }
    }

"""

read_group_ids = []

for alignment in alignments:
    for link in alignment['data']['submitted_aligned_reads'][0]['_links']:
        if link['type'] == 'read_group':
            read_group_ids.append(link['id'])
            
%time read_groups = [dcp.query_api(query, {'id': x}) for x in read_group_ids]

CPU times: user 2.8 s, sys: 68 ms, total: 2.86 s
Wall time: 1min 8s


In [19]:
query = """
query Aliquots ($id: String) 
    {
     aliquot(id: $id) {
        type
        project_id
        submitter_id
        aliquot_quantity
        aliquot_volume
        amount
        analyte_isolation_batch_id
        analyte_isolation_date
        analyte_isolation_method
        analyte_type
        concentration
        exclude
        exclusion_criteria
        experiment_batch_id
        experiment_date
        experimental_strategy
         _links {
           type
           id
         }
     }
    }

"""

aliquot_ids = []

for read_group in read_groups:
    for link in read_group['data']['read_group'][0]['_links']:
        if link['type'] == 'aliquot':
            aliquot_ids.append(link['id'])
            
%time aliquots = [dcp.query_api(query, {'id': x}) for x in aliquot_ids]

CPU times: user 3.12 s, sys: 68 ms, total: 3.19 s
Wall time: 1min 10s


In [20]:
query = """
query SampleMetadata ($id: String) 
    {
     sample(id: $id) {
        id
        type
        project_id
        submitter_id
        autolysis_score
        biospecimen_anatomic_site
        biospecimen_anatomic_site_detail
        biospecimen_anatomic_site_uberon_id
        biospecimen_anatomic_site_uberon_term
        biospecimen_physical_site
        biospecimen_type
        collection_kit
        collection_site
        composition
        current_weight
        freezing_method
        hours_to_collection
        hours_to_sample_procurement
        internal_notes
        is_ffpe
        method_of_sample_procurement
        oct_embedded
        pathology_notes
        preservation_method
        prosector_notes
        sample_type
        tissue_type
     }
    }

"""

sample_ids = []

for aliquot in aliquots:
    for link in aliquot['data']['aliquot'][0]['_links']:
        if link['type'] == 'sample':
            sample_ids.append(link['id'])

%time samples = [dcp.query_api(query, {'id': x}) for x in sample_ids]

CPU times: user 3.05 s, sys: 124 ms, total: 3.17 s
Wall time: 46.4 s


### Flatten Aliquot and Sample Metadata and Add to bundle

Now that we have some useful metadata for the bundle, we can serialize it so it can be easily loaded elsewhere.

In [21]:
flattened_aliquots = []
for aliquot in aliquots:
    aliquot = aliquot['data']['aliquot'][0]
    del aliquot['_links']
    flattened_aliquots.append(aliquot)

In [22]:
flattened_aliquots[0]

{u'aliquot_quantity': None,
 u'aliquot_volume': None,
 u'amount': None,
 u'analyte_isolation_batch_id': None,
 u'analyte_isolation_date': None,
 u'analyte_isolation_method': None,
 u'analyte_type': u'DNA',
 u'concentration': None,
 u'exclude': None,
 u'exclusion_criteria': None,
 u'experiment_batch_id': None,
 u'experiment_date': None,
 u'experimental_strategy': None,
 u'project_id': u'topmed-public',
 u'submitter_id': u'NWD319341',
 u'type': u'aliquot'}

In [23]:
flattened_samples = []
for sample in samples:
    sample = sample['data']['sample'][0]
    flattened_samples.append(sample)
flattened_samples[0]

{u'autolysis_score': None,
 u'biospecimen_anatomic_site': None,
 u'biospecimen_anatomic_site_detail': None,
 u'biospecimen_anatomic_site_uberon_id': None,
 u'biospecimen_anatomic_site_uberon_term': None,
 u'biospecimen_physical_site': None,
 u'biospecimen_type': u'Normal - other',
 u'collection_kit': None,
 u'collection_site': None,
 u'composition': u'B-lymphocyte',
 u'current_weight': None,
 u'freezing_method': None,
 u'hours_to_collection': None,
 u'hours_to_sample_procurement': None,
 u'id': u'e450679d-0c17-467c-8fd5-dba1510f041b',
 u'internal_notes': None,
 u'is_ffpe': None,
 u'method_of_sample_procurement': None,
 u'oct_embedded': None,
 u'pathology_notes': None,
 u'preservation_method': None,
 u'project_id': u'topmed-public',
 u'prosector_notes': None,
 u'sample_type': None,
 u'submitter_id': u'NA19238_sample',
 u'tissue_type': None,
 u'type': u'sample'}

In [24]:
flattened_samples[0]

{u'autolysis_score': None,
 u'biospecimen_anatomic_site': None,
 u'biospecimen_anatomic_site_detail': None,
 u'biospecimen_anatomic_site_uberon_id': None,
 u'biospecimen_anatomic_site_uberon_term': None,
 u'biospecimen_physical_site': None,
 u'biospecimen_type': u'Normal - other',
 u'collection_kit': None,
 u'collection_site': None,
 u'composition': u'B-lymphocyte',
 u'current_weight': None,
 u'freezing_method': None,
 u'hours_to_collection': None,
 u'hours_to_sample_procurement': None,
 u'id': u'e450679d-0c17-467c-8fd5-dba1510f041b',
 u'internal_notes': None,
 u'is_ffpe': None,
 u'method_of_sample_procurement': None,
 u'oct_embedded': None,
 u'pathology_notes': None,
 u'preservation_method': None,
 u'project_id': u'topmed-public',
 u'prosector_notes': None,
 u'sample_type': None,
 u'submitter_id': u'NA19238_sample',
 u'tissue_type': None,
 u'type': u'sample'}

In [25]:
bundles = zip(
    flat_core_metadata,
    flattened_aliquots,
    flattened_samples,
    [f[0] for f in flattened_bundles], # alignment
    [f[1] for f in flattened_bundles]) # index

In [26]:
bundles[0]

({u'contributor': u'Broad',
  u'coverage': u'US',
  u'creator': u'Francisco Ortuno',
  u'data_type': u'WGS',
  u'date': u'04/13/18',
  u'description': u'Aligned reads for NWD319341 sample',
  u'format': u'CRAM',
  u'language': u'en-US',
  u'project_id': u'topmed-public',
  u'relation': u'Uchicago',
  u'rights': u'open access',
  u'source': u'HapMap',
  u'subject': u'sequencing',
  u'submitter_id': u'NWD319341-cram-core-metadata',
  u'title': u'NWD319341 CRAM File'},
 {u'aliquot_quantity': None,
  u'aliquot_volume': None,
  u'amount': None,
  u'analyte_isolation_batch_id': None,
  u'analyte_isolation_date': None,
  u'analyte_isolation_method': None,
  u'analyte_type': u'DNA',
  u'concentration': None,
  u'exclude': None,
  u'exclusion_criteria': None,
  u'experiment_batch_id': None,
  u'experiment_date': None,
  u'experimental_strategy': None,
  u'project_id': u'topmed-public',
  u'submitter_id': u'NWD319341',
  u'type': u'aliquot'},
 {u'autolysis_score': None,
  u'biospecimen_anatomic_

## Serializing a bundle

Serializing to JSON allows us to easily send these data elsewhere.

In [46]:
import json
import os
def write_bundle_json(bundle, bundle_id):
    try:
        os.mkdir(bundle_id, 0755);
    except Exception as e:
        print(str(e))
    with open('{}/metadata.json'.format(bundle_id), 'w') as outfile:
        json.dump(bundle[0], outfile)
    with open('{}/aliquot.json'.format(bundle_id), 'w') as outfile:
        json.dump(bundle[0], outfile)
    with open('{}/sample.json'.format(bundle_id), 'w') as outfile:
        json.dump(bundle[0], outfile)
    with open('{}/manifest.json'.format(bundle_id), 'w') as outfile:
        manifest = [bundle[3], bundle[4]]
        json.dump(manifest, outfile)

In [47]:
write_bundle_json(bundles[0], 'test-bundle')

[Errno 17] File exists: 'test-bundle'


In [48]:
!ls test-bundle
!cat test-bundle/manifest.json

aliquot.json  manifest.json  metadata.json  sample.json
[{"dosurl": "https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/73b12502-f60c-5f81-bd35-b0afc4678759", "name": "NWD319341.recab.cram", "updated_datetime": "2018-04-15T21:54:21.545596+00:00", "created_datetime": "2018-01-30T21:37:29.252021+00:00", "md5sum": "0044399fa0f9356565c8cc0234217338", "s3url": "s3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram", "did": "73b12502-f60c-5f81-bd35-b0afc4678759", "gsurl": "gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram", "size": 23438579833}, {"dosurl": "https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/16664d78-b05e-56d3-8589-7d155eb2b3ef", "name": "NWD319341.recab.cram.crai", "updated_datetime": "2018-04-14T19:57:36.324661+00:00", "created_datetime": "2018-01-30T21:47:17.975531+00:00", "md5sum": "c5fd2b9973f68840a4d9f155376d009a", "s3url": "s3://cgp-commons-public/topmed_open_

## Serializing them all

We'll create two serialization models, one will generate a single JSON that includes all of the file manifests and metadata. This could then be easily parsed and iterated over to regenerate an index.

Let's name the fields of the items (instead of providing an array) so that downstream parsers can more easily introspect on the contents.

To uniquely identify each bundle, we'll add a bundle_id.

In [50]:
import uuid

json_bundles = [{
    'bundle_uuid': str(uuid.uuid4()),
    'core_metadata': b[0],
    'aliquot': b[1],
    'sample': b[2],
    'manifest': [b[3], b[4]]} for b in bundles]

with open('topmed-public.json', 'w') as outfile:
    json.dump(json_bundles, outfile)

# It's less than 400Kb

## Deserialization

Now that we have a dataset, we might imagine submitting them one by one to the DSS pltaform.

To do this, we'll need to load the file we just created and iterate over the items.

In [51]:
topmed_public = {}

with open('topmed-public.json', 'r') as infile:
  topmed_public = json.load(infile)

print(topmed_public[0]['aliquot']['analyte_type'])
print(len(topmed_public))
print("\n".join([x['manifest'][0]['gsurl'] for x in topmed_public][:10]))
print(topmed_public[0]['manifest'][0].keys())

DNA
107
gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/6a19bc32-ddc5-55b7-9315-224331f4f1a0/NWD991001.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/4fc383f9-2c21-599d-aa7d-ea1be7e8fc3e/NWD968809.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/b2318775-7d17-5e37-857d-f260d3772b19/NWD967078.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/9449321f-0ebd-5607-99b9-ba8f30a185ad/NWD953198.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/fab5eca7-0f49-586a-9bca-a66f2df4f173/NWD952432.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/a96950b9-eeb1-56f3-937f-f92c9f91eb7a/NWD929194.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/9b418a88-134c-5567-8340-94269c2b9e52/NWD918554.recab.cram
gs://cgp-commons-multi-region-public/topmed_open_access/baae6014-5527-5dae-8f95-

## Using bdbags for serialization

bdbags are a serialization technique we can leverage to make these data available. First, we'll make a bag for a single bundle.

In [52]:
from bdbag import bdbag_api as bdbag

def manifest_item_to_bd_item(mitem, urlselector):
    bd_item = {}
    # We have to pick a URL, should we request a fix upstream?
    # We can include the other URLs as metadata...
    bd_item['url'] = mitem[urlselector]
    bd_item['length'] = mitem['size']
    # FIXME bdbag seems to support only one item per filename
    # and so we append the protocol so that all the urls
    # make it to the fetch.txt
    bd_item['filename'] = mitem['name'] + '.' + urlselector
    bd_item['md5'] = mitem['md5sum']
    return bd_item

# We'll just include the same information twice so that all
# URLs are present. This does have the effect that blindly
# fetching the bag will download duplicate data.
gs_manifest = [manifest_item_to_bd_item(x, 'gsurl') for x in json_bundles[0]['manifest']]
s3_manifest = [manifest_item_to_bd_item(x, 's3url') for x in json_bundles[0]['manifest']]
remote_file_manifest = gs_manifest + s3_manifest
remote_file_manifest

[{'filename': u'NWD319341.recab.cram.gsurl',
  'length': 23438579833,
  'md5': u'0044399fa0f9356565c8cc0234217338',
  'url': u'gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram'},
 {'filename': u'NWD319341.recab.cram.crai.gsurl',
  'length': 1429317,
  'md5': u'c5fd2b9973f68840a4d9f155376d009a',
  'url': u'gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai'},
 {'filename': u'NWD319341.recab.cram.s3url',
  'length': 23438579833,
  'md5': u'0044399fa0f9356565c8cc0234217338',
  'url': u's3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram'},
 {'filename': u'NWD319341.recab.cram.crai.s3url',
  'length': 1429317,
  'md5': u'c5fd2b9973f68840a4d9f155376d009a',
  'url': u's3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai'}]

Here we'll add DOS URLs to the manifest as well, so the original indexd record can be retrieved when an id is available.

In [53]:
def indexd_to_dos_url(indexd_doc):
    dos_base = "https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects"
    dos_url = "{}/{}".format(dos_base, indexd_doc)
    return dos_url

def manifest_item_to_dos_manifest(mitem):
    bd_item = manifest_item_to_bd_item(mitem, 's3url')
    bd_item['url'] = indexd_to_dos_url(mitem['did'])
    # kludge to remove our name padding
    bd_item['filename'] = mitem['name']
    return bd_item

indexd_to_dos_url(indexd_bundles[0][3])

"https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/{u'updated_date': u'2018-04-15T22:29:19.458472', u'baseid': u'6fa56c4b-053b-4735-bcd7-79e0dc2cada0', u'form': u'object', u'did': u'73b12502-f60c-5f81-bd35-b0afc4678759', u'file_name': None, u'rev': u'c8287717', u'acl': [u'topmed', u'public'], u'urls_metadata': {u's3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram': {}, u'gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram': {}}, u'version': None, u'urls': [u'gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram', u's3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram'], u'created_date': u'2018-04-15T22:29:19.458462', u'hashes': {u'md5': u'0044399fa0f9356565c8cc0234217338'}, u'size': 23438579833, u'metadata': {}}"

In [54]:
def manifest_to_bdmanifest(manifest):
    gs_manifest = [manifest_item_to_bd_item(x, 'gsurl') for x in manifest]
    s3_manifest = [manifest_item_to_bd_item(x, 's3url') for x in manifest]
    dos_manifest = [manifest_item_to_dos_manifest(x) for x in manifest]
    remote_file_manifest = gs_manifest + s3_manifest + dos_manifest
    return remote_file_manifest

def bundle_to_bag(bundle, bundle_id):
    try:
        os.mkdir(bundle_id, 0755);
    except Exception as e:
        print(str(e))
    # its a peculiarity of the client to need to write a file
    manifest_path = 'remote-file-manifest.json'
    with open(manifest_path, 'w') as outfile:
        json.dump(manifest_to_bdmanifest(bundle['manifest']), outfile)
    return bdbag.make_bag(
        bundle_id,
        metadata=bundle,
        remote_file_manifest=manifest_path)

In [55]:
bundle_to_bag(json_bundles[0], 'test-bag')


[Errno 17] File exists: 'test-bag'


<bdbag.bdbagit.BDBag at 0x7f88f6dde650>

In [36]:
!ls test-bag
!cat test-bag/fetch.txt

bag-info.txt  data	 manifest-md5.txt     tagmanifest-sha256.txt
bagit.txt     fetch.txt  tagmanifest-md5.txt
https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/73b12502-f60c-5f81-bd35-b0afc4678759	23438579833	data/NWD319341.recab.cram
https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/16664d78-b05e-56d3-8589-7d155eb2b3ef	1429317	data/NWD319341.recab.cram.crai
gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai	1429317	data/NWD319341.recab.cram.crai.gsurl
s3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram.crai	1429317	data/NWD319341.recab.cram.crai.s3url
gs://cgp-commons-multi-region-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram	23438579833	data/NWD319341.recab.cram.gsurl
s3://cgp-commons-public/topmed_open_access/a4e58dc1-3425-54cf-bbe7-64dc01679da3/NWD319341.recab.cram	23438579833	data/NWD319341.recab.cram.s3url


## Bag the whole dataset

Now that we can bag a single bundle, we can make a bag that includes all of the topmed 107 with cloud URLs and metadata.

This is meant to serve as a demonstration to help us determine how best to relate fetch items to the metadata results.

In [58]:
def make_bag_of_bundles(bundle_list, bag_id):
    try:
        os.mkdir(bag_id, 0755);
    except Exception as e:
        print(str(e))
    manifest_path = 'remote-file-manifest.json'
    with open(manifest_path, 'w') as outfile:
        manifest = reduce(lambda x, y: x + y, [manifest_to_bdmanifest(x['manifest']) for x in bundle_list])
        json.dump(manifest, outfile)
    # FIXME bdbag seems to act on our top level keys
    # so we add a 'metadata' top level key to avoid errors
    return bdbag.make_bag(
        bag_id,
        metadata={'metadata': bundle_list},
        remote_file_manifest=manifest_path)

In [59]:
make_bag_of_bundles(json_bundles, 'topmed-public')

<bdbag.bdbagit.BDBag at 0x7f88f6d55e50>

In [60]:
!ls topmed-public
!head topmed-public/fetch.txt

bag-info.txt  data	 manifest-md5.txt     tagmanifest-sha256.txt
bagit.txt     fetch.txt  tagmanifest-md5.txt
https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/7733b6cb-811f-5839-a5dc-ac4262ded126	37501686827	data/NWD100953.recab.cram
https://dos.bionimbus.org/ga4gh/dos/v1/dataobjects/13d85741-e634-5f51-b50f-a0d0f166cfe8	2201638	data/NWD100953.recab.cram.crai
gs://cgp-commons-multi-region-public/topmed_open_access/44a8837b-4456-5709-b56b-54e23000f13a/NWD100953.recab.cram.crai	2201638	data/NWD100953.recab.cram.crai.gsurl
s3://cgp-commons-public/topmed_open_access/44a8837b-4456-5709-b56b-54e23000f13a/NWD100953.recab.cram.crai	2201638	data/NWD100953.recab.cram.crai.s3url
gs://cgp-commons-multi-region-public/topmed_open_access/44a8837b-4456-5709-b56b-54e23000f13a/NWD100953.recab.cram	37501686827	data/NWD100953.recab.cram.gsurl
s3://cgp-commons-public/topmed_open_access/44a8837b-4456-5709-b56b-54e23000f13a/NWD100953.recab.cram	37501686827	data/NWD100953.recab.cram.s3url
https://dos.bionimbu

In [61]:
bdbag.archive_bag('topmed-public', 'tgz')

'/home/david/git/dcp-demo/topmed-public.tgz'

### Opening the bag from file

More work needs to be demonstrated for accessing data in bdbags that are present in a cloud storage environment.

For now, we'll simply demonstrate we can open the bag and reason about the metadata.

In [62]:
bdbag.validate_bag_structure('topmed-public')

## Future Work

How different is this process from the current export flow? What needs to be done to align the metadata so that we can use the export systems that exist?

Should these queries be added as named fields and more support given to the gen3 graphql client?

With a way of getting metadata and links to files and checksums, we can regenerate the metadata model in other systems.

Improvements to the serialization model could include semantic versioning, addition of JSON-LD context, and creation of schematic representations of types.

This should be used as a way to drive the serialization modes of the submissions API in sheepdog, as well as improve DSS ingestion capabilities. This code could be replaced with a few lines added to sheepdog.

The GraphQL interface presents some interesting capabilities, especially when combining the results of analysis. If a workflow creates an analytic, making this available in some indexable form can reduce the amount of recomputation.

The data generated by this notebook should be sufficient to load into the HCA DSS. It might be prudent to normalize the file manifest to look more like a DOS object, since indexd already presents this interface. This might allow DOS to be the interface for file manifests.

The HCA concept of a bundle doesn't completely align with the DCP data model. This means that there will likely be manual work involved for each dataset onboarded for the short term.

The azul indexer will have to be updated to support metadata on a project by project basis.

Having a service we can simply post these bags to and start off the indexing are of interest.

DOS Bundles are schema compliant, although some peculiarities of the metadata model deserve exploration. Automating data loading via the DOS interface seems like a reasonable step forward.

The did's included in the internal manifest format represent what will be GUIDs for the provided platform.