# Generate json

- order: 1

In [None]:
#| default_exp af3.json

In [None]:
#| export
import re, shutil, json, pandas as pd, numpy as np
from pathlib import Path

from rdkit import Chem as rd_chem
from rdkit.Chem import AllChem,rdmolfiles
from rdkit import Chem

from Bio.PDB import PDBParser
from kdock.data.core import *

## Single protein sequence (default)
> Default pipeline, will run MSA and template search

In [None]:
#| export
def dump_json(data, save_path):
    "Save json data into a file"
    with open(save_path,'w') as f: 
        json.dump(data,f,indent=4)

In [None]:
#| export
def get_protein_json(name, # job name
                     seq, # aa sequence
                     save_path=None, # .json
                     seeds=[1]
                     ):
    "Generate json of single protein sequence for input of docker command"
    
    json_data = {
        "name": name,
        "modelSeeds": seeds,
        "sequences": [
            {
                "protein": {
                    "id": "A",
                    "sequence": seq,
                }
            },
        ],
        "bondedAtomPairs": [],
        "dialect": "alphafold3",
        "version": 3
    }
    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        dump_json(json_data,save_path)
    return json_data

In [None]:
data = get_protein_json('proteinA','AAA','data/proteinA.json',seeds=[1,2,3])
data

{'name': 'proteinA',
 'modelSeeds': [1, 2, 3],
 'sequences': [{'protein': {'id': 'A', 'sequence': 'AAA'}}],
 'bondedAtomPairs': [],
 'dialect': 'alphafold3',
 'version': 2}

## Protein-SMILES

- First run the normal `sequence only` pipeline for the protein
- Get the output data.json file, read it, load the `["sequences"][0]["protein"]`

In [None]:
#| export
def read_json(file_path):
    with open(file_path,'r') as f: 
        data = json.load(f)
    return data

In [None]:
protein_json = read_json('data/seq_only_data.json')

In [None]:
#| export
def get_protein_smiles_json(smi_id:str, 
                            SMILES:str, 
                            protein_json, # json type
                            save_path=None, # .json
                            seeds=[1]
                            ):
    
    "Get json for protein-ligand docking task"
    raw_smiles = r"{}".format(SMILES) # JSON escaping, \ to \\
    protein_index = next(i for i, item in enumerate(protein_json["sequences"]) if "protein" in item)
    json_data = {
        "name": smi_id,
        "modelSeeds": seeds,
        "sequences": [
            {
                "ligand": {
                    "id": "L",
                    "smiles": raw_smiles,
                }
            }, 
            {
                "protein": protein_json["sequences"][protein_index]["protein"]
            },
        ],
        "bondedAtomPairs": [],
        "dialect": "alphafold3",
        "version": 2
    }
    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        dump_json(json_data,save_path)
    return json_data

In [None]:
out = get_protein_smiles_json('smi_name','CCC',protein_json,'data/protein_smi.json',seeds=[1,2,3])

Let's take a look for the json:

In [None]:
str(out)[:100]

"{'name': 'smi_name', 'modelSeeds': [1, 2, 3], 'sequences': [{'ligand': {'id': 'L', 'smiles': 'CCC'}}"

In [None]:
df = pd.DataFrame({'idx':['a','b'],'smi':['CCC','OCO']})
df

Unnamed: 0,idx,smi
0,a,CCC
1,b,OCO


In [None]:
project_name='sdf'
for idx, smi in df.values:
    _ = get_protein_smiles_json(idx,smi,protein_json,f'af_input/{project_name}/{idx}.json',seeds=[1,2,3])

## Protein-CCDcode

In [None]:
#| export
def get_protein_ccdcode_json(protein_json,  # dict with protein sequence
                              ccd_code,      # str or list of str
                              job_id: str,   # job/task ID
                              save_path=None,  # optional output path
                              seeds=[1]):      # optional random seeds
    "Create AlphaFold3 docking JSON with CCD code(s)."
    
    # Normalize ccd_code to a list
    if isinstance(ccd_code, str):
        ccd_code = [ccd_code]
    elif not isinstance(ccd_code, list):
        raise TypeError("ccd_code must be a string or a list of strings.")

    protein_index = next(i for i, item in enumerate(protein_json["sequences"]) if "protein" in item)

    json_data = {
        "name": job_id,
        "modelSeeds": seeds,
        "sequences": [
            {
                "ligand": {
                    "id": "L",
                    "ccdCodes": ccd_code
                }
            },
            {
                "protein": protein_json["sequences"][protein_index]["protein"]
            },
        ],
        "dialect": "alphafold3",
        "version": 3
    }

    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        dump_json(json_data, save_path)

    return json_data

## Protein-CCD for covalent

### sdf2CCD

`mol_to_ccd_cif` Reference: https://github.com/google-deepmind/alphafold3/issues/178

About hydrogens: https://github.com/google-deepmind/alphafold3/issues/212

In [None]:
#| export

# Mapping bond types to mmCIF-compatible values
_RDKIT_BOND_TYPE_TO_MMCIF = {
    rd_chem.BondType.SINGLE: 'SING',
    rd_chem.BondType.DOUBLE: 'DOUB',
    rd_chem.BondType.TRIPLE: 'TRIP',
    rd_chem.BondType.AROMATIC: 'AROM'
}

def assign_atom_names_from_graph(mol):
    for i, atom in enumerate(mol.GetAtoms()):
        atom.SetProp('atom_name', f"{atom.GetSymbol()}{i+1}")
    return mol

def mol_to_ccd_text(mol, component_id, pdbx_smiles=None, include_hydrogens=False):
    mol = rd_chem.Mol(mol)
    if include_hydrogens:
        mol = rd_chem.AddHs(mol)
    rd_chem.Kekulize(mol, clearAromaticFlags=True)

    if mol.GetNumConformers() == 0:
        raise ValueError('The molecule has no conformers')
    conf = mol.GetConformer()
    coords = conf.GetPositions()

    mol = assign_atom_names_from_graph(mol)
    atom_map = {atom.GetIdx(): atom.GetProp('atom_name') for atom in mol.GetAtoms()}

    lines = [
        f"data_{component_id}",
        "#",
        f"_chem_comp.id {component_id}",
        f"_chem_comp.name '{component_id}'",
        "_chem_comp.type non-polymer",
        "_chem_comp.formula '?'",
        "_chem_comp.mon_nstd_parent_comp_id ?",
        "_chem_comp.pdbx_synonyms ?",
        "_chem_comp.formula_weight '?'",
    ]
    if pdbx_smiles:
        lines.append(f"_chem_comp.pdbx_smiles {pdbx_smiles}")
    lines += [
        "#",
        "loop_",
        "_chem_comp_atom.comp_id",
        "_chem_comp_atom.atom_id",
        "_chem_comp_atom.type_symbol",
        "_chem_comp_atom.charge",
        "_chem_comp_atom.pdbx_leaving_atom_flag",
        "_chem_comp_atom.pdbx_model_Cartn_x_ideal",
        "_chem_comp_atom.pdbx_model_Cartn_y_ideal",
        "_chem_comp_atom.pdbx_model_Cartn_z_ideal"
    ]

    for i, atom in enumerate(mol.GetAtoms()):
        if not include_hydrogens and atom.GetSymbol() == 'H':
            continue
        x, y, z = coords[i]
        lines.append(f"{component_id} {atom_map[atom.GetIdx()]} {atom.GetSymbol()} {atom.GetFormalCharge()} N {x:.3f} {y:.3f} {z:.3f}")

    lines += [
        "#",
        "loop_",
        "_chem_comp_bond.atom_id_1",
        "_chem_comp_bond.atom_id_2",
        "_chem_comp_bond.value_order",
        "_chem_comp_bond.pdbx_aromatic_flag"
    ]

    for bond in mol.GetBonds():
        a1, a2 = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
        if not include_hydrogens and (mol.GetAtomWithIdx(a1).GetSymbol() == 'H' or mol.GetAtomWithIdx(a2).GetSymbol() == 'H'):
            continue
        bond_type = _RDKIT_BOND_TYPE_TO_MMCIF[bond.GetBondType()]
        aromatic_flag = 'Y' if bond.GetIsAromatic() else 'N'
        lines.append(f"{atom_map[a1]} {atom_map[a2]} {bond_type} {aromatic_flag}")
    lines.append("#")

    return "\n".join(lines)

In [None]:
#| export
def sdf2ccd(sdf_path,
            CCD_name='lig-1', # do not use '_'; use as less letter as possible, 'lig-any' leads to extra ligands
            ):

    "Convert the compound to the AF3 required CCD format"
    supplier = Chem.SDMolSupplier(sdf_path)
    mol = supplier[0]  # Get the first molecule
    return mol_to_ccd_text(mol,CCD_name)

In [None]:
sdf2ccd('covalent_test/lig-HKI.sdf')[:100]

"data_lig-any\n#\n_chem_comp.id lig-any\n_chem_comp.name 'lig-any'\n_chem_comp.type non-polymer\n_chem_com"

### json

In [None]:
#| export
def get_protein_ccd_json(protein_json, # dict with protein sequence
                         rec_residue_num:int, # 1-indexed, for bondedAtomPairs, e.g., ["A", 145, "SG"] 
                         rec_atom_id:str, # for bondedAtomPairs, e.g., ["A", 145, "SG"] 
                         lig_sdf_path, # ccd text
                         lig_atom_id:str, # 0-indexed, for bondedAtomPairs, ["L", 1, "C04"]
                         job_id:str, # str, job/task ID
                         save_path=None,# optional output path
                         seeds=[1], # optional random seeds
                         ):               
    "Create AlphaFold3 docking JSON with customized CCD ligand and bondedAtomPairs."

    # get userCCD
    userCCD=sdf2ccd(lig_sdf_path)
    ccd_id = re.search(r"_chem_comp.id\s+([^\s#]+)", userCCD).group(1)
    
    protein_index = next(i for i, item in enumerate(protein_json["sequences"]) if "protein" in item)

    json_data = {
        "name": job_id,
        "modelSeeds": seeds,
        "sequences": [
            {
                "ligand": {
                    "id": "L",
                    "ccdCodes": [ccd_id]
                }
            },
            {
                "protein": protein_json["sequences"][protein_index]["protein"]
            },
        ],
        "bondedAtomPairs": [[["A", rec_residue_num, rec_atom_id],["L", 1, lig_atom_id]]],
        "userCCD": userCCD,
        "dialect": "alphafold3",
        "version": 3
    }

    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        dump_json(json_data, save_path)

    return json_data

Version 2, with user ccd and pair as input:

In [None]:
def get_protein_ccd_json2(protein_json,             # dict with protein sequence
                         userCCD,                  # ccd text
                         pair1,                    # protein pair e.g., ["A", 145, "SG"] 1-indexed
                         pair2,                    # ligand pair e.g., ["L", 1, "C04"] 0-indexed
                         job_id,                   # str, job/task ID
                         save_path=None,           # optional output path
                         seeds=[1]):               # optional random seeds
    "Create AlphaFold3 docking JSON with customized CCD ligand and bondedAtomPairs."
    
    ccd_id = re.search(r"_chem_comp.id\s+([^\s#]+)", ccd_text).group(1)
    json_data = {
        "name": job_id,
        "modelSeeds": seeds,
        "sequences": [
            {
                "ligand": {
                    "id": "L",
                    "ccdCodes": [ccd_id]
                }
            },
            {
                "protein": protein_json["sequences"][0]["protein"]
            },
        ],
        "bondedAtomPairs": [[pair1,pair2]],
        "userCCD": userCCD,
        "dialect": "alphafold3",
        "version": 3
    }

    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        dump_json(json_data, save_path)

    return json_data

## Split the files to subfolder
> for multi-GPUs

In [None]:
#| export
def split_nfolder(folder_dir, 
                  n=4):
    "Move json files from a folder into subfolders (folder_0, folder_1, ..., folder_N)."
    
    folder_dir = Path(folder_dir)

    files = sorted(folder_dir.glob("*.json"))
    # print(len(files))
    subfolders = [folder_dir / f"folder_{i}" for i in range(n)]
    for folder in subfolders:
        folder.mkdir(exist_ok=True)

    for idx, file in enumerate(files):
        target_folder = subfolders[idx % n]
        shutil.move(str(file), target_folder / file.name)

    print(f"Distributed {len(files)} files into {n} folders.")

In [None]:
split_nfolder(f'af_input/{project_name}')

Distributed 2 files into 4 folders.


## End

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

### Reference:

In [None]:
# import collections
# from collections.abc import Mapping, Sequence

# from absl import logging
# from alphafold3.cpp import cif_dict
# import numpy as np
# import rdkit.Chem as rd_chem
# from rdkit.Chem import AllChem as rd_all_chem

# def mol_to_ccd_cif(
#     mol: rd_chem.Mol,
#     component_id: str,
#     pdbx_smiles: str | None = None,
#     include_hydrogens: bool = True,
# ) -> cif_dict.CifDict:
#   """Creates a CCD-like mmcif data block from an rdkit Mol object.

#   Only a subset of associated mmcif fields is populated, but that is
#   sufficient for further usage, e.g. in featurization code.

#   Atom names can be specified via `atom_name` property. For atoms with
#   unspecified value of that property, the name is assigned based on element type
#   and the order in the Mol object.

#   If the Mol object has associated conformers, atom positions from the first of
#   them will be populated in the resulting mmcif file.

#   Args:
#      mol: An rdkit molecule.
#      component_id: Name of the molecule to use in the resulting mmcif. That is
#        equivalent to CCD code.
#      pdbx_smiles: If specified, the value will be used to populate
#        `_chem_comp.pdbx_smiles`.
#      include_hydrogens: Whether to include atom and bond data involving
#        hydrogens.

#   Returns:
#      An mmcif data block corresponding for the given rdkit molecule.

#   Raises:
#     UnsupportedMolBond: When a molecule contains a bond that can't be
#       represented with mmcif.
#   """
#   mol = rd_chem.Mol(mol)
#   if include_hydrogens:
#     mol = rd_chem.AddHs(mol)
#   rd_chem.Kekulize(mol)

#   if mol.GetNumConformers() > 0:
#     ideal_conformer = mol.GetConformer(0).GetPositions()
#     ideal_conformer = np.vectorize(lambda x: f'{x:.3f}')(ideal_conformer)
#   else:
#     # No data will be populated in the resulting mmcif if the molecule doesn't
#     # have any conformers attached to it.
#     ideal_conformer = None

#   mol_cif = collections.defaultdict(list)
#   mol_cif['data_'] = [component_id]
#   mol_cif['_chem_comp.id'] = [component_id]
#   if pdbx_smiles:
#     mol_cif['_chem_comp.pdbx_smiles'] = [pdbx_smiles]

#   mol = assign_atom_names_from_graph(mol, keep_existing_names=True)

#   for atom_idx, atom in enumerate(mol.GetAtoms()):
#     element = atom.GetSymbol()
#     if not include_hydrogens and element in ('H', 'D'):
#       continue

#     mol_cif['_chem_comp_atom.comp_id'].append(component_id)
#     mol_cif['_chem_comp_atom.atom_id'].append(atom.GetProp('atom_name'))
#     mol_cif['_chem_comp_atom.type_symbol'].append(atom.GetSymbol().upper())
#     mol_cif['_chem_comp_atom.charge'].append(str(atom.GetFormalCharge()))
#     if ideal_conformer is not None:
#       coords = ideal_conformer[atom_idx]
#       mol_cif['_chem_comp_atom.pdbx_model_Cartn_x_ideal'].append(coords[0])
#       mol_cif['_chem_comp_atom.pdbx_model_Cartn_y_ideal'].append(coords[1])
#       mol_cif['_chem_comp_atom.pdbx_model_Cartn_z_ideal'].append(coords[2])

#   for bond in mol.GetBonds():
#     atom1 = bond.GetBeginAtom()
#     atom2 = bond.GetEndAtom()
#     if not include_hydrogens and (
#         atom1.GetSymbol() in ('H', 'D') or atom2.GetSymbol() in ('H', 'D')
#     ):
#       continue
#     mol_cif['_chem_comp_bond.comp_id'].append(component_id)
#     mol_cif['_chem_comp_bond.atom_id_1'].append(
#         bond.GetBeginAtom().GetProp('atom_name')
#     )
#     mol_cif['_chem_comp_bond.atom_id_2'].append(
#         bond.GetEndAtom().GetProp('atom_name')
#     )
#     try:
#       bond_type = bond.GetBondType()
#       # Older versions of RDKit did not have a DATIVE bond type. Convert it to
#       # SINGLE to match the AF3 training setup.
#       if bond_type == rd_chem.BondType.DATIVE:
#         bond_type = rd_chem.BondType.SINGLE
#       mol_cif['_chem_comp_bond.value_order'].append(
#           _RDKIT_BOND_TYPE_TO_MMCIF[bond_type]
#       )
#       mol_cif['_chem_comp_bond.pdbx_stereo_config'].append(
#           _RDKIT_BOND_STEREO_TO_MMCIF[bond.GetStereo()]
#       )
#     except KeyError as e:
#       raise UnsupportedMolBondError from e
#     mol_cif['_chem_comp_bond.pdbx_aromatic_flag'].append(
#         'Y' if bond.GetIsAromatic() else 'N'
#     )

#   return cif_dict.CifDict(mol_cif)