# Benchmark for Dask Dataframe
This benchmark demostrates the efficiency of using columnar data formats. Here we run 3 benchmarks on the complete PDB to Uniprot residue-level mapping with a total of 105,594,955 records as of July 28, 2018.

1. Count number of records
2. Run a query
3. Join datasets

In [1]:
# set number of cores and thread per core
cores = 2
threads = 2

In [2]:
import time
start = time.time()

## Setup Dask Client

In [3]:
from dask.distributed import Client
client = Client(n_workers=cores, threads_per_worker=threads, processes=True)
client

0,1
Client  Scheduler: tcp://127.0.0.1:59146  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 4  Memory: 17.18 GB


# 1. Count number of records
Read PDB to UniProt mapping file in the ORC columnar data format.

In [4]:
import dask.dataframe as dd

df = dd.read_orc('../data/pdb2uniprot_residues.orc.lzo').dropna()
df = df.astype({'uniprotNum': 'int32'})
df

Unnamed: 0_level_0,structureChainId,pdbResNum,pdbSeqNum,uniprotId,uniprotNum
npartitions=20,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,object,object,int32,object,int32
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [5]:
%%time
print("total number of records: ", df.shape[0].compute())

total number of records:  96162206
CPU times: user 907 ms, sys: 93.1 ms, total: 1 s
Wall time: 24 s


In [6]:
df.head()

Unnamed: 0,structureChainId,pdbResNum,pdbSeqNum,uniprotId,uniprotNum
0,1A5E.A,1,1,P42771,1
1,1A5E.A,2,2,P42771,2
2,1A5E.A,3,3,P42771,3
3,1A5E.A,4,4,P42771,4
4,1A5E.A,5,5,P42771,5


# 2. Run a query
## Find Mitogen-activated protein kinase 14
Here we run a query for PDB - UniProt mappings for UniProt ID Q16539 (MK14_HUMAN) and retrieve their residue-level mappings for residues that are observed in the PDB structure.

In [7]:
%%time
mk14_human = df.query("uniprotId == 'Q16539'").compute()

print("Number of distinct chains :", mk14_human['structureChainId'].nunique())
print("Number of residue mappings:", mk14_human.shape[0])

Number of distinct chains : 243
Number of residue mappings: 82277
CPU times: user 1.13 s, sys: 120 ms, total: 1.25 s
Wall time: 28.5 s


In [8]:
mk14_human.head()

Unnamed: 0,structureChainId,pdbResNum,pdbSeqNum,uniprotId,uniprotNum
159186,2ZB1.A,4,4,Q16539,4
159187,2ZB1.A,5,5,Q16539,5
159188,2ZB1.A,6,6,Q16539,6
159189,2ZB1.A,7,7,Q16539,7
159190,2ZB1.A,8,8,Q16539,8


# 3. Join operation

In [9]:
# create a random dataset of ~10,000 chains
sample = df.sample(frac=0.0001, random_state=1)['structureChainId'].drop_duplicates().to_frame().compute()

print("Sample size:", sample.shape[0])
sample.head()

Sample size: 9425


Unnamed: 0,structureChainId
4805020,4V88.Dn
1647933,5E17.D
2517021,3E1F.4
4575670,3JAM.L
3043930,5CDI.H


Now we use this sample dataset to run a database inner join for ~10,000 records

In [10]:
%%time
subset = df.merge(sample, on='structureChainId')
print("Number of residue in subset ", subset.shape[0].compute())

Number of residue in subset  3706634
CPU times: user 1.25 s, sys: 123 ms, total: 1.37 s
Wall time: 29.5 s


In [11]:
subset.head()

Unnamed: 0,structureChainId,pdbResNum,pdbSeqNum,uniprotId,uniprotNum
0,1CSO.E,16,1,P00777,115
1,1CSO.E,17,2,P00777,116
2,1CSO.E,18,3,P00777,117
3,1CSO.E,19,4,P00777,118
4,1CSO.E,29,5,P00777,119


In [12]:
end = time.time()
print("Dask dataframe total time", end-start, "sec.")

Dask dataframe total time 118.05193066596985 sec.


In [None]:
%load_ext watermark
%watermark 