# Configuration

In [1]:
# Expectes a *.fasta file
input_file = "hdfs:///files/salmonella/assembledASM694v2"
input_segment_size = 80

intermediate_window_size = 1024
shotgun_sequence_length = 75

partitions = 128

# Outputs a sequence file with (position, sequence)
output_file = "hdfs:///files/salmonella/window"

## TODO:

1. [ ] Check that groupByKey is indeed corretly ordered (just sort it, as atomic values have index)
2. [ ] Change grouping method to use -offset, so to make it actually shift the offset right, instead of left
3. [ ] Validate indices are correctly preserved by resizing method, even with offset

In [2]:
import findspark
findspark.init()
findspark.find()

'/usr/local/spark/python/pyspark'

In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .master("yarn")
         .appName("python-testing")
         .config("spark.executor.instances", 16)
         .config("spark.executor.memory", "1536m")
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-04-24 17:00:36,149 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-24 17:00:45,682 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [4]:
sc = spark.sparkContext

In [5]:
sc

In [6]:
raw_data = sc.textFile(input_file).filter(lambda x: not x.startswith('>'))
raw_data.take(1)

                                                                                

['AGAGATTACGTCTGGTTGCAAGAGATCATGACAGGGGGAATTGGTTGAAAATAAATATATCGCCAGCAGCACATGAACAA']

In [7]:
data = raw_data#.sample(False, 0.05, 1)
#data.take(1)

In [8]:
def preprocess(raw, segmentLength):
    indexed = raw.zipWithIndex().map(lambda x: (x[1], x[0]))
    atomized = indexed.flatMap(lambda x: [(x[0] * segmentLength + i, v) for i, v in enumerate(x[1])])
    return atomized

In [9]:
def resize(atomized, newSize, offset = 0):
    keyed = atomized.map(lambda x: (((x[0] - offset) // newSize) * newSize + offset, (x[0], x[1])))
    grouped = keyed.groupByKey()
    ordered = grouped.mapValues(lambda x: sorted(x, key=lambda k: k[0]))
    stringified = ordered.mapValues(lambda x: (x[0][0], "".join([v[1] for v in x])))
    return stringified

def slidingWindow(stringified, window_size):
    def window(x):
        s = x[1][1]
        windows = []
        for i in range(len(s) - window_size):
            windows.append((x[0] + i, s[i:i+window_size]))
            
        return windows

    return stringified.flatMap(window)

In [10]:
pre = preprocess(data, input_segment_size).repartition(partitions)
pre.take(5)

                                                                                

[(340, 'G'), (341, 'A'), (342, 'G'), (343, 'T'), (344, 'G')]

In [11]:
window_offset_0 = resize(pre, intermediate_window_size, 0)
window_0 = slidingWindow(window_offset_0, shotgun_sequence_length)

window_offset_1 = resize(pre, intermediate_window_size, shotgun_sequence_length)
window_1 = slidingWindow(window_offset_1, shotgun_sequence_length)

In [12]:
unioned = window_0.union(window_1).repartition(partitions)

In [13]:
windowed = unioned.distinct()
windowed.count()

                                                                                

4951308

In [15]:
windowed.take(2)

[(3532223,
  'GCCACGCTATCGACGGTACCTTTTAATACCCGGTTGCTGCCAAGCGGCGTGATTTCGGCACGATATCCCGGACGC'),
 (3729573,
  'AGCTCTTTGGTCTCTTTCGGGTTAAGGCCAGCCGCCGGTTCGTCGAGCATCAGAATTTCTGGCTGCGTCACCATG')]

In [14]:
windowed.saveAsSequenceFile(output_file)