In [None]:
from functools import reduce

from pathlib import Path
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = SparkSession.builder \
        .master('local[*]') \
        .config("spark.driver.memory", "15g") \
        .config("spark.ui.showConsoleProgress", "false") \
        .appName('spark') \
        .getOrCreate()

# Useful snippets for the analysis

In [2]:
def drop_null_columns(df: DataFrame) -> DataFrame:
    """
    Drops all columns which only contain null values.
    """
    df_count = df.count()
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v == df_count]
    df = df.drop(*to_drop)
    return df

In [3]:
def aggregate_evidence(df: DataFrame) -> DataFrame:
    """
    Groups the evidence on the association level and returns a dataframe with the aggregated values of the direction of effect.
    """
    return df.groupBy('targetId', 'diseaseId', 'datasourceId').agg(
        F.size(F.collect_list('effectDirection_down')).alias('effectDirection_down'),
        F.size(F.collect_list('effectDirection_up')).alias('effectDirection_up'),
        F.size(F.collect_list('effectSize_down')).alias('effectSize_down'),
        F.size(F.collect_list('effectSize_up')).alias('effectSize_up'),
    )


In [35]:
def get_stats(df: DataFrame, all: bool = False) -> None:
    """
    Calculates overall counts of the direction of effect dataset.
    When all is set to True, the df is aggregated and counts are calculated for all evidence.
    """
    
    source_name = df.select('datasourceId').distinct().collect()[0][0]
    total = df.select('targetId', 'diseaseId').distinct().count()
    if all:
        source_name = 'all'
        df = df.groupBy('targetId', 'diseaseId').agg(
            *[
                F.aggregate(F.collect_list(col), F.lit(0), lambda acc, x: acc + x).alias(col)
                for col in g2p.columns
                if col not in ['diseaseId', 'targetId', 'datasourceId']
            ]
        )
    
    direction_discrepancies = (
        df.filter((F.col('effectDirection_down') > 0) & (F.col('effectDirection_up') > 0))
        .count()
    )
    size_discrepancies = (
        df.filter((F.col('effectSize_down') > 0) & (F.col('effectSize_up') > 0))
        .count()
    )

    print(f'STATS for {source_name}')
    print(f'... {total} total associations.')
    print(
        f'... {direction_discrepancies} associations with discrepant direction ({direction_discrepancies/total*100:.2f}%).'
    )
    print(f'... {size_discrepancies} associations with discrepant size ({size_discrepancies/total*100:.2f}%). \n')

In [4]:
quantitative_traits = (
    # EFO IDs that are under measurement
    spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/diseases')
    .filter(F.array_contains(F.col('therapeuticAreas'), 'EFO_0001444'))
    .select(F.col('id').alias('diseaseId')).distinct()
    .toPandas()['diseaseId'].to_list()
)

quantitative_traits[:5]

['EFO_0005189', 'EFO_0008080', 'EFO_0008167', 'EFO_0008181', 'EFO_0010586']

# Data processing

## Known Drugs: Gold standard

Direction of effect comes from the mechanism of action of the drug that modulates the target.

In [5]:
lof_moas = [
    'AGONIST',
    'POSITIVE MODULATOR',
    'OPENER',
    'PARTIAL AGONIST',
    'ACTIVATOR',
    'STABILISER',
    'VACCINE ANTIGEN',
    'RELEASING AGENT',
    'POSITIVE ALLOSTERIC MODULATOR',
]

gof_moas = [
    'INHIBITOR',
    'ANTAGONIST',
    'BLOCKER',
    'NEGATIVE ALLOSTERIC MODULATOR',
    'HYDROLYTIC ENZYME',
    'RNAI INHIBITOR',
    'PROTEOLYTIC ENZYME',
    'NEGATIVE MODULATOR',
    'DISRUPTING AGENT',
    'ANTISENSE INHIBITOR',
    'DEGRADER',
    'INVERSE AGONIST',
    'ALLOSTERIC ANTAGONIST',
    'SEQUESTERING AGENT',
    'CHELATING AGENT',
    'OXIDATIVE ENZYME',
]


In [6]:
chembl = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=chembl'))
    .join(
        spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/mechanismOfAction')
        .select(F.explode('chemblIds').alias('drugId'), 'actionType').distinct(),
        on='drugId', how='inner'
    )
    .filter((F.col('actionType').isin(lof_moas)) | (F.col('actionType').isin(gof_moas)))
    .withColumn('effectDirection_up', F.when(F.col('actionType').isin(gof_moas), F.lit(True)))
    .withColumn('effectDirection_down', F.when(F.col('actionType').isin(lof_moas), F.lit(True)))
    .withColumn('effectSize_up', F.lit(True))
    .withColumn('effectSize_down', F.lit(None))
    .transform(aggregate_evidence)
)

chembl.show(2, False, True)

22/07/12 11:54:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0-------------------------------
 targetId             | ENSG00000007314 
 diseaseId            | EFO_0000612     
 datasourceId         | chembl          
 effectDirection_down | 0               
 effectDirection_up   | 5               
 effectSize_down      | 0               
 effectSize_up        | 5               
-RECORD 1-------------------------------
 targetId             | ENSG00000007314 
 diseaseId            | EFO_1000781     
 datasourceId         | chembl          
 effectDirection_down | 0               
 effectDirection_up   | 1               
 effectSize_down      | 0               
 effectSize_up        | 1               
only showing top 2 rows



## Gene2Phenotype

Directionality:
- absent_gene_product: SO_0002317
- increased_gene_product_level: SO_0002315

In [10]:
g2p = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=gene2phenotype'))
    .withColumn(
        'effectDirection_down',
        F.when(F.col('variantFunctionalConsequenceId') == 'SO_0002317', F.lit(True))
    )
    .withColumn('effectDirection_up', F.when(F.col('variantFunctionalConsequenceId') == 'SO_0002315', F.lit(True)))
    .withColumn('effectSize_up', F.lit(True))
    .withColumn('effectSize_down', F.lit(None))
    .filter(F.col('effectDirection_up').isNotNull() | F.col('effectDirection_down').isNotNull())
    .transform(aggregate_evidence)

)

g2p.show(2, False, True)

-RECORD 0-------------------------------
 targetId             | ENSG00000163093 
 diseaseId            | EFO_0009025     
 datasourceId         | gene2phenotype  
 effectDirection_down | 2               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 2               
-RECORD 1-------------------------------
 targetId             | ENSG00000186326 
 diseaseId            | MONDO_0012033   
 datasourceId         | gene2phenotype  
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
only showing top 2 rows



## Orphanet

Directionality:
- loss_of_function_variant: SO_0002054
- gain_of_function_variant: SO_0002053

In [11]:
orphanet = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=orphanet'))
    .withColumn(
        'effectDirection_down',
        F.when(F.col('variantFunctionalConsequenceId') == 'SO_0002054', F.lit(True))
    )
    .withColumn('effectDirection_up', F.when(F.col('variantFunctionalConsequenceId') == 'SO_0002053', F.lit(True)))
    .withColumn('effectSize_up', F.lit(True))
    .withColumn('effectSize_down', F.lit(None))
    .filter(F.col('effectDirection_up').isNotNull() | F.col('effectDirection_down').isNotNull())
    .transform(aggregate_evidence)
)

orphanet.show(2,False,True)

-RECORD 0-------------------------------
 targetId             | ENSG00000130294 
 diseaseId            | MONDO_0019941   
 datasourceId         | orphanet        
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
-RECORD 1-------------------------------
 targetId             | ENSG00000143520 
 diseaseId            | MONDO_0014555   
 datasourceId         | orphanet        
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
only showing top 2 rows



## Gene burden

- Models that consider PTVs are always LoF
- Effect size come from the betas/ORs

In [12]:
lof_models = [
    'ptv5pcnt',
    'ptv',
    'ptvraredmg',
    'pLoF',
    'MLOF-VT',
    'MLOF-MB',
    'LOF-Burden',
    'LOF-VT',
    'ADD-WGR-FIRTH_M1.singleton',
    'ADD-WGR-FIRTH_M1.01',
    'ADD-WGR-FIRTH_M1.001',
    'ADD-WGR-FIRTH_M1.1',
    'ADD-WGR-FIRTH_M1.0001',
]

burden = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=gene_burden'))
    .filter(F.col('statisticalMethod').isin(lof_models))
    .withColumn('effectDirection_down', F.lit(True))
    .withColumn('effectDirection_up', F.lit(None))
    .withColumn('effectSize_down', F.when((F.col('beta') < 0) | (F.col('oddsRatio') < 1), F.lit(True)))
    .withColumn('effectSize_up', F.when((F.col('beta') > 0) | (F.col('oddsRatio') > 1), F.lit(True)))
    .transform(aggregate_evidence)
)

burden.show(2, False, True)


-RECORD 0-------------------------------
 targetId             | ENSG00000012048 
 diseaseId            | EFO_0000305     
 datasourceId         | gene_burden     
 effectDirection_down | 6               
 effectDirection_up   | 0               
 effectSize_down      | 1               
 effectSize_up        | 5               
-RECORD 1-------------------------------
 targetId             | ENSG00000105610 
 diseaseId            | EFO_0004526     
 datasourceId         | gene_burden     
 effectDirection_down | 14              
 effectDirection_up   | 0               
 effectSize_down      | 14              
 effectSize_up        | 0               
only showing top 2 rows



##  GWAS of coding variants (Genetics)

- Models that consider PTVs are always LoF
- Effect size come from the betas/ORs

In [13]:
lof_consequences = [
    'SO_0001589', # frameshift
    'SO_0002012', # start_lost
    'SO_0001587', # stop_gained
    'SO_0001818', # protein altering variant
]

In [14]:
ot_genetics = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=ot_genetics_portal'))
    .filter(F.col('variantFunctionalConsequenceId').isin(lof_consequences))
    .withColumn('effectDirection_down', F.lit(True))
    .withColumn('effectDirection_up', F.lit(None))
    .withColumn('effectSize_down', F.when((F.col('beta') < 0) | (F.col('oddsRatio') < 1), F.lit(True)))
    .withColumn('effectSize_up', F.when((F.col('beta') > 0) | (F.col('oddsRatio') > 1), F.lit(True)))
    .transform(aggregate_evidence)
)

ot_genetics.show(2, False, True)

-RECORD 0----------------------------------
 targetId             | ENSG00000205045    
 diseaseId            | EFO_0004833        
 datasourceId         | ot_genetics_portal 
 effectDirection_down | 2                  
 effectDirection_up   | 0                  
 effectSize_down      | 2                  
 effectSize_up        | 0                  
-RECORD 1----------------------------------
 targetId             | ENSG00000176920    
 diseaseId            | EFO_0005760        
 datasourceId         | ot_genetics_portal 
 effectDirection_down | 1                  
 effectDirection_up   | 0                  
 effectSize_down      | 0                  
 effectSize_up        | 1                  
only showing top 2 rows



## ClinVar

- LoF variants are considered
- Clinical significance addresses the size of the effect

In [15]:
significances = [
    'pathogenic',
    'likely pathogenic',
    'risk factor',
    'protective',
]

In [16]:
clinvar = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=eva*'))
    .withColumn('datasourceId', F.lit('clinvar'))
    .withColumn('significance', F.explode('clinicalSignificances'))
    .filter(F.col('significance').isin(significances))
    .filter(F.col('variantFunctionalConsequenceId').isin(lof_consequences))
    .withColumn('effectDirection_down', F.lit(True))
    .withColumn('effectDirection_up', F.lit(None))
    .withColumn('effectSize_down', F.when(F.col('significance') == 'protective', F.lit(True)))
    .withColumn('effectSize_up', F.when(F.col('significance') != 'protective', F.lit(True)))
    .transform(aggregate_evidence)
)

clinvar.show(2, False, True)

-RECORD 0-------------------------------
 targetId             | ENSG00000018236 
 diseaseId            | MONDO_0012929   
 datasourceId         | clinvar         
 effectDirection_down | 9               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 9               
-RECORD 1-------------------------------
 targetId             | ENSG00000049860 
 diseaseId            | MONDO_0010006   
 datasourceId         | clinvar         
 effectDirection_down | 79              
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 79              
only showing top 2 rows



## Mouse models

All the genotypes refer to KO models

In [17]:
impc = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=impc'))
    .withColumn('effectDirection_down', F.lit(True))
    .withColumn('effectDirection_up', F.lit(None))
    .withColumn('effectSize_up', F.lit(True))
    .withColumn('effectSize_down', F.lit(None))
    .transform(aggregate_evidence)
)

impc.show(2, False, True)

-RECORD 0-------------------------------
 targetId             | ENSG00000001461 
 diseaseId            | EFO_1001451     
 datasourceId         | impc            
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
-RECORD 1-------------------------------
 targetId             | ENSG00000001617 
 diseaseId            | MONDO_0009141   
 datasourceId         | impc            
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
only showing top 2 rows



## Differential expression

- All experiments are disease vs control
    - Positive fold change: increase of expression
    - Negative fold change: decrease of expression

In [18]:
expression = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=expression_atlas'))
    .withColumn('effectDirection_down', F.when(F.col('log2FoldChangeValue') < 0, F.lit(True)))
    .withColumn('effectDirection_up', F.when(F.col('log2FoldChangeValue') > 0, F.lit(True)))
    .withColumn('effectSize_down', F.lit(None))
    .withColumn('effectSize_up', F.lit(True))
    .transform(aggregate_evidence)
)

expression.show(2, False, True)

-RECORD 0--------------------------------
 targetId             | ENSG00000003137  
 diseaseId            | EFO_0003096      
 datasourceId         | expression_atlas 
 effectDirection_down | 1                
 effectDirection_up   | 0                
 effectSize_down      | 0                
 effectSize_up        | 1                
-RECORD 1--------------------------------
 targetId             | ENSG00000004777  
 diseaseId            | EFO_0000676      
 datasourceId         | expression_atlas 
 effectDirection_down | 1                
 effectDirection_up   | 0                
 effectSize_down      | 0                
 effectSize_up        | 1                
only showing top 2 rows



## Project Score

Synthetic lethality -> all LoF

In [19]:
crispr = (
    drop_null_columns(spark.read.parquet('/Users/irene/Documents/dev/pyspark/22.06.2/evidence/sourceId=crispr'))
    .withColumn('effectDirection_down', F.lit(True))
    .withColumn('effectDirection_up', F.lit(None))
    .withColumn('effectSize_up', F.lit(True))
    .withColumn('effectSize_down', F.lit(None))
    .transform(aggregate_evidence)
)

crispr.show(2, False, True)

-RECORD 0-------------------------------
 targetId             | ENSG00000166595 
 diseaseId            | EFO_0001378     
 datasourceId         | crispr          
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
-RECORD 1-------------------------------
 targetId             | ENSG00000197713 
 diseaseId            | EFO_0001075     
 datasourceId         | crispr          
 effectDirection_down | 1               
 effectDirection_up   | 0               
 effectSize_down      | 0               
 effectSize_up        | 1               
only showing top 2 rows



# Export data

In [20]:
datasets = [
    chembl,
    g2p,
    orphanet,
    burden,
    ot_genetics,
    clinvar,
    impc,
    expression,
    crispr,
    # oncology,
    # coloc,
]

In [39]:
all_data = reduce(DataFrame.unionByName, datasets).repartition(20).persist()

In [32]:
cwd = Path.cwd().parent

# all_data.write.parquet(str(cwd / 'outputs' / 'data_harmonisation'), mode='overwrite')

                                                                                

# Stats



## Overall

In [36]:
# Per datasource

[get_stats(e) for e in datasets]

STATS for chembl
... 70211 total associations.
... 4982 associations with discrepant direction (7.10%).
... 0 associations with discrepant size (0.00%). 

STATS for gene2phenotype
... 2425 total associations.
... 0 associations with discrepant direction (0.00%).
... 0 associations with discrepant size (0.00%). 

STATS for orphanet
... 1757 total associations.
... 0 associations with discrepant direction (0.00%).
... 0 associations with discrepant size (0.00%). 

STATS for gene_burden
... 2874 total associations.
... 0 associations with discrepant direction (0.00%).
... 144 associations with discrepant size (5.01%). 

STATS for ot_genetics_portal
... 114 total associations.
... 0 associations with discrepant direction (0.00%).
... 3 associations with discrepant size (2.63%). 

STATS for clinvar
... 10275 total associations.
... 0 associations with discrepant direction (0.00%).
... 1 associations with discrepant size (0.01%). 

STATS for impc
... 523293 total associations.
... 0 associat

[None, None, None, None, None, None, None, None, None]

In [51]:
# All

get_stats(all_data, all=True)

STATS for all
... 767340 total associations.
... 9102 associations with discrepant direction (1.19%).
... 172 associations with discrepant size (0.02%). 



## Provided examples

### LDLR/low density lipoprotein cholesterol measurement

LDLR down - low density lipoprotein cholesterol measurement up ->  Wrong therapeutic hypothesis for inhibitor

- We only have data from gene_burden because variants from Genetics Portal are not eligible


In [55]:
(
    all_data
    .filter((F.col('diseaseId') == 'EFO_0004611') & (F.col('targetId') == 'ENSG00000130164'))
    .show()
)

+---------------+-----------+------------+--------------------+------------------+---------------+-------------+
|       targetId|  diseaseId|datasourceId|effectDirection_down|effectDirection_up|effectSize_down|effectSize_up|
+---------------+-----------+------------+--------------------+------------------+---------------+-------------+
|ENSG00000130164|EFO_0004611| gene_burden|                   4|                 0|              0|            4|
+---------------+-----------+------------+--------------------+------------------+---------------+-------------+



### PCSK9/low density lipoprotein cholesterol measurement

PCSK9 down - low density lipoprotein cholesterol measurement down -> Good therapeutic hypothesis for inhibitor
- We only have data from gene_burden because variants from Genetics Portal are not eligible

In [56]:
(
    all_data
    .filter((F.col('diseaseId') == 'EFO_0004611') & (F.col('targetId') == 'ENSG00000169174'))
    .show()
)

+---------------+-----------+------------+--------------------+------------------+---------------+-------------+
|       targetId|  diseaseId|datasourceId|effectDirection_down|effectDirection_up|effectSize_down|effectSize_up|
+---------------+-----------+------------+--------------------+------------------+---------------+-------------+
|ENSG00000169174|EFO_0004611| gene_burden|                  11|                 0|             11|            0|
+---------------+-----------+------------+--------------------+------------------+---------------+-------------+



### FLG/atopic eczema

FLG down - atopic eczema up -> Wrong therapeutic hypothesis for inhibitor. Benchmark with expression.
- All the sources in the Platform populate the analysis
- Practically all available evidence support the hypothesis
- There is one evidence from the genetics_portal that is discordant (1_152313385_G_A)

In [57]:
(
    all_data
    .filter((F.col('diseaseId') == 'EFO_0000274') & (F.col('targetId') == 'ENSG00000143631'))
    .show()
)

+---------------+-----------+------------------+--------------------+------------------+---------------+-------------+
|       targetId|  diseaseId|      datasourceId|effectDirection_down|effectDirection_up|effectSize_down|effectSize_up|
+---------------+-----------+------------------+--------------------+------------------+---------------+-------------+
|ENSG00000143631|EFO_0000274|           clinvar|                  12|                 0|              0|           12|
|ENSG00000143631|EFO_0000274|              impc|                   2|                 0|              0|            2|
|ENSG00000143631|EFO_0000274|ot_genetics_portal|                   6|                 0|              1|            5|
|ENSG00000143631|EFO_0000274|  expression_atlas|                   2|                 0|              0|            2|
|ENSG00000143631|EFO_0000274|       gene_burden|                   8|                 0|              0|            8|
+---------------+-----------+------------------+

### IL4R/atopic eczema

IL4R up - atopic eczema up -> Good therapeutic hypothesis for inhibitor
- IMPC evidence is not present because the disease is not the same (associations have not been expanded)
- ClinVar evidence is not present because variants are not eligible
- ChEMBL and Expression Atlas are consistent

In [58]:
(
    all_data
    .filter((F.col('diseaseId') == 'EFO_0000274') & (F.col('targetId') == 'ENSG00000077238'))
    .show()
)

+---------------+-----------+----------------+--------------------+------------------+---------------+-------------+
|       targetId|  diseaseId|    datasourceId|effectDirection_down|effectDirection_up|effectSize_down|effectSize_up|
+---------------+-----------+----------------+--------------------+------------------+---------------+-------------+
|ENSG00000077238|EFO_0000274|          chembl|                   0|                36|              0|           36|
|ENSG00000077238|EFO_0000274|expression_atlas|                   0|                 1|              0|            1|
+---------------+-----------+----------------+--------------------+------------------+---------------+-------------+

