### Benchmark file
- In this file I removed the heatmap generation part since it wasn't meant for efficiency but rather than visuals. Here I am getting the execution time of the whole calculation until the filtered dataframe count. I also removed the persist part because the calculations are not so heavy.
- I will add the persist to differentiate between embedding creation and cosine similarity df creation.

The combinations of configurations I am using which I added in the whole_time.csv file (for the whole runtime).

In [1]:
# BioPython library for collecting the sequences from cif files
from Bio.PDB import PDBList
from Bio.PDB.MMCIFParser import MMCIFParser

In [2]:
# Data manipulation libraries
import os
import io
from collections import Counter
import pandas as pd
import numpy as np
import seaborn as sns
import time

# Pyspark libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.functions import vector_to_array

In [3]:
# Creating the spark session
spark = SparkSession.builder \
    .master("spark://master:7077")\
    .appName("Proteindata spark application")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .config("spark.driver.host", "10.67.22.219") \
    .config("spark.driver.port","6066")\
    .config("spark.executor.memory", "4g")\
    .config("spark.executor.cores","2")\
    .config("spark.sql.optimizer.enableRangeJoin", "true")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/08 22:11:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#.config("spark.memoryOverheadFactor", "0.05")\

In [5]:
# Creating the spark context
sc = spark.sparkContext

In [6]:
#start_time = time.time()
start_time0 = time.time()

In [7]:
# Getting the file paths
base_path = '/data_files/pdb_files'
base_path_edit = '/data_files/pdb_files/{}'
file_names = os.listdir(base_path)
# The size of our original folder is about 2GB consisting of 1712 cif files
# we needed to reduce the data size in order to be able to compute without crashing
# we will examine the reasons of the crashing in the benchmarking
file_list = [base_path_edit.format(i) for i in file_names][-300:]
files_rdd = sc.parallelize(file_list)#numSlices=12

In [8]:
# Counting all of the paths to see if there are any errors
files_rdd.count()

                                                                                

300

In [9]:
#files_rdd.coalesce(12)
files_rdd.getNumPartitions()

2

In [10]:
def parse_file(file):

    cif_parser = MMCIFParser(QUIET=True) # CIF file parser
    length = 0 # Setting the length initially to 0 for error correction
    name = file.split('/')[3].split('.')[0] # Getting the id of the protein
    structure = cif_parser.get_structure("protein", file) # getting structure ? try "protein"

    # Dictionary for residue names
    d3to1 = {'CYS': 'C', 'ASP': 'D', 'SER': 'S', 'GLN': 'Q', 'LYS': 'K',
    'ILE': 'I', 'PRO': 'P', 'THR': 'T', 'PHE': 'F', 'ASN': 'N',
    'GLY': 'G', 'HIS': 'H', 'LEU': 'L', 'ARG': 'R', 'TRP': 'W',
    'ALA': 'A', 'VAL':'V', 'GLU': 'E', 'TYR': 'Y', 'MET': 'M'}

    # Creating the sequence   
    for model in structure:
        for chain in model:
            sequence = [d3to1.get(residue.get_resname(), 'X') for residue in chain.get_residues()]
            length = len(sequence)
    
    return name,sequence,length

In [11]:
# Creating an RDD for the tokens
def tokens_df_creator(file_path):

    data = []
    name, sequence, length = parse_file(file_path)
    row_value = {
        'id':name,
        'length':length,
        'tokens':sequence, # sequence is tokenized like ["A","B","K",...] (each residue is a token)
    }
    data.append(Row(**row_value)) # Creating rows containing id, length and the sequence of each file in the folder
    
    return data

# Turning the RDD into a DF for easier usage
tokens_rdd = files_rdd.flatMap(tokens_df_creator) # FlatMap applied to the RDD
tokens_df = tokens_rdd.toDF()

                                                                                

In [12]:
# Creating the dictionary from the .vec file
# .vec file contains the keys (residues) and their corresponding embedding with size 1024
def load_vectors(fname):
    fin = io.open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')
    data = {}
    for line in fin:
        letter_token = line.rstrip().split()
        # For each residue creating a DenseVector containing the embedding obtained from the .vec file
        data[letter_token[0]] = DenseVector([float(letter) for letter in letter_token[1:]]) 
    return data

vec_dict = load_vectors('/data_files/prot_bert.vec')

In [13]:
# Broadcasting the dictionary
vec_broadcast = sc.broadcast(vec_dict)

In [14]:
# Embedding the sequence
# The embedding is done using a UDF
@F.udf(ArrayType(VectorUDT()))
def embed_sequence(tokens_list):
    return [vec_broadcast.value[token] for token in tokens_list if token in vec_dict]


In [15]:
# The embeddings are added as a new column (UDF applied to the DataFrame)
tokens_df = tokens_df.withColumn("embeddings",embed_sequence(tokens_df.tokens))

In [16]:
# Creating another udf to get the mean of each embedding row
@F.udf(VectorUDT())
def mean_calculator(embedding,length):
    mean_embedding = sum(embedding)/length # getting the mean of the embeddings in axis=0
    return mean_embedding   

# Created mean embedding is added as a new column
tokens_df = tokens_df.withColumn("mean_embed",mean_calculator(tokens_df.embeddings,tokens_df.length))

In [17]:
# Selecting a subset of the tokens_df
mean_embed_df = tokens_df.select("id","mean_embed")

##### For calculating how long each complete task take

In [18]:
end_time0 = time.time()
execution_time0 = end_time0 - start_time0
mean_embed_df = mean_embed_df.persist()

start_time1 = time.time()

In [19]:
# Creating 2 views from the same mean_embed_df
mean_embed_df.withColumnRenamed('id', 'id1').withColumnRenamed('mean_embed', 'embed_1').createOrReplaceTempView("df1")

mean_embed_df.withColumnRenamed('id', 'id2').withColumnRenamed('mean_embed', 'embed_2').createOrReplaceTempView("df2")

# Using sql to join the dfs using a condition which reduces the number of unnecessary calculation
# caused by the joining 2 dfs to create all the possible pairs
joined_df = spark.sql(
"""SELECT *
FROM df1, df2
WHERE df1.id1 < df2.id2""") # using this condition we are creating only the upper triangle of the expected cross joined "matrix"

In [20]:
# Convert DenseVector to an array
joined_df = joined_df.withColumn("embed_1", vector_to_array(F.col("embed_1")))
joined_df = joined_df.withColumn("embed_2", vector_to_array(F.col("embed_2")))

In [21]:
# Compute dot product, norms, and cosine similarity directly
df_cos_sim = (
    joined_df.withColumn("dot_prod", F.expr("aggregate(zip_with(embed_1, embed_2, (x, y) -> x * y), 0D, (acc, v) -> acc + v)"))
      .withColumn("norm_1", F.expr("sqrt(aggregate(embed_1, 0D, (acc, v) -> acc + v * v))"))
      .withColumn("norm_2", F.expr("sqrt(aggregate(embed_2, 0D, (acc, v) -> acc + v * v))"))
      .withColumn("cosine_similarity", F.col("dot_prod") / (F.col("norm_1") * F.col("norm_2")))
      .drop("dot_prod", "norm_1", "norm_2","embed_1","embed_2")
)

In [22]:
# Filtering the data since being really close to 1 is not surprising
filtered_df = df_cos_sim.filter("cosine_similarity < 0.955")
filtered_df.count() # We are expecting a small amount

                                                                                

2

In [23]:
#end_time = time.time()
#execution_time = end_time - start_time
end_time1 = time.time()
execution_time1 = end_time1 - start_time1

In [24]:
execution_time0

4.365358114242554

In [25]:
execution_time1

151.8496322631836

### Stopping the context and the spark app

In [26]:
sc.stop()

In [27]:
spark.stop()