In [1]:
import os
os.environ["JAVA_HOME"] = "/mnt/extproj/projekte/textmining/jdk/openlogic-openjdk-11.0.22+7-linux-x64"


In [2]:
from collections import defaultdict
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import Window


In [3]:
if not "spark" in globals() or spark is None:
    print("Creating builder")
    spark = SparkSession.builder.\
            config("spark.executor.memory", "70g").\
            config("spark.driver.memory", "50g").\
            config("spark.memory.offHeap.enabled",True).\
            config("spark.memory.offHeap.size","16g").\
            config("spark.sql.shuffle.partitions", 300).\
            appName('mirexplore').getOrCreate()
    
#spark.conf.set("spark.executor.memory", "70g")
#spark.conf.set("spark.driver.memory", "50g")
#spark.conf.set("spark.memory.offHeap.enabled",True)
#spark.conf.set("spark.memory.offHeap.size","16g")  
#spark.conf.set("spark.sql.shuffle.partitions", 300)

Creating builder




In [4]:
df = spark.read.parquet("/mnt/extproj/projekte/textmining/mx_feb24/mx_mirna_gene_parquet/")

In [5]:
df.show(n=10)

+------------+-----------+-----------+---------+--------+---------------------+----------------+---------------+-----------------+--------------------+----------------+
|miRNA_family| miRNA_name|gene_family|gene_name|organism|interaction_direction|interaction_type|evidence_source|evidence_document|       sent_evidence|     interaction|
+------------+-----------+-----------+---------+--------+---------------------+----------------+---------------+-----------------+--------------------+----------------+
|     miR-155|    miR-155|      ACSS2|      ACS|     hsa|             GENE_MIR|            DOWN|     mirexplore|         25319951|{25319951.2.8, {1...|{GENE_MIR, DOWN}|
|     miR-155|    miR-155|       ACCS|      ACS|     hsa|             GENE_MIR|            DOWN|     mirexplore|         25319951|{25319951.2.8, {1...|{GENE_MIR, DOWN}|
|     miR-155|    miR-155|   SERPINA5|      PCI|     hsa|             GENE_MIR|            DOWN|     mirexplore|         25319951|{25319951.2.8, {1...|{GEN

In [6]:
df.select(col("organism")).distinct().show()

+--------+
|organism|
+--------+
|     mmu|
| mmu;hsa|
|     hsa|
+--------+



In [7]:
df = df.withColumn("organism", split(df['organism'], ';'))

In [8]:
flatten2 = udf(lambda l: ";".join(set(sorted([x for i in l for x in i]))), StringType())

In [9]:
df_small = df.groupBy("miRNA_family", "gene_family", "interaction").agg(
    #count("*").alias("total_count"),
    flatten2(collect_set("organism")).alias("organisms"),
    collect_set("evidence_document").alias("evidence_documents"),
    collect_set("sent_evidence").alias("sent_evidences")
).orderBy("miRNA_family", "gene_family")
df_small = df_small.withColumn("evidence_count", size("evidence_documents")).withColumn("sent_count", size("sent_evidences"))
df_small.orderBy("evidence_count", ascending=False).show()


+------------+-----------+----------------+---------+--------------------+--------------------+--------------+----------+
|miRNA_family|gene_family|     interaction|organisms|  evidence_documents|      sent_evidences|evidence_count|sent_count|
+------------+-----------+----------------+---------+--------------------+--------------------+--------------+----------+
|    miR-146a|      TRAF6| {MIR_GENE, NEU}|  hsa;mmu|[PMC4355369, PMC4...|[{PMC10125762.2.4...|           586|      1186|
|    miR-146a|      IRAK1| {MIR_GENE, NEU}|  hsa;mmu|[PMC4355369, PMC4...|[{PMC4641946.3.54...|           522|       968|
|     miR-34a|      SIRT1| {MIR_GENE, NEU}|  hsa;mmu|[PMC10346838, 320...|[{PMC7402144.3.74...|           520|       956|
|     miR-34a|      SIRT1|{MIR_GENE, DOWN}|  hsa;mmu|[PMC6769631, PMC3...|[{PMC10377597.3.1...|           494|       848|
|     miR-155|      SOCS1| {MIR_GENE, NEU}|  hsa;mmu|[PMC3296165, PMC6...|[{PMC10351202.3.6...|           465|       904|
|     miR-34a|       TP5

In [10]:
df_small.select(col("organisms")).distinct().show()

+---------+
|organisms|
+---------+
|  hsa;mmu|
|      mmu|
|      hsa|
+---------+



In [11]:
w = Window.partitionBy(["miRNA_family", "gene_family"])
df_cons = df_small.alias("cons").withColumn('maxEvidence', max('evidence_count').over(w))\
    .withColumn('is_consensus', when(col('evidence_count') == col('maxEvidence'), 1).otherwise(0))\
    .drop("maxEvidence")
df_cons.show()

+------------+-------------+----------------+---------+--------------------+--------------------+--------------+----------+------------+
|miRNA_family|  gene_family|     interaction|organisms|  evidence_documents|      sent_evidences|evidence_count|sent_count|is_consensus|
+------------+-------------+----------------+---------+--------------------+--------------------+--------------+----------+------------+
|      let-7a|1700099I09RIK| {MIR_GENE, NEU}|      mmu|          [34480870]|[{34480870.2.11, ...|             1|         1|           1|
|      let-7a|     AASDHPPT|{MIR_GENE, DOWN}|  hsa;mmu|[PMC4385863, PMC5...|[{PMC4385863.3.16...|             2|         2|           1|
|      let-7a|        AAVS1| {MIR_GENE, NEU}|      hsa|        [PMC9169108]|[{PMC9169108.3.10...|             1|         1|           1|
|      let-7a|        AAVS1| {GENE_MIR, NEU}|      hsa|        [PMC9169108]|[{PMC9169108.3.10...|             1|         1|           1|
|      let-7a|        ABCB1|  {MIR_GENE, 

In [12]:
df_cons.orderBy("evidence_count", ascending=False).show()

+------------+-----------+----------------+---------+--------------------+--------------------+--------------+----------+------------+
|miRNA_family|gene_family|     interaction|organisms|  evidence_documents|      sent_evidences|evidence_count|sent_count|is_consensus|
+------------+-----------+----------------+---------+--------------------+--------------------+--------------+----------+------------+
|    miR-146a|      TRAF6| {MIR_GENE, NEU}|  hsa;mmu|[PMC4355369, PMC4...|[{PMC10125762.2.4...|           586|      1186|           1|
|    miR-146a|      IRAK1| {MIR_GENE, NEU}|  hsa;mmu|[PMC4355369, PMC4...|[{PMC4641946.3.54...|           522|       968|           1|
|     miR-34a|      SIRT1| {MIR_GENE, NEU}|  hsa;mmu|[PMC10346838, 320...|[{PMC7402144.3.74...|           520|       956|           1|
|     miR-34a|      SIRT1|{MIR_GENE, DOWN}|  hsa;mmu|[PMC6769631, PMC3...|[{PMC10377597.3.1...|           494|       848|           0|
|     miR-155|      SOCS1| {MIR_GENE, NEU}|  hsa;mmu|[P

In [13]:
df_cons.write.parquet("/mnt/extproj/projekte/textmining/mx_feb24/mx_mirna_gene_consensus_parquet", mode="overwrite")

In [14]:
df_cons.select(col("organisms")).distinct().show()

+---------+
|organisms|
+---------+
|  hsa;mmu|
|      mmu|
|      hsa|
+---------+

