In [1]:
import BioSimSpace as bss
import os as _os
from glob import glob as _glob
import alchemlyb as _alchemlyb
from alchemlyb.parsing.gmx import extract_u_nk as _gmx_extract_u_nk
from alchemlyb.preprocessing.subsampling import equilibrium_detection as _equilibrium_detection
from alchemlyb.estimators import AutoMBAR as _AutoMBAR
from alchemlyb.postprocessors.units import to_kcalmol as _to_kcalmol
import warnings as _warnings
from BioSimSpace import Units as _Units
import glob
import nglview as nv


Sending anonymous Sire usage statistics to http://siremol.org.
For more information, see http://siremol.org/analytics
To disable, set the environment variable 'SIRE_DONT_PHONEHOME' to 1
To see the information sent, set the environment variable 
SIRE_VERBOSE_PHONEHOME equal to 1. To silence this message, set
the environment variable SIRE_SILENT_PHONEHOME to 1.



  setattr(self, word, getattr(machar, word).flat[0])
  return self._float_to_str(self.smallest_subnormal)
  setattr(self, word, getattr(machar, word).flat[0])
  return self._float_to_str(self.smallest_subnormal)




In [2]:
# from https://github.com/michellab/BioSimSpace/blob/benchmark-2022/python/BioSimSpace/FreeEnergy/_relative.py
# Author: Anna Hertz

def _preprocessing_extracted_data(data):
        """_summary_
        Parameters
        ----------
            data : pandas.DataFrame
                Dataframe of extracted dHdl or u_nk data.
        Returns
        -------
            processed_data : pandas.DataFrame
            Dataframe of dHdl or u_nk data processed using automated equilibration
            detection followed by statistical inefficiency.
        """

        # Subsample according to equilibration detection.
        eq_okay = False
        try:
            sampled_data = [_equilibrium_detection(i, i.iloc[:, 0])
                       for i in data]
            eq_okay = True
        except:
            pass
        
        # Throw errors if either failed
        if not eq_okay:
            _warnings.warn("Could not detect equilibration.")
            sampled_data = data

        # make sure there are more than 50 samples for the analysis
        if eq_okay:
            for i in sampled_data:
                if len(i.iloc[:, 0]) < 50:
                    _warnings.warn(
                        "Less than 50 samples as a result of preprocessing. No preprocessing will be performed.")
                    sampled_data = data

        # concatanate in alchemlyb format
        processed_data = _alchemlyb.concat(sampled_data)

        return processed_data

def _analyse_mbar(files, temperatures, lambdas, engine):
    """Analyse existing free-energy data using MBAR and the alchemlyb library.
        Parameters
        ----------
        files : list
            List of files for all lambda values to analyse. Should be sorted.
        temperatures : list
            List of temperatures at which the simulation was carried out at for each lambda window.
            Index of the temperature value should match it's corresponding lambda window index in files.
        lambdas : list
            Sorted list of lambda values used for the simulation.
        engine : str
            Engine with which the simulation was run.
        Returns
        -------
        pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
            The potential of mean force (PMF). The data is a list of tuples,
            where each tuple contains the lambda value, the PMF, and the
            standard error.
        overlap : numpy.matrix 
            The overlap matrix. This gives the overlap between each lambda
            window.
    """

    function_glob_dict = {
        "GROMACS": (_gmx_extract_u_nk)
    }

    # Extract the data.
    func = function_glob_dict[engine]
    try:
        u_nk = [func(x, T=t) for x, t in zip(files, temperatures)]
    except:
        print("Could not extract the data from the provided files!")

    # Preprocess the data.
    try:
        processed_u_nk = _preprocessing_extracted_data(u_nk)
    except:
        _warnings.warn("Could not preprocess the data.")
        processed_u_nk = u_nk

    try:
        mbar = _AutoMBAR().fit(processed_u_nk)
    except:
        print("MBAR free-energy analysis failed!")

    # Extract the data from the mbar results.
    data = []
    # Convert the data frames to kcal/mol.
    delta_f_ = _to_kcalmol(mbar.delta_f_)
    d_delta_f_ = _to_kcalmol(mbar.d_delta_f_)
    for lambda_, t in zip(lambdas, temperatures):
        x = lambdas.index(lambda_)
        mbar_value = delta_f_.iloc[0, x]
        mbar_error = d_delta_f_.iloc[1, x]

        # Append the data.
        data.append((lambda_,
                    (mbar_value) * _Units.Energy.kcal_per_mol,
                    (mbar_error) * _Units.Energy.kcal_per_mol))

    # Calculate overlap matrix.
    overlap = mbar.overlap_matrix

    return (data, overlap)


def _analyse_gromacs(work_dir=None, estimator=None, method="alchemlyb"):
        """Analyse the GROMACS free energy data.
           Parameters
           ----------
           work_dir : str
               The path to the working directory.
           estimator : str
               The estimator ('MBAR' or 'TI') used.
           Returns
           -------
           pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
               The potential of mean force (PMF). The data is a list of tuples,
               where each tuple contains the lambda value, the PMF, and the
               standard error.
           overlap or dHdl : numpy.matrix or alchemlyb.estimators.ti_.TI
               For MBAR, this returns the overlap matrix for the overlap between each lambda window.
               For TI, this returns the gradients for plotting a graph.
        """

        if not isinstance(work_dir, str):
            raise TypeError("'work_dir' must be of type 'str'.")
        if not _os.path.isdir(work_dir):
            raise ValueError("'work_dir' doesn't exist!")


        if method == "alchemlyb":

            files = sorted(_glob(work_dir + "/lambda_*/gromacs.xvg"))
            lambdas = [float(x.split("/")[-2].split("_")[-1]) for x in files]

            # find the temperature at each lambda window
            temperatures = []
            for file in files:
                found_temperature = False
                with open(file, 'r') as f:
                    for line in f.readlines():
                        t = None
                        start = 'T ='
                        end = '(K)'
                        if start and end in line:
                            t = int(
                                ((line.split(start)[1]).split(end)[0]).strip())
                            temperatures.append(t)
                            if t is not None:
                                found_temperature = True
                                break

                if not found_temperature:
                    raise ValueError(
                        f"The temperature was not detected in the GROMACS output file, {file}")

            if temperatures[0] != temperatures[-1]:
                raise ValueError(
                    "The temperatures at the endstates don't match!")

            if estimator == 'MBAR':
                data, overlap = _analyse_mbar(
                    files, temperatures, lambdas, "GROMACS")


            return (data, overlap)

In [18]:
import os
paths = glob.glob(f"../kpc2/outputs/GROMACS/*/")
for path in paths:
    unbound_lambdas = sorted(glob.glob(f"{path}/unbound/*"))
    bound_lambdas = sorted(glob.glob(f"{path}/bound/*"))
    for i in range(len(unbound_lambdas)):
        unbound_file = unbound_lambdas[i] + "/afe/gromacs.xvg"
        os.system(f"cp {unbound_file} {unbound_lambdas[i]}")
        bound_file = bound_lambdas[i] + "/afe/gromacs.xvg"
        os.system(f"cp {bound_file} {bound_lambdas[i]}")

In [28]:
analysis_file = "../kpc2/afe/results_analysis.txt"
with open(analysis_file, "w") as file:
    for path in paths:
        transformation = path.split("/")[-2]
        file.write(transformation+"\n")
        free_directory = path + "unbound/"
        bound_directory = path + "bound/"
        try:
            pmf_free, overlap_matrix_free = _analyse_gromacs(free_directory, "MBAR")
            pmf_bound, overlap_matrix_bound = _analyse_gromacs(bound_directory, "MBAR")    
            free_energy_difference, free_energy_error = bss.FreeEnergy.Relative.difference(pmf_bound, pmf_free)
            file.write(f"{free_energy_difference} \u00B1 {free_energy_error} kcal mol\u207B\u00B9 \n")    
        except IndexError as e:
            file.write(str(e)+"\n")
        except ValueError as e:
            file.write(str(e)+"\n")

In [14]:
path = f"../kpc2/outputs/GROMACS/lig_10~lig_8/"
free_directory = path + "unbound/"
bound_directory = path + "bound/"

pmf_free, overlap_matrix_free = _analyse_gromacs(free_directory, "MBAR")
pmf_bound, overlap_matrix_bound = _analyse_gromacs(bound_directory, "MBAR")    
free_energy_difference, free_energy_error = bss.FreeEnergy.Relative.difference(pmf_bound, pmf_free)
print(free_energy_difference, free_energy_error)    

IndexError: list index out of range