<img style="float: left" src="images/spark.png" />
<img style="float: right" src="images/surfsara.png" />
<hr style="clear: both" />

# Running Blast with Apache Spark

In our third notebook we will look at some metagenomics data. We will use blast(x) to compare our sequences to (part) of the NCBI non-redundant protein database.

_You can edit the cells below and execute the code by selecting the cell and press Shift-Enter. Code completion is supported by use of the Tab key._

During the exercises you may want to refer to the [PySpark documentation](https://spark.apache.org/docs/1.6.1/api/python/pyspark.html#pyspark.RDD) for more information on possible transformations and actions.

In [None]:
# initialize Spark
from pyspark import SparkContext, SparkConf
if not 'sc' in globals():
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)

## RDD from a FASTA file

We will start by creating an RDD from a FASTA file. We will read this using the `sc.textFile` method we also used in the previous notebooks. Note that this is a smaller sample with around a 100 sequences.

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent=2)

reads = sc.textFile("file:////home/jovyan/work/data/blast/input/sample.fa")

pp.pprint(reads.take(3))

Like before the division in records is incorrect. The metadata and sequence data of a single read are now split over two separate records. The clean solution to this would be to write a custom InputFormat in Java or Scala. For simplicity in this tutorial we will do some text munching in Spark to get the same effect.

## Merge metadata and sequence into a single record

In this case we are going to run NCBI BLAST - a third-party binary appplication - on the sequences and for that we will need to have the data in the exact format that BLAST expects (e.g. fasta metadata and sequence records). In order to create these records we can first add an index to all the records. We will then use a series of [`map`](https://spark.apache.org/docs/1.6.1/api/python/pyspark.html#pyspark.RDD.map) function applications to change the index of the odd-numberd records (subtract those indices by 1) and create tuples where the first element is the index.

In [None]:
indexedReads = reads.zipWithIndex()
pp.pprint(indexedReads.take(2))

In [None]:
# Write appropriate map functions to renumber odd indices and to switch the position of the tuple values.
keyedReads = (indexedReads
              # PLEASE FILL IN YOUR CODE HERE
             )

pp.pprint(keyedReads.take(2))

After executing the previous cell we should have an RDD that contains tuples with index, line-from-fasta pairs. The index of the records can be used to recreate the sequence and metadata combinations. In order to do so you can use the [`groupByKey`](https://spark.apache.org/docs/1.6.1/api/python/pyspark.html#pyspark.RDD.groupByKey) operation. This will, for each key create a list of values. 

In [None]:
# Create an RDD 'metaAndSeq' that is grouped by the index. 
# PLEASE FILL IN YOUR CODE HERE

# Print the results (HINT: you can do a 'map' on the results of the grouping to cast 
# the iterable to a Python list: list(some_iterable)
pp.pprint(metaAndSeq.take(2))

With the grouping done, we can combine the values of the grouping into the metadata and sequence text combination. We will drop the index from the tuple in this step. The result should look like a single record (meta and sequence) from  fasta file.

In [None]:
# Merge the results of the grouping into a single string record.
inputSeqs = metaAndSeq.map(lambda tup: tup[1][0] + "\n" + tup[1][1])
pp.pprint(inputSeqs.take(1)[0])

## Blasting the sequences
With each record being a meta and sequence combination we can call blast to run on each of these records in parallel. The amount of parallel executions are determined here by the amount of RDD partitions. First print the amount of partitions of the inputSeq RDD.

In [None]:
# Get the amount of partitions in the current RDD
print inputSeqs.getNumPartitions()

As there are currently 12 cores assigned to this environment we can repartition the RDD in 12 parts so that 12 blast executions can be run in parallel.

In [None]:
# Repartion the RDD; note that this means that data may be shuffled over the network
inputSeqsRepart = inputSeqs.repartition(12)

Finally we can run blastx. We have created a small shell script on the host that we can call via the RDD [`pipe`](https://spark.apache.org/docs/1.6.1/api/python/pyspark.html#pyspark.RDD.pipe) function. This function pipes the contents of each record to a subprocess running the script or command you specified. Pipe will collect the standard output (each line as a record). 

While handy in this exercise - think of all the requirements needed on a typical cluster configurations and of all the possible things that could go wrong when running a workflow such as this at scale.

In [None]:
# Run blastx for each of the records. Use the pipe function to run the command present in the script variable.
# Name the resulting RDD 'blastedSeqs'
script = "/home/jovyan/work/data/blast/program/ncbi-blast-2.4.0+/runblast"
# PLEASE FILL IN YOUR CODE HERE
pp.pprint(blastedSeqs.take(4))
blastedSeqs.cache()

## Post-processing 1: taxonomic distribution of Blast hits

Spark, Pyspark and Jupyter together provide a great platform for interactive analysis. It is possible to have different views on your data, work at scale and leverage existing Python and Scala libraries. In this next section we will use the [ETE toolkit](http://etetoolkit.org) to enrich our blast data with extra taxonomic information and look at the distribution of organisms inferred from the blast hits at various taxonomic ranks.

Before enriching we will restructure the blast output. First split all the output lines on the tab character and create a blast report with only the read id, blast hit, e-value, tax id and species elements. 

In [None]:
# Split the blast output lines
blastedSeqsVals = blastedSeqs.map(lambda s: s.split("\t"))

# Create a report with only a subset of the data. Some fields contain multiple matches. For demonstration
# purposes we only consider the first of such a value. What would be a better way to deal with this?
annotatedReport = blastedSeqsVals.map(lambda a: [a[0], a[1], a[5], a[8].split(";")[0], a[9].split(";")[0]])
print annotatedReport.first()

With that in place we can start add other data to our annotation. We want to have an idea of the makeup of our sample at different taxonomic ranks. The ETE toolkit has functionality for parsing the NCBI taxonomy data files. We will start by defining a function that from a tax id present in the blast output can infer the taxonomic hierarchy of each tax id (the lineage). We will only extract the kingdom, phylum and class for a given tax id.

In [None]:
from ete3 import NCBITaxa
def ncbiQueryLineage(taxid):
    try:
        ncbi = NCBITaxa()
        lineage = ncbi.get_lineage(taxid)
        names = ncbi.get_taxid_translator(lineage)
        report = [names[taxid] for taxid in lineage]
        return [report[2], report[3], report[4]]
    except ValueError as e: 
        return ["", "", ""]

Lets test our function on two id's. The first time `NCBITaxa` is run the database is populated. This may take some time.

In [None]:
pp.pprint(ncbiQueryLineage(525371))
pp.pprint(ncbiQueryLineage(696844))

Now use a `map` to apply the function to the tax id in our annotatedReport

In [None]:
taxReport = annotatedReport.map(lambda a: a + ncbiQueryLineage(a[3]))
taxReport.cache()
pp.pprint(taxReport.take(5))

## Post-processing 2: Plotting taxonomic distributions

With the enrichment in place we can make a flew plots of the frequencies of distinct species, phyla and kingdoms. In this exercise we will also convert our data to a [`DataFrame`](http://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame). DataFrames are great for working with tabular, typed data and support a custom DSL for data selection as well as the possibility to run SQL queries against the data stored in the DataFrame.

First we will define the structure of our table by defining a [`Row`](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.Row) object to use when converting records in the RDD to Rows in the DataFrame.

In [None]:
from pyspark.sql import Row
taxRecord = Row("readid", "giid", "evalue", "taxid", "species", "kingdom", "phylum", "class")

The values in our RDD are all strings. Cast the e-value to a float before converting to a DataFrame

In [None]:
taxReportRows = taxReport.map(lambda a: taxRecord(a[0], a[1], float(a[2]), a[3], a[4], a[5], a[6], a[7]))

Next we can initialize a [`SQLContext`](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.SQLContext) to convert our RDD to a DataFrame. Do the conversion, print the schema and the first 10 records.

In [None]:
from pyspark.sql import SQLContext

sqlCtx = SQLContext(sc)
taxReportDF = sqlCtx.createDataFrame(taxReportRows)
taxReportDF.printSchema()
taxReportDF.select("readid", "giid", "species", "phylum").show(n=10, truncate=False)

Finally, we can use functions on this DataFrame to group on columns, count values, sort etc. If you have worked with relational databases this will look very familiar.

In [None]:
from pyspark.sql.functions import col

speciesDistribution = taxReportDF.select("readid", "species").groupBy("species").count().sort(col("count").desc())
speciesDistribution.show(truncate=False)

Jupyter integrates nicely with plotting libraries such as [matplotlib](http://matplotlib.org). Create a simple bargraph of the species that where found more than 15 times in the blast report.

In [None]:
speciesPlotVals =  speciesDistribution.filter(col("count") >= 15).rdd.map(lambda r: (r['species'], r['count']))

import matplotlib.pyplot as plt
plt.rcdefaults()
import numpy as np
import matplotlib.pyplot as plt


species = speciesPlotVals.keys().collect()
y_pos = np.arange(len(species))
freq = speciesPlotVals.values().collect()

plt.barh(y_pos, freq, align='center')
plt.yticks(y_pos, species)
plt.xlabel('Frequency')
plt.title('Distribution of species matches in sample')

plt.show()

We can do the same for the phylum and kingdom level in the taxonomic hierarchy. Fill in the relevant sections to create these graphs.

In [None]:
# PLEASE FILL IN YOUR CODE HERE

phylum = phylumPlotVals.keys().collect()
y_pos = np.arange(len(phylum))
freq = phylumPlotVals.values().collect()

plt.barh(y_pos, freq, align='center')
plt.yticks(y_pos, phylum)
plt.xlabel('Frequency')
plt.title('Distribution of phylum matches in sample')

plt.show()

In [None]:
# PLEASE FILL IN YOUR CODE HERE

kingdom = kingDomPlotVals.keys().collect()
y_pos = np.arange(len(kingdom))
freq = kingDomPlotVals.values().collect()

plt.barh(y_pos, freq, align='center')
plt.yticks(y_pos, kingdom)
plt.xlabel('Frequency')
plt.title('Distribution of kingdom matches in sample')

plt.show()