In [1]:
from Bio import SeqIO
from Bio import Align
import re
import statistics
aligner = Align.PairwiseAligner(mode='global', match_score=2, mismatch_score=-1)

In [2]:
from collections import deque

def hamming_distance(s1, s2):
    return sum(ch1 != ch2 for ch1, ch2 in zip(s1, s2))

def approx_match(reference, string, max_mismatches=5):
    # if abs(len(string) - len(reference)) > max_mismatches:
    #     if hamming_distance(reference, string) <= max_mismatches:
    #         return True
    if hamming_distance(reference, string) <= max_mismatches:
        return True
    return False

def count_mismatches(s1, s2):
    alignment = aligner.align(s1, s2)
    score = alignment.score
    n_mismatches = sum(1 for a, b in zip(alignment[0][0], alignment[0][1]) if a != b)
    return n_mismatches

def print_stats(records):
    sizes = [len(rec) for rec in records]
    print("Mean read length:", statistics.mean(sizes))
    print("Median:", statistics.median(sizes))
    print("Mode:", statistics.mode(sizes))
    print("Max:", max(sizes))
    print("Min:", min(sizes))

In [6]:
reference_string = "reference"
all_strings = ["reference", "refernce", "inreferenceto", "referece", "apple", "banana", "ref"]

In [7]:
for string in all_strings:
    alignment = aligner.align(reference_string, string)
    score = alignment.score
    mismatch_count = sum(1 for a, b in zip(alignment[0][0], alignment[0][1]) if a != b)
    insertions, deletions, mismatches = 0, 0, 0
    for char1, char2 in zip(alignment[0][0], alignment[0][1]):
        if char1 == "-" and char2 != "-":
            insertions += 1
        elif char1 != "-" and char2 == "-":
            deletions += 1
        elif char1 != char2:
            mismatches += 1
    n = count_mismatches(reference_string, string)
    print(f'{string}:')
    print(alignment[0])
    print(f'\tHamming distance: {hamming_distance(reference_string, string)}')
    print(f'\tAlignment score: {score}')
    print(f'\tInsertions: {insertions}\tDeletions: {deletions}\tMismatches: {mismatch_count}')
    print(f'Mismatch function: {n}')
    print()

reference:
target            0 reference 9
                  0 ||||||||| 9
query             0 reference 9

	Hamming distance: 0
	Alignment score: 18.0
	Insertions: 0	Deletions: 0	Mismatches: 0
Mismatch function: 0

refernce:
target            0 reference 9
                  0 |||||-||| 9
query             0 refer-nce 8

	Hamming distance: 3
	Alignment score: 16.0
	Insertions: 0	Deletions: 1	Mismatches: 1
Mismatch function: 1

inreferenceto:
target            0 --reference--  9
                  0 --|||||||||-- 13
query             0 inreferenceto 13

	Hamming distance: 7
	Alignment score: 18.0
	Insertions: 4	Deletions: 0	Mismatches: 4
Mismatch function: 4

referece:
target            0 reference 9
                  0 ||||||-|| 9
query             0 refere-ce 8

	Hamming distance: 2
	Alignment score: 16.0
	Insertions: 0	Deletions: 1	Mismatches: 1
Mismatch function: 1

apple:
target            0 r----eference  9
                  0 -----|------- 13
query             0 -apple-------  5



In [8]:
f1 = 'TATTGCGATAGCTGAGAGAGAAGACGCGAGGG'
f2 = 'GCGAAAACAAAAAACAAAAATAAGAATCCAAGCAGCAGCAACA'

In [9]:
test_file = '../oligo/data/processed/oligo_1_wash_merged/oligo_1_bg.merged.fasta'
records = list(SeqIO.parse(test_file, "fasta"))

In [10]:
# parse input
print("Total reads in:", len(records))
print_stats(records)

Total reads in: 4740316
Mean read length: 88.94119316096227
Median: 89.0
Mode: 89
Max: 100
Min: 31


In [12]:
# prune
matches = [rec for rec in records if rec.seq.startswith(f1) or rec.seq.endswith(f2)]
# parse output
print("Total reads out:", len(matches))
print_stats(matches)
print("% of reads retained:", (len(matches)/len(records))*100)

Total reads out: 4218415
Mean read length: 88.96693450028032
Median: 89
Mode: 89
Max: 100
Min: 53
% of reads retained: 88.99016436878891


In [13]:
# prune
front_len = len(f1)
end_len = len(f2)
apprx_matches = [rec for rec in records if approx_match(f1, rec.seq[0:front_len]) and approx_match(f2, rec.seq[-end_len:])]
# parse output
print("Total reads out:", len(apprx_matches))
print_stats(apprx_matches)
print("% of reads retained:", round((len(apprx_matches)/len(records))*100, 2))

Total reads out: 2451
Mean read length: 89.97103223174214
Median: 90
Mode: 90
Max: 91
Min: 68
% of reads retained: 0.05


In [None]:
start_matches = [rec for rec in records if rec.seq.startswith(f1)]
print(f'# reads with exact 5\' matches: {len(start_matches)}')
for i in range(front_len):
    apprx_start_matches = [rec for rec in records if approx_match(f1, rec.seq[0:front_len], i)]
    print(f'# reads with {i} 5\' mismatches: {len(apprx_start_matches)}')

In [None]:
end_matches = [rec for rec in records if rec.seq.endswith(f2)]
print(f'# reads with exact 3\' matches: {len(end_matches)}')
for i in range(end_len):
    apprx_end_matches = [rec for rec in records if approx_match(f2, rec.seq[-end_len:], i)]
    print(f'# reads with {i} 3\' mismatches: {len(apprx_end_matches)}')

In [None]:
score = aligner.score(reference_string, string)
end_matches = [rec for rec in records if rec.seq.endswith(f2)]
print(f'# reads with exact 3\' matches: {len(end_matches)}')
for i in range(end_len):
    apprx_end_matches = [rec for rec in records if count_mismatch(f2, rec.seq[-end_len:]) <= i]
    print(f'# reads with {i} 3\' mismatches: {len(apprx_end_matches)}')

In [16]:
apprx_start_matches = [rec for rec in records if count_mismatches(f1, rec.seq[0:len(f1)]) <= 5]
print(f'# reads with 5 5\' mismatches: {len(apprx_start_matches)}')

# reads with 5 5' mismatches: 4728458


In [15]:
apprx_end_matches = [rec for rec in records if count_mismatches(f2, rec.seq[-end_len:]) <= 5]
print(f'# reads with 5 3\' mismatches: {len(apprx_end_matches)}')

# reads with 5 3' mismatches: 4702919


In [None]:
matches = [rec for rec in records if mismatch_count(f2, rec.seq[0:len(f1)]) <= n and mismatch_count(f2, rec.seq[-len(f2):]) <= n]