Skip to content

Commit

Permalink
Various updates for improved compatability with round tripping the PD…
Browse files Browse the repository at this point in the history
…B and other sequence importers

specifically:
1. update to support NEF-Pipelines round trip fasta headers
2. better support of a variety of fasta headers (including pdb/rcsb headers)
3. better compatability with other sequence importers
  • Loading branch information
locsmith committed May 2, 2024
1 parent da2d0d5 commit 9ac35e3
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 29 deletions.
8 changes: 4 additions & 4 deletions src/nef_pipelines/tests/fasta/test_export_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
app.command()(sequence)

EXPECTED_3AA = """
>test CHAIN: A | START RESIDUE: 1
>test NEFPLS | CHAIN: A | START: 1
AAA
"""
Expand All @@ -29,10 +29,10 @@ def test_3aa():


EXPECTED_3AA_2_CHAINS = """
>test CHAIN: A | START RESIDUE: 1
>test NEFPLS | CHAIN: A | START: 1
AAA
>test CHAIN: B | START RESIDUE: 1
>test NEFPLS | CHAIN: B | START: 1
CCC
"""
Expand All @@ -47,7 +47,7 @@ def test_3aa_2_chains():


EXPECTED_THX = """
>test CHAIN: thx | START RESIDUE: 1
>test NEFPLS | CHAIN: thx | START: 1
AAA
"""

Expand Down
133 changes: 108 additions & 25 deletions src/nef_pipelines/transcoders/fasta/importers/sequence.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from itertools import chain, cycle, islice
from pathlib import Path
from typing import Iterable, List
Expand All @@ -24,6 +25,15 @@
from nef_pipelines.lib.util import STDIN, exit_error, parse_comma_separated_options
from nef_pipelines.transcoders.fasta import import_app

CHAIN_STARTS_HELP = """first residue number of sequences can be a comma separated list or added multiple times
note the offsets are applied on top of any chain starts present in the file headers unless
the option --no-header is used"""

NO_HEADER_HELP = """don't read the NEF-Pipelines sequence header if present
the format of the header is >{entry_name} CHAIN: {chain} | START RESIDUE: {chain_start}
an example would be >test CHAIN: A | START: -2 for the entry test where chain A starts at -2
"""

app = typer.Typer()

NO_CHAIN_START_HELP = """don't include the start chain link type on a chain for the first residue [linkage will be
Expand All @@ -47,7 +57,7 @@ def sequence(
starts: List[str] = typer.Option(
[],
"--starts",
help="first residue number of sequences can be a comma separated list or ",
help=CHAIN_STARTS_HELP,
metavar="<START>",
),
no_chain_starts: List[str] = typer.Option(
Expand All @@ -57,7 +67,7 @@ def sequence(
[], "--no-chain-end", help=NO_CHAIN_END_HELP
),
# TODO: unused inputs!
entry_name: str = typer.Option("fasta", help="a name for the entry if required"),
entry_name: str = typer.Option(None, help="a name for the entry if required"),
in_file: Path = typer.Option(
STDIN,
"-i",
Expand All @@ -71,6 +81,7 @@ def sequence(
"--molecule-type",
help="molecule type",
),
no_header: bool = typer.Option(True, "--no-header", help=NO_HEADER_HELP),
file_names: List[Path] = typer.Argument(
..., help="the file to read", metavar="<FASTA-FILE>"
),
Expand Down Expand Up @@ -118,8 +129,9 @@ def sequence(
no_chain_starts,
no_chain_ends,
molecule_types,
no_header,
file_names,
in_file,
entry_name,
)

print(entry)
Expand All @@ -132,27 +144,34 @@ def pipe(
no_chain_starts: List[bool],
no_chain_ends: List[bool],
molecule_types: List[MoleculeType],
no_header: bool,
file_names,
input: Path,
entry_name: str,
):
fasta_frames = []

chain_code_iterator = chain_code_iter(chain_codes)
fasta_sequences = OrderedSet()

fasta_sequences.update(
_read_sequences(file_names, chain_code_iterator, molecule_types)
fasta_residues, read_entry_name = _read_sequences(
file_names, chain_codes, molecule_types, not no_header
)

read_chain_codes = residues_to_chain_codes(fasta_sequences)
if read_entry_name and not entry_name:
entry.entry_id = read_entry_name
elif entry_name:
entry.entry_id = entry_name
else:
entry.entry_id = "fasta"

read_chain_codes = residues_to_chain_codes(fasta_residues)

offsets = _get_sequence_offsets(read_chain_codes, starts)

fasta_sequences = offset_chain_residues(fasta_sequences, offsets)
fasta_residues = offset_chain_residues(fasta_residues, offsets)

fasta_sequences = sorted(fasta_sequences)
fasta_residues = sorted(fasta_residues)

fasta_frames.append(sequence_to_nef_frame(fasta_sequences))
fasta_frames.append(
sequence_to_nef_frame(fasta_residues, no_chain_starts, no_chain_ends)
)

entry = add_frames_to_entry(entry, fasta_frames)

Expand All @@ -179,30 +198,83 @@ def residues_to_chain_codes(residues: List[SequenceResidue]) -> List[str]:
return list(OrderedSet([residue.chain_code for residue in residues]))


@dataclass
class Sequence:
entry_id: str
comment: str
sequence: List[str]
chain_code: str = None
starts: int = 1


# could do with taking a list of offsets
# noinspection PyUnusedLocal
#
# >pdb|4ciw|A
# >CW42
# >crab_anapl ALPHA CRYSTALLIN B CHAIN (ALPHA(B)-CRYSTALLIN).
# >gnl|mdb|bmrb16965:1 HET-s
# >gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg
# >2DLV_1|Chain A|Regulator of G-protein signaling 18|Homo sapiens (9606)
# >2SRC_1|Chain A|TYROSINE-PROTEIN KINASE SRC|Homo sapiens (9606)
# >H1N1 nucleoprotein, monomeric 416A mutant


def _parse_fasta(fasta_sequence, parse_header: bool = True) -> Sequence:
definition = fasta_sequence.formatted_definition_line()
bar_count = definition.count("|")
last_part = definition.split()[-1]
is_bracketed = last_part.startswith("(") and last_part.endswith(")")
is_last_field_number = last_part.strip("()").isdigit()

chain_code = None
start = 1
if fasta_sequence.description.startswith("NEFPLS") and parse_header:
fields = definition.split("|")
for field in fields:
if field.startswith("CHAIN:"):
chain_code = field.split()[-1]
if field.startswith("START:"):
start = field.split()[-1]
entry_id, comment = fasta_sequence.id, fasta_sequence.description

elif bar_count == 3 and is_bracketed and is_last_field_number:
fields = definition.split("|")
entry_id = fields[0]
comment = "|".join(fields[1:])
else:
entry_id, comment = fasta_sequence.id, fasta_sequence.description

letters = [letter_code.letter_code for letter_code in fasta_sequence.sequence]
return Sequence(entry_id, comment, letters, chain_code, start)


def _read_sequences(
file_paths: List[Path],
chain_codes: Iterable[str],
molecule_types: List[MoleculeType],
parse_header=False,
) -> List[SequenceResidue]:

sequences = []
headers = []
sequence_records = []

for file_path in file_paths:
try:
with open(file_path) as handle:
try:
reader = Reader(handle)
sequences.extend([sequence for sequence in reader])
headers.extend([header for header in reader])
for fasta_sequence in reader:
sequence_records.append(
_parse_fasta(fasta_sequence, parse_header)
)

except Exception as e:
# check if relative to os.getcwd
exit_error(f"Error reading fasta file {str(file_path)}", e)
except IOError as e:
exit_error(f"couldn't open {file_path} because:\n{e}", e)

number_sequences = len(sequences)
number_sequences = len(sequence_records)
number_molecule_types = len(molecule_types)

if number_molecule_types == 1:
Expand All @@ -216,21 +288,32 @@ def _read_sequences(
residues = OrderedSet()
# read as many chain codes as there are sequences
# https://stackoverflow.com/questions/16188270/get-a-fixed-number-of-items-from-a-generator
for sequence, chain_code, molecule_type in zip(
sequences, chain_code_iter(chain_codes), molecule_types
for sequence_record, chain_code, molecule_type in zip(
sequence_records, chain_code_iter(chain_codes), molecule_types
):

sequence = [letter_code.letter_code for letter_code in sequence]

try:
sequence_3_let = translate_1_to_3(sequence, molecule_type=molecule_type)
sequence_3_let = translate_1_to_3(
sequence_record.sequence, molecule_type=molecule_type
)
except BadResidue as e:
exit_error(
f"Error translating sequence {sequence} to 3 letter code {e}",
f"Error translating sequence {sequence_record} to 3 letter code {e}",
e,
)

chain_code = (
sequence_record.chain_code if sequence_record.chain_code else chain_code
)
chain_residues = sequence_3let_to_res(sequence_3_let, chain_code)

chain_residues = offset_chain_residues(
chain_residues,
[
sequence_record.starts - 1,
],
)

residues.update(chain_residues)

return residues
return residues, sequence_records[0].entry_id

0 comments on commit 9ac35e3

Please sign in to comment.