In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame

In [2]:
spark = SparkSession.builder.getOrCreate()

In [5]:
dfs_sections = spark.read \
.format("jdbc") \
.option("url", "jdbc:sqlite:/nfs/workspaces/datasets/covid-19/articles.sqlite") \
.option("dbtable", "(SELECT * FROM sections)").load()
dfs_sections.show()
dfs_articles = spark.read \
.format("jdbc") \
.option("url", "jdbc:sqlite:/nfs/workspaces/datasets/covid-19/articles.sqlite") \
.option("query", "(SELECT * FROM articles)") \
.option("customSchema", "Published STRING, Entry STRING").load()
dfs_articles.show()
dfs_citations = spark.read \
.format("jdbc") \
.option("url", "jdbc:sqlite:/nfs/workspaces/datasets/covid-19/articles.sqlite") \
.option("query", "(SELECT * FROM citations)").load()
dfs_citations.show()

+---+--------+--------+------+-----+--------------------+-----------+
| Id| Article|    Tags|Design| Name|                Text|     Labels|
+---+--------+--------+------+-----+--------------------+-----------+
|  0|wvx6q999|COVID-19|     0|TITLE|Note from the edi...|   FRAGMENT|
|  1|wvx6q999|COVID-19|     0| null|The situation has...|       null|
|  2|wvx6q999|COVID-19|     0| null|Meanwhile, on 7 J...|       null|
|  3|wvx6q999|COVID-19|     0| null|In order to suppo...|       null|
|  4|wvx6q999|COVID-19|     0| null|For example, ther...|   FRAGMENT|
|  5|wvx6q999|COVID-19|     0| null|In this issue of ...|   FRAGMENT|
|  6|wvx6q999|COVID-19|     0| null|One is a research...|   FRAGMENT|
|  7|wvx6q999|COVID-19|     0| null|on the developmen...|       null|
|  8|wvx6q999|COVID-19|     0| null|Before this publi...|       null|
|  9|wvx6q999|COVID-19|     0| null|The other is a ra...|       null|
| 10|wvx6q999|COVID-19|     0| null|It also points ou...|   FRAGMENT|
| 11|wvx6q999|COVID-

In [6]:
ner_type = ["BC2GM","BC4CHEMD","BC5CDR-chem","BC5CDR-disease","JNLPBA","NCBI-disease","linnaeus","s800"]
ner_app = []
for item in ner_type :
    ner_path = "/home/leeivan/bio_corpus/ner/"+ item  + "/all_entity.csv"
    df = spark.read.format("csv").option("header","true").load(ner_path)
    df = df.withColumn("type", lit(item))
    ner_app.append(df)
df_ner_all = reduce(DataFrame.unionAll, ner_app)
df_ner_all = df_ner_all.withColumnRenamed("type", "ner_type")
df_ner_all.show()

+--------------------+---+----------+--------+
|              entity|  n|section_id|ner_type|
+--------------------+---+----------+--------+
|                AAK1|  1|      1522|   BC2GM|
|      Januase Kinase|  1|      1523|   BC2GM|
|              IL - 8|  1|      3461|   BC2GM|
|              IL - 8|  2|      3461|   BC2GM|
|              IL - 8|  3|      3461|   BC2GM|
|                b A1|  1|      6798|   BC2GM|
|                BafA|  2|      6798|   BC2GM|
|                BafA|  1|      6799|   BC2GM|
|vacuolar type H (...|  2|      6799|   BC2GM|
|          v - ATPase|  3|      6799|   BC2GM|
|                   C|  1|      6800|   BC2GM|
|                BafA|  2|      6800|   BC2GM|
|mammalian target ...|  3|      6800|   BC2GM|
|              mTORC1|  4|      6800|   BC2GM|
|              mTORC1|  1|      6801|   BC2GM|
|              mTORC1|  2|      6801|   BC2GM|
|              mTORC1|  1|      6803|   BC2GM|
|transcription fac...|  1|      6804|   BC2GM|
|            

In [7]:
re_type = ["GAD","euadr"]
re_app = []
for item in re_type:
    re_path = "/home/leeivan/bio_corpus/re_all_in_one/" + item + "/all_relations.csv"
    df = spark.read.format("csv").option("header","true").load(re_path)
    df = df.withColumn("type", lit(item))
    re_app.append(df)

In [8]:
df_re_all = reduce(DataFrame.unionAll, re_app)
df_re_all = df_re_all.withColumnRenamed("type", "re_type")
df_re_all.show()

+----------+-------+
|section_id|re_type|
+----------+-------+
|       222|    GAD|
|      2780|    GAD|
|      4270|    GAD|
|      8822|    GAD|
|      9142|    GAD|
|      9614|    GAD|
|      9998|    GAD|
|     10000|    GAD|
|     11154|    GAD|
|     12708|    GAD|
|     12709|    GAD|
|     12710|    GAD|
|     13385|    GAD|
|     14966|    GAD|
|     15913|    GAD|
|     15918|    GAD|
|     17060|    GAD|
|     18664|    GAD|
|     18665|    GAD|
|     20429|    GAD|
+----------+-------+
only showing top 20 rows



In [9]:
df_ner_re = df_ner_all.join(df_re_all, "section_id")
df_ner_re.show()
df_ner_re_src = df_ner_re.withColumnRenamed("entity","entity_src") \
.withColumnRenamed("n", "n_src") \
.withColumnRenamed("ner_type", "ner_type_src") \
.withColumnRenamed("re_type", "re_type_src")
df_ner_re_src.show()
df_ner_re_dst = df_ner_re.withColumnRenamed("entity","entity_dst") \
.withColumnRenamed("n", "n_dst") \
.withColumnRenamed("ner_type", "ner_type_dst") \
.withColumnRenamed("re_type", "re_type_dst")
df_ner_re_dst.show()
df_ner_re_src_dst = df_ner_re_src.join(df_ner_re_dst, "section_id")
df_ner_re_src_dst.show()
df_ner_re_src_dst = df_ner_re_src_dst.filter("entity_src != entity_dst") \
.filter("re_type_src == re_type_dst") \
.filter("ner_type_src == ner_type_dst")

+----------+--------------------+---+--------+-------+
|section_id|              entity|  n|ner_type|re_type|
+----------+--------------------+---+--------+-------+
|      1523|      Januase Kinase|  1|   BC2GM|  euadr|
|      6799|                BafA|  1|   BC2GM|  euadr|
|      6799|vacuolar type H (...|  2|   BC2GM|  euadr|
|      6799|          v - ATPase|  3|   BC2GM|  euadr|
|      6803|              mTORC1|  1|   BC2GM|  euadr|
|      6805|                BafA|  1|   BC2GM|  euadr|
|      6805|              mTORC1|  2|   BC2GM|  euadr|
|     10479|                   h|  1|   BC2GM|  euadr|
|     16293|   ribosome helicase|  1|   BC2GM|  euadr|
|     18261|                  CR|  1|   BC2GM|  euadr|
|     18261|              Cas13b|  2|   BC2GM|  euadr|
|     18265|              Cas13b|  1|   BC2GM|  euadr|
|     20120|      HLA - B27 mRNA|  1|   BC2GM|  euadr|
|     20122|      HLA - B27 mRNA|  1|   BC2GM|  euadr|
|     23056|               renin|  1|   BC2GM|  euadr|
|     2305

In [10]:
from pyspark.sql.functions import col
from pyspark.sql.functions import *

In [11]:
e_drop_duplicate = df_ner_re_src_dst.withColumn("entity_pair",array_join(array_sort(array(col("entity_src"),col("entity_dst"))),"_")).dropDuplicates(["entity_pair"]).drop("entity_pair")

In [12]:
from pyspark.sql.window import Window as W
v = df_ner_all.groupBy("entity").agg(collect_set("section_id").alias("sections"))

In [13]:
v.cache()

DataFrame[entity: string, sections: array<string>]

In [14]:
v = v.withColumn("idx", monotonically_increasing_id())
windowSpec = W.orderBy("idx")
v = v.withColumn("id", row_number().over(windowSpec)).drop("idx")
v.show()

+--------------------+--------------------+---+
|              entity|            sections| id|
+--------------------+--------------------+---+
|"Taxifolin 3 ' - ...|           [1278739]|  1|
|$ \ beta $ - hairpin|           [1539578]|  2|
|         ( E , E ) -|           [1503298]|  3|
|- 19 and H1N1 swi...|  [5220609, 1411652]|  4|
|              - Nrf2|           [6906166]|  5|
|        - nCV N gene|           [3823453]|  6|
|         - occlusion|   [1123441, 642104]|  7|
|- related infections|   [1354288, 665292]|  8|
|1 , 2 , 3 , 4 - t...|[1291599, 8620851...|  9|
|1 , 3 , 6 - trihy...|           [1538143]| 10|
|       1 microglobul|           [6290762]| 11|
|                 13C|[874533, 1470080,...| 12|
|     17β - estradiol|           [3906171]| 13|
|     1ab polyprotein|[622787, 920614, ...| 14|
|2 , 2 ' - Azino -...|            [853792]| 15|
|2019 - nCoV main ...|[682594, 7808126,...| 16|
|2019 Corona Virus...|[7274121, 1352238...| 17|
|3 , 5 - Di - O - ...|  [1064612, 304072

In [232]:
v.filter("entity = 'COVID - 19'").show()

+----------+--------------------+-----+
|    entity|            sections|   id|
+----------+--------------------+-----+
|COVID - 19|[6430795, 1018353...|45423|
+----------+--------------------+-----+



In [15]:
e_drop_duplicate = e_drop_duplicate.select("entity_src","entity_dst","re_type_src")
# e = e.withColumnRenamed("entity_src", "src").withColumnRenamed("entity_dst", "dst") \
# .withColumnRenamed("re_type_src", "relationship")
e_drop_duplicate.show()

+--------------------+--------------------+-----------+
|          entity_src|          entity_dst|re_type_src|
+--------------------+--------------------+-----------+
|sesquiterpen hydr...|( E ) - ß - farne...|      euadr|
|                   ,|   Alloyohimbine , ,|      euadr|
|                 3CL|           - ACE - 2|      euadr|
|  palindrome repeats|               - Cas|      euadr|
|             - LARP1|        5 ′ TOP mRNA|        GAD|
|                coki|            / IL - 6|        GAD|
|1 , 10 - phenanth...|[ Co ( indo - O ,...|      euadr|
|                 111|               human|      euadr|
|14 - 3 - 3 protei...|               YWHAE|      euadr|
|       spike protein|2 - O - ribose me...|        GAD|
|                trip|             2 ′ - C|      euadr|
|         2019 - nCoV|      angiotensin II|      euadr|
|receptor binding ...|        2019 - spike|      euadr|
|      SARS - CoV - 2|            2019 nCO|      euadr|
|             24 , 25|                   e|     

In [16]:
e_drop_duplicate = e_drop_duplicate.join(v, e_drop_duplicate.entity_src == v.entity).select("id","entity_dst","re_type_src").withColumnRenamed("id","src")
e = e_drop_duplicate.join(v, e_drop_duplicate.entity_dst == v.entity).select("id","src","re_type_src").withColumnRenamed("id","dst").withColumnRenamed("re_type_src","relationship")

In [17]:
e.cache()

DataFrame[dst: int, src: int, relationship: string]

In [18]:
e.show()

+---+-----+------------+
|dst|  src|relationship|
+---+-----+------------+
|  1|50272|       euadr|
|  1|34433|       euadr|
|  1|26113|       euadr|
|  1|77094|       euadr|
|  5|53014|       euadr|
|  9|18661|         GAD|
| 10|50656|       euadr|
| 13|95216|       euadr|
| 13|28406|       euadr|
| 18|48101|       euadr|
| 18| 4370|       euadr|
| 18|26521|       euadr|
| 18|93377|       euadr|
| 19|17642|         GAD|
| 19|93139|       euadr|
| 19|33829|       euadr|
| 19|47794|         GAD|
| 19|91314|       euadr|
| 19|32219|       euadr|
| 19|70152|       euadr|
+---+-----+------------+
only showing top 20 rows



In [19]:
from graphframes import *
g = GraphFrame(v,e)

In [191]:
gad_g = g.filterVertices("entity like '%respiratory%' or entity like '%COVID%'").dropIsolatedVertices()

In [233]:
gad_g = g.filterEdges("src = 45423 or dst = 45423").dropIsolatedVertices()

In [234]:
gad_g.vertices.show()

+---+--------------------+--------------------+
| id|              entity|            sections|
+---+--------------------+--------------------+
| 59| COVID - 19 distress|[1439412, 1494838...|
| 64|          COVID - op|            [858177]|
| 71|                 CRS|[1186406, 4756921...|
| 72|    CV complications|[935312, 1110874,...|
|114|                 GIS|  [1330911, 6925845]|
|140|                   K|[5196416, 5474943...|
|146|  Kidney dysfunction|[7868742, 4174959...|
|148|                  LT|[1183680, 1613277...|
|151|    MERS coronavirus|[657579, 1436661,...|
|154|                  MM|[8291247, 843908,...|
|168|               NMOSD|[1259429, 1490185...|
|218| SARS - CoV - 2 - on|[2503539, 2795172...|
|226|   SARS - CoV2 virus|[2392326, 1063890...|
|227|SARS - associated...|           [1754433]|
|239|Severe acute resp...|    [877046, 618577]|
|250|US covirus diseas...|            [947468]|
|279|            arrhymia|[1092038, 639157,...|
|287|      bronchiectasis|[2115523, 2127

In [244]:
gad_g.edges.count()

5682

In [236]:
gad_g_v = gad_g.vertices.select("id").rdd.flatMap(lambda x: x).collect()

In [237]:
nodes = [{'name': i["entity"], 'size': i["size"]}
         for i in gad_g.vertices.select("entity", size("sections").alias("size")).collect()]

In [238]:
links = [{'source': gad_g_v.index(u["source"]), 'target': gad_g_v.index(u["target"])}
         for u in gad_g.edges.withColumnRenamed("dst","target").withColumnRenamed("src","source").select("source","target","relationship").collect()]

In [239]:
graph = {'nodes': nodes, "links": links}

In [240]:
graph

{'nodes': [{'name': 'COVID - 19 distress', 'size': 8},
  {'name': 'COVID - op', 'size': 1},
  {'name': 'CRS', 'size': 252},
  {'name': 'CV complications', 'size': 6},
  {'name': 'GIS', 'size': 2},
  {'name': 'K', 'size': 227},
  {'name': 'Kidney dysfunction', 'size': 3},
  {'name': 'LT', 'size': 14},
  {'name': 'MERS coronavirus', 'size': 20},
  {'name': 'MM', 'size': 59},
  {'name': 'NMOSD', 'size': 66},
  {'name': 'SARS - CoV - 2 - on', 'size': 3},
  {'name': 'SARS - CoV2 virus', 'size': 67},
  {'name': 'SARS - associated coronavirus 2', 'size': 1},
  {'name': 'Severe acute respiratory syndrome coronavirus 2 related',
   'size': 2},
  {'name': 'US covirus disease 2019', 'size': 1},
  {'name': 'arrhymia', 'size': 4},
  {'name': 'bronchiectasis', 'size': 86},
  {'name': 'chronic lower respiratory disease', 'size': 5},
  {'name': 'co of 2019', 'size': 1},
  {'name': 'covir', 'size': 13},
  {'name': 'hemolytic uremic syndrome', 'size': 2},
  {'name': 'inflammatory', 'size': 262},
  {'nam

In [241]:
import json
with open('graph.json', 'w') as f:
    json.dump(graph, f, indent=4,)

In [None]:
%%html
<div id="d3-example"></div>
<style>
.node {stroke: #fff; stroke-width: 1.5px;}
.link {stroke: #999; stroke-opacity: .9;}
</style>

In [243]:
%%javascript
// We load the d3.js library from the Web.
require.config({paths:
    {d3: "http://d3js.org/d3.v3.min"}});
require(["d3"], function(d3) {
  // The code in this block is executed when the
  // d3.js library has been loaded.

  // First, we specify the size of the canvas
  // containing the visualization (size of the
  // <div> element).
  var width = 900, height = 600;

  // We create a color scale.
  var color = d3.scale.category10();

  // We create a force-directed dynamic graph layout.
  var force = d3.layout.force()
    .charge(-120)
    .linkDistance(30)
    .size([width, height]);

  // In the <div> element, we create a <svg> graphic
  // that will contain our interactive visualization.
  var svg = d3.select("#d3-example").select("svg")
  if (svg.empty()) {
    svg = d3.select("#d3-example").append("svg")
          .attr("width", width)
          .attr("height", height);
  }

  // We load the JSON file.
  d3.json("graph.json", function(error, graph) {
    // In this block, the file has been loaded
    // and the 'graph' object contains our graph.

    // We load the nodes and links in the
    // force-directed graph.
    force.nodes(graph.nodes)
      .links(graph.links)
      .start();

    // We create a <line> SVG element for each link
    // in the graph.
    var link = svg.selectAll(".link")
      .data(graph.links)
      .enter().append("line")
      .attr("class", "link");

    // We create a <circle> SVG element for each node
    // in the graph, and we specify a few attributes.
    var node = svg.selectAll(".node")
      .data(graph.nodes)
      .enter().append("circle")
      .attr("class", "node")
      .attr("r", 5)  // radius
      .style("fill", function(d) {
         // The node color depends on the club.
         return color(d.size);
      })
      .call(force.drag);

    // The name of each node is the node number.
    node.append("title")
        .text(function(d) { return d.name; });

    // We bind the positions of the SVG elements
    // to the positions of the dynamic force-directed
    // graph, at each time step.
    force.on("tick", function() {
      link.attr("x1", function(d){return d.source.x})
          .attr("y1", function(d){return d.source.y})
          .attr("x2", function(d){return d.target.x})
          .attr("y2", function(d){return d.target.y});

      node.attr("cx", function(d){return d.x})
          .attr("cy", function(d){return d.y});
    });
  });
});

<IPython.core.display.Javascript object>

In [None]:
import networkx as nx
gp = nx.from_pandas_edgelist(g.edges.where(col("dst")=="1").toPandas(),'src','dst')

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(8, 6));
nx.draw_networkx(gp, ax=ax)

In [None]:
import json
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
g = nx.karate_club_graph()