# Creating record set curation file

- Fields in recordsets are annotated as a manual effort done by multiple people.
- Curation was collected in a Google spreadsheet table.
- The table has columns to make it easier for the curators to annotated also, also provides extra annotation for croissant ingestion

## Columns

- `dataset_name` - name of the dataset for curators.
- `field_id` - identifier of the field for croissant.
- `column_name` - just the label of the field for curators.
- `column_description` - Curators annotate fields with description
- `foreign_key` - curators add `field_id` of the foreign field
- `bioregistry_prefix` - if data in a column comes from a database in bioregisty, annotate
- `example` - helps curatos 

## Process

1. Fetch curation table from Google
2. Composing description
3. Iterating over column and building output
4. Save curation as json.

In [1]:
import pandas as pd
import json

# Curation of all columns from all OpenTargets output datasets:
curation = 'https://docs.google.com/spreadsheets/d/132SKHMoaJePu4nTlBnQwfaz3dhfJiKmJUujfYkzXMdI/export?format=tsv&gid=179018892'

# Folder to save the resulting curation file:
asset_folder = '../src/ot_croissant/assets/'

# Reading table:
curation_table = (
    pd.read_csv(curation, sep='\t')
    .astype(
        {
            'column_description': pd.StringDtype(),
            'foreign_key': pd.StringDtype(),
        }
    )
)
curation_table.head()

Unnamed: 0,dataset_name,field_id,column_name,column_description,foreign_key,bioregistry_prefix,Example
0,association_by_datasource_direct,association_by_datasource_direct/datatypeId,datatypeId,Identifier for the data type used to calculate...,,,MONDO_0800026
1,association_by_datasource_direct,association_by_datasource_direct/datasourceId,datasourceId,Identifier for the data source used to calcula...,,,
2,association_by_datasource_direct,association_by_datasource_direct/diseaseId,diseaseId,Identifier for the disease in the association.,disease/id,,
3,association_by_datasource_direct,association_by_datasource_direct/targetId,targetId,Identifier for the target in the association.,target/id,,C
4,association_by_datasource_direct,association_by_datasource_direct/score,score,Association score calculated independently for...,,,HPO:probinson[2021-09-23];HPO:probinson[2021-0...


In [54]:
# Collection of curated dataset:
curation_json = []

# Composing description:
def compose_description(row: pd.Series) -> str:
    """
    Composes the description of a column based on the bioregistry prefix and the column description.
    If the bioregistry prefix is not available, it returns the column description as is.

    Args:
        row (pd.Series): A row from the curation table.
    
    Returns:
        str: The composed description.
    """
    # If the bioregistry prefix is not available, return the column description as is:
    description = (
        row['column_description']
        if pd.isna(row['bioregistry_prefix'])
        else f"{row['column_description']} [bioregistry:{row['bioregistry_prefix'].lower()}]"
    )

    return description

# Iterating over the rows of the curation table:
for _, row in curation_table.iterrows():
    # If the column description is not available, skip the row:
    if pd.isna(row['column_description']):
        continue

    # Adding curation to the dictionary:
    data = {
        'id': row['field_id'],
        'description': compose_description(row)
    }

    # If the foreign key is available, add it to the dictionary:
    if not pd.isna(row['foreign_key']):
        data['foreign_key'] = row['foreign_key']

    # If the bioregistry prefix is available, add it to the dictionary:
    curation_json.append(data)

# Saving the curation to a JSON file:
with open(f'{asset_folder}/recordset.json', 'w') as f:
    json.dump(curation_json, f, indent=2)


25/08/20 16:08:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 518746 ms exceeds timeout 120000 ms
25/08/20 16:08:47 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/20 16:08:49 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

In [None]:
# Columns with bioregistry prefix:
curation_table.loc[curation_table.bioregistry_prefix.notna()]

Unnamed: 0,dataset_name,field_id,column_name,column_description,foreign_key,bioregistry_prefix,Example
14,disease_phenotype,disease_phenotype/evidence/references,references,References or citations supporting the evidence.,,pubmed,[PMID:14566559]
20,mouse_phenotype,mouse_phenotype/biologicalModels/id,id,Unique identifier for the biological model.,,MGI,MGI:6140117
21,mouse_phenotype,mouse_phenotype/biologicalModels/literature,literature,References related to the mouse model.,,pubmed,[30949703]
23,mouse_phenotype,mouse_phenotype/modelPhenotypeClasses/id,id,Unique identifier for the phenotype class.,,MP,MP:0005389
25,mouse_phenotype,mouse_phenotype/modelPhenotypeId,modelPhenotypeId,Identifier for the specific phenotype observed...,,MP,MP:0005343
29,mouse_phenotype,mouse_phenotype/targetInModelEnsemblId,targetInModelEnsemblId,Ensembl identifier for the target gene in the ...,,ENSEMBL,ENSMUSG00000087651
30,mouse_phenotype,mouse_phenotype/targetInModelMgiId,targetInModelMgiId,MGI (Mouse Genome Informatics) identifier for ...,,MGI,MGI:1917034
48,reactome,reactome/id,id,Unique identifier for the Reactome pathway,,reactome,
55,expression,expression/id,id,Ensembl human gene identifier for the expresse...,,ENSEMBL,ENSG00000071243
57,expression,expression/tissues/efo_code,efo_code,Ontology ID of the biosample the expression da...,biosample/biosampleId,UBERON,


In [6]:
# Fields without description:
curation_table.loc[curation_table.column_description.isna()]

Unnamed: 0,dataset_name,field_id,column_name,column_description,foreign_key,bioregistry_prefix,Example
67,expression,expression/tissues/protein/reliability,reliability,,,,
71,expression,expression/tissues/protein/cell_type/reliability,reliability,,,,
72,expression,expression/tissues/protein/cell_type/level,level,,,,


# Adding tags to distributions

In [49]:
import pandas as pd
import json

# Curation of all columns from all OpenTargets output datasets:
curation = 'https://docs.google.com/spreadsheets/d/1JBu9HsRqwdGYMsYVoNAghmyEoU34S7fFnNIjHuzl0To/export?format=tsv'

# Folder to save the resulting curation file:
asset_folder = '../src/ot_croissant/assets/'

# Reading table:
curation_table = (
    pd.read_csv(curation, sep='\t')
    .rename(columns={'Dataset': 'id', 'Tag': 'tag'})
    .assign(
        tags = lambda df: df.tag.str.split('|')
    )
    [['id', 'tags']]
)

curation_table.head()


Unnamed: 0,id,tags
0,association_by_datasource_direct,[Target-Disease]
1,association_by_datasource_indirect,[Target-Disease]
2,association_by_datatype_direct,[Target-Disease]
3,association_by_datatype_indirect,[Target-Disease]
4,association_by_overall_indirect,[Target-Disease]


In [50]:
import numpy as np

# Reading existing curation of recordsets:
distribution_df = pd.read_json(f'{asset_folder}/distribution.json', orient='records')

# These might be artefacts from previous annotation:
for tag in ['tags_x', 'tags_x', 'tags']:
    if tag in distribution_df.columns:
        distribution_df = distribution_df.drop(columns=[tag])

# Collect distribution metadata:
distribution_data = (
    distribution_df
    .merge(curation_table, on='id', how='left')
    .fillna({np.nan: None})
    .apply(
        lambda row: row.to_dict(),
        axis=1
    )
    .to_list()
)

# Saving the curation to a JSON file:
with open(f'{asset_folder}/distribution.json', 'w') as f:
    json.dump(distribution_data, f, indent=2)

## Adding Intervals dataset

The code cell below prepares the table we used to annotate datasets for croissant recordset JSON. I decided to keep the Google spreadsheet and any modification happens needs to propagated from that source of truth. 

The resulting tsv file is then added to the spreadsheet. After annotating the table with column description and foreign keys, the above code needs to be re-run to update the recordset.json file.

In [55]:
from pyspark.sql import SparkSession, functions as f
import pandas as pd
import json

# Reading a piece of dataset representing the schema of the entire dataset:
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('../interval.parquet')

# Dataset name is hardcoded:
dataset_name = 'interval'

data = json.loads(df.schema.json())

# Schema representation is collected in this dictionary:
schema = []

def process_field(fields: dict[str, str], parent: str):
    for field in fields['fields']:
        name = field.get('name')

        if isinstance(field['type'], dict):
            if field['type']['type'] == 'struct':
                process_field(field['type']['fields'], f'{parent}/{name}' )
            if field['type']['type'] == 'array':
                process_field(field['type']['elementType'],  f'{parent}/{name}')
        
        else:
            schema.append(
                {
                    'dataset_name': dataset_name,
                    'field_id': f'{parent}/{name}',
                    'column_name': name,
                }
            )

# We iterate over the schema and collect field names that we will be able to annotate on the spreadsheet:
process_field(data, dataset_name)

# Save dataset.
pd.DataFrame(schema).to_csv('interval_schema.tsv', sep='\t', index=False)

ConnectionRefusedError: [Errno 61] Connection refused

## Updating pharmacogenetics dataset

There was a larger-scale update of the pharmacogenetics dataset in the 25.09 release.

In [None]:
from pyspark.sql import SparkSession, functions as f
import pandas as pd
import json

# Reading a piece of dataset representing the schema of the entire dataset:
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('/Users/dsuveges/project_data/25.09/output/pharmacogenomics')

# Dataset name is hardcoded:
dataset_name = 'pharmacogenomics'

data = json.loads(df.schema.json())

# Schema representation is collected in this dictionary:
schema = []

def process_fields(fields: dict[str, str], parent: str):
    for field in fields['fields']:
        name = field.get('name')

        if isinstance(field['type'], dict):
            if field['type']['type'] == 'struct':
                process_fields(field['type'], f'{parent}/{name}' )
            if field['type']['type'] == 'array':
                if isinstance(field['type']['elementType'], dict):
                    process_fields(field['type']['elementType'],  f'{parent}/{name}')
        
        else:
            schema.append(
                {
                    'dataset_name': dataset_name,
                    'field_id': f'{parent}/{name}',
                    'column_name': name,
                }
            )



# We iterate over the schema and collect field names that we will be able to annotate on the spreadsheet:
process_fields(data, dataset_name)

# Save dataset.
pd.DataFrame(schema).to_csv('pharmacogenomics_schema.tsv', sep='\t', index=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/20 20:12:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/08/20 21:33:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1388914 ms exceeds timeout 120000 ms
25/08/20 21:33:52 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/20 21:33:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$