# Scenario 3.1 Resnet Search

In [1]:
#  import spark related modules
import pyspark
import socket
from pyspark.sql import SparkSession, SQLContext

 
# prevent error on rerunning if session is still alive
if 'sc' in globals():
    sc.stop()
    
# most configuration of the sparksession is done for you in the back
application_name = 'datastory 3 - resnet'

sparkSession = SparkSession.builder.appName(application_name)\
                           .config('spark.executor.memory', '4g')\
                           .config('spark.executor.instances', '5')\
                           .config('spark.sql.shuffle.partitions', '50')\
                           .config('spark.driver.memory', '4g')\
                           .config('spark.executor.cores', '3')\
                           .config('spark.driver.host', socket.gethostbyname(socket.gethostname()))\
                           .getOrCreate()

sc = sparkSession.sparkContext
sqlCtx = SQLContext(sc)

sc

In [2]:
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import upper
from pyspark.sql.functions import lower
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from pyspark.sql.functions import split
from pyspark.sql.functions import countDistinct

In [3]:
from collections import defaultdict


## 3.1

In [4]:
hdfs_path = "hdfs://demo-full-load-hdfs-namenode:8020"
data_path = hdfs_path + '/resnet/' 

In [5]:
def cell_type_df():
    file = data_path+'elsevier-csv-resnet-celltype_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["CellType", "hasName", "hasProperty", "isMemberOf"])


def protein_df():
    file = data_path+'elsevier-csv-resnet-protein_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["Protein", "hasName", "hasProperty", "isMemberOf"])

def pathway_df():
    file = data_path+'elsevier-csv-resnet-pathway_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["Pathway", "hasName", "hasProperty", "isMemberOf"])

def group_df():
    file = data_path+'elsevier-csv-resnet-group_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["Group", "hasName", "hasProperty", "isMemberOf"])

def small_mol_df():
    file = data_path+'elsevier-csv-resnet-smallmol_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["SmallMol", "hasName", "hasProperty", "isMemberOf"])


def disease_df():
    file = data_path+'elsevier-csv-resnet-disease_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["Disease", "hasName", "hasProperty", "isMemberOf"])

def entity_property_df():
    file = data_path+'elsevier-csv-resnet-entityproperty_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["EntityProperty", "hasType", "hasPrimitiveValue"])


def property_types_df():
    file = data_path+'elsevier-csv-resnet-propertytype_deduplicated/*.csv'
    return sparkSession \
            .read \
            .option("header","true") \
            .csv(file) \
            .select(["PropertyType", "core_prefLabel"])

def description_df():
    types = property_types_df()
    
    description_uri = types.filter(types.core_prefLabel == "Description").toPandas().loc[0]['PropertyType']
    
    entity_properties = entity_property_df()
    return entity_properties \
            .filter(entity_properties.hasType == description_uri)\
            .where(entity_properties.hasPrimitiveValue.isNotNull())\
            .select(['EntityProperty', 'hasPrimitiveValue'])


def alias_df():
    types = property_types_df()
    
    alias_uri = types.filter(types.core_prefLabel == "Alias").toPandas().loc[0]['PropertyType']
    
    entity_properties = entity_property_df()
    return entity_properties \
            .filter(entity_properties.hasType == alias_uri)\
            .where(entity_properties.hasPrimitiveValue.isNotNull())\
            .select(['EntityProperty', 'hasPrimitiveValue'])




In [6]:
def attach_properties(spark_df):
    spark_df = spark_df.withColumn("hasProperty", explode(split(col("hasProperty"), "\\|\\|")).alias("hasProperty"))
    alias_items = alias_df()
    spark_df = spark_df.join(alias_items, alias_items.EntityProperty == spark_df.hasProperty, how='left')
    spark_df = spark_df.withColumnRenamed("hasPrimitiveValue", "hasAlias")
    spark_df = spark_df.drop('EntityProperty')

    description_items = description_df()
    spark_df = spark_df.join(description_items, description_items.EntityProperty == spark_df.hasProperty, how='left')
    spark_df = spark_df.withColumnRenamed("hasPrimitiveValue", "hasDescription")
    spark_df = spark_df.drop(*["EntityProperty", "hasProperty"])
    
    spark_df = spark_df.drop_duplicates()
    
    return spark_df


In [7]:
def get_pathways_by_uris(uris: list):
    pathways = attach_properties(pathway_df())
    
    uris_broadcast = sc.broadcast(uris)
    return pathways.filter(pathways.Pathway.isin(uris_broadcast.value))\
                .select(["Pathway", "hasName", "isMemberOf", "hasDescription", "hasAlias"])

def get_groups_by_uris(uris: list):
    groups = attach_properties(group_df())

    uris_broadcast = sc.broadcast(uris)
    return groups.filter(groups.Group.isin(uris_broadcast.value))\
                .select(["Group", "hasName", "isMemberOf", "hasDescription", "hasAlias"])



def resolve_memberships(memberships: str):
    if memberships is None:
        return None
    membership_items = memberships.split('||')
    
    memberships = defaultdict(list)
    
    for item in membership_items:
        membership_type = item.split('/')[-2]
        
        memberships[membership_type].append(item)
        
    return memberships


def explode_memberships(memberships: str, to_explode: list = ['pathway']):
    resolved_memberships = resolve_memberships(memberships)
    
    for key, item in resolved_memberships.items():
        if key == 'pathway':
            pathways_k = get_pathways_by_uris(item)
            
        elif key == 'group':
            groups_k = get_groups_by_uris(item)
            


In [8]:
import logging

pyspark_log = logging.getLogger('pyspark').setLevel(logging.ERROR)
py4j_logger = logging.getLogger("py4j").setLevel(logging.ERROR)


loggers = {}

def get_logger(logger_name):
        # create logger for prd_ci
        if loggers.get(logger_name):
            return loggers.get(logger_name)
        else:
            log = logging.getLogger(logger_name)
            log.setLevel(level=logging.INFO)

            # create formatter and add it to the handlers
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')


            # reate console handler for logger.
            ch = logging.StreamHandler()
            ch.setLevel(level=logging.INFO)
            ch.setFormatter(formatter)

            log.addHandler(ch)
            loggers[logger_name] = log
            return log
        


In [9]:

import pandas as pd


class ResNetSearch():
    def __init__(self, intensity: str = 'advanced'):
        """
        intensity can be 'advanced' or 'simple'
        """
        self.intensity = intensity
        
        self._validate_input()
        
        self.logger = get_logger("resnet.search")

        self.logger.info("ResNet search ready.")
        
        self.result_df_columns = ['Name', 'Entity', 'isMemberOf', 'Description', 'Alias']
        self.result_df = pd.DataFrame({}, columns=self.result_df_columns)
        
    def _validate_input(self):
        allowed_intensity = ["simple", "advanced"]
        
        if self.intensity not in allowed_intensity:
            raise ValueError("Unsupported search intentity, allowed: {}".format(allowed_intensity))

    def update_output(self):

        print("Found {} results...".format(len(self.result_df.index)), end='\r')
    
        
    def _update_results(self, new_results):
        if not new_results.empty:
            self.result_df = self.result_df.append(new_results[self.result_df_columns], ignore_index=True)
            self.update_output()
        
    def _format_results(self, df, entity_label: str):
        selection = df.rename(columns={
            "hasName": "Name", 
            "hasDescription": "Description",
            "hasAlias": "Alias"})
        selection["Entity"] = entity_label
        
        return selection
    
    def _reset_data(self):
        self.result_df = pd.DataFrame({}, columns=self.result_df_columns)
        
    def apply_simple_search(self, spark_df, entity_label, term):
        filtered_df = spark_df.where(lower(spark_df.hasName).contains(term))\
            .select([entity_label, 'isMemberOf', 'hasName', 'hasDescription', 'hasAlias'])
        
        return filtered_df
    
    def apply_advanced_search(self, spark_df, entity_label, term):
        filtered_df = spark_df.filter(
                lower(spark_df.hasName).contains(term) | \
                lower(spark_df.hasAlias).contains(term) | \
                lower(spark_df.hasDescription).contains(term)) \
            .select([entity_label, 'isMemberOf', 'hasName', 'hasDescription', 'hasAlias'])
        
        return filtered_df
        
    def apply_search_term(self, spark_df, entity_label, term):
        search_results = None
        if self.intensity == 'simple':
            search_results = self.apply_simple_search(spark_df, entity_label, term)
        elif self.intensity == 'advanced':
            search_results = self.apply_advanced_search(spark_df, entity_label, term)
        
        search_results_df = search_results.toPandas()
        formatted_results = self._format_results(df=search_results_df, entity_label=entity_label)
        self._update_results(formatted_results)
        
        return search_results_df
        
        
    def search_proteins(self, term):
        self.logger.info("Searching for proteins...")
        df = attach_properties(protein_df())

        proteins = self.apply_search_term(df, 'Protein', term)
        return proteins
    
    def search_small_mol(self, term):
        self.logger.info("Searching for small molecules...")
        df = attach_properties(small_mol_df())

        mols = self.apply_search_term(df, 'SmallMol', term)
        return mols
    
    def search_cells(self, term):
        self.logger.info("Searching for cells...")
        df = attach_properties(cell_type_df())

        cells = self.apply_search_term(df, 'CellType', term)
        return cells
    
    def search_diseases(self, term):
        self.logger.info("Searching for diseases...")
        df = attach_properties(disease_df())

        diseases = self.apply_search_term(df, 'Disease', term)
        return diseases
    
    def search_pathways(self, term):
        self.logger.info("Searching for pathways by text...")
        df = attach_properties(pathway_df())

        pathways = self.apply_search_term(df, 'Pathway', term)
        return pathways
        
    def get_groups_for_uris(self, group_uris: list):
        self.logger.info("Searching for groups...")

        groups_df = get_groups_by_uris(group_uris).toPandas()
        formatted_groups = self._format_results(df=groups_df, entity_label='Group')
        self._update_results(formatted_groups)
        
        return groups_df

    def get_pathways_for_uris(self, pathway_uris: list):
        self.logger.info("Searching for pathways...")

        pathways_df = get_pathways_by_uris(pathway_uris).toPandas()
        formatted_pathways = self._format_results(df=pathways_df, entity_label='Pathway')
        self._update_results(formatted_pathways)
        
        return pathways_df
    
    def explode_memberships(self, entities: pd.DataFrame):
        self.logger.info("Searching for memberships...")

        membership_series = entities['isMemberOf'].apply(lambda x: resolve_memberships(x))
        
        all_pathways = []
        all_groups = []
        
        for membership_dict in membership_series:
            if membership_dict is not None:
                all_pathways = all_pathways + membership_dict['pathway']
                all_pathways = all_pathways + membership_dict['group']
            
        all_pathways = list(set(all_pathways))
        all_groups = list(set(all_groups))
        
        return all_pathways, all_groups
            
        
    def get_descriptions(self):
        """Unused..."""
        self.logger.info("Fetching descriptions...")
        
        sdf = sqlCtx.createDataFrame(self.result_df.copy())
        
        sdf = sdf.withColumn("hasProperty", explode(split(col("hasProperty"), "\\|\\|")).alias("hasProperty"))
        
        all_descriptions = description_df()

        sdf = sdf.join(all_descriptions, sdf.hasProperty == all_descriptions.EntityProperty, how='left')
        
        df = sdf.toPandas()
        
        self._reset_data()

        self.result_df = df
        
        self.result_df = self.result_df.rename(columns={"hasPrimitiveValue": "Description"})
        
        
    def search(self, term: str, limit=10) -> pd.DataFrame:
        self._reset_data()
        
        term = term.lower()
        
        # Protein
        proteins_df = self.search_proteins(term)
        
        smallmol_df = self.search_small_mol(term)
        
        cells_df = self.search_cells(term)
        
        diseases_df = self.search_diseases(term)
        
        pathways_text_df = self.search_pathways(term)
        
        entities_df = proteins_df.append(smallmol_df.append(cells_df.append(diseases_df)))
        
        all_pathways, all_groups = self.explode_memberships(entities_df)

        pathways = self.get_pathways_for_uris(all_pathways)
        groups = self.get_groups_for_uris(all_groups)
        
        self.logger.info("Formatting and deduplicating...")
        
        deduped = self.result_df[['Name', 'Description', 'Entity', 'Alias']]\
                    .sort_values(by=['Entity','Name', 'Description']) \
                    .drop_duplicates(subset=['Name', 'Entity'], keep='first')\
                    .reset_index(drop=True)
        
        self.logger.info("Showing {} results...".format(len(deduped)))
        return deduped
    
    
    
    
    
    
    

In [10]:
search_application = ResNetSearch(intensity="advanced")

2020-09-09 12:34:26,763 - resnet.search - INFO - ResNet search ready.


In [11]:
search_results = search_application.search("hela")
search_results

2020-09-09 12:34:26,771 - resnet.search - INFO - Searching for proteins...
2020-09-09 12:34:51,245 - resnet.search - INFO - Searching for small molecules...


Found 16 results...

2020-09-09 12:35:05,003 - resnet.search - INFO - Searching for cells...


Found 121 results...

2020-09-09 12:35:12,997 - resnet.search - INFO - Searching for diseases...


Found 148 results...

2020-09-09 12:35:21,194 - resnet.search - INFO - Searching for pathways by text...


Found 154 results...

2020-09-09 12:35:28,005 - resnet.search - INFO - Searching for memberships...
2020-09-09 12:35:28,007 - resnet.search - INFO - Searching for pathways...


Found 158 results...

2020-09-09 12:35:34,285 - resnet.search - INFO - Searching for groups...


Found 176 results...

2020-09-09 12:35:36,583 - resnet.search - INFO - Formatting and deduplicating...
2020-09-09 12:35:36,588 - resnet.search - INFO - Showing 122 results...


Unnamed: 0,Name,Description,Entity,Alias
0,H1HeLa,uterine cervical cancer cell line,CellType,
1,HeLa,uterine cervical cancer cell line,CellType,
2,HeLa 229,uterine cervical cancer cell line,CellType,
3,HeLa P4,uterine cervical cancer cell line,CellType,
4,HeLa S,uterine cervical cancer cell line,CellType,
...,...,...,...,...
117,vat yellow 4,,SmallMol,C.I. Vat Yellow 4||CCRIS 176||Tinon Golden Yel...
118,zinc,,SmallMol,zincum||Zinc (dust or fume)||Granular zinc||me...
119,zinc acetylacetonate,,SmallMol,Bis(pentanedionato)zinc||Zinc 2 4-pentanediona...
120,zinc chelator calcium EDTA,,SmallMol,


In [12]:
pd.set_option('display.max_rows', 500)

display(search_results)

pd.reset_option('display.max_rows')

Unnamed: 0,Name,Description,Entity,Alias
0,H1HeLa,uterine cervical cancer cell line,CellType,
1,HeLa,uterine cervical cancer cell line,CellType,
2,HeLa 229,uterine cervical cancer cell line,CellType,
3,HeLa P4,uterine cervical cancer cell line,CellType,
4,HeLa S,uterine cervical cancer cell line,CellType,
5,HeLa S3,uterine cervical cancer cell line,CellType,
6,HeLa-APL,uterine cervical cancer cell line,CellType,
7,HeLa-luc,luciferase-transfected cell line luciferase-t...,CellType,
8,HeLaRC32,uterine cervical cancer cell line,CellType,
9,erythropoietic protoporphyria,,Disease,protoporphyrinuria||deficiency of ferrochelata...
