In [53]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Assignment3").getOrCreate()
sc = spark.sparkContext
spark

In [54]:
with open('data/exclude.txt') as file_ecl:
    list_exclude = file_ecl.readlines()    

In [55]:
list_exclude = [gene.replace('\n', '') for gene in list_exclude]
list_exclude

['gene21', 'gene33', 'gene44', 'gene88', 'gene77']

In [56]:
rdd = sc.textFile('data/genes.txt')
print(rdd.count())
print(rdd.take(20))

110
['gene01,3.0', 'gene03,5.0,1.0,2.0,4.0,3.0,1.0', 'gene01,3.0,6.0', 'gene03,1.0,2.0,4.0,4.0,3.0,1.0,0.0', 'gene21,2.4,1.2,2.0', 'gene33,1.0,1.0,2.0,3.0', 'gene09,2.0,2.0,3.0,4.0,4.0,4.0', 'gene44,1.0,1.0,2.0,3.0', 'gene55,3.0', 'gene44,5.0,1.0,2.0,4.0,3.0,1.0', 'gene44,3.0,6.0', 'gene21,3.4,1.2,2.0,5.0,3.5,2.0,1.0', 'gene22,3.0', 'gene79,2.0,4.0,3.0,1.0', 'gene01,3.0,3.0,3.0,4.0,5.0,5.0,5.0', 'gene03,5.0,1.0,2.0,4.0,3.0,1.0', 'gene01,3.0,6.0', 'gene03,1.0,2.0,4.0,4.0,3.0,1.0,0.0', 'gene21,2.4,1.2,2.0', 'gene33,1.0,1.0,2.0,3.0']


In [57]:

def splitAndFilter(record):
    tokens = record.split(',')
    #filter records with number of tokens less than two
    #drop records due to the exclude
    if((len(tokens)>=2) and (tokens[0] not in list_exclude)):
        #convert numbers from string to float
        tokens = [float(tokens[i]) if i!=0 else tokens[i] for i in range(len(tokens))]
        #convert gene_id into uppercase, find count, minimum, maximum in a singlerecord
        #format for returned object is ('GENEXX', (count, min, max))
        return (tokens[0].upper(), (len(tokens[1:]), min(tokens[1:]), max(tokens[1:])))

rdd.map(splitAndFilter).take(10)

[('GENE01', (1, 3.0, 3.0)),
 ('GENE03', (6, 1.0, 5.0)),
 ('GENE01', (2, 3.0, 6.0)),
 ('GENE03', (7, 0.0, 4.0)),
 None,
 None,
 ('GENE09', (6, 2.0, 4.0)),
 None,
 ('GENE55', (1, 3.0, 3.0)),
 None]

In [58]:
def find_count_min_max(a, b):
    #function for reduce by key
    #sum of count, minimum, maximum
    return ((a[0] + b[0]),
    min(a[1], b[1]),
    max(a[2], b[2])
    )

#using splitAndFilter will return None if record does not satisfy the requirements
#so we need to drop Nones
#last map is used to adjust the format of output

res = rdd.map(splitAndFilter).filter(lambda x: x is not None).reduceByKey(find_count_min_max).map(lambda x: (x[0], x[1][0], x[1][1], x[1][2]))
print("length of result rdd is:", res.count())
print("result:\n", res.collect())

length of result rdd is: 9
result:
 [('GENE03', 160, 0.0, 8.0), ('GENE55', 12, 0.0, 4.0), ('GENE22', 3, 3.0, 3.0), ('GENE79', 16, 1.0, 4.0), ('GENE99', 9, 1.0, 3.0), ('GENE01', 42, 3.0, 6.0), ('GENE09', 11, 1.0, 5.0), ('GENE89', 16, 2.0, 4.0), ('GENE02', 1, 3.0, 3.0)]


In [59]:
spark.stop()