# Running Blast with Apache Spark

Important sources:
- ChEMBL schema (used for getting protein data and create local data bank): ftp://ftp.ebi.ac.uk/pub/databases/chembl/ChEMBLdb/latest/chembl_22_1_schema_documentation.html
- BioPython: http://biopython.org/DIST/docs/tutorial/Tutorial.html
- FASTA format: https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastDocs&DOC_TYPE=BlastHelp, https://ncbi.github.io/cxx-toolkit/pages/ch_demo#ch_demo.T5

In [1]:
# imports
from pyspark import SparkContext, SparkConf
from chemblhelper import ChEMBLHelper
import pprint
import re
import os
import subprocess
import shutil
import shlex

In [2]:
# initialize Spark
if not 'sc' in globals():
    conf = SparkConf().setMaster('yarn')
    sc = SparkContext(conf=conf)
    
sc.setLogLevel("INFO")

## Creating a local protein data bank

For this, we will take data from ChEMBL.  Of particular interest are the `accession` and `sequence` fields in `component_sequences` table.

Following the [NCBI C++ Toolkit documentation](https://ncbi.github.io/cxx-toolkit/pages/ch_demo#ch_demo.T5), we create a FASTA file in this format:
- each line is no more than 80 chars length
- the line description will be of the format:
 - `>ebl|[accession]|[locus]`
 
Since locus is not available, we will omit this. Once the fasta file is created, then we need to create a local BLAST database by invoking the `makeblastdb` command line utility.  This tool is part of the `ncbi-blast+` package.  To install:

```bash
sudo apt-get install ncbi-blast+
```

A sample command line looks like:

```bash
makeblastdb -in mydatabank.fasta -parse_seqids -dbtype prot
```


In [3]:
# get data from ChEMBL
chemblhelper = ChEMBLHelper()
databankfilename = chemblhelper.createDataBank("http://hadoop1:50070", "/home/hduser/Lab", "/user/hduser/ics5200", 100000)

In [4]:
rawdata = sc.textFile(databankfilename) \
            .map(lambda line: line.split("\t"))            

In [9]:
# clean up before saving file to disk
hdfsFasta = "/home/hduser/Lab/chembl100000.fasta"
shutil.rmtree(hdfsFasta, ignore_errors=True)

In [10]:
# manipulate raw data rdd and create FASTA file
rawdata.map(lambda line: (line[9], line[10])) \
        .distinct() \
        .map(lambda t: str(">ebl|" + t[0] + "|\r" + "\r".join(re.findall(".{1,80}",t[1])))) \
        .coalesce(1) \
        .saveAsTextFile("file://" + hdfsFasta)

In [11]:
# copy spark job file to protein bank folder
localFasta = "/home/hduser/Lab/proteinbank/chemblsample.fasta"
os.rename(os.path.join(hdfsFasta, "part-00000"), localFasta)

In [12]:
# create local blast db
process = subprocess.Popen(shlex.split("makeblastdb -in %s -parse_seqids -dbtype prot" % os.path.basename(localFasta)),                           
                           stdout = subprocess.PIPE,
                           stderr = subprocess.PIPE,  
                           cwd = os.path.dirname(localFasta),
                           shell = False)

out, err = process.communicate()

print out
print err



Building a new DB, current time: 01/12/2017 19:09:44
New DB name:   /home/hduser/Lab/proteinbank/chemblsample.fasta
New DB title:  chemblsample.fasta
Sequence type: Protein
Keep Linkouts: T
Keep MBits: T
Maximum file size: 1000000000B
Adding sequences from FASTA; added 46 sequences in 0.008214 seconds.




## BLAST

Using BioPython library:(http://biopython.org/DIST/docs/tutorial/Tutorial.html)

In [13]:
from Bio.Blast.Applications import NcbiblastxCommandline
from Bio.Blast import NCBIXML

### Run BLAST query

In [14]:
proteinSeqs = rawdata.map(lambda line: (line[9], line[10])).distinct()
proteinSeqs.count()

46

In [33]:
# prepare the blastx command\n
# note that we have to set the cmd = 'blastp' to run the protein counterpart instead of nucleoid

outFile = "/home/hduser/Lab/fastaTest.xml"

(query_acc, query_seq) = proteinSeqs.collect()[33]

print query_acc
print query_seq

P43681
MELGGPGAPRLLPPLLLLLGTGLLRASSHVETRAHAEERLLKKLFSGYNKWSRPVANISDVVLVRFGLSIAQLIDVDEKNQMMTTNVWVKQEWHDYKLRWDPADYENVTSIRIPSELIWRPDIVLYNNADGDFAVTHLTKAHLFHDGRVQWTPPAIYKSSCSIDVTFFPFDQQNCTMKFGSWTYDKAKIDLVNMHSRVDQLDFWESGEWVIVDAVGTYNTRKYECCAEIYPDITYAFVIRRLPLFYTINLIIPCLLISCLTVLVFYLPSECGEKITLCISVLLSLTVFLLLITEIIPSTSLVIPLIGEYLLFTMIFVTLSIVITVFVLNVHHRSPRTHTMPTWVRRVFLDIVPRLLLMKRPSVVKDNCRRLIESMHKMASAPRFWPEPEGEPPATSGTQSLHPPSPSFCVPLDVPAEPGPSCKSPSDQLPPQQPLEAEKASPHPSPGPCRPPHGTQAPGLAKARSLSVQHMSSPGEAVEGGVRCRSRSIQYCVPRDDAAPEADGQAAGALASRNTHSAELPPPDQPSPCKCTCKKEPSSVSPSATVKTRSTKAPPPHLPLSPALTRAVEGVQYIADHLKAEDTDFSVKEDWKYVAMVIDRIFLWMFIIVCLLGTVGLFLPPWLAGMI


In [34]:
blastp_cline = NcbiblastxCommandline(cmd = "blastp",
                                     db = localFasta,
                                     evalue = 0.01,
                                     outfmt = 5,
                                     remote = False,
                                     out = outFile)

(out, err) = blastp_cline(stdin = query_seq)
print out
print err





### Parse BLAST output file

In [35]:
result_handle = open(outFile)

#use NCBIXML.read if there is only one record, otherwise use .parse()
blast_record = NCBIXML.read(result_handle)
#blast_records = NCBIXML.parse(result_handle)

In [36]:
for alignment in blast_record.alignments:
    i = 1
    print("-------------------------------------------")
    for hsp in alignment.hsps:
        print("  **** Alignment (%d) ****" % i)
        i = i + 1
        print("   sequence: %s" % alignment.accession)            
        print("   e value: %s" % hsp.expect) 

-------------------------------------------
  **** Alignment (1) ****
   sequence: P43681
   e value: 0.0
-------------------------------------------
  **** Alignment (1) ****
   sequence: P09483
   e value: 0.0
-------------------------------------------
  **** Alignment (1) ****
   sequence: P12389
   e value: 0.0
  **** Alignment (2) ****
   sequence: P12389
   e value: 4.62337e-30
-------------------------------------------
  **** Alignment (1) ****
   sequence: P32297
   e value: 4.53918e-168
  **** Alignment (2) ****
   sequence: P32297
   e value: 3.97331e-21
-------------------------------------------
  **** Alignment (1) ****
   sequence: P04757
   e value: 5.9835e-168
  **** Alignment (2) ****
   sequence: P04757
   e value: 3.06058e-22
-------------------------------------------
  **** Alignment (1) ****
   sequence: P02708
   e value: 2.79895e-123
  **** Alignment (2) ****
   sequence: P02708
   e value: 1.57876e-13
-------------------------------------------
  **** Alignme

In [74]:
# create matching proteins list.
# each list entry is a tuple in the format:
#  (Accession, Hit, High-Scoring-Pair expect value)
result_handle = open(outFile)
blast_record = NCBIXML.read(result_handle)
matches = []
for alignment in blast_record.alignments:   
    i = 1
    for hsp in alignment.hsps:
        matches.append((alignment.accession, (i, hsp.expect)))
        i = i + 1        

In [75]:
print(matches)

[(u'P43681', (1, 0.0)), (u'P09483', (1, 0.0)), (u'P12389', (1, 0.0)), (u'P12389', (2, 4.62337e-30)), (u'P32297', (1, 4.53918e-168)), (u'P32297', (2, 3.97331e-21)), (u'P04757', (1, 5.9835e-168)), (u'P04757', (2, 3.06058e-22)), (u'P02708', (1, 2.79895e-123)), (u'P02708', (2, 1.57876e-13)), (u'P17787', (1, 2.15841e-122)), (u'P17787', (2, 7.42827e-23)), (u'P12390', (1, 2.21328e-122)), (u'P12390', (2, 3.94746e-26)), (u'P30926', (1, 7.73243e-121)), (u'P30926', (2, 3.7696e-22)), (u'P12392', (1, 3.20585e-119)), (u'P12392', (2, 7.69644e-22)), (u'P11230', (1, 3.39991e-92)), (u'P11230', (2, 7.96386e-14)), (u'P07510', (1, 5.16782e-83)), (u'P07510', (2, 0.00163193)), (u'P18508', (1, 7.66042e-17)), (u'P15431', (1, 8.78793e-17)), (u'P63079', (1, 5.67068e-16)), (u'P30191', (1, 6.91179e-16)), (u'P28471', (1, 7.27628e-16)), (u'P63138', (1, 2.63311e-15)), (u'P47870', (1, 3.28385e-15)), (u'P20236', (1, 4.62198e-15)), (u'P28473', (1, 4.64932e-15)), (u'P23574', (1, 1.75029e-14)), (u'P14867', (1, 7.91482e-14

In [89]:
sm = sc.parallelize(matches) 
sm.collect()

[(u'P43681', (1, 0.0)),
 (u'P09483', (1, 0.0)),
 (u'P12389', (1, 0.0)),
 (u'P12389', (2, 4.62337e-30)),
 (u'P32297', (1, 4.53918e-168)),
 (u'P32297', (2, 3.97331e-21)),
 (u'P04757', (1, 5.9835e-168)),
 (u'P04757', (2, 3.06058e-22)),
 (u'P02708', (1, 2.79895e-123)),
 (u'P02708', (2, 1.57876e-13)),
 (u'P17787', (1, 2.15841e-122)),
 (u'P17787', (2, 7.42827e-23)),
 (u'P12390', (1, 2.21328e-122)),
 (u'P12390', (2, 3.94746e-26)),
 (u'P30926', (1, 7.73243e-121)),
 (u'P30926', (2, 3.7696e-22)),
 (u'P12392', (1, 3.20585e-119)),
 (u'P12392', (2, 7.69644e-22)),
 (u'P11230', (1, 3.39991e-92)),
 (u'P11230', (2, 7.96386e-14)),
 (u'P07510', (1, 5.16782e-83)),
 (u'P07510', (2, 0.00163193)),
 (u'P18508', (1, 7.66042e-17)),
 (u'P15431', (1, 8.78793e-17)),
 (u'P63079', (1, 5.67068e-16)),
 (u'P30191', (1, 6.91179e-16)),
 (u'P28471', (1, 7.27628e-16)),
 (u'P63138', (1, 2.63311e-15)),
 (u'P47870', (1, 3.28385e-15)),
 (u'P20236', (1, 4.62198e-15)),
 (u'P28473', (1, 4.64932e-15)),
 (u'P23574', (1, 1.75029e-14

In [90]:
uniqueAccessions = sm.map(lambda t: t[0]).distinct().collect()
uniqueAccessions
len(uniqueAccessions)

28

In [91]:
accessionsCompId = rawdata.filter(lambda t: t[9] in uniqueAccessions).map(lambda p: (p[9], p[8])).distinct()
accessionsCompId.take(10)

[(u'P11230', u'16'),
 (u'P20236', u'30'),
 (u'O09028', u'1'),
 (u'P43681', u'46'),
 (u'P32297', u'43'),
 (u'P23576', u'32'),
 (u'P18508', u'28'),
 (u'P12389', u'18'),
 (u'P17787', u'26'),
 (u'P02708', u'2')]

In [92]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, desc
from pyspark.sql import SQLContext
simSchema = StructType([StructField("accession", StringType(), False),
                                StructField("hit", IntegerType(), False),
                                StructField("similarity", FloatType(), False),
                                StructField("compId", LongType(), False)])

In [98]:
sm2 = sm.join(accessionsCompId).map(lambda t: (t[0], t[1][0][0], t[1][0][1], long(t[1][1])))
#accessionsCompId.join(sm).collect()

In [99]:
sim = sqlContext.createDataFrame(sm2, simSchema) 

In [100]:
sim.show()

+---------+---+-----------+------+
|accession|hit| similarity|compId|
+---------+---+-----------+------+
|   P28471|  1|7.27628E-16|    38|
|   P18506|  1|1.50255E-10|    27|
|   P30926|  1|        0.0|    42|
|   P30926|  2| 3.7696E-22|    42|
|   P15431|  1|8.78793E-17|    24|
|   P43681|  1|        0.0|    46|
|   P32297|  1|        0.0|    43|
|   P32297|  2|3.97331E-21|    43|
|   P23574|  1|1.75029E-14|    31|
|   P04757|  1|        0.0|     4|
|   P04757|  2|3.06058E-22|     4|
|   P63138|  1|2.63311E-15|    50|
|   P07510|  1|        0.0|     9|
|   P07510|  2| 0.00163193|     9|
|   P20236|  1|4.62198E-15|    30|
|   P12390|  1|        0.0|    19|
|   P12390|  2|3.94746E-26|    19|
|   P63079|  1|5.67068E-16|    49|
|   P28473|  1|4.64932E-15|    39|
|   P19969|  1|9.04537E-14|    29|
+---------+---+-----------+------+
only showing top 20 rows

