In [1]:
import json
from json import JSONDecodeError

import requests
from functools import reduce
import pandas as pd
from pyspark.sql.functions import (
    col, udf, struct, lit, split, expr, collect_set, struct, 
    regexp_replace, min as pyspark_min, explode, when,
    array_contains, count, first, element_at, size, sum as pyspark_sum, array
)
from pyspark.sql.types import (
    FloatType, ArrayType, StructType, StructField, BooleanType, StringType, IntegerType
)
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from collections import defaultdict
from pyspark.context import SparkContext
from plip.basic import config

config.DNARECEPTOR = True

# establish spark connection
spark = (
    SparkSession.builder
    .master('local[*]')
    .getOrCreate()
)

# Dataset witht all the details, produced earlier:
input_dataset = (
    spark.read.csv("output_files/structure_for_plip_small_set.csv", sep=",", header=True)
    .groupBy("pdbStructureId")
    .agg(collect_set(col("pdbCompoundId")).alias("pdbCompoundId"))
    .toPandas()
)

print(input_dataset.head())
print(len(input_dataset))

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/27 23:05:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/27 23:05:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 1:>                                                          (0 + 1) / 1]

  pdbStructureId pdbCompoundId
0           1a31         [5IU]
1           1a4l         [DCF]
2           1a6y         [5IU]
3           1apz         [ASP]
4           1az1         [ALR]
496


                                                                                

In [2]:
from plip.structure.preparation import PDBComplex, PLInteraction
from plip.exchange.report import BindingSiteReport
from plip.basic import config


class GetPDB:
    
    PDB_URL = 'https://www.ebi.ac.uk/pdbe/entry-files/download/pdb{}.ent'
    
    def __init__(self, data_folder: str) -> None:
        self.data_folder = data_folder
        
    
    def get_pdb(self, pdb_structure_id: str) -> str:
        """Reading file from a given loaction fetch and save if not found"""
        try:
            # Readind data from the given location:
            with open(f'{self.data_folder}/pdb{pdb_structure_id}.ent', 'rt') as f:
                data = f.read()
    
        except FileNotFoundError:
            # Fetch data from the web
            data = self.fetch_pdb(pdb_structure_id)
            
            # Save file
            with open(f'{self.data_folder}/pdb{pdb_structure_id}.ent', 'wt') as f:
                f.write(data)
    
        return data
    

    def fetch_pdb(self, pdb_structure_id: str)-> str:
        """This function fetches the pdb file from ePDB server as a string

        Args:
            pdb_structure_id (str)
        Returns:
            structure data in pdb format as string eg 'AIN:A:1202'
        """
        data = ""
    
        headers={'Content-Type': 'text/plain'}
    
        if not pdb_structure_id:
            return ''

        try:
            response = requests.get(self.PDB_URL.format(pdb_structure_id), headers=headers)
            if response.headers['Content-Type'] != 'text/plain; charset=UTF-8':
                pass
            else:
                data = response.text
    
        except:
            data = ''

        return data


def run_plip(row):
    """This function fetches the pdb file from ePDB server as a string

    Args:
        valid_types (str)
        plip_wanted_columns
        combination_dict
    Returns:
        A PySpark dataframe
    """
    (structure, drugs) = row

    try:
        pdb = gpdb.get_pdb(structure)
    except:
        return 'failed_to_fetch.'
    
    if pdb == '':
        return 'returned empty'
    

    protlig = PDBComplex()
    
    try:
        protlig.load_pdb(pdb, as_string=True)  # load the pdb file
        return 'parsed alright'
    except:
        return 'parsing failed'


def parse_interaction(interaction: PLInteraction, compound_id:str, pdb_id:str) -> dict:

    interaction_type = interaction.__doc__.split('(')[0]
    
    if interaction_type == 'waterbridge':
        return {}

    # Parsing data form the interaction:
    return {
        'pdb_structure_id': pdb_id,
        'compound_id': compound_id,
        'interaction_type': interaction_type,
        'prot_residue_number': interaction.resnr,
        'prot_residue_type': interaction.restype,
        'prot_chain_id': interaction.reschain
    }

def characerize_complex(row):
    # Get pdb data:
    pdb_id = row['pdbStructureId']
    compounds = row['pdbCompoundId']
    
    pdb_data = gpdb.get_pdb(pdb_id)
    if pdb_data:

        # Load into plip:
        mol_complex = PDBComplex()
        
        try:
            mol_complex.load_pdb(pdb_data, as_string=True)

        except:
            pass
        
        if mol_complex.ligands:
            
            # Filtering out only the relevant ligands:
            ligands_of_interest = [ligand for ligand in mol_complex.ligands if ligand.hetid in compounds]
                
            # Characterizing relevant complex:
            [mol_complex.characterize_complex(ligand) for ligand in ligands_of_interest]

            # Extract details from ligands:
            return [parse_interaction(interaction, compound.split(':')[0], pdb_id) for compound, interaction_set in mol_complex.interaction_sets.items() for interaction in interaction_set.all_itypes]

        else:
            return []
    
    else:
        return []


In [3]:
input_dataset

Unnamed: 0,pdbStructureId,pdbCompoundId
0,1a31,[5IU]
1,1a4l,[DCF]
2,1a6y,[5IU]
3,1apz,[ASP]
4,1az1,[ALR]
...,...,...
491,7oca,[CLR]
492,7p9t,[DCM]
493,7pw7,[ATP]
494,7voe,[9SC]


In [8]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>


In [4]:
from pandarallel import pandarallel
pandarallel.initialize()

gpdb = GetPDB(data_folder='output_files/pdb_structure_files')

input_dataset['new_col'] = input_dataset.parallel_apply(
        characerize_complex, axis=1
    )

INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


22/03/28 01:31:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 842532 ms exceeds timeout 120000 ms
22/03/28 01:31:24 WARN SparkContext: Killing executors is not supported by current scheduler.


In [4]:
# launch a local Dask cluster using all available cores
from dask.distributed import Client
client = Client()
client.dashboard_link

'http://127.0.0.1:8787/status'

In [5]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:62379,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:62441,Total threads: 2
Dashboard: http://127.0.0.1:62447/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:62386,
Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-zhr2mm6c,Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-zhr2mm6c

0,1
Comm: tcp://127.0.0.1:62442,Total threads: 2
Dashboard: http://127.0.0.1:62446/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:62385,
Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-rzn9vizo,Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-rzn9vizo

0,1
Comm: tcp://127.0.0.1:62443,Total threads: 2
Dashboard: http://127.0.0.1:62445/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:62384,
Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-koqpueei,Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-koqpueei

0,1
Comm: tcp://127.0.0.1:62440,Total threads: 2
Dashboard: http://127.0.0.1:62444/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:62387,
Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-3qtrpw_s,Local directory: /Users/marinegirardey/Documents/OpenTargetInternship/OT_internship_repo_clone/dask-worker-space/worker-3qtrpw_s


In [6]:
import dask.dataframe as dd
gpdb = GetPDB(data_folder='output_files/pdb_structure_files')

ddf = dd.from_pandas(input_dataset, npartitions=30)

input_dataset = (
    ddf
    .assign(
        new_col = ddf.map_partitions(
            lambda df: df.apply(lambda row: characerize_complex(row), axis=1), meta=(None, 'f8')
        )
#        .map_partitions(lambda df: df.apply(run_plip, axis=1), meta=(None, 'f8'))
    )
       .compute(scheduler='processes')
)

22/03/27 10:24:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 828659 ms exceeds timeout 120000 ms
22/03/27 10:24:07 WARN SparkContext: Killing executors is not supported by current scheduler.


In [24]:
print(type(input_dataset))

<class 'pandas.core.frame.DataFrame'>


In [19]:
from itertools import chain

plip_output_pd_df = pd.DataFrame(list(chain.from_iterable(
    input_dataset
    .loc[lambda df: df.new_col.apply(lambda x: len(x) >0)]
    .assign(new_col = lambda df: df.new_col.apply(lambda l: [value for value in l if value != {}]))
    .new_col
    .to_list()
)))

In [20]:
plip_output_pd_df

Unnamed: 0,pdb_structure_id,compound_id,interaction_type,prot_residue_number,prot_residue_type,prot_chain_id
0,1a4l,DCF,hbond,184,GLY,A
1,1a4l,DCF,hbond,103,SER,A
2,1a4l,DCF,pistack,17,HIS,A
3,1a4l,DCF,hydroph_interaction,65,PHE,A
4,1a4l,DCF,metal_complex,15,HIS,A
...,...,...,...,...,...,...
7180,7voe,9SC,hydroph_interaction,151,TRP,A
7181,7voe,9SC,hydroph_interaction,243,PHE,A
7182,7voe,9SC,hydroph_interaction,332,PHE,A
7183,7voj,ACY,hbond,49,ASP,A


distributed.utils - ERROR - 'start'
Traceback (most recent call last):
  File "/Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/site-packages/distributed/utils.py", line 695, in log_errors
    yield
  File "/Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/site-packages/distributed/dashboard/components/shared.py", line 285, in update
    ts = metadata["keys"][self.key]
KeyError: 'start'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x29dc4a3a0>>, <Task finished name='Task-142830' coro=<_needs_document_lock.<locals>._needs_document_lock_wrapper() done, defined at /Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/site-packages/bokeh/server/session.py:77> exception=KeyError('start')>)
Traceback (most recent call last):
  File "/Users/marinegirardey/miniforge3/envs/plip_env/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callba

In [27]:
plip_output_pd_df.to_csv("output_files/interaction_structure_drug_plip_output.csv", index=False, header=True)