In [1]:
%env PROTEINS_FILE proteins.txt
%env PDB_DIR pdbs/
%env FASTA_DIR fasta/
%env SS_DIR ss/

env: PROTEINS_FILE=proteins.txt
env: PDB_DIR=pdbs/
env: FASTA_DIR=fasta/
env: SS_DIR=ss/


In [2]:
!mkdir -p $FASTA_DIR $SS_DIR $PDB_DIR

In [3]:
# !rm -rf pdbs/*
!./batch_download.sh -f proteins.txt -p -o pdbs
!gunzip -k --force pdbs/*.pdb.gz
!ls pdbs

Downloading https://files.rcsb.org/download/3JSN.pdb.gz to pdbs/3JSN.pdb.gz
3JSN.pdb  3JSN.pdb.gz


In [4]:
from typing import *
import os
import itertools
import re

In [5]:
# https://www.ddbj.nig.ac.jp/ddbj/code-e.html
AMINO_ACIDS = {
    'ALA': 'A',  # Alanine
    'ARG': 'R',  # Arginine
    'ASN': 'N',  # Asparagine
    'ASP': 'D',  # Aspartic acid
    'CYS': 'C',  # Cysteine
    'GLN': 'Q',  # Glutamine
    'GLU': 'E',  # Glutamic acid
    'GLY': 'G',  # Glycine
    'HIS': 'H',  # Histidine
    'ILE': 'I',  # Isoleucine
    'LEU': 'L',  # Leucine
    'LYS': 'K',  # Lysine
    'MET': 'M',  # Methionine
    'PHE': 'F',  # Phenylalanine
    'PRO': 'P',  # Proline
    'PYL': 'O',  # Pyrrolysine
    'SER': 'S',  # Serine
    'SEC': 'U',  # Selenocysteine
    'THR': 'T',  # Threonine
    'TRP': 'W',  # Tryptophan
    'TYR': 'Y',  # Tyrosine
    'VAL': 'V',  # Valine
    'ASX': 'B',  # Aspartic acid or Asparagine
    'GLX': 'Z',  # Glutamic acid or Glutamine
    'XAA': 'X',  # Any amino acid
    'XLE': 'J',  # Leucine or Isoleucine
}

In [6]:
def read_proteins_file(proteins_file_path: str):
    with open(proteins_file_path, 'r') as f:
        proteins = [line.replace('\n', '') for line in f.readlines()]
    return proteins

In [7]:
proteins_file = os.environ['PROTEINS_FILE']
pdb_dir = os.environ['PDB_DIR']
fasta_dir = os.environ['FASTA_DIR']
secondary_structure_dir = os.environ['SS_DIR']

In [8]:
def parse_ss_line(line: str):
    if line.startswith('HELIX '):
        return {
            'record_name': 'HELIX',
            'serial_number': int(line[7:10].strip() or '-1'),
            'id': line[11:14].strip(),
            'initial_residue_name': line[15:18].strip(),
            'initial_chain_id': line[19].strip(),
            'initial_sequence_number': int(line[21:25].strip() or '-1'),
            'initial_residue_insertion_code': line[25].strip(),
            'terminal_residue_name': line[27:30].strip(),
            'terminal_chain_id': line[31].strip(),
            'terminal_sequence_number': int(line[34:37].strip() or '-1'),
            'terminal_residue_insertion_code': line[37].strip(),
            'helix_class': int(line[38:40].strip() or '-1'),
            'comment': line[40:70].strip(),
            'length': int(line[71:76].strip() or '-1'),
        }
    elif line.startswith('SHEET '):
        return {
            'record_name': 'SHEET',
            'serial_number': int(line[7:10].strip() or '-1'),
            'id': line[11:14].strip(),
            'number_of_strands': int(line[14:16].strip() or '-1'),
            'initial_residue_name': line[17:20].strip(),
            'initial_chain_id': line[21].strip(),
            'initial_sequence_number': int(line[22:26]),
            'initial_residue_insertion_code': line[26].strip(),
            'terminal_residue_name': line[28:31].strip(),
            'terminal_chain_id': line[32].strip(),
            'terminal_sequence_number': int(line[33:37].strip() or '-1'),
            'terminal_residue_insertion_code': line[37].strip(),
            'sense': int(line[38:40].strip() or '-1'),
            'current_atom': line[41:45].strip(),
            'current_residue_name': line[45:48].strip(),
            'current_chain_id': line[49].strip(),
            'current_residue_sequence': int(line[50:54].strip() or '-1'),
            'current_insertion_code': line[54].strip(),
            'previous_atom': line[56:60].strip(),
            'previous_residue_name': line[60:63].strip(),
            'previous_chain_id': line[64].strip(),
            'previous_residue_sequence': int(line[65:69].strip() or '-1'),
            'previous_insertion_code': line[69].strip(),
        }
    elif line.startswith('TURN '):
        return {
            'record_name': 'TURN',
            'serial_number': int(line[7:10].strip() or '-1'),
            'id': line[11:14].strip(),
            'initial_residue_name': line[15:18].strip(),
            'initial_chain_id': line[19].strip(),
            'initial_sequence_number': int(line[20:24].strip() or '-1'),
            'initial_residue_insertion_code': line[24].strip(),
            'terminal_residue_name': line[26:29].strip(),
            'terminal_chain_id': line[30].strip(),
            'terminal_sequence_number': int(line[31:35].strip() or '-1'),
            'terminal_residue_insertion_code': line[35].strip(),
            'comment': line[40:70].strip(),
        }

In [9]:
def parse_seqres_line(line: str):
    if line.startswith('SEQRES'):
        return {
            'record_name': 'SEQRES',
            'serial_number': int(line[7:10].strip() or '-1'),
            'chain_id': line[11].strip(),
            'number_of_residues': int(line[13:17].strip() or '-1'),
            'residues': [
                line[i:i+3].strip() for i in range(19, 71, 4)
            ]
        }

In [10]:
def parse_line(line: str):
    if re.match('^(SHEET|HELIX|TURN)', line):
        return parse_ss_line(line)
    elif re.match('^SEQRES', line):
        return parse_seqres_line(line)

In [11]:
def parse_single_chain_pdb_file(pdb_file_path: str):
    with open(pdb_file_path, 'r') as f:
        return list(filter(lambda item: item is not None, map(parse_line, f.readlines())))

In [12]:
def extract_sequence_from_pdb_records(pdb_records: List[dict]):
    seqres_records = list(filter(lambda item: item.get('record_name') == 'SEQRES', pdb_records))
    chains_records = {group: list(group_items) for group, group_items in itertools.groupby(seqres_records, lambda item: (item['chain_id'], int(item['number_of_residues'])))}
    sequences = dict()
    for (chain_id, number_of_residues), records in chains_records.items():
        seq = "".join([
            "".join([
                AMINO_ACIDS.get(residue) 
                for residue in item['residues'] 
                if residue.strip()
            ]) 
            for item in records
        ])
        sequences[chain_id] = (seq, number_of_residues)
    return sequences

In [13]:
def get_secondary_structure_map(pdb_records: List[dict]):
    ss_records = list(filter(lambda item: item.get('record_name') in ['HELIX', 'SHEET', 'TURN'], pdb_records))
    return {
        group: list(group_items) 
        for group, group_items in itertools.groupby(
            ss_records,
            lambda item: (item['record_name'], item['initial_chain_id'], item['terminal_chain_id'])
        )
    }

In [14]:
def get_sequence_secondary_structure_segments(
    secondary_structure_map: Dict[Tuple[str, str, str], List[dict]],
):
    _segments = dict()
    for (structure_type, initial_chain, terminal_chain), seq_parts in secondary_structure_map.items():
        if initial_chain != terminal_chain:
            raise Exception('Unsupported')
        _chain = initial_chain
        if _chain not in _segments:
            _segments[_chain] = set()
        for _seq_part in seq_parts:
            _id = _seq_part['id']
            _initial_seq_num = _seq_part['initial_sequence_number']
            _terminal_seq_num = _seq_part['terminal_sequence_number']
            _item = (structure_type, _id, _initial_seq_num, _terminal_seq_num)
            if _item not in _segments[_chain]:
                _segments[_chain].add(_item)
    return _segments

In [15]:
def get_secondary_structure_masks(
    secondary_structure_segments: Dict[str, Tuple[str, int, int]],
    chain_sequences: Dict[str, Tuple[str, int]],
):
    masks = dict()
    for chain_id, segments in secondary_structure_segments.items():
        lines = []
        _sequence, _ = chain_sequences[chain_id]
        if chain_id not in masks:
            masks[chain_id] = 'C' * len(_sequence)
        for (structure_type, segment_id, seq_start, seq_end) in segments:
            _length = seq_end - seq_start + 1
            masks[chain_id] = masks[chain_id][:seq_start-1] + structure_type[0] * _length + masks[chain_id][seq_end:]
    return masks

In [16]:
def write_fasta(
    residue_id: str,
    chain_sequences: Dict[str, Tuple[str, int]],
    prefix: str = None,
    line_width: int = 60,
):
    prefix = prefix or ""
    for chain_id, (sequence, _) in chain_sequences.items():
        _seq_lines = [sequence[i:i+line_width] + '\n' for i in range(0, len(sequence), line_width)]
        lines = [
            f'>{residue_id}:{chain_id}\n',
            *_seq_lines,
        ]
        with open(prefix + residue_id + '-' + chain_id + '.fasta', 'w') as f:
            f.writelines(lines)

In [17]:
def write_ss_file(
    residue_id: str,
    secondary_structure_segments: Dict[str, Tuple[str, int, int]],
    chain_sequences: Dict[str, Set[Tuple[str, str, int, int]]],
    prefix: str = None,
):
    prefix = prefix or ""
    for chain_id, segments in secondary_structure_segments.items():
        lines = []
        _sequence, _ = chain_sequences[chain_id]
        for (structure_type, segment_id, seq_start, seq_end) in segments:
            lines.append(f"{segment_id}\n")
            lines.append(f"{seq_start}:{seq_end}\n")
            lines.append(f"{_sequence[seq_start-1:seq_end]}\n")
            lines.append(f"{structure_type}\n")
        with open(prefix + residue_id + '-' + chain_id + ".ss", 'w') as f:
            f.writelines(lines)

In [18]:
def read_ss_file(
    residue_id: str,
    chain_id: str,
    prefix: str = None,
):
    prefix = prefix or ""
    with open(prefix + residue_id + '-' + chain_id + ".ss", 'r') as f:
        lines = list(map(lambda item: item.strip(), f.readlines()))
    if len(lines) % 4 != 0:
        raise Exception('Invalid ss file')
    segments = [(lines[i+3], lines[i], int(lines[i+1].split(':')[0]), int(lines[i+1].split(':')[1])) for i in range(0, len(lines), 4)]
    return set(segments)

In [19]:
pdb_path = 'pdbs/3JSN.pdb'

In [20]:
pdb_records = parse_single_chain_pdb_file(pdb_path)
pdb_records

[{'record_name': 'SEQRES',
  'serial_number': 1,
  'chain_id': 'A',
  'number_of_residues': 318,
  'residues': ['MET',
   'ALA',
   'ASP',
   'LEU',
   'SER',
   'SER',
   'ARG',
   'VAL',
   'ASN',
   'GLU',
   'LEU',
   'HIS',
   'ASP']},
 {'record_name': 'SEQRES',
  'serial_number': 2,
  'chain_id': 'A',
  'number_of_residues': 318,
  'residues': ['LEU',
   'LEU',
   'ASN',
   'GLN',
   'TYR',
   'SER',
   'TYR',
   'GLU',
   'TYR',
   'TYR',
   'VAL',
   'GLU',
   'ASP']},
 {'record_name': 'SEQRES',
  'serial_number': 3,
  'chain_id': 'A',
  'number_of_residues': 318,
  'residues': ['ASN',
   'PRO',
   'SER',
   'VAL',
   'PRO',
   'ASP',
   'SER',
   'GLU',
   'TYR',
   'ASP',
   'LYS',
   'LEU',
   'LEU']},
 {'record_name': 'SEQRES',
  'serial_number': 4,
  'chain_id': 'A',
  'number_of_residues': 318,
  'residues': ['HIS',
   'GLU',
   'LEU',
   'ILE',
   'LYS',
   'ILE',
   'GLU',
   'GLU',
   'GLU',
   'HIS',
   'PRO',
   'GLU',
   'TYR']},
 {'record_name': 'SEQRES',
  'serial

In [21]:
chain_sequences = extract_sequence_from_pdb_records(pdb_records)
chain_sequences

{'A': ('MADLSSRVNELHDLLNQYSYEYYVEDNPSVPDSEYDKLLHELIKIEEEHPEYKTVDSPTVRVGGEAQASFNKVNHDTPMLSLGNAFNEDDLRKFDQRIREQIGNVEYMCELKIDGLAVSLKYVDGYFVQGLTRGDGTTGEDITENLKTIHAIPLKMKEPLNVEVRGEAYMPRRSFLRLNEEKEKNDEQLFANPRNAAAGSLRQLDSKLTAKRKLSVFIYSVNDFTDFNARSQSEALDELDKLGFTTNKNRARVNNIDGVLEYIEKWTSQRESLPYDIDGIVIKVNDLDQQDEMGFTQKSPRWAIAYKFPAEEHHHHHH',
  318)}

In [22]:
secondary_structure_map = get_secondary_structure_map(pdb_records)
secondary_structure_map

{('HELIX',
  'A',
  'A'): [{'record_name': 'HELIX',
   'serial_number': 1,
   'id': '1',
   'initial_residue_name': 'ASP',
   'initial_chain_id': 'A',
   'initial_sequence_number': 3,
   'initial_residue_insertion_code': '',
   'terminal_residue_name': 'TYR',
   'terminal_chain_id': 'A',
   'terminal_sequence_number': 23,
   'terminal_residue_insertion_code': '',
   'helix_class': 1,
   'comment': '',
   'length': 21}, {'record_name': 'HELIX',
   'serial_number': 2,
   'id': '2',
   'initial_residue_name': 'SER',
   'initial_chain_id': 'A',
   'initial_sequence_number': 33,
   'initial_residue_insertion_code': '',
   'terminal_residue_name': 'HIS',
   'terminal_chain_id': 'A',
   'terminal_sequence_number': 49,
   'terminal_residue_insertion_code': '',
   'helix_class': 1,
   'comment': '',
   'length': 17}, {'record_name': 'HELIX',
   'serial_number': 3,
   'id': '3',
   'initial_residue_name': 'PRO',
   'initial_chain_id': 'A',
   'initial_sequence_number': 50,
   'initial_residue_in

In [23]:
_segments = get_sequence_secondary_structure_segments(secondary_structure_map)
_segments['A']

{('HELIX', '1', 3, 23),
 ('HELIX', '10', 231, 243),
 ('HELIX', '11', 255, 269),
 ('HELIX', '12', 286, 294),
 ('HELIX', '2', 33, 49),
 ('HELIX', '3', 50, 53),
 ('HELIX', '4', 57, 63),
 ('HELIX', '5', 87, 103),
 ('HELIX', '6', 142, 147),
 ('HELIX', '7', 171, 186),
 ('HELIX', '8', 192, 202),
 ('HELIX', '9', 205, 211),
 ('SHEET', 'A', 72, 74),
 ('SHEET', 'A', 138, 140),
 ('SHEET', 'B', 84, 85),
 ('SHEET', 'B', 107, 113),
 ('SHEET', 'B', 251, 253),
 ('SHEET', 'B', 277, 284),
 ('SHEET', 'B', 303, 307),
 ('SHEET', 'C', 116, 123),
 ('SHEET', 'C', 126, 132),
 ('SHEET', 'C', 162, 169),
 ('SHEET', 'C', 215, 221),
 ('SHEET', 'D', 295, 296),
 ('SHEET', 'D', 299, 300)}

In [24]:
masks = get_secondary_structure_masks(_segments, chain_sequences)
masks['A']

'CCHHHHHHHHHHHHHHHHHHHHHCCCCCCCCCHHHHHHHHHHHHHHHHHHHHHCCCHHHHHHHCCCCCCCCSSSCCCCCCCCCSSCHHHHHHHHHHHHHHHHHCCCSSSSSSSCCSSSSSSSSCCSSSSSSSCCCCCSSSCHHHHHHCCCCCCCCCCCCCCSSSSSSSSCHHHHHHHHHHHHHHHHCCCCCHHHHHHHHHHHCCHHHHHHHCCCSSSSSSSCCCCCCCCCHHHHHHHHHHHHHCCCCCCCSSSCHHHHHHHHHHHHHHHCCCCCCCSSSSSSSSCHHHHHHHHHSSCCSSCCSSSSSCCCCCCCCCCC'

In [25]:
proteins = read_proteins_file(proteins_file)

for protein in proteins:
    protein_name = protein.upper()
    pdb_path = f'{pdb_dir}{protein_name}.pdb'

    pdb_records = parse_single_chain_pdb_file(pdb_path)  # list of dicts, each dict represents a parsed line in pdb file
    chain_sequences = extract_sequence_from_pdb_records(pdb_records)  # chain_id: sequence
    secondary_structure_map = get_secondary_structure_map(pdb_records)  # {(struct_type(HELIX,SHEET,etc),start_chain,end_chain):[...records]}
    sequence_ss_segments = get_sequence_secondary_structure_segments(secondary_structure_map)  # {(struct_type, id, start, end)}
    chain_masks = get_secondary_structure_masks(sequence_ss_segments, chain_sequences)  # chain_id: mask
    write_fasta(protein_name, chain_sequences, (fasta_dir.rstrip('/') + '/'))
    write_ss_file(protein_name, sequence_ss_segments, chain_sequences, (secondary_structure_dir.rstrip('/') + "/"))