**Homework 12**

**Note 1**: You should do this in a PySpark notebook, not a Python 3 notebook. Execute the first 3 cells that are provided.

**Note 2**: In the following exercises, keep the amount of data returned by Spark to the local notebook session to the minimum needed for that exercise. In other words, all the work should be done via distributed computing and not by returning a large collection that is then processed in regular Python.

**Note 3**: To minimize waitinng times, do the exercises using the C. elegans genome. Data for the human genome at `/data/human/*fa` but since it takes a long time, running your code against the human genome is optional.

In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
62,,pyspark,idle,,,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


In [2]:
%%spark

In [3]:
# Change path when debugging is compete to work on human genome

# fasta_path = '/data/human/*fa'
fasta_path = '/data/c_elegans/*fa'

**Exercise 1 (50 points)**

Write a program using `spark` to find 5 most common k-mers (shifting windows of length k) in the human genome. Ignore case when processing k-mers. You can work one line at a time - we will ignore k-mers that wrap around lines. You should write a function that takes a path to FASTA files and a value for k, and returns an key-value RDD of k-mer counts. Remember to strip comment lines that begin with '>' from the anlaysis.  

Use k = 20.

**Note**: The textFile method takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Please set this paramter to 60 - it will speed up processing.

**Check**: Use the C. elegans genome at `/data/c_elegans/*fa`. You should get 

```
[
(u'ATATATATATATATATATAT', 2168), 
(u'TATATATATATATATATATA', 2142), 
(u'CTCTCTCTCTCTCTCTCTCT', 1337), 
(u'TCTCTCTCTCTCTCTCTCTC', 1327), 
(u'AGAGAGAGAGAGAGAGAGAG', 1007)
]
```

In [4]:
c_elegans = sc.textFile('/data/c_elegans/*fa',60)

In [5]:
from collections import Counter

In [6]:
def count(line,k):
    num=len(line)
    line=line.upper()
    counter=[]
    if line[0]==">":
        return counter
    else:
        for j in range(0,num-k+1):
            window=line[j:j+k]
            counter.append(window)
        return counter

def k_mers(path,k):
    dictionary={}
    file_name=sc.textFile(path,60)
    words = file_name.flatMap(lambda line: count(line,k))
    words1 = words.map(lambda x: (x, 1))
    counts = words1.reduceByKey(lambda a, b: a + b)
    return(counts,words)  

In [7]:
k_mers_20=k_mers('/data/c_elegans/*fa',20)

In [8]:
k_mers_20[0].takeOrdered(10, key=lambda x: -x[1])

[(u'ATATATATATATATATATAT', 2217), (u'TATATATATATATATATATA', 2184), (u'CTCTCTCTCTCTCTCTCTCT', 1373), (u'TCTCTCTCTCTCTCTCTCTC', 1361), (u'AGAGAGAGAGAGAGAGAGAG', 1033), (u'GAGAGAGAGAGAGAGAGAGA', 1017), (u'TGCCGATTTGCCGGAAATTT', 772), (u'TGTGTGTGTGTGTGTGTGTG', 771), (u'TTGCCGATTTGCCGGAAATT', 766), (u'GTGTGTGTGTGTGTGTGTGT', 748)]

**Exercise 2 (10 points)**

Find all k-mers that are palindromes (i.e the sequence is the same when read back-to-front). How many are there?

In [9]:
palindromes=k_mers_20[1].map(lambda x: x==x[::-1])

In [10]:
pals1 = palindromes.map(lambda x: (x, 1))
count_pals = pals1.reduceByKey(lambda a, b: a + b)
count_pals.takeOrdered(2, key=lambda x: -x[1])

[(False, 68516766), (True, 2816)]

**Exercise 3 (10 points)** 

As a simple QC measure, we can assume that the k-mers that have a count of only 1 are due to sequencing errors. Put all the k-mers with a count of 2 or more in a Spark DataFrame with two columns (sequence, count). Count how many rows in the DataFrame have counts between 5 and 10 (inclusive of both 5 and 10).

In [11]:
from pyspark import SparkContext, SparkConf

In [12]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [13]:
spark_df = sqlc.createDataFrame(k_mers_20[0])

In [14]:
spark_df.show(n=3)

+--------------------+---+
|                  _1| _2|
+--------------------+---+
|TTTTGAACACTATCAAAAAA|  1|
|TTAAATATACAAAAACATTG|  1|
|GGTAGGCAGGCATGAGGTAG|  1|
+--------------------+---+
only showing top 3 rows

In [15]:
spark_df.registerTempTable('k_mers')
q = sqlc.sql('select _1 as sequence,_2 as count from k_mers where _2 > 1')
q.show(n=3)

+--------------------+-----+
|            sequence|count|
+--------------------+-----+
|CTGAAAAAATCAGCTTTTAT|    5|
|AAATGCGTTTTTTGAACTTA|    3|
|TACATCTTCTTTTGTAAGAC|    2|
+--------------------+-----+
only showing top 3 rows

In [16]:
q2 = sqlc.sql('select COUNT(_1) as Number_3_to_5 FROM k_mers WHERE _2>4 AND _2 < 6')
q2.show()

+-------------+
|Number_3_to_5|
+-------------+
|        62224|
+-------------+

**Exercsie 4 (30 points)**

Make a Markov transition matrix for any nucleotide ('A', 'C', 'T', 'G') to any other nucleotide. The (i,j) entry should indicate the probability of finding the jth nucleotide appearing immediaely after the ith nucleotide in the genome. For example, the entry (0, 2) shows the probability of finding a T immediately followng an A. The matrix should have shape (4,4).

In [17]:
k_mers_2=k_mers('/data/c_elegans/*fa',2)
markov_list=k_mers_2[0].takeOrdered(17, key=lambda x: -x[1])

In [18]:
import numpy as np
markov_list

[(u'TT', 13345793), (u'AA', 13345542), (u'AT', 8726699), (u'TA', 6254722), (u'GA', 6127569), (u'TC', 6124442), (u'TG', 6101784), (u'CA', 6101366), (u'CT', 4994901), (u'AG', 4990349), (u'AC', 4765404), (u'GT', 4760088), (u'CC', 3309602), (u'GG', 3288847), (u'GC', 3284945), (u'CG', 3079340)]

In [19]:
A_total=markov_list[1][1]+markov_list[2][1]+markov_list[9][1]+markov_list[10][1]
C_total=markov_list[7][1]+markov_list[8][1]+markov_list[12][1]+markov_list[15][1]
T_total=markov_list[0][1]+markov_list[3][1]+markov_list[6][1]+markov_list[5][1]
G_total=markov_list[4][1]+markov_list[11][1]+markov_list[13][1]+markov_list[14][1]

In [20]:
Markov_Transition=np.zeros([4,4])

In [21]:
Markov_Transition[0,0]=float(markov_list[1][1])/float(A_total)
Markov_Transition[0,1]=float(markov_list[10][1])/float(A_total)
Markov_Transition[0,2]=float(markov_list[2][1])/float(A_total)
Markov_Transition[0,3]=float(markov_list[9][1])/float(A_total)

Markov_Transition[1,0]=float(markov_list[7][1])/float(C_total)
Markov_Transition[1,1]=float(markov_list[12][1])/float(C_total)
Markov_Transition[1,2]=float(markov_list[8][1])/float(C_total)
Markov_Transition[1,3]=float(markov_list[15][1])/float(C_total)

Markov_Transition[2,0]=float(markov_list[3][1])/float(T_total)
Markov_Transition[2,1]=float(markov_list[5][1])/float(T_total)
Markov_Transition[2,2]=float(markov_list[0][1])/float(T_total)
Markov_Transition[2,3]=float(markov_list[6][1])/float(T_total)

Markov_Transition[3,0]=float(markov_list[4][1])/float(G_total)
Markov_Transition[3,1]=float(markov_list[14][1])/float(G_total)
Markov_Transition[3,2]=float(markov_list[11][1])/float(G_total)
Markov_Transition[3,3]=float(markov_list[13][1])/float(G_total)


In [22]:
Markov_Transition

array([[ 0.41930201,  0.14972367,  0.27418313,  0.15679119],
       [ 0.34894441,  0.18928009,  0.28566436,  0.17611113],
       [ 0.19652411,  0.1924307 ,  0.41932641,  0.19171878],
       [ 0.35091985,  0.18812557,  0.27260556,  0.18834903]])

In [23]:
np.sum(Markov_Transition,axis=1)

array([ 1.,  1.,  1.,  1.])