In [3]:
from Bio import Restriction
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from pathlib import Path
import pandas

In [13]:
#records = [rec for rec in SeqIO.parse(eu01_gbff, 'genbank')]
b31_fna = Path('../src/rst_caller/test_seqs/GCF_000008685.2_ASM868v2_genomic.fna')
bol26_fna = Path('../src/rst_caller/test_seqs/GCA_000181575.2_ASM18157v2_genomic.fna')
pabe_fna = Path('../src/rst_caller/test_seqs/GCA_002151485.1_ASM215148v1_genomic.fna')
eu01_gbff = Path('../src/rst_caller/test_seqs/EU_Bb_01.gbff')
eu01_fna = Path('../src/rst_caller/test_seqs/EU_Bb_01.fna')

In [5]:
enzymes = [
    Restriction.HinfI, Restriction.MseI
]
for enzyme in enzymes:
    print(f"{enzyme}: {enzyme.site}")

HinfI: GANTC
MseI: TTAA


In [None]:
# this approach only functions with guaranteed consistent rRNA annotation. (and then just barely.)
rrna_its = SeqRecord(Seq(""), id='rRNA_ITS', name='rRNA_ITS', description='rRNA_ITS sequence')
its_start = 0
its_end = 0
for rec in records:
    for feat in rec.features:
        if feat.type == 'gene':
            gene_name = feat.qualifiers.get('gene','')
            gene_loc = feat.location
            if 'rrf' in gene_name:
                print(gene_name, gene_loc.start, gene_loc.end)
                its_start = gene_loc.end - 20
                its_end = its_start + 1712 + 20
                rrna_its.seq = rec.seq[slice(its_start, its_end)]
                break
            # if 'rrs' in gene_name:
            #     print(gene_name, gene_loc.start, gene_loc.end)
            #     rrs_start.append(gene_loc.start)
            #     rrs_end.append(gene_loc.end)
            #     if len(rrs_start) == 2:
            #         its_end = sorted(rrs_start)[0]
            #         print('ITS_coords', its_start, its_end)
            #         print('Len sequence', its_end - its_start)
            #         rrna_its.seq = rec.seq[slice(its_start, its_end)]
            #         break

In [None]:
import subprocess
import io
from Bio import Restriction
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from pathlib import Path

# patterns, primers, and type definitions are from Liveris et al. (1995)
RFLP_PATTERNS = {
    'H1':[1078,372,310],
    'H2':[1078,310,241,131],
    'M1':[258,149,136,128,102],
    'M2':[364,258,136,102],
}

RST_TYPES = {
    1: ['H1', 'M1'],
    2: ['H2', 'M2'],
    3: ['H2', 'M1'],
}

def fuzzy_match(observed, expected, tolerance=10):
    unmatched = expected[:]
    for obs in observed:
        for exp in unmatched:
            if abs(obs-exp) <= tolerance:
                unmatched.remove(exp)
                break
    return len(unmatched) == 0

def get_best_pattern(obs_frags, enzyme, tolerance=10):
    candidates = []
    for pat_name, pat_frags in RFLP_PATTERNS.items():
        if pat_name.startswith(enzyme[0]) and fuzzy_match(obs_frags, sorted(pat_frags), tolerance):
            candidates.append(pat_name)
    return candidates

def amplify_and_cut(input_fa_file, quiet=False, output=Path('results'), **kwargs):
    # primers from Liveris et al. (1995)
    fwd = "GGTATGTTTAGTGAGGG"
    rev = "CAGGCTCTACACTTCTG"

    file_id = input_fa_file.stem

    with open(input_fa_file, 'r') as f:
        proc_out = subprocess.run(
            ["seqkit", "amplicon", "--bed",
             "-F", fwd, "-R", rev,
             "-m", "2", "-M", "-I"],
            stdin=f,
            text=True,
            capture_output=True,
        )
        output_file = output / f"{file_id}_RFLP_RST_amplicon.fna"
        with open(output_file, 'w') as outhandle:
            # convert to BED6+1 to add coordinates to header?
            # let's try it.
            #(0, 'chrom') (1, 'start') (2, 'end') (3, 'name') (4, 'score') (5, 'strand') (6, 'sequence') (7, 'mismatches') (8, 'mismatches_5') (9, 'mismatches_3')
            #keys: list[str] = ["chrom", "start", "end", "name", "score", "strand", "sequence", "mismatches", "mismatches_5", "mismatches_3"]
            amplicons = []
            for line in proc_out.stdout.split('\n'):
                if not line.strip():
                    continue
                fields: list[str] = line.split('\t')
                rec_id: str = fields[0]
                rec_name: str = "rRNA_ITS_amplicon"
                rec_description: str = f"[{fields[1]}:{fields[2]}] {file_id}_{rec_name} mismatches={fields[7]}({fields[8]}+{fields[9]})"
                amplicons.append(SeqRecord(
                                            seq=Seq(fields[6]),
                                            id = rec_id,
                                            name=rec_name,
                                            description=rec_description
                                            )
                )
                amplicon: SeqRecord = amplicons[0]
                amplicon_out: int = SeqIO.write(amplicons, outhandle, 'fasta')
                print(f"{amplicon_out} amplicons written.")
        # run the digestion but toss out the MseI fragments below 100.
        HinfI_fragments: list[int] = sorted({len(frag) for frag in Restriction.HinfI.catalyze(amplicon.seq, linear=True)})
        MseI_fragments: list[int] = sorted({len(frag) for frag in Restriction.MseI.catalyze(amplicon.seq, linear=True) if len(frag) >= 100})

        # this is not optimal but this is the only way I can get it to return types that have been experimentally determined.
        H_match: list | None = get_best_pattern(HinfI_fragments, 'HinfI', 45)
        M_match: list | None = get_best_pattern(MseI_fragments, 'MseI', 17)
        if not quiet:
            print(amplicon.description.split(' ')[-1], len(amplicon.seq))
            print(f"HinfI fragments: {HinfI_fragments}")
            print(f"MseI fragments:  {MseI_fragments}")
            print(f"Matched HinfI Pattern: {H_match}")
            print(f"Matched MseI Pattern:  {M_match}")

        if not H_match or not M_match:
            # I want to return 0 if one of the patterns doesn't match
            # but isn't RST3 anything not 1 or 2? 0 for now to make debugging easier.
            if not quiet:
               print("Error: Novel fragment pattern detected!")
               # should make it always output all fragments
               # TODO: see above
            return 0,  HinfI_fragments, MseI_fragments, amplicons, amplicon, H_match, M_match

        for rst, (hpat, mpat) in RST_TYPES.items():
            if hpat in H_match and mpat in M_match:
                called_RST: int = rst
        if not quiet:
            print(f"Matched RST: {called_RST}")

        return called_RST, HinfI_fragments, MseI_fragments, amplicons, amplicon, H_match, M_match

In [95]:
test = Path('test')
test.mkdir(parents=True, exist_ok=True)


In [96]:

frags = amplify_and_cut(b31_fna, output=test)
print(frags)
frags = amplify_and_cut(eu01_fna, output=test)
print(frags)

1 amplicons written.
mismatches=1(0+1) 1712
HinfI fragments: [302, 373, 1037]
MseI fragments:  [105, 129, 137, 148, 258]
Matched HinfI Pattern: ['H1']
Matched MseI Pattern:  ['M1']
Matched RST: 1
(1, [302, 373, 1037], [105, 129, 137, 148, 258], [SeqRecord(seq=Seq('GGTATGTTTAGTGAGGGGGGTGAAGTCGTAACAAGGTAGCCGTACTGGAAAGTG...CTA'), id='NC_001318.1', name='rRNA_ITS_amplicon', description='[442941:444653] GCF_000008685.2_ASM868v2_genomic_rRNA_ITS_amplicon mismatches=1(0+1)', dbxrefs=[])], SeqRecord(seq=Seq('GGTATGTTTAGTGAGGGGGGTGAAGTCGTAACAAGGTAGCCGTACTGGAAAGTG...CTA'), id='NC_001318.1', name='rRNA_ITS_amplicon', description='[442941:444653] GCF_000008685.2_ASM868v2_genomic_rRNA_ITS_amplicon mismatches=1(0+1)', dbxrefs=[]), ['H1'], ['M1'])
1 amplicons written.
mismatches=2(0+2) 3369
HinfI fragments: [40, 68, 172, 307, 373, 589, 805, 1015]
MseI fragments:  [100, 119, 122, 127, 174, 211, 258, 424]
Matched HinfI Pattern: []
Matched MseI Pattern:  []
Error: Novel fragment pattern detected!
(0, [4

In [17]:
amplify_and_cut(bol26_fna, output=test)

1 amplicons written.
mismatches=1(0+1) 1712
HinfI fragments: [302, 373, 1037]
 MseI fragments: [105, 129, 137, 148, 258]
Matched HinfI Pattern: ['H1']
 Matched MseI Pattern: ['M1']
Matched RST: 1
Matched RST: 1
Matched RST: 1


(1,
 [302, 373, 1037],
 [105, 129, 137, 148, 258],
 [SeqRecord(seq=Seq('GGTATGTTTAGTGAGGGGGGTGAAGTCGTAACAAGGTAGCCGTACTGGAAAGTG...CTA'), id='ABCW02000003.1', name='ABCW02000003.1', description='ABCW02000003.1 Borreliella burgdorferi Bol26 gcontig_1118719648276, whole genome shotgun sequence mismatches=1(0+1)', dbxrefs=[])],
 SeqRecord(seq=Seq('GGTATGTTTAGTGAGGGGGGTGAAGTCGTAACAAGGTAGCCGTACTGGAAAGTG...CTA'), id='ABCW02000003.1', name='ABCW02000003.1', description='ABCW02000003.1 Borreliella burgdorferi Bol26 gcontig_1118719648276, whole genome shotgun sequence mismatches=1(0+1)', dbxrefs=[]),
 ['H1'],
 ['M1'])

In [519]:
amp_n_cut(pabe_fna)

(1, [302, 373, 1037], [105, 129, 137, 148, 258])

In [520]:
metadata = pandas.read_csv('old_metadata.tsv', sep='\t')
metadata

Unnamed: 0,Strain_ID,Alias,Assembly_ID,RST
0,B-17/2013,Gr-39,ASM1913465v1,3
1,PAli,,ASM215146v1,1
2,PAbe,,ASM215148v1,1
3,B31_NRZ,,ASM215150v1,1
4,B408,,ASM2466215v1,3
...,...,...,...,...
77,UWI247P,MC104,,2
78,UWI248P,MC105,,2
79,UWI263P,MC123,,2
80,UWI283P,MC149,,3


In [None]:
files = Path('/home/mf019/longread_pangenome/expanded_dataset_analysis/assemblies/dataset_v5').glob('**/*.fna')
header = ["assembly_id", "existing_type", "called_type", "calls_match"]
lines = ["\t".join(header)]

for file in files:
    filename = file.stem
    rst_out = rstc.amp_n_cut(file)
    old_type = ''
    if filename.startswith("GCF"):
        asm_name = filename.split('_')[2]
        #df.loc[df['col_B'] == 'apple', 'col_C']
        old_type = metadata.loc[metadata["Assembly_ID"] == asm_name, "RST"].to_list()
    else:
        asm_name = filename
        old_type = metadata.loc[metadata["Strain_ID"] == asm_name, "RST"].to_list()
        if len(old_type) == 0:
            old_type = metadata.loc[metadata["Alias"] == asm_name, "RST"].to_list()
    if len(old_type) > 0:
        old_type = old_type[0]
    else:
        old_type = 0

    line = f"{asm_name}\t{old_type}\t{rst_out[0]}\t{old_type == rst_out[0]}"
    lines.append(line)
    print(line)

ASM4079076v1	3	3	True
B418P	3	3	True
ASM215146v1	1	1	True
URI87H	1	1	True
URI34H	3	3	True
URI88H	2	2	True
URI33H	2	2	True
UCT110H	2	2	True
URI39H	1	1	True
URI91H	1	1	True
UCT35H	3	3	True
ESI361H	3	3	True
UWI247P	2	2	True
URI120H	1	1	True
URI107H	1	1	True
ASM215150v1	1	1	True
UWI263P	2	2	True
ASM4079079v1	2	2	True
ASM4079078v1	3	3	True
URI89H	1	1	True
URI42H	1	1	True
URI44H	2	2	True
UCT109H	1	1	True
URI40H	1	1	True
B500P	3	3	True
URI117H	2	2	True
URI47H	2	2	True
URI86H	2	2	True
ASM4079071v1	2	2	True
ASM4079075v1	3	3	True
URI36H	3	3	True
UNY208P	2	2	True
ASM4079074v1	3	3	True
ESI26H	1	1	True
UCT31H	1	1	True
ASM4079080v1	1	1	True
ESI403H	1	1	True
URI56H	3	3	True
XYZ459H	3	3	True
UCT30H	2	2	True
ASM4079073v1	2	2	True
PFhe	0	1	False
URI103H	2	2	True
UCT29H	1	1	True
UNY1128P	3	3	True
URI112H	2	2	True
UNY1032P	1	1	True
UWI248P	2	2	True
UNY203P	3	3	True
UCT96H	1	1	True
ASM1913465v1	3	3	True
ESI425H	3	3	True
UCT32H	2	2	True
UNY990P	2	2	True
UNY193P	3	3	True
UCT113H	2	2	True
URI93H	1	1	True
ASM2

In [514]:
with open('rst_caller_test.tsv', 'w') as outf:
    contents = "\n".join(lines)
    outf.write(contents)

In [86]:
output_file = "test_RFLP_RST_amplicon.fna" #output / f"file_name.fna"
fwd = "GGTATGTTTAGTGAGGG"
rev = "CAGGCTCTACACTTCTG"

with open(b31_fna, 'r') as inhandle:
    file_id = b31_fna.stem
    with open(output_file, 'w') as outhandle:
        proc_out = subprocess.run(
            [ "seqkit", "amplicon", "--bed",
                    "-F", fwd, "-R", rev,
                    "-m", "2", "-M", "-I" ],
            stdin=inhandle,
            text=True,
            capture_output=True,
        )
        rec = proc_out.stdout
        #(0, 'chrom') (1, 'start') (2, 'end') (3, 'name') (4, 'score') (5, 'strand') (6, 'sequence') (7, 'mismatches') (8, 'mismatches_5') (9, 'mismatches_3')
        keys: list[str] = ["chrom", "start", "end", "name", "score", "strand", "sequence", "mismatches", "mismatches_5", "mismatches_3"]
        amplicons = []
        for line in proc_out.stdout.split('\n'):
            if not line.strip():
                continue
            fields: list[str] = line.split('\t')
            rec_id: str = fields[0]
            rec_name: str = "rRNA_ITS_amplicon"
            rec_description: str = f"[{fields[1]}:{fields[2]}] {file_id}_{rec_name} mismatches={fields[7]}({fields[8]}+{fields[9]})"
            amplicons.append(SeqRecord(
                                        seq=Seq(fields[6]),
                                        id = rec_id,
                                        name=rec_name,
                                        description=rec_description
                                        )
            )
        # bed6+1 has tab sep cols:
        # chrom \t start \t end \t name \t score \t strand \t sequence \t mismatches \t mismatches_5 \t mismatches_3 \n
        #amplicons = [rec for rec in SeqIO.parse(io.StringIO(proc_out.stdout), 'fasta')]
        print("type of amplicons object:", type(amplicons))
        amplicon = amplicons[0]
        print("type of amplicon object:", type(amplicon))
        amplicon_out = SeqIO.write(amplicons, outhandle, 'fasta')
        print(f"{amplicon_out} amplicons written.")



type of amplicons object: <class 'list'>
type of amplicon object: <class 'Bio.SeqRecord.SeqRecord'>
1 amplicons written.
