## Benchmark for Reading and Datamining PDB Structures with mmtf-pyspark

In [1]:
from pyspark.sql import SparkSession
from mmtfPyspark.io import mmtfReader
from mmtfPyspark.filters import ContainsGroup
from mmtfPyspark.utils import ColumnarStructure
from mmtfPyspark.interactions import InteractionExtractorPd

import gzip
import pandas as pd
import numpy as np
import os
import time

## Setup the benchmark
Set the path to the MMTF Hadoop Sequence file. Here we retrieve the value of the environment variable MMTF_FULL

In [2]:
path = mmtfReader.get_mmtf_full_path()

Hadoop Sequence file path: MMTF_FULL=/Users/peter/MMTF_Files/full


Specify a list with the number of cores

In [3]:
cores = [4]

In [4]:
# create results directory
results_dir = '../results'
if not os.path.exists(results_dir):
    os.makedirs(results_dir)

## Structure Benchmark
This benchmark read structures

In [13]:
def structure(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Benchmark3").getOrCreate()
    structures = mmtfReader.read_sequence_file(path, first_model=True)                
    count = structures.count()

    spark.stop()
    return count

In [14]:
df_s = pd.DataFrame(columns=('cores', 'structures'))

for num_cores in cores:
    start = time.time()
    count = structure(path, num_cores)
    end = time.time()
    print('structures, cores:', num_cores, 'time:', end-start, 'seconds')
    df_s = df_s.append([{'cores':num_cores, 'structures': end-start, 'count': count}], ignore_index=True, sort=True)

structures, cores: 4 time: 204.89131379127502 seconds


In [15]:
df_s.to_csv(os.path.join(results_dir, 'structures.csv'), index=False)

In [16]:
df_s

Unnamed: 0,cores,count,structures
0,4,140825.0,204.891314


## Structure calc_core_group_data Benchmark


In [5]:
def structure(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Benchmark3").getOrCreate()
    structures = mmtfReader.read_sequence_file(path, first_model=True)
    structures.foreach(lambda s: s[1].calc_core_group_data())
    count = 0

    spark.stop()
    return count

In [6]:
df_s = pd.DataFrame(columns=('cores', 'structures_group_data'))

for num_cores in cores:
    start = time.time()
    count = structure(path, num_cores)
    end = time.time()
    print('structures, cores:', num_cores, 'time:', end-start, 'seconds')
    df_s = df_s.append([{'cores':num_cores, 'structures_group_data': end-start, 'count': count}], ignore_index=True, sort=True)

structures, cores: 4 time: 625.5085597038269 seconds


In [7]:
df_s.to_csv(os.path.join(results_dir, 'structures_group_data.csv'), index=False)

In [8]:
df_s

Unnamed: 0,cores,count,structures_group_data
0,4,0.0,625.50856


## Structure To Pandas Benchmark
This benchmark read structures and converts them to pandas dataframes
default: 891, 819, 855 (no JIT), 1039 (cache=True)
use_categories: 1079
multi_index: 

In [17]:
def structure(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Benchmark3").getOrCreate()
    structures = mmtfReader.read_sequence_file(path, first_model=True)
    dfs = structures.map(lambda s: s[1].to_pandas())
    count = dfs.count()

    spark.stop()
    return count

In [18]:
df_s = pd.DataFrame(columns=('cores', 'structures_to_pandas'))

for num_cores in cores:
    start = time.time()
    count = structure(path, num_cores)
    end = time.time()
    print('structures, cores:', num_cores, 'time:', end-start, 'seconds')
    df_s = df_s.append([{'cores':num_cores, 'structures_to_pandas': end-start, 'count': count}], ignore_index=True, sort=True)

structures, cores: 4 time: 823.9224970340729 seconds


In [19]:
df_s.to_csv(os.path.join(results_dir, 'structures_to_pandas.csv'), index=False)

In [20]:
df_s

Unnamed: 0,cores,count,structures_to_pandas
0,4,140825.0,823.922497


## Structure to Chain Benchmark
This benchmark read structures and flatmaps to polymer chains

In [13]:
def structure_to_chains(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Interactions").getOrCreate()
    structures = mmtfReader.read_sequence_file(path)
    chains = structures.flatMap(lambda s: s[1].get_chains())                 
    count = chains.count()

    spark.stop()
    return count

In [14]:
df_s2c = pd.DataFrame(columns=('cores', 'structure_to_chains'))

for num_cores in cores:
    start = time.time()
    count = structure_to_chains(path, num_cores)
    end = time.time()
    print('structure_to_chains, cores:', num_cores, 'time:', end-start, 'seconds')
    df_s2c = df_s2c.append([{'cores':num_cores, 'structure_to_chains': end-start, 'count': count}], ignore_index=True, sort=True)

structure_to_chains, cores: 4 time: 243.24330711364746 seconds


In [15]:
df_s2c.to_csv(os.path.join(results_dir, 'structure_to_chains.csv'), index=False)

In [16]:
df_s2c

Unnamed: 0,cores,count,structure_to_chains
0,4,447250.0,243.243307


## Structure to Chain to pandas Benchmark
This benchmark read structures and flatmaps to polymer chains and convert to pandas dataframes

In [17]:
def structure_to_chains(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Interactions").getOrCreate()
    structures = mmtfReader.read_sequence_file(path)
    chains = structures.flatMap(lambda s: s[1].get_chains())
    dfc = chains.map(lambda c: c.to_pandas())
    count = dfc.count()

    spark.stop()
    return count

In [18]:
df_s2c = pd.DataFrame(columns=('cores', 'structure_to_chains'))

for num_cores in cores:
    start = time.time()
    count = structure_to_chains(path, num_cores)
    end = time.time()
    print('structure_to_chains, cores:', num_cores, 'time:', end-start, 'seconds')
    df_s2c = df_s2c.append([{'cores':num_cores, 'structure_to_chains': end-start, 'count': count}], ignore_index=True, sort=True)

structure_to_chains, cores: 4 time: 1272.5538392066956 seconds


In [19]:
df_s2c.to_csv(os.path.join(results_dir, 'structure_to_chains_to_pandas.csv'), index=False)

In [20]:
df_s2c

Unnamed: 0,cores,count,structure_to_chains
0,4,447250.0,1272.553839


## Saltbridges Benchmark
This benchmark finds salt bridges in protein structures. Structures with multiple models, e.g., NMR structures are excluded.

In [5]:
def saltbridges(path, num_cores):
    spark = SparkSession.builder.master("local[" + str(num_cores) + "]").appName("Saltbridges").getOrCreate()
    structures = mmtfReader.read_sequence_file(path, fraction=0.05)
    structures = structures.filter(lambda s: s[1].num_models == 1)
                               
    distance_cutoff = 3.5
    query = "polymer and (group_name in ['ASP', 'GLU']) and (atom_name in ['OD1', 'OD2', 'OE1', 'OE2'])"
    target = "polymer and (group_name in ['ARG', 'LYS', 'HIS']) and (atom_name in ['NH1', 'NH2', 'NZ', 'ND1', 'NE2'])"

    interactions = InteractionExtractorPd.get_interactions(structures, distance_cutoff, query, target, bio=None)
    count = interactions.count()

    spark.stop()
    return count

In [6]:
df_saltbridges = pd.DataFrame(columns=('cores', 'saltbridges_pd'))

for num_cores in cores:
    start = time.time()
    count = saltbridges(path, num_cores)
    end = time.time()
    print('saltbridges_pd, cores:', num_cores, 'time:', end-start, 'seconds')
    df_saltbridges = df_saltbridges.append([{'cores':num_cores, 'saltbridges_pd': end-start, 'count': count}], ignore_index=True, sort=True)

Py4JJavaError: An error occurred while calling o60.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/mmtfPyspark/interactions/interaction_extractor_pd.py", line 182, in __call__
    tt = t_chains.get_group(t_chain).reset_index(drop=True)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 877, in get_group
    raise KeyError(name)
KeyError: 'E'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2775)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2774)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2774)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/mmtfPyspark/interactions/interaction_extractor_pd.py", line 182, in __call__
    tt = t_chains.get_group(t_chain).reset_index(drop=True)
  File "/anaconda3/envs/mmtf-pyspark-benchmarks-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 877, in get_group
    raise KeyError(name)
KeyError: 'E'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
df_saltbridges.to_csv(os.path.join(results_dir, 'saltbridges_pd.csv'), index=False)
df_saltbridges
