# Start Spark Context

Make sure to execute first and execute only once per session

In [2]:
from pyspark import SparkContext
sc = SparkContext(master="local[4]")

# Read in the (small) file

In [2]:
raw = sc.textFile('../data/rxSmallSubset.csv')

# Task 1

Write a program that computes the total "net ingredient cost" of prescription items dispensed for each PERIOD in the data set (total pounds and pence from the NIC field).

As you do this, be aware that this data (like all real data) can be quite noisy and dirty. The first line in the file might describe the schema, and so it doesn’t have any valid data, just a bunch of text. You may or may not find lines that do not have enough entries on them, or where an entry is of the wrong type (for example, the NIC or ACT COST cannot be converted into a decimal number. Basically, you need to write robust code. If you find any error on a line, simply discard the line. Your code should still output the correct result.


For your results, print out each period, in sorted order, followed by the total net ingredient cost for that period.

The following steps are just a guide. Feel free to do it your own way.

#### Define a function that checks if a string is a valid number and preprocess

In [3]:
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from itertools import islice
from pyspark import SparkContext, SparkConf
from pyspark import sql
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import *

def clean_and_filter(df):

    ## Converting to appropriate datatypes
    df = df.withColumn('NIC', df['NIC'].cast('float'))
    df = df.withColumn('BNF_CODE', df['BNF_CODE'].cast('string'))
    df = df.withColumn('ITEMS', df['ITEMS'].cast('integer'))
    df = df.withColumn('QUANTITY', df['QUANTITY'].cast('integer'))
    df = df.withColumn('ACT_COST', df['ACT_COST'].cast('float'))
    df = df.withColumn('SHA', df['SHA'].cast('string'))
    df = df.withColumn('PCT', df['PCT'].cast('string'))
    df = df.withColumn('PRACTICE', df['PRACTICE'].cast('string'))
    df = df.withColumn('BNF_NAME', df['BNF_NAME'].cast('string'))
    ## Dropping null rows
    df = df.na.drop()
    return df

def convert_to_dataframe(data):
    trans = data.map(lambda x: x.encode("ascii", "ignore"))
    tagsheader = trans.first()
    header = sc.parallelize([tagsheader])
    trans_data = trans.subtract(header)
    tuple_data = trans_data.map(lambda x: tuple(str(x).split(",")))
    df = tuple_data.toDF(["SHA","PCT","PRACTICE","BNF_CODE","BNF_NAME","ITEMS","NIC","ACT_COST","QUANTITY","PERIOD"])
    return df


spark = SparkSession(sc)



import pandas as pd

raw_df = convert_to_dataframe(raw)

raw_df = clean_and_filter(raw_df)

#### Print the result in order

In [4]:
raw_df.groupBy("PERIOD").agg(F.sum("NIC")).orderBy('PERIOD').show()

+------+------------------+
|PERIOD|          sum(NIC)|
+------+------------------+
|201607|3563.1400191783905|
|201608| 5234.559994220734|
|201609|2747.1500222682953|
|201610| 5375.820019602776|
|201611|3918.3799645900726|
|201612|4052.8500669002533|
|201701| 3838.030029833317|
|201702| 6235.619965791702|
|201703| 3263.129995942116|
|201704|7862.6200060248375|
|201705|  5922.64001005888|
|201706| 8012.429938673973|
|201707| 6010.360013484955|
|201708| 4226.469961047173|
|201709| 4062.250020980835|
+------+------------------+



# Task 2

Find the 5 practices that issued the prescriptions with the highest total net ingredient cost in the data set.

In [5]:
raw_df.groupBy("PRACTICE").agg(F.sum("NIC")).orderBy(["sum(NIC)"], ascending=[0]).show(5)

+--------+------------------+
|PRACTICE|          sum(NIC)|
+--------+------------------+
|  C81033|  4592.39990234375|
|  P81772| 2573.469970703125|
|  D82064|2070.1499996185303|
|  D82048|  1241.31005859375|
|  J82139| 1027.989990234375|
+--------+------------------+
only showing top 5 rows



# Task 3

Your task is to classify each sequence in the contaminated tardigrade file as being most likely bacteria or tardigrade.

There are many ways to approach this job. Here are some steps at a high level:

a) A function that calculates Edit Distance between two sequences

b) Calculate Edit Distance for each sample against every clean and bacterial contig

c) Find the shortest distance for each sample

d) Classify samples

You are likely to use much more RDD operations than previous tasks. Check documents for some handy functions.

#### Load data files

In [3]:
bacterialRaw = sc.textFile('../data/exp1.oneline.fa.small')
cleanRaw = sc.textFile('../data/nHd.2.3.abv500.oneline.fa.small')
contaminatedRaw = sc.textFile('../data/LMYF01.1.oneline.fa.small')

In [4]:
# myRDDlist = [bacterialRaw,cleanRaw,contaminatedRaw]

def checkSequence(l1, l2):
    x = 0
    check = "ACTG"
   
    for i in range(len(l1)):
        if l1[i] not in check or l2[i] not in check:
            return float('inf')
        
        if l1[i] != l2[i]:
            x += 1
            
    return x

def Func(lines):
    lines = lines.split('|')
    return lines

line = contaminatedRaw.map(lambda s: s.replace(">","").replace("<","|"))
line = line.map(Func)
line.take(100)
sample_list = line.collect()
# print(sample_list)

ba = bacterialRaw.map(lambda s: s.replace(">","").replace("<","|"))
ba = ba.map(Func)
# ba.take(10)
Ba_list = ba.collect()

clean = cleanRaw.map(lambda s: s.replace(">","").replace("<","|"))
clean = clean.map(Func)
clean_list = clean.collect()


# clean.take(10)
clean_dist = []
ba_dist = []

for s in sample_list:
    t = []
    for c in clean_list:
        t.append(checkSequence(s[2], c[2]))
    
    clean_dist.append(t)

# print(clean_dist)

for s in sample_list:
    t = []
    for b in Ba_list:
        t.append(checkSequence(s[2], b[2]))
    
    ba_dist.append(t)
    
# print(ba_dist)

min_clean = [min(c) for c in clean_dist]
# print(min_clean)

min_ba = [min(b) for b in ba_dist]
# print(min_ba)

import collections

res = []
sample_name = [s[1].split()[0] for s in sample_list]
for i in range(len(min_clean)):
    if min_clean[i] < min_ba[i]:
        res.append((sample_name[i], 'Clean'))
    elif min_clean[i] > min_ba[i]:
        res.append((sample_name[i], 'Bacteria'))
    else:
        t1 = collections.Counter(clean_dist[i])
        t2 = collections.Counter(ba_dist[i])
        if t1[min_clean[i]] > t2[min_ba[i]]:
            res.append((sample_name[i], 'Clean'))
        elif t1[min_clean[i]] < t2[min_ba[i]]:
            res.append((sample_name[i],'Bacteria'))
        else:
            res.append((sample_name[i],'Not Sure'))
# print(res)

res_rdd = sc.parallelize(res)
res_rdd.take(100)

[('LMYF01000001.1', 'Bacteria'),
 ('LMYF01000002.1', 'Bacteria'),
 ('LMYF01000003.1', 'Clean'),
 ('LMYF01000004.1', 'Clean'),
 ('LMYF01000005.1', 'Bacteria'),
 ('LMYF01000006.1', 'Clean'),
 ('LMYF01000007.1', 'Not Sure'),
 ('LMYF01000008.1', 'Not Sure'),
 ('LMYF01000009.1', 'Not Sure'),
 ('LMYF01000010.1', 'Bacteria'),
 ('LMYF01000011.1', 'Bacteria'),
 ('LMYF01000012.1', 'Bacteria'),
 ('LMYF01000013.1', 'Clean'),
 ('LMYF01000014.1', 'Not Sure'),
 ('LMYF01000015.1', 'Clean'),
 ('LMYF01000016.1', 'Clean'),
 ('LMYF01000017.1', 'Bacteria'),
 ('LMYF01000018.1', 'Clean'),
 ('LMYF01000019.1', 'Bacteria'),
 ('LMYF01000020.1', 'Clean'),
 ('LMYF01000021.1', 'Clean'),
 ('LMYF01000022.1', 'Clean'),
 ('LMYF01000023.1', 'Bacteria'),
 ('LMYF01000024.1', 'Bacteria'),
 ('LMYF01000025.1', 'Bacteria'),
 ('LMYF01000026.1', 'Bacteria'),
 ('LMYF01000027.1', 'Not Sure'),
 ('LMYF01000028.1', 'Clean'),
 ('LMYF01000029.1', 'Bacteria'),
 ('LMYF01000030.1', 'Clean'),
 ('LMYF01000031.1', 'Bacteria'),
 ('LMYF010000

In [5]:
res_rdd.filter(lambda x : x[1] == 'Bacteria').count()

35