In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql import Row
from pyspark.sql.functions import col, max, expr
from pyspark.sql.functions import when

In [3]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/15 14:14:27 INFO SparkEnv: Registering MapOutputTracker
23/12/15 14:14:28 INFO SparkEnv: Registering BlockManagerMaster
23/12/15 14:14:28 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/15 14:14:28 INFO SparkEnv: Registering OutputCommitCoordinator


# Data parsing

In [4]:
def join_dataframes(initial_df: DataFrame, 
                    second_df: DataFrame, 
                    initial_key_column: str, 
                    second_key_column: str,
                    columns_to_join: list) -> DataFrame:
    """
    Joins two PySpark DataFrames on specified key columns.

    Args:
    initial_df (DataFrame): The initial PySpark DataFrame.
    second_df (DataFrame): The second PySpark DataFrame to join with.
    initial_key_column (str): The key column name in the initial DataFrame.
    second_key_column (str): The key column name in the second DataFrame.
    columns_to_join (list): List of column names from the second DataFrame to include in the join.

    Returns:
    DataFrame: The resulting DataFrame after the join.
    """

    # Selecting specified columns from the second DataFrame, including its key column
    second_df_selected = second_df.select([second_key_column] + columns_to_join)

    # Performing the left join
    joined_df = initial_df.join(second_df_selected, 
                                initial_df[initial_key_column] == second_df_selected[second_key_column], 
                                how='left')

    # Drop the second key column if not needed
    joined_df = joined_df.drop(second_df_selected[second_key_column])

    return joined_df


In [5]:
def count_unique_values(df: DataFrame, column_name: str) -> int:
    """
    Count unique values in a specific column of a PySpark DataFrame.

    Args:
    df (DataFrame): The PySpark DataFrame.
    column_name (str): The name of the column to analyze.

    Returns:
    int: The number of unique values in the column.
    """
    # Get distinct values in the column and count them
    unique_count = df.select(column_name).distinct().count()

    return unique_count

# Example usage
# unique_count = count_unique_values(your_dataframe, 'your_column_name')
# print(f"Number of unique values: {unique_count}")


In [6]:
def show_unique_values_and_counts(df: DataFrame, column_name: str):
    """
    Shows unique values and their counts for a specified column in a Spark DataFrame.

    Parameters:
    df (DataFrame): The Spark DataFrame to analyze.
    column_name (str): The name of the column for which to count unique values.
    """
    if column_name not in df.columns:
        raise ValueError(f"Column {column_name} not found in DataFrame")

    unique_values_counts = df.groupBy(column_name).count()
    unique_values_counts.show()


In [7]:
# Take list of unique drugs (obtained from target with evidence in Platform or chemProbes)
import pandas as pd

unique_drugs_pd_df = pd.read_csv("../data/drug_to_target_unique_drugs.csv")
drug_list = spark.createDataFrame(unique_drugs_pd_df)
# drug_list = spark.read.csv(drug_list_dir, header=True, inferSchema=True)
drug_list.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+
|       drugId|
+-------------+
|CHEMBL1200632|
|   CHEMBL1231|
|CHEMBL1233511|
|   CHEMBL1637|
|CHEMBL1743017|
| CHEMBL185885|
|CHEMBL1949708|
|CHEMBL2105675|
|CHEMBL2107826|
|CHEMBL2109673|
|CHEMBL2346976|
| CHEMBL279115|
|CHEMBL3181832|
|CHEMBL3545096|
|CHEMBL3545103|
|CHEMBL3545145|
|CHEMBL3545312|
| CHEMBL363648|
|CHEMBL3707249|
|CHEMBL3989766|
+-------------+
only showing top 20 rows



                                                                                

In [8]:
in_drugs = count_unique_values(drug_list, "drugId")
print("Number of initial drugs: ", in_drugs)



Number of initial drugs:  12835


                                                                                

### For each drug find a max phase of clinical trial

In [9]:
molecule_path = "gs://open-targets-data-releases/23.12/output/etl/json/molecule"
molecule = spark.read.json(molecule_path)
molecule.persist()

# List of columns from molecule table
list_molecule = ["drugType", 
                "maximumClinicalTrialPhase", 
                "isApproved", 
                "linkedTargets", 
                "linkedDiseases"]
                 
# Join list of drugs and max_phase from molecule table
drug_list_phase = join_dataframes(drug_list, molecule, "drugId", "id", list_molecule).persist()

# Show number of drugs by approval status
print("Number of drugs by approval status: ")
show_unique_values_and_counts(drug_list_phase, "isApproved")

                                                                                

Number of drugs by approval status: 




+----------+-----+
|isApproved|count|
+----------+-----+
|      null| 4085|
|      true| 3340|
|     false| 5410|
+----------+-----+



                                                                                

### For each unique drug find bioactivity data from chembl_33_activity

In [10]:
activity_path = "gs://open-targets-pre-data-releases/chembl-columns/chembl-inputs/chembl_33_activity.jsonl"
activity = spark.read.json(activity_path)
activity.persist()

# List of columns from activity table
list_activity = ["assay_chembl_id",
                "assay_type",
                "action_type",
                "pchembl_value",
                "standard_type",
                "standard_units",
                "standard_value",
                "standard_relation",
                "target_organism",
                "target_pref_name",
                "target_chembl_id",
                "data_validity_comment",
                "data_validity_description"]
                # "standard_flag",
                # "ligand_efficiency",
                # "assay_variant_mutation"
                # "assay_variant_accession"

# Join list of drugs and chembl_33_activity
drug_to_activity = join_dataframes(drug_list_phase, activity, "drugId", "molecule_chembl_id", list_activity)\
                                    .filter(col("assay_chembl_id").isNotNull()).persist()

# Calculate for how many drugs and targets we have bioactivities
drug_list_count = count_unique_values(drug_list, 'drugId')
drug_to_activity_count = count_unique_values(drug_to_activity, 'drugId')
drug_to_activity_count_targets = count_unique_values(drug_to_activity, 'target_chembl_id')

print("Number of unique drugs from targets dataset: ", drug_list_count)
print("Number of unique drugs with any bioactivities: ", drug_to_activity_count)
print("Number of unique targets with any drug bioactivities: ", drug_to_activity_count_targets)

                                                                                

Number of unique drugs from targets dataset:  12835
Number of unique drugs with any bioactivities:  6215
Number of unique targets with any drug bioactivities:  4848


### For each bioactivity assay find parameters from chembl_33_assay

In [11]:
assay_path = "gs://open-targets-pre-data-releases/chembl-columns/chembl-inputs/chembl_33_assay.jsonl"
assay = spark.read.json(assay_path)
assay.persist()

# List of columns from assay table
list_assay = ["confidence_score",
            "confidence_description",
            "assay_category"]

drug_to_assay = join_dataframes(drug_to_activity, assay, "assay_chembl_id", "assay_chembl_id", list_assay).persist()

                                                                                

## Assay filters

In [12]:
# assay_type
assay_type_filter = drug_to_assay.filter(
    (col("assay_type") != "P") &
    (col("assay_type") != "U"))
assay_type_d = count_unique_values(assay_type_filter, "drugId")
assay_type_t = count_unique_values(assay_type_filter, "target_chembl_id")

print("Unique drugs with bioactivities for non P and U assays:", assay_type_d)
print("Unique targets with bioactivities for non P and U assays:", assay_type_t)

# confidence_score
confidence_score_filter = assay_type_filter.filter(col("confidence_score").isin([9, 7]))
confidence_score_d = count_unique_values(confidence_score_filter, "drugId")
confidence_score_t = count_unique_values(confidence_score_filter, "target_chembl_id")

print("Unique drugs with bioactivities for single/homolog proteins:", confidence_score_d)
print("Unique targets with bioactivities for single/homolog proteins:", confidence_score_t)

# target_organism
target_organism_filter = confidence_score_filter.filter(col("target_organism") == "Homo sapiens")
target_organism_d = count_unique_values(target_organism_filter, "drugId")
target_organism_t = count_unique_values(target_organism_filter, "target_chembl_id")
drug_target_organism_t = target_organism_filter.count()

print("Unique drug-target pairs with bioactivities for human targets:", drug_target_organism_t)

print("Unique drugs with bioactivities for human targets:", target_organism_d)
print("Unique targets with bioactivities for human targets:", target_organism_t)

                                                                                

Unique drugs with bioactivities for non P and U assays: 6174
Unique targets with bioactivities for non P and U assays: 4848


                                                                                

Unique drugs with bioactivities for single/homolog proteins: 3663
Unique targets with bioactivities for single/homolog proteins: 2136


                                                                                

Unique drug-target pairs with bioactivities for human targets: 18860
Unique drugs with bioactivities for human targets: 3373
Unique targets with bioactivities for human targets: 1369


### Protein classification by uniprot from SwissProt

In [13]:
# Map target_chembl_id to uniprots via accession

target_path = "gs://open-targets-pre-data-releases/chembl-columns/chembl-inputs/chembl_33_target.jsonl"
target = spark.read.json(target_path)
target.persist()

list_target = ["target_components",
            "target_type"]

target_to_uniprot = join_dataframes(target_organism_filter, target, "target_chembl_id", "target_chembl_id", list_target).persist()

# Explode target_components
# Define a UDF to extract the 'accession' field
def extract_accession(rows):
    # Assuming you want to extract the 'accession' from the first Row object in the list
    return rows[0].accession if rows else None

# Register UDF
extract_accession_udf = udf(extract_accession, StringType())

# Apply UDF to create a new column with the 'accession' values
target_to_uniprot_extr = target_to_uniprot.withColumn("accession", extract_accession_udf(target_to_uniprot["target_components"]))

                                                                                

In [14]:
# # What does protein_classifications mean?

# target_component_path = "gs://open-targets-pre-data-releases/chembl-columns/chembl-inputs/chembl_33_target_component.jsonl"
# target_component = spark.read.json(target_component_path)
# target_component.persist()
# target_component.show()

In [15]:
proteinclass_path = pd.read_csv("../data/uniprot2family.csv")
proteinclass_str = proteinclass_path.astype(str).drop_duplicates()
proteinclass = spark.createDataFrame(proteinclass_str)

proteinclass_list = ["proteinClass"]

uniprot_to_class = join_dataframes(target_to_uniprot_extr, proteinclass, "accession", "accession", proteinclass_list).persist()

23/12/15 14:15:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [16]:
show_unique_values_and_counts(uniprot_to_class, 'proteinClass')



+--------------+-----+
|  proteinClass|count|
+--------------+-----+
|        Enzyme| 5476|
|          GPCR| 1789|
|        Kinase| 7317|
|            TF|  209|
|          None| 1664|
|            IC|  359|
|            NR|  660|
|    Epigenetic|  769|
|   Transporter|  583|
|TF; Epigenetic|   29|
|          null|    5|
+--------------+-----+



                                                                                

In [18]:
uniprot_to_class.count()

18860

In [19]:
uniprot_to_class.write.parquet("gs://ot-team/polina/uniprot_to_class")

                                                                                