In [16]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import explode, split
from pyspark.sql.functions import upper
from pyspark.sql import DataFrame

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/10 13:28:33 INFO SparkEnv: Registering MapOutputTracker
24/01/10 13:28:33 INFO SparkEnv: Registering BlockManagerMaster
24/01/10 13:28:33 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/01/10 13:28:33 INFO SparkEnv: Registering OutputCommitCoordinator


In [7]:
import pandas as pd

# Try different encodings until one works
try:
    initial_approvals = pd.read_csv("../../data/2023/2023_approvals_in_v0.csv", encoding='ISO-8859-1')
except UnicodeDecodeError:
    try:
        initial_approvals = pd.read_csv("../../data/2023/2023_approvals_in_v0.csv", encoding='latin1')
    except UnicodeDecodeError:
        initial_approvals = pd.read_csv("../../data/2023/2023_approvals_in_v0.csv", encoding='cp1252')

initial_approvals = spark.createDataFrame(initial_approvals)
# initial_approvals = spark.read.option("header", "true").csv("../../data/2023/2023_approvals_in_v0.csv").persist()

# initial_approvals.show()

In [11]:
# originalDrugName explode
initial_approvals = initial_approvals.withColumn("originalDrugNameArray", split(initial_approvals.originalDrugName, ","))
exploded_approvals = initial_approvals.withColumn("drugName", explode(initial_approvals.originalDrugNameArray))

exploded_approvals.show()


+-------------+--------------------+--------------------+------+--------------+--------------------+---------------+--------------------+-----------+----------+------------+----------+------------+---------------------+
|brandDrugName|    originalDrugName|            drugName|drugId|       sponsor|          properties|therapeuticArea|          indication|diseaseName|diseaseIds|diseaseClass|reviewType|yearApproval|originalDrugNameArray|
+-------------+--------------------+--------------------+------+--------------+--------------------+---------------+--------------------+-----------+----------+------------+----------+------------+---------------------+
|      Leqembi|           Lecanemab|           Lecanemab|   NaN|  Eisai/Biogen|Amyloid-_-targete...|            NaN|   Alzheimer disease|        NaN|       NaN|         NaN|       NaN|        2023|          [Lecanemab]|
|    Brenzavvy|       Bexagliflozin|       Bexagliflozin|   NaN|   Theracosbio|     SGLT2 inhibitor|            NaN|Glyc

### Finding drugId by drugName

In [12]:
molecule_path = "gs://open-targets-data-releases/23.12/output/etl/json/molecule"
molecule = spark.read.json(molecule_path)
molecule.persist()
molecule.show()

[Stage 6:>                                                          (0 + 1) / 1]

+---------------+--------------------+--------------------+--------------------+--------------------+--------------+----------------+-------------+--------------------+----------+--------------------+--------------------+-------------------------+--------------------+-------------+--------------------+--------------------+-------------------+
+---------------+--------------------+--------------------+--------------------+--------------------+--------------+----------------+-------------+--------------------+----------+--------------------+--------------------+-------------------------+--------------------+-------------+--------------------+--------------------+-------------------+
|          false|Cc1cc(CN2CCN(c3c(...|                null|                null|Small molecule drug.|Small molecule|           false|CHEMBL1086582|UUGWPYPNRZQDFO-UH...|      null|                null|                null|                     null|       CHEMBL1086582|         null|                  []|       

                                                                                

In [14]:
# Convert the content of the column to uppercase
exploded_approvals_up = exploded_approvals.withColumn("drugName", upper(exploded_approvals["drugName"]))

# Show the result
exploded_approvals_up.show()

+-------------+--------------------+--------------------+------+--------------+--------------------+---------------+--------------------+-----------+----------+------------+----------+------------+---------------------+
|brandDrugName|    originalDrugName|            drugName|drugId|       sponsor|          properties|therapeuticArea|          indication|diseaseName|diseaseIds|diseaseClass|reviewType|yearApproval|originalDrugNameArray|
+-------------+--------------------+--------------------+------+--------------+--------------------+---------------+--------------------+-----------+----------+------------+----------+------------+---------------------+
|      Leqembi|           Lecanemab|           LECANEMAB|   NaN|  Eisai/Biogen|Amyloid-_-targete...|            NaN|   Alzheimer disease|        NaN|       NaN|         NaN|       NaN|        2023|          [Lecanemab]|
|    Brenzavvy|       Bexagliflozin|       BEXAGLIFLOZIN|   NaN|   Theracosbio|     SGLT2 inhibitor|            NaN|Glyc

In [17]:
def join_dataframes(initial_df: DataFrame, 
                    second_df: DataFrame, 
                    initial_key_column: str, 
                    second_key_column: str,
                    columns_to_join: list) -> DataFrame:
    """
    Joins two PySpark DataFrames on specified key columns.

    Args:
    initial_df (DataFrame): The initial PySpark DataFrame.
    second_df (DataFrame): The second PySpark DataFrame to join with.
    initial_key_column (str): The key column name in the initial DataFrame.
    second_key_column (str): The key column name in the second DataFrame.
    columns_to_join (list): List of column names from the second DataFrame to include in the join.

    Returns:
    DataFrame: The resulting DataFrame after the join.
    """

    # Selecting specified columns from the second DataFrame, including its key column
    second_df_selected = second_df.select([second_key_column] + columns_to_join)

    # Performing the left join
    joined_df = initial_df.join(second_df_selected, 
                                initial_df[initial_key_column] == second_df_selected[second_key_column], 
                                how='left')

    # Drop the second key column if not needed
    joined_df = joined_df.drop(second_df_selected[second_key_column])

    return joined_df

In [18]:
# List of columns from molecule table
list = ["id"]

name2chembl = join_dataframes(exploded_approvals_up, molecule, "drugName", "name", list).persist()

In [20]:
# Delete the column
name2chembl = name2chembl.drop('drugId').withColumnRenamed('id', 'drugId')

### Indications search

In [21]:
from pyspark.sql import SparkSession
import pandas as pd
import requests

In [22]:
def ols_query(query):
    api_url = f"https://www.ebi.ac.uk/ols/api/search?q={query}&groupField=iri&start=0&ontology=efo"
    response = requests.get(api_url)
    response_json = response.json()
    
    if response_json["response"]["numFound"] > 0:
        docs = response_json["response"]["docs"]
        obo_id = docs[0]["iri"]
        label = docs[0]["label"]
        return [obo_id, label]
    else:
        return [None, None]


In [23]:
# Convert Spark DataFrame to Pandas DataFrame
name2chembl_pandas = name2chembl.toPandas()

In [24]:
# Apply the function to the column
results = name2chembl_pandas['indication'].apply(ols_query)

In [33]:
# Convert the results to a Pandas DataFrame and concatenate with the original DataFrame
results_df = pd.DataFrame(results.tolist(), columns=['obo_id', 'label'])
indications = pd.concat([name2chembl_pandas, results_df], axis=1)\
                .drop(columns=['diseaseName', 'diseaseIds', 'originalDrugNameArray'])\
                .rename(columns={'label': 'diseaseName'})\
                .rename(columns={'obo_id': 'diseaseIds'})\

indications['diseaseIds'] = indications['diseaseIds'].str.replace('http://purl.obolibrary.org/obo/', '').str.replace('http://www.ebi.ac.uk/efo/', '')

indications.to_csv("../../data/2023/2023_approvals_in_v01.csv")

  indications['diseaseIds'] = indications['diseaseIds'].str.replace('http://purl.obolibrary.org/obo/', '').str.replace('http://www.ebi.ac.uk/efo/', '')
