# PySpark
* Familiar interface (map, reduce, take, fold, etc.)
* Uses YARN and HDFS

### Monitoring:
    http://<driver-node>:4040
    
Assembler based on: 
    `https://pmelsted.wordpress.com/2013/11/23/naive-python-implementation-of-a-de-bruijn-graph/`

In [1]:
#from pyspark import SparkContext
import pyspark

In [None]:
# sc.stop()

In [None]:
# conf = pyspark.SparkConf()

In [None]:
# conf.setMaster("yarn-client")

# conf.set('spark.driver.memory','4g')
# conf.set('spark.executor.memory','8g')
#conf.set('spark.executor.cores', '1')
# conf.set('spark.shuffle.service.enabled','true')
# conf.set('spark.dynamicAllocation.enabled','true')
# conf.set('spark.dynamicAllocation.initialExecutors','2')
# conf.set('spark.dynamicAllocation.minExecutors','2')

In [None]:
# sc = pyspark.SparkContext(conf=conf)

In [None]:
sc.defaultParallelism

Data is stored internally in an RDD (Resilient Distributed Dataset)

In [None]:
sc.defaultMinPartitions

In [2]:
compliment = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}

In [None]:
raw_data = sc.textFile('hdfs://172.31.26.32:8020/genome/Ba10k.sim1.fq')
raw_data.getNumPartitions()

## fastq file parser

In [None]:
data = raw_data.filter(lambda x: x[0] in ['A','C','G','T'] if len(x) > 0)

In [None]:
k = 17

## calculate list of kmers
Both forward and the reverse complement.

In [None]:
fwd_list = data.flatMap(lambda x: [x[i:i+k] for i in range(len(x.rstrip())-k+1)])


rev_comp = data.map(lambda x:''.join(reversed([complement.get(base, base) for base in x])))

rev_list = rev_comp.flatMap(lambda x: [x[i:i+k] for i in range(len(x.rstrip())-k+1)])

In [None]:
kmer_list = fwd_list + rev_list

## now we need to coalesce them, like WordCount

In [None]:
emitter = kmer_list.map(lambda x: (x, 1))

In [None]:
emitter.take(5)

In [None]:
kmer_counts = emitter.reduceByKey(lambda x, y: x+y)
# kmer_counts.take(10)

In [None]:
kmer_counts.count()

In [None]:
kmer_counts.keys().take(5)

In [None]:
kmer_counts.values().take(5)

In [None]:
# kmer_dict = kmer_counts.collectAsMap()
# kmer_dict.take(5)

## Next, find the contigs by creating a graph and walking it.
Note: The original source expects d = {'kmer':count}
* Maybe build a dict in all_contigs then go forward?

In [3]:
def twin(km):
    compliment = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
    # return Seq.reverse_complement(km)
    return "".join(compliment.get(base, base) for base in reversed(km))


In [4]:
def fw(km):
    for x in 'ACGT':
        yield km[1:]+x

def bw(km):
    for x in 'ACGT':
        yield x + km[:-1]

In [5]:
def contig_to_string(c):
    return c[0] + ''.join(x[-1] for x in c[1:])

def get_contig(d,km):
    '''
    Find kmer's contig.
    Return: the string, list of kmers in contig
    '''
    c_fw = get_contig_forward(d,km)

    c_bw = get_contig_forward(d,twin(km))

    if km in fw(c_fw[-1]):
        c = c_fw
    else:
        c = [twin(x) for x in c_bw[-1:0:-1]] + c_fw
    return contig_to_string(c),c


def get_contig_forward(d,km):
    c_fw = [km]

    while True:
        if sum(x in d for x in fw(c_fw[-1])) != 1:
            break

        cand = [x for x in fw(c_fw[-1]) if x in d][0]
        if cand == km or cand == twin(km):
            break # break out of cycles or mobius contigs
        if cand == twin(c_fw[-1]):
            break # break out of hairpins

        if sum(x in d for x in bw(cand)) != 1:
            break

        c_fw.append(cand)

    return c_fw

def all_contigs(k_tuples):
    d = dict(k_tuples)
    done = set()
    r = []
    for x in d:
        if x not in done:
            s,c = get_contig(d,x)
            for y in c:
                done.add(y)
                done.add(twin(y))
            r.append(s)
    return r

# Code below is for putting the graph in GFA format
# Could use it as part of a visualizer step: GFA -> .dot (for GraphViz)
#     G = {}
#     heads = {}
#     tails = {}
#     for i,x in enumerate(r):
#         G[i] = ([],[])
#         heads[x[:k]] = (i,'+')
#         tails[twin(x[-k:])] = (i,'-')

#     for i in G:
#         x = r[i]
#         for y in fw(x[-k:]):
#             if y in heads:
#                 G[i][0].append(heads[y])
#             if y in tails:
#                 G[i][0].append(tails[y])
#         for z in fw(twin(x[:k])):
#             if z in heads:
#                 G[i][1].append(heads[z])
#             if z in tails:
#                 G[i][1].append(tails[z])

#     return G,r



In [None]:
%time contigs = kmer_counts.mapPartitions(all_contigs).collect()

In [None]:
contigs[:5]

In [None]:
contigs.count()

In [None]:
contigs.getNumPartitions()

## Now let's try a bigger data set

In [6]:
raw_data = sc.textFile('hdfs://172.31.26.32:8020/genome/sra_data.fastq', 100)
raw_data.getNumPartitions()

100

In [7]:
k = 21

In [8]:
data = raw_data.filter(lambda x: len(x) > 0 and x[0] in ['A','C','G','T'] )

In [9]:
fwd_list = data.flatMap(lambda x: [x[i:i+k] for i in range(len(x.rstrip())-k+1)])


rev_comp = data.map(lambda x:''.join(reversed([compliment.get(base, base) for base in x])))

rev_list = rev_comp.flatMap(lambda x: [x[i:i+k] for i in range(len(x.rstrip())-k+1)])

In [10]:
kmer_list = fwd_list + rev_list
emitter = kmer_list.map(lambda x: (x, 1))
kmer_counts = emitter.reduceByKey(lambda x, y: x+y)


In [11]:

contigs = kmer_counts.mapPartitions(all_contigs)
contigs.saveAsTextFile('hdfs://172.31.26.32:8020/genome/sra_output')

Py4JJavaError: An error occurred while calling o78.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 8, ip-172-31-26-33.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/ubuntu/appcache/application_1477954749815_0007/container_e22_1477954749815_0007_01_000009/pyspark.zip/pyspark/worker.py", line 64, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1213)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1464)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1443)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1443)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1443)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:507)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/ubuntu/appcache/application_1477954749815_0007/container_e22_1477954749815_0007_01_000009/pyspark.zip/pyspark/worker.py", line 64, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


## Now for the really big data set
And let's try to chain it, like the big boys do...

In [None]:
raw_data = sc.textFile('hdfs://172.31.26.32:8020/genome/Ecoli-RR359304-2.fastq')
raw_data.getNumPartitions()