## Scoring POC Notebook

In [1]:
import java.nio.file.Paths
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.JavaConversions._
import org.apache.spark.sql.expressions.Window
import org.yaml.snakeyaml.Yaml
import scala.io.Source

In [14]:
spark.getClass()

class org.apache.spark.sql.SparkSession

In [2]:
val input_dir = Paths.get(System.getProperty("user.home"), "data", "ot", "extract")
val output_dir = Paths.get(System.getProperty("user.home"), "data", "ot", "results")
val input_file = input_dir.resolve("evidence.json")
val df = spark.read.json(input_file.toString())

input_dir = /home/eczech/data/ot/extract
output_dir = /home/eczech/data/ot/results
input_file = /home/eczech/data/ot/extract/evidence.json
df = [access_level: string, disease: struct<biosample: struct<id: string, name: string>, efo_info: struct<efo_id: string, label: string ... 2 more fields> ... 3 more fields> ... 12 more fields]


[access_level: string, disease: struct<biosample: struct<id: string, name: string>, efo_info: struct<efo_id: string, label: string ... 2 more fields> ... 3 more fields> ... 12 more fields]

In [3]:
df.count()

1626544

In [4]:
val dfe = df.select(
        $"target.id".as("target_id"),
        $"private.efo_codes".as("efo_codes"),
        $"disease.id".as("disease_id"),
        $"scores.association_score".as("score"),
        $"sourceID".as("source_id"),
        $"id"
    )
val cols = dfe.columns

dfe = [target_id: string, efo_codes: array<string> ... 4 more fields]
cols = Array(target_id, efo_codes, disease_id, score, source_id, id)


Array(target_id, efo_codes, disease_id, score, source_id, id)

In [5]:
dfe.show(5, 12)

+------------+------------+-----------+------------+------------+------------+
|   target_id|   efo_codes| disease_id|       score|   source_id|          id|
+------------+------------+-----------+------------+------------+------------+
|ENSG00000...|[EFO_0000...|EFO_0000384|0.1130747...|expressio...|38c0851b3...|
|ENSG00000...|[EFO_0000...|EFO_0000384|      0.3762|expressio...|899d05b84...|
|ENSG00000...|[EFO_0000...|EFO_0000384|      0.3465|expressio...|a56ee52c9...|
|ENSG00000...|[EFO_0000...|EFO_0000384|0.3266999...|expressio...|b3c46e0fa...|
|ENSG00000...|[EFO_0000...|EFO_0000384|0.4158000...|expressio...|eca9e41d0...|
+------------+------------+-----------+------------+------------+------------+
only showing top 5 rows



In [6]:
dfe.groupBy("source_id").count().orderBy("count").show()

+------------------+------+
|         source_id| count|
+------------------+------+
|   uniprot_somatic|   284|
|           progeny|   308|
|            sysbio|   408|
|    gene2phenotype|  1586|
|            crispr|  1659|
|           intogen|  2375|
|uniprot_literature|  4553|
|       eva_somatic|  7585|
|          reactome| 10159|
|  genomics_england| 10528|
|           uniprot| 30480|
|    phewas_catalog| 55987|
|cancer_gene_census| 60310|
|        slapenrich| 74570|
|               eva| 96734|
|      gwas_catalog|180984|
|  expression_atlas|204229|
|            chembl|383122|
|         phenodigm|500683|
+------------------+------+



In [7]:
dfe.select("efo_codes").show(3, 100)

+------------------------------------------------------------------------------+
|                                                                     efo_codes|
+------------------------------------------------------------------------------+
|[EFO_0000540, EFO_0000405, EFO_0000384, EFO_0005140, EFO_0000408, EFO_0003767]|
|[EFO_0000540, EFO_0000405, EFO_0000384, EFO_0005140, EFO_0000408, EFO_0003767]|
|[EFO_0000540, EFO_0000405, EFO_0000384, EFO_0005140, EFO_0000408, EFO_0003767]|
+------------------------------------------------------------------------------+
only showing top 3 rows



In [8]:
dfe.select(size($"efo_codes").as("n")).groupBy("n").count().orderBy("n").show()

+---+------+
|  n| count|
+---+------+
|  1|     4|
|  2| 31382|
|  3|117825|
|  4|177100|
|  5|143127|
|  6|199967|
|  7| 99531|
|  8|105265|
|  9|129871|
| 10|155527|
| 11|104358|
| 12| 57374|
| 13| 56029|
| 14| 30250|
| 15| 28318|
| 16| 13342|
| 17| 17993|
| 18| 12379|
| 19| 21961|
| 20| 18028|
+---+------+
only showing top 20 rows



In [9]:
// Verify that the efo_codes list contains the disease_id before exploding based on 
// the efo_codes field (which could result in rows being lost if efo_codes is empty
// or does not contain the primary disease id)
dfe.map(r => r.getAs[Seq[String]]("efo_codes") contains r.getAs[String]("disease_id"))
    .groupBy("value").count().show()

+-----+-------+
|value|  count|
+-----+-------+
| true|1626544|
+-----+-------+



In [10]:
val direct_data_sources = List("expression_atlas")
val dfi = dfe
    .select("id", "source_id", "disease_id", "target_id", "efo_codes", "score")
    .withColumn("is_direct_source", $"source_id".isin(direct_data_sources:_*))
    .withColumn(
        "efo_ids", 
        when($"is_direct_source", array($"disease_id"))
        .otherwise($"efo_codes")
    )
    
dfi

direct_data_sources = List(expression_atlas)
dfi = [id: string, source_id: string ... 6 more fields]


[id: string, source_id: string ... 6 more fields]

In [11]:
val dfie = dfi
    .withColumnRenamed("disease_id", "orig_disease_id")
    .select(
        $"id", $"source_id", $"orig_disease_id", $"target_id", $"score",
        explode($"efo_ids").as("disease_id")
    )
    .withColumn(
        "is_direct_id",
        $"orig_disease_id" === $"disease_id"
    )
dfie

dfie = [id: string, source_id: string ... 5 more fields]


[id: string, source_id: string ... 5 more fields]

In [12]:
dfie.count()

14067077

In [13]:
dfie.filter(
    $"disease.id" === "EFO_0001645" &&
    $"target.id" === "ENSG00000226777" &&
    $"sourceID" === "sysbio"
).show()

+--------------------+---------+---------------+---------------+-----+-----------+------------+
|                  id|source_id|orig_disease_id|      target_id|score| disease_id|is_direct_id|
+--------------------+---------+---------------+---------------+-----+-----------+------------+
|abb040109d9414a1f...|   sysbio|    EFO_0001645|ENSG00000226777|0.859|EFO_0003777|       false|
|abb040109d9414a1f...|   sysbio|    EFO_0001645|ENSG00000226777|0.859|EFO_0000408|       false|
|abb040109d9414a1f...|   sysbio|    EFO_0001645|ENSG00000226777|0.859|EFO_0000319|       false|
|abb040109d9414a1f...|   sysbio|    EFO_0001645|ENSG00000226777|0.859|EFO_0001645|        true|
+--------------------+---------+---------------+---------------+-----+-----------+------------+



In [None]:
(
    for (v <- Seq(0, 1)) 
    yield {dfie.filter($"is_direct_id" === v).limit(3)}
) reduce(_ union _) show(6, 12)

In [None]:
dfie.groupBy("source_id").pivot("is_direct_id").agg(count("id")).show()

In [None]:
dfie.groupBy("target_id", "disease_id").count().withColumnRenamed("count", "n")
    .groupBy("n").count().orderBy($"n".desc).show

In [None]:
import java.util.{Map => JMap}
def using[A](r: Source)(f: Source => A): A = {try f(r) finally r.close()}
def loadConfig(path: String): JMap[String, Any] = {
    val content = using(Source.fromFile(path))(f => f.mkString)
    (new Yaml).load(content).asInstanceOf[JMap[String, Any]]
}
def loadScoringConfig(path: String): Map[String, Double] = {
    mapAsScalaMap(
        loadConfig(path)
        .get("scoring_weights").asInstanceOf[JMap[String, Any]]
        .get("source").asInstanceOf[JMap[String, Double]]
    ).toMap
}
val config = loadScoringConfig("../config/scoring.yml")
config

In [None]:
val lkp = typedLit(config)
val dfse = dfie
    .withColumnRenamed("score", "score_evidence")
    .withColumn("score_source", $"score_evidence" * coalesce(lkp($"source_id"), lit(1.0)))
dfse.show(3)

In [177]:
var w = Window.partitionBy("target_id", "disease_id", "source_id").orderBy($"score_source".desc)
val dfseh = dfse
    .withColumn("rid", row_number().over(w))
    .withColumn("score", $"score_source" / pow($"rid", 2.0))
dfseh.show(5)

+--------------------+----------------+---------------+---------------+--------------+--------------+------------+------------+---+-------+
|                  id|       source_id|orig_disease_id|      target_id|score_evidence|    disease_id|is_direct_id|score_source|rid|  score|
+--------------------+----------------+---------------+---------------+--------------+--------------+------------+------------+---+-------+
|c2ddf8443100ed7f1...|       phenodigm|  Orphanet_3337|ENSG00000000460|        0.9317|   EFO_0009566|       false|     0.18634|  1|0.18634|
|c2ddf8443100ed7f1...|       phenodigm|  Orphanet_3337|ENSG00000000460|        0.9317| Orphanet_3337|        true|     0.18634|  1|0.18634|
|5f0d033bca543b007...|genomics_england| Orphanet_33574|ENSG00000001084|           1.0|Orphanet_68367|       false|         1.0|  1|    1.0|
|a622181d1a443117e...|genomics_england| Orphanet_33574|ENSG00000001084|           1.0|Orphanet_68367|       false|         1.0|  2|   0.25|
|6e6fcb2a7b6ef8285..

w = org.apache.spark.sql.expressions.WindowSpec@35ae1cc2
dfseh = [id: string, source_id: string ... 8 more fields]


[id: string, source_id: string ... 8 more fields]

In [178]:
dfseh.describe("score").show()

+-------+--------------------+
|summary|               score|
+-------+--------------------+
|  count|            14067077|
|   mean| 0.05697707589949231|
| stddev|  0.1466786322141555|
|    min|4.590854684798087...|
|    max|                 1.0|
+-------+--------------------+



In [186]:
// Aggregate association to source level
// - clip summed scores at 1
// - take max is_direct before applying max again at association level
// - See: https://github.com/opentargets/data_pipeline/blob/7098546ee09ca1fc3c690a0bd6999b865ddfe646/mrtarget/modules/Association.py#L285
//   for how scores are clipped at source level but not at datatype or overall levels
val dfses = dfseh.groupBy("target_id", "disease_id", "source_id")
    .agg(sum("score").as("score_raw"), max("is_direct_id").as("is_direct"))
    .withColumn("score", when($"score_raw" > 1, 1).otherwise($"score_raw"))
dfses.show(3)

+---------------+--------------+----------------+---------+---------+-------+
|      target_id|    disease_id|       source_id|score_raw|is_direct|  score|
+---------------+--------------+----------------+---------+---------+-------+
|ENSG00000000460|   EFO_0009566|       phenodigm|  0.18634|    false|0.18634|
|ENSG00000000460| Orphanet_3337|       phenodigm|  0.18634|     true|0.18634|
|ENSG00000001084|Orphanet_68367|genomics_england|     1.25|    false|    1.0|
+---------------+--------------+----------------+---------+---------+-------+
only showing top 3 rows



dfses = [target_id: string, disease_id: string ... 4 more fields]


[target_id: string, disease_id: string ... 4 more fields]

In [187]:
import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
    .setInputCol("score_raw")
    .setOutputCol("score_bin")
    .setSplits(Array(0.0, 0.1, 0.5, 0.8, 1.0, 1.5, 2.0, 1000.0))
bucketizer.transform(dfses).groupBy("score_bin").count().show()

+---------+-------+
|score_bin|  count|
+---------+-------+
|      0.0| 339977|
|      1.0|1474378|
|      4.0| 181197|
|      3.0|  66604|
|      2.0| 212497|
|      5.0|  16993|
+---------+-------+



bucketizer = bucketizer_35fb4623cc5e


bucketizer_35fb4623cc5e

In [188]:
dfses.describe("score_raw").show()

+-------+--------------------+
|summary|           score_raw|
+-------+--------------------+
|  count|             2291646|
|   mean|  0.3497490074440019|
| stddev|  0.3227700291513107|
|    min|2.124299626275006...|
|    max|   1.644636205517087|
+-------+--------------------+



In [189]:
// Aggregate to association level
var w = Window.partitionBy("target_id", "disease_id").orderBy($"score".desc)
val dfsea = dfses
    .withColumn("rid", row_number().over(w))
    .withColumn("score", $"score" / pow($"rid", 2.0))
    .groupBy("target_id", "disease_id")
    .agg(    
        sum("score").as("score"), 
        max("is_direct").as("is_direct"),
        collect_set("source_id").as("source_ids")
    )
dfsea.show(5)

+---------------+--------------+-------------------+---------+-----------+
|      target_id|    disease_id|              score|is_direct| source_ids|
+---------------+--------------+-------------------+---------+-----------+
|ENSG00000000938|Orphanet_44890|                0.2|     true|   [chembl]|
|ENSG00000000971| Orphanet_1505|0.23495000000000002|    false|[phenodigm]|
|ENSG00000000971|Orphanet_98687|                0.2|    false|[phenodigm]|
|ENSG00000001497|   EFO_0000178|           0.405682|     true|   [crispr]|
|ENSG00000001626| Orphanet_2924|            0.22535|     true|[phenodigm]|
+---------------+--------------+-------------------+---------+-----------+
only showing top 5 rows



w = org.apache.spark.sql.expressions.WindowSpec@3e8739d6
dfsea = [target_id: string, disease_id: string ... 3 more fields]


[target_id: string, disease_id: string ... 3 more fields]

In [196]:
var path = output_dir.resolve("score_source.parquet")
dfses.write.format("parquet").mode("overwrite").save(path.toString())
path

path = /home/eczech/data/ot/results/score_source.parquet


/home/eczech/data/ot/results/score_source.parquet

In [197]:
var path = output_dir.resolve("score_association.parquet")
dfsea.write.format("parquet").mode("overwrite").save(path.toString())
path

path = /home/eczech/data/ot/results/score_association.parquet


/home/eczech/data/ot/results/score_association.parquet

In [201]:
def testtr(df: DataFrame): DataFrame = {
    df
}
dfsea.transform(testtr).show(1)

testtr: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


+---------------+--------------+-----+---------+----------+
|      target_id|    disease_id|score|is_direct|source_ids|
+---------------+--------------+-----+---------+----------+
|ENSG00000000938|Orphanet_44890|  0.2|     true|  [chembl]|
+---------------+--------------+-----+---------+----------+
only showing top 1 row

