In [2]:
from pyspark.sql import SparkSession
#or use .master("local[1]") instead of .master("spark://192.168.1.153:7077")
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("team8")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        
sc = spark_session.sparkContext

In [2]:
# get data from here - https://portal.hmpdacc.org/search/f?filters=%7B%22content%22:%5B%7B%22op%22:%22in%22,%22content%22:%7B%22field%22:%22files.file_format%22,%22value%22:%5B%22FASTQ%22%5D%7D%7D,%7B%22op%22:%22in%22,%22content%22:%7B%22field%22:%22cases.subject.gender%22,%22value%22:%5B%22female%22%5D%7D%7D,%7B%22op%22:%22in%22,%22content%22:%7B%22field%22:%22cases.sample_body_site%22,%22value%22:%5B%22buccal%20mucosa%22%5D%7D%7D,%7B%22op%22:%22in%22,%22content%22:%7B%22field%22:%22cases.subject_gender%22,%22value%22:%5B%22female%22%5D%7D%7D%5D,%22op%22:%22and%22%7D&facetTab=cases&pagination=%7B%22files%22:%7B%22from%22:1,%22size%22:20,%22sort%22:%22file_name.raw:asc%22%7D%7D
# use wget

# load file to rdd
# microbiome = spark_context.textFile("/home/ubuntu/microbiome/SRS055118.tar.bz2")
microbiome = sc.textFile("hdfs://192.168.1.153:9000/team08/SRS055118.tar.bz2")

In [5]:
# Count number of lines

print(microbiome.count())

46137917


In [3]:
# filter for sequences

microbiome_sequences = microbiome.zipWithIndex().map(lambda x: (x[1]-1,x[0])).filter(lambda x: x[0]%4 == 0)


In [4]:
# break by every nucleotide
microbiome_sequences_nucleo = microbiome_sequences.map(lambda x: x[1])
microbiome_sequences_nucleo = microbiome_sequences_nucleo.flatMap(lambda line: line.strip())
microbiome_sequences_nucleo = microbiome_sequences_nucleo.map(lambda word: (word, 1))
microbiome_sequences_nucleo = microbiome_sequences_nucleo.reduceByKey(lambda a, b: a + b, 1)
microbiome_sequences_nucleo = microbiome_sequences_nucleo.sortByKey(1, 1)
microbiome_sequences_nucleo = microbiome_sequences_nucleo.sortBy(lambda pair:pair[1], False)

# import pysam
# samfile = pysam.AlignmentFile("/home/ubuntu/data/NA06985.final.cram", "rc")
# print('ok')

In [5]:
# count GC%
total_sum = microbiome_sequences_nucleo.map(lambda x : x[1]).sum()
#g_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "G")
#c_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "C")
print(total_sum)

821233960


In [23]:
g_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "G")
g_sum = g_sum.filter(lambda x: x[1])

c_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "C")
c_sum = g_sum.filter(lambda x: x[1])

In [27]:
gc_sum = g_sum.take(1)[0][1] + c_sum.take(1)[0][1]
print(( gc_sum / total_sum)*100 )

64.3421360704567


In [4]:
# Final function

import datetime

def CountGCwSpark(path):
    microbiome = sc.textFile(path)
    start_time = 'Start time:' + datetime.date.strftime(datetime.datetime.now(), format="%H:%M:%S %d/%m/%Y") + '\n'
    print(start_time)
    time_diff_a = datetime.datetime.now()
    # filter for sequences
    microbiome_sequences = microbiome.zipWithIndex().map(lambda x: (x[1]-1,x[0])).filter(lambda x: x[0]%4 == 0)
    # break by nucleotide
    microbiome_sequences_nucleo = microbiome_sequences.map(lambda x: x[1]).flatMap(lambda line: line.strip())
    # map-reduce
    microbiome_sequences_nucleo = microbiome_sequences_nucleo.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b, 1)
    # count GC
    total_sum = microbiome_sequences_nucleo.map(lambda x : x[1]).sum()
    g_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "G")
    g_sum = g_sum.filter(lambda x: x[1])
    c_sum = microbiome_sequences_nucleo.filter(lambda x: x[0] == "C")
    c_sum = c_sum.filter(lambda x: x[1])
    gc_sum = g_sum.take(1)[0][1] + c_sum.take(1)[0][1]
    print(( gc_sum / total_sum)*100 )
    end_time = 'End time:' + datetime.date.strftime(datetime.datetime.now(), format="%H:%M:%S %d/%m/%Y") + '\n'
    print(end_time)
    time_diff_b = datetime.datetime.now()
    total_time = time_diff_b - time_diff_a
    total_time_message = 'It took ' + str(total_time)
    print(total_time_message)
    return

#CountGCwSpark("hdfs://192.168.1.153:9000/team08/SRS055118.tar.bz2")

In [5]:
CountGCwSpark("hdfs://192.168.1.153:9000/team08/joined.tar.bz2")

Start time:15:10:38 04/06/2019

49.79655371408647
End time:15:21:11 04/06/2019

It took 0:10:32.633786


In [5]:
CountGCwSpark("hdfs://192.168.1.153:9000/team08/SRS143728.tar.bz2")

Start time:16:14:28 04/06/2019

2.3067595648953416
End time:16:30:28 04/06/2019

It took 0:15:59.678349
