In [1]:
import pandas as pd
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = (SparkSession.builder
         .master('local[*]')
         .config("spark.driver.memory", "15g")
         .appName('spark')
         .getOrCreate())

## Generate table with main pieces of information from AOPWiki

To include:
- AOP id
- Adverse Outcome
- Mapped Adverse Outcome to EFO
- Target Name
- Mapped target to ENSEMBL
- Key Event title
- OECD Status
- Species applicability
- Reference

In [25]:
## Load datasets generated from the AOPWiki XML

aop = spark.read.json('/Users/irene/Documents/dev/datasets/aopV1')
ke = spark.read.json('/Users/irene/Documents/dev/datasets/keV1/part-00000-966ef78b-2361-42b3-99fe-ab3cb9d07d6e-c000.json').drop('references')

### 1. AOP id + Reference + status

In [4]:
aop.printSchema()

root
 |-- id: long (nullable = true)
 |-- keyEvents: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- chemicalId: long (nullable = true)
 |    |    |-- inchiKey: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- preferredName: string (nullable = true)
 |    |    |-- qualityAssurance: string (nullable = true)
 |    |    |-- synonyms: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)



In [93]:
wip = (aop
       .select(F.col('id').alias('aopId'), 'keyEvents', 'status', F.col('title').alias('aopName'))
       .withColumn('reference', F.concat(F.lit('https://aopwiki.org/aops/'), F.col('aopId')))
       )

wip.show(5)

+-----+--------------------+------+--------------------+--------------------+
|aopId|           keyEvents|status|             aopName|           reference|
+-----+--------------------+------+--------------------+--------------------+
|  177|[{1104, key-event...|  null|Cyclooxygenase 1 ...|https://aopwiki.o...|
|  398|[{1880, molecular...|  null|Inhibition of ALD...|https://aopwiki.o...|
|  242|[{1466, key-event...|  null|Inhibition of lys...|https://aopwiki.o...|
|  241|[{68, key-event},...|  null|Latent Transformi...|https://aopwiki.o...|
|  260|[{1513, key-event...|  null|CYP2E1 activation...|https://aopwiki.o...|
+-----+--------------------+------+--------------------+--------------------+
only showing top 5 rows



### 2. Adverse Outcome

In [94]:
wip = (
    # Explode all KE listed in an AOP and filter only the AO ones
    wip.withColumn('keyEvent', F.explode('keyEvents'))
    .filter(F.col('keyEvent.type') == 'adverse-outcome')

    .withColumn('adverseOutcomeId', F.col('keyEvent.id'))

    # Join with KE dataset to extract the title of the KE
    .join(
        ke.select(F.col('id').alias('adverseOutcomeId'), F.col('title').alias('adverseOutcome')),
        on='adverseOutcomeId', how='left'
    )

    .drop('keyEvent')
)

wip.show(5)

+----------------+-----+--------------------+------+--------------------+--------------------+--------------------+
|adverseOutcomeId|aopId|           keyEvents|status|             aopName|           reference|      adverseOutcome|
+----------------+-----+--------------------+------+--------------------+--------------------+--------------------+
|             351|  177|[{1104, key-event...|  null|Cyclooxygenase 1 ...|https://aopwiki.o...| Increased Mortality|
|             361|  177|[{1104, key-event...|  null|Cyclooxygenase 1 ...|https://aopwiki.o...| Decline, Population|
|             972|  398|[{1880, molecular...|  null|Inhibition of ALD...|https://aopwiki.o...|Decreased fertili...|
|             636|  242|[{1466, key-event...|  null|Inhibition of lys...|https://aopwiki.o...| Decreased, survival|
|            1467|  242|[{1466, key-event...|  null|Inhibition of lys...|https://aopwiki.o...|   Growth, reduction|
+----------------+-----+--------------------+------+--------------------

### 3. Target name

In [95]:
# Extracted from KE where the biological event is a Protein Ontology (PRO) ID
ke.withColumn('be', F.explode('biologicalEvents')).filter(F.col('be.object.source') == 'PR').first()

Row(biologicalEvents=[Row(action='decreased', object=Row(name='NADH-ubiquinone oxidoreductase chain 1', source='PR', sourceId='PR:000031316'), process='NADH dehydrogenase (ubiquinone) activity')], biologicalOrganisationLevel='Cellular', id=887, keyEventStressors=["1',2'-dihydrorotenone"], organTerm=Row(name=None, sourceId=None), title='Inhibition, NADH-ubiquinone oxidoreductase  (complex I)', be=Row(action='decreased', object=Row(name='NADH-ubiquinone oxidoreductase chain 1', source='PR', sourceId='PR:000031316'), process='NADH dehydrogenase (ubiquinone) activity'))

In [96]:
targets = (
    ke.withColumn('be', F.explode('biologicalEvents'))
    .filter(F.col('be.object.source') == 'PR')
    .withColumn('targetName', F.col('be.object.name'))
    .select(F.col('id').alias('keyEventId'), 'targetName')
)

targets.show(5)

+----------+--------------------+
|keyEventId|          targetName|
+----------+--------------------+
|       887|NADH-ubiquinone o...|
|       888|NADH-ubiquinone o...|
|      1265|nuclear hormone r...|
|      1266|cardioactive pept...|
|       858|peroxisome prolif...|
+----------+--------------------+
only showing top 5 rows



In [97]:
# Merge the target data with the wip dataset

wip = (
    wip

    # First I extract all KE IDs per AOP
    .withColumn('keyEventId', F.explode('keyEvents'))
    .withColumn('keyEventId', F.col('keyEventId.id'))

    # I can now join it with the targets dataset
    # Inner join: only interested in AOPs for which there is a target
    .join(targets, on='keyEventId', how='inner')
)

wip.show(5, False, True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 keyEventId       | 1103                                                                                                                                                                                                                                         
 adverseOutcomeId | 351                                                                                                                                                                                                                                          
 aopId            | 177                                                                                                                                                                                                           

### 4. Key Event name

In [98]:
ke_names = (

    wip.select('keyEvents', 'aopId').distinct()
    # Explode KEs again to have all KEs, not just the ones with a target
    .withColumn('keyEventId', F.explode('keyEvents.id'))

    # Bring name of the KE
    .join(
        ke.select(F.col('id').alias('keyEventId'), F.col('title').alias('keyEventName')),
        on='keyEventId', how='left')

    # Group KE in a struct
    .withColumn('keyEvents', F.struct('keyEventId', 'keyEventName'))
    # The KE dataset has a lot of duplication with NaN values - I'll remove them
    # TODO: Filter out KE describing AO
    .filter(F.col('keyEvents.keyEventName').isNotNull())

    .groupby('aopId')
    .agg(F.collect_set('keyEvents').alias('keyEvents'))

)

ke_names.first()

Row(aopId=29, keyEvents=[Row(keyEventId=111, keyEventName='Agonism, Estrogen receptor'), Row(keyEventId=252, keyEventName='Increase, Renal pathology due to VTG deposition'), Row(keyEventId=364, keyEventName='Impaired development of, Reproductive organs'), Row(keyEventId=363, keyEventName='Altered, Reproductive behaviour'), Row(keyEventId=339, keyEventName='Altered, Larval development'), Row(keyEventId=360, keyEventName='Decrease, Population trajectory'), Row(keyEventId=78, keyEventName='Reduction, Cumulative fecundity and spawning'), Row(keyEventId=307, keyEventName='Increase, Vitellogenin synthesis in liver'), Row(keyEventId=220, keyEventName='Increase, Plasma vitellogenin concentrations')])

In [99]:
wip = (
    wip.
    drop('keyEvents', 'keyEventId')
    .join(ke_names, on='aopId', how='inner')
    .distinct()
)

wip.show(2, False, True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 aopId            | 29                                                                                                                                                                                                                                                                                                                                                                                                                              
 adverseOutcomeId | 363                                                                                                       

### 5. Species -- hasn't been extracted

### 6. Map target to Ensembl

In [116]:
target_idx = (
    spark.read.parquet('/Users/irene/Documents/dev/pyspark/21.09.5/targets')
    .select('id', F.array('approvedSymbol').alias('approvedSymbol'),
            F.array('approvedName').alias('approvedName'),
            F.col('synonyms.label').alias('synonyms'), F.col('symbolSynonyms.label').alias('symbolSynonyms'),
            F.col('nameSynonyms.label').alias('nameSynonyms'))
    .withColumn('namesCombined', F.array_distinct(F.flatten(F.array('approvedSymbol', 'approvedName', 'synonyms', 'symbolSynonyms', 'nameSynonyms'))))
    .select(F.col('id').alias('targetId'), 'namesCombined')
)

target_idx.first()

Row(targetId='ENSG00000002016', namesCombined=['RAD52', 'RAD52 homolog, DNA repair protein', 'DNA repair protein RAD52 homolog', 'recombination protein RAD52', 'rhabdomyosarcoma antigen MU-RMS-40.23'])

In [119]:
wip = (
    wip.join(
        target_idx.withColumn('targetName', F.explode('namesCombined')).drop('namesCombined'),
        on='targetName', how='left'
    )
    .distinct()
)

In [120]:
wip.show(5, vertical=True)

-RECORD 0--------------------------------
 targetName       | dual oxidase 1       
 aopId            | 193                  
 adverseOutcomeId | 1101                 
 status           | null                 
 aopName          | Dual oxidase (DUO... 
 reference        | https://aopwiki.o... 
 adverseOutcome   | Altered, Amphibia... 
 keyEvents        | [{1101, Altered, ... 
 targetId         | ENSG00000137857      
-RECORD 1--------------------------------
 targetName       | NFAT activation m... 
 aopId            | 154                  
 adverseOutcomeId | 984                  
 status           | EAGMST Under Review  
 aopName          | Inhibition of Cal... 
 reference        | https://aopwiki.o... 
 adverseOutcome   | Impairment, T-cel... 
 keyEvents        | [{979, Interferen... 
 targetId         | ENSG00000235568      
-RECORD 2--------------------------------
 targetName       | calmodulin           
 aopId            | 77                   
 adverseOutcomeId | 563           

In [121]:
wip.select('targetName').distinct().count()

138

In [None]:
# 92 out of 138 targets have been mapped to Ensembl.

### 7. Map AO to EFO

In [None]:
from ontoma import OnToma

otmap = OnToma()

In [138]:
aos = list(wip.select('adverseOutcome').toPandas()['adverseOutcome'].dropna().unique())

aos[:5]

['Altered, Amphibian metamorphosis',
 'Impairment, T-cell dependent antibody response',
 'Death/Failure, Colony',
 'Increased Mortality',
 'Decrease, Population trajectory']

In [140]:
mappings = dict()

for event in aos:
    mapping = otmap.find_term(event)
    if len(mapping) > 0:
        mappings[event] = mapping[0].id_ot_schema

INFO     - ontoma.interface - Processed: Altered, Amphibian metamorphosis → []
INFO:ontoma.interface:Processed: Altered, Amphibian metamorphosis → []
INFO     - ontoma.interface - Processed: Impairment, T-cell dependent antibody response → []
INFO:ontoma.interface:Processed: Impairment, T-cell dependent antibody response → []
INFO     - ontoma.interface - Processed: Death/Failure, Colony → []
INFO:ontoma.interface:Processed: Death/Failure, Colony → []
INFO     - ontoma.interface - Processed: Increased Mortality → []
INFO:ontoma.interface:Processed: Increased Mortality → []
INFO     - ontoma.interface - Processed: Decrease, Population trajectory → []
INFO:ontoma.interface:Processed: Decrease, Population trajectory → []
INFO     - ontoma.interface - Processed: N/A, Cyanosis occurs → []
INFO:ontoma.interface:Processed: N/A, Cyanosis occurs → []
INFO     - ontoma.interface - Processed: Decreased sperm quantity or quality in the adult, Decreased fertility  → []
INFO:ontoma.interface:Process

In [141]:
mappings

{'Hypertension': 'EFO_0000537',
 'Hepatotoxicity': 'EFO_0011052',
 'Testicular atrophy': 'HP_0000029',
 'Pulmonary fibrosis': 'EFO_0009448',
 'obesity': 'EFO_0001073',
 'Liver Cancer': 'MONDO_0002691'}

In [143]:
# Coverage is extremely low: 6 out of 98 AOs have been successfully mapped

In [149]:
# Add mappings to the table

wip = (
    wip.withColumn('adverseOutcomeId', F.col('adverseOutcome'))
    .replace(to_replace=mappings, subset=['adverseOutcomeId'])
    .withColumn('adverseOutcomeId', F.when(F.col('adverseOutcomeId').contains('_'), F.col('adverseOutcomeId')))
)

Row(targetName='nitric oxide synthase, endothelial', aopId=149, adverseOutcomeId='EFO_0000537', status='Under Development', aopName='Peptide Oxidation Leading to Hypertension', reference='https://aopwiki.org/aops/149', adverseOutcome='Hypertension', keyEvents=[Row(keyEventId=937, keyEventName='KE7 : Impaired, Vasodilation'), Row(keyEventId=209, keyEventName='Peptide Oxidation'), Row(keyEventId=933, keyEventName='KE6 : Depletion, Nitric Oxide'), Row(keyEventId=952, keyEventName='Hypertension'), Row(keyEventId=951, keyEventName='KE8 : Increase, Vascular Resistance'), Row(keyEventId=935, keyEventName='KE2 : Decrease, GTPCH-1'), Row(keyEventId=927, keyEventName='KE1 : S-Glutathionylation, eNOS'), Row(keyEventId=934, keyEventName='KE3 : Decrease, Tetrahydrobiopterin'), Row(keyEventId=973, keyEventName='KE5 : Decrease, AKT/eNOS activity'), Row(keyEventId=932, keyEventName='KE4 : Uncoupling, eNOS')], targetId='ENSG00000164867')

### 8. Export resulting table

In [152]:
output = (
    wip
    .withColumn('species', F.lit(None))
    .select('aopId', 'aopName', 'targetName', 'targetId', 'adverseOutcome', 'adverseOutcomeId', 'keyEvents', 'species', 'status', 'reference')
    .distinct()
)

output.show(1, False, True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 aopId            | 193                                                                                                                                                                  
 aopName          | Dual oxidase (DUOX) inhibition leading to altered amphibian metamorphosis                                                                                            
 targetName       | dual oxidase 1                                                                                                                                                       
 targetId         | ENSG00000137857                                                                                                                                                      
 adverseOutcome   | Altered, Amphibian metamorphosis                  

In [153]:
output.coalesce(1).write.json('/Users/irene/Documents/dev/random_notebooks/AOPWiki/outputs/AOPWifi_refined')