In [1]:

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import namedtuple
from tqdm import tqdm
import seaborn as sns

To-do
- [x] Load in activitiy coefficients from test predictions
- [x] Download corresponding antoine coefficients


In [2]:
# Read in list of molecule names and CAS numbers
molecule_list = pd.read_csv("../data/01_raw/molecule_list.csv")
molecule_list = molecule_list[["solvent_name_1", "smiles", "cas_number"]]
molecule_list

Unnamed: 0,solvent_name_1,smiles,cas_number
0,"(1z,5z)-cycloocta-1,5-diene",C1=CCCC=CCC1,111-78-4
1,"(2e,4e)-2,4-hexadiene",CC=CC=CC,5194-51-4
2,(dichloromethyl)benzene,ClC(Cl)c1ccccc1,98-87-3
3,"(e)-1,3-pentadiene",C=CC=CC,2004-70-8
4,(e)-2-butenenitrile,CC=CC#N,627-26-9
...,...,...,...
610,tert-amyl methyl ether,CCC(C)(C)OC,994-05-8
611,tert-amyl ethyl ether,CCOC(C)(C)CC,
612,tetrahydro furfuryl alcohol,OCC1CCCO1,97-99-4
613,tetramethyl tetrahydrofurane,CC1(C)CCC(C)(C)O1,15045-43-9


In [185]:
# Read in test data
test_df = pd.read_csv("../data/05_model_input/cosmo/test_indp.csv")
test_df_features = pd.read_csv("../data/05_model_input/cosmo/test_indp_features.csv")
test_df = pd.concat([test_df, test_df_features], axis=1)
test_df_pred = pd.read_csv("../data/07_model_output/cosmo/DG-TLCB_test_indp_preds.csv")
test_df_pred = test_df_pred[["ln_gamma_1", "ln_gamma_2"]]
test_df_pred = test_df_pred.rename(lambda x: x + "_pred", axis=1)
test_df = pd.concat([test_df, test_df_pred], axis=1)
test_df = test_df.merge(molecule_list, left_on="smiles_1", right_on="smiles", how="left")
test_df = test_df.drop(columns=["smiles"]).rename(columns={"solvent_name_1": "name_1"})
test_df = test_df.merge(molecule_list, left_on="smiles_2", right_on="smiles", how="left")
test_df = test_df.drop(columns=["smiles"]).rename(columns={"solvent_name_1": "name_2"})
for i in range(1, 3):
    test_df["error"]  = (test_df[f"ln_gamma_{i}"] - test_df[f"ln_gamma_{i}_pred"]).abs()
    test_df[f"gamma_{i}"] = np.exp(test_df[f"ln_gamma_{i}"])
    test_df[f"gamma_{i}_pred"] = np.exp(test_df[f"ln_gamma_{i}_pred"])
test_df_grouped = test_df.groupby(by=["cas_number_x", "cas_number_y"]).mean().reset_index()
test_df_grouped = test_df_grouped[["cas_number_x", "cas_number_y", "error"]]
test_df_grouped.sort_values("error")

Unnamed: 0,cas_number_x,cas_number_y,error
3071,542-18-7,10025-87-3,0.006473
3284,542-18-7,96-33-3,0.007024
3211,542-18-7,554-12-1,0.009839
2890,496-11-7,629-04-9,0.010264
596,105-57-7,98-82-8,0.010451
...,...,...,...
1549,119-64-2,509-14-8,2.942904
2033,352-93-2,509-14-8,2.990716
3555,626-67-5,509-14-8,3.022911
2481,493-02-7,509-14-8,3.841458


In [11]:
# Import get function from requests module because is the function in charge of
# getting the HTTP GET request with the given url.
from requests import get

# Import BeautifulSoup from bs4 because make the html parse and help us to
# handle de DOM.
from bs4 import BeautifulSoup

# Import closing for ensure that any network resource will free when they go out
# of scope.
from contextlib import closing

import urllib.parse
import re

def get_antoine_coef(cas_number, temperatures):

    """ Return a list with the coefficients A, B and C if they exist for the
        given Temperature. If not, return None and print it.
    :param Name:
        A string with the name of the compound in English.
    :param Temperature:
        A float number with the temperature in Kelvin.
    :rtype: List
    :return coef with [A, B, C]
    """

    # Obtaining the table using the get_html function showed below. Table is a
    # BeautifulSoup Object.
    table = get_html_table(cas_number)

    # Extract the rows from the table. Knowing what tags have an HTML table.
    # Also, knowing that the fist row with he table header does not have the
    # class attribute 'exp' so we obtain just the rows with data.
    # The find_all function from BeautifulSoup return a list
    try:
        rows = table.find_all('tr', class_='exp')
    except AttributeError:
        for _ in range(len(temperatures)):
            yield [None, None, None]
        return

    # Declaring the lists for storage Temperatures, and coefficients.
    Temperatures_range, As, Bs, Cs = [], [], [], []

    # Looping over rows to extract and fill As, Bs, and Cs variables because now
    # we are sure the Temperatues is between some range.
    for row in rows:

        # As the rows, we extract the columns for the current row. Knowing that
        # the cols have the <td> tag in HTML as well
        # The find_all function from BeautifulSoup return a list
        cols =  row.find_all('td')

        # Remote uncertainty
        clean = lambda text: float(re.split(r"±\s+[\d\.\w]+", text)[0])
        # for i in range(len(cols)):
        #     res = re.split(r"±\s+[\d\.\w]+", cols[i].text)
        #     cols[i].text = res[0]

        # First transform the strings into float numbers and put them in their
        # respective list
        As.append(clean(cols[1].text))
        Bs.append(clean(cols[2].text))
        Cs.append(clean(cols[3].text))

        # For the temperatures, we have a range and we need to extract each
        # limit (lower and higher) and put them in an extra list. So
        # Temperatures variable will be a list of lists.
        lower_lim = float(cols[0].text.replace(" ","").split('-')[0])
        higher_lim = float(cols[0].text.replace(" ","").split('-')[1])
        Temperatures_range.append([lower_lim, higher_lim])


    # Checking if the Temperature gave fits in some interval
    index = None
    for temperature in temperatures:
        for i, interval in enumerate(Temperatures_range):
            if (interval[0] <= temperature
                and temperature <= interval[1]):
                index = i
                break
            else:
                index = None

        if index == None:
            yield [None, None, None]
        else:
            A = As[index]
            B = Bs[index]
            C = Cs[index]
            yield [A, B, C]


def get_html_table(cas_number):

    """ Return the html already parsed using the a helper function listed below.
    :param Name:
        A string with the name of the compound in English.
    :rtype: BeautifulSoup Object
    """

    # The name parameter is part of the url. For example, if you want the
    # methane data, the url is
    # https://webbook.nist.gov/cgi/cbook.cgi?Name=methane&Mask=4.
    # Name = urllib.parse.quote(Name)
    # url = str.format('https://webbook.nist.gov/cgi/cbook.cgi?Name={0}&Mask=4', Name)
    cas_number = urllib.parse.quote(cas_number)
    url = str.format('https://webbook.nist.gov/cgi/cbook.cgi?ID={0}&Mask=4', cas_number)

    # Function to get the request made, see below.
    raw_html = get_response(url)

    # Parse the html using BeautifulSoup.
    html = BeautifulSoup(raw_html, 'html.parser')

    # Extract the table that contains the data, the table has a specific
    # attributes 'aria-label' as 'Antoine Equation Parameters'.
    table = html.find('table', attrs={'aria-label': 'Antoine Equation Parameters'})

    return table


def get_response(url):

    """ Return the raw_html for parsing later or None if can't reach the page
    :param url:
        The string for the GET request.
    :rtype: BeautifulSoup Object
    :rtype: None if can't reach the website
    """

    try: 
        with closing(get(url, stream=True)) as resp:
            if is_good_response(resp):
                return resp.content
            else:
                return None
    
    except:
        print('Not found')
        return None


def is_good_response(resp):
    """
    Returns True if the response seems to be HTML, False otherwise.
    """
    content_type = resp.headers['Content-Type'].lower()
    return (resp.status_code == 200 
            and content_type is not None 
            and content_type.find('html') > -1) 


In [171]:
# Download Antoine coefficients from NIST
all_names = pd.concat([
    test_df[["cas_number_x", "temperature (K)"]].rename(columns={"cas_number_x": "cas_number"}), 
    test_df[["cas_number_y", "temperature (K)"]].rename(columns={"cas_number_y": "cas_number"})
])
all_names_grouped = all_names.drop_duplicates().groupby("cas_number")
n = all_names["cas_number"].nunique()
antoine_data = pd.DataFrame(columns=["A", "B", "C", "temperature (K)", "cas_number"])
for cas_number, data in tqdm(all_names_grouped, total=n):
    coeffs = list(get_antoine_coef(cas_number, data["temperature (K)"].values))
    coeffs = np.array(coeffs)
    coeffs = pd.DataFrame(coeffs, columns=["A", "B", "C"])
    coeffs["temperature (K)"] = data["temperature (K)"].values
    coeffs["cas_number"] = cas_number
    antoine_data = pd.concat([antoine_data, coeffs], ignore_index=True)

100%|██████████| 440/440 [02:44<00:00,  2.68it/s]


In [186]:
test_df = test_df.merge(
    antoine_data, 
    left_on=["temperature (K)", "cas_number_x"], 
    right_on=["temperature (K)", "cas_number"],
    how="left"
)
test_df = test_df.drop("cas_number", axis=1)
test_df = test_df.rename(columns={"A": "A_1", "B": "B_1", "C": "C_1"})
test_df = test_df.merge(
    antoine_data, 
    left_on=["temperature (K)", "cas_number_y"], 
    right_on=["temperature (K)", "cas_number"],
    how="left"
)
test_df = test_df.drop("cas_number", axis=1)
test_df = test_df.rename(columns={"A": "A_2", "B": "B_2", "C": "C_2"})
test_df["antoine_exists"] = test_df["A_1"].notnull() & test_df["B_1"].notnull() & test_df["C_1"].notnull() & test_df["A_2"].notnull() & test_df["B_2"].notnull() & test_df["C_2"].notnull()

In [194]:
# Rank pairs by error and only include those with Antoine data
test_df[test_df["antoine_exists"] == True].groupby(["cas_number_x", "cas_number_y"]).mean().reset_index().sort_values("error", ascending=False)

Unnamed: 0,cas_number_x,cas_number_y,ln_gamma_1,ln_gamma_2,temperature (K),x(1),ln_gamma_1_pred,ln_gamma_2_pred,error,gamma_1,gamma_1_pred,gamma_2,gamma_2_pred,antoine_exists
719,352-93-2,509-14-8,-0.528198,-0.540824,366.65,0.5,1.656739,2.553515,3.106301,0.652602,13.253072,0.654933,229.167069,1.0
581,123-75-1,509-14-8,-0.354806,-0.386738,365.15,0.5,1.367552,2.325459,2.712197,0.729697,4.352789,0.730426,49.524396,1.0
943,509-14-8,540-84-1,-0.282770,-0.286711,372.15,0.5,3.245013,2.251714,2.538424,0.777608,828.714478,0.777922,35.578958,1.0
950,509-14-8,75-69-4,-0.699789,-0.670406,314.65,0.5,2.998709,1.709026,2.380626,0.613254,678.130765,0.589653,16.673891,1.0
940,509-14-8,121-44-8,-0.289033,-0.347277,363.15,0.5,2.623770,1.914530,2.261807,0.774144,370.579895,0.757274,21.455268,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
126,105-57-7,56-23-5,0.068279,0.075905,350.15,0.5,0.036398,0.080439,0.012256,1.073637,1.042479,1.081227,1.085457,1.0
424,110-81-6,493-02-7,0.165215,0.161342,427.65,0.5,0.096715,0.157850,0.011462,1.196032,1.116152,1.187788,1.184484,1.0
423,110-81-6,493-01-6,0.165215,0.161342,427.65,0.5,0.096715,0.157850,0.011462,1.196032,1.116152,1.187788,1.184484,1.0
377,110-81-6,108-88-3,-0.008627,-0.004996,411.15,0.5,-0.023185,-0.011669,0.011136,0.991601,0.977675,0.995023,0.988468,1.0


In [205]:
antoine_coefficient = namedtuple('antoine_coefficient', ['A', 'B', 'C'])

def antoine_equation(T, coefficients: antoine_coefficient):
    return 10**(coefficients.A - coefficients.B/(T + coefficients.C))

def gamma_phi_pressure(T, x, gamma_1, gamma_2, component_1_antoine, component_2_antoine):
    Psat_1  = antoine_equation(T, component_1_antoine)
    Psat_2 = antoine_equation(T, component_2_antoine)
    P = x*gamma_1* Psat_1 + (1-x)*gamma_2*Psat_2
    return  P

def make_pxy(data, suffix=""):
    Ps = []
    ys = []
    data = data.copy()
    antoine_cols_1 = ["A_1", "B_1", "C_1"]
    antoine_cols_2 = ["A_2", "B_2", "C_2"]
    for i in range(data.shape[0]):
        x = data["x(1)"].iloc[i]
        T = data["temperature (K)"].iloc[i]

        # Get antoine coefficients
        antoine_data_1 = data[antoine_cols_1].iloc[i].values.astype(float)
        if np.isnan(antoine_data_1).any():
            Ps.append(np.nan)
            ys.append(np.nan)
            continue
        ac_1 = antoine_coefficient(*antoine_data_1)
        
        antoine_data_2 = data[antoine_cols_2].iloc[i].values.astype(float)
        if np.isnan(antoine_data_2).any():
            Ps.append(np.nan)
            ys.append(np.nan)
            continue
        ac_2 = antoine_coefficient(*antoine_data_2)

        # Calculate compositions
        gamma_1 = data["gamma_1" + suffix].iloc[i]
        gamma_2 = data["gamma_2"+suffix].iloc[i]
        P_sys = gamma_phi_pressure(T, x, gamma_1, gamma_2, ac_1, ac_2)
        Ps.append(P_sys)
        Psat_1  = antoine_equation(T, ac_1)
        ys.append(x*gamma_1*Psat_1/P_sys)
    data["y"+suffix] = ys
    data["P"+suffix] = Ps
    return data

def make_pxy_plot(cas_number_1, cas_number_2, test_data, n_cols=2):
    data = test_data[
        (test_data["cas_number_x"] == cas_number_1) & 
        (test_data["cas_number_y"] == cas_number_2)
    ].copy()
    name_1 = data["name_1"].iloc[0]
    name_2 = data["name_2"].iloc[0]
    data = data.sort_values(by="temperature (K)")
    data = make_pxy(data)
    data = make_pxy(data, suffix="_pred")
    data = data.dropna()
    if data["temperature (K)"].nunique() > 1:
        data["T"] = data["temperature (K)"].astype(str) + " K"
        g = sns.FacetGrid(data, col="T", col_wrap=n_cols, legend_out=True)
        g.map(sns.lineplot, "x(1)", "P", color="k", label="COSMO-RS", alpha=0.5)
        g.map(sns.lineplot, "y", "P", color="k",alpha=0.5)
        g.map(sns.scatterplot, "x(1)", "P_pred", color="#025b66", label="DG")
        g.map(sns.scatterplot, "y", "P_pred", color="#025b66")
        g.set_xlabels("x,y")
        g.set_ylabels("P (bar)")
        n_rows = len(g.axes)
        g.figure.subplots_adjust(top=1 - 0.6/n_rows)
        g.figure.suptitle(f"{name_1} + {name_2}", fontsize=11)
        g.add_legend()
        fig = g.figure
    else:
        fig, ax = plt.subplots(1)
        T = data["temperature (K)"].iloc[0]
        sns.lineplot(data=data, x="x(1)", y="P", color="k", label="COSMO-RS",ax=ax, alpha=0.5)
        sns.lineplot(data=data, x="y", y="P", color="k",ax=ax, alpha=0.5)
        sns.scatterplot(data=data, x="x(1)", y="P_pred", color="#025b66", label="DG",ax=ax)
        sns.scatterplot(data=data, x="y", y="P_pred", color="#025b66",ax=ax)
        ax.set_xlabel("x,y", fontsize=14)
        ax.set_ylabel("P (bar)", fontsize=14)
        ax.set_title(f"{name_1} + {name_2} (T = {T} K)", fontsize=14)
        ax.tick_params(labelsize=10)
        # ax.xaxis.set_tick_params(labelsize=16)
        # ax.yaxis.set_tick_params(labelsize=16)
        ax.tick_params(direction="in", which="both")
        # g.legend()
    return fig
# Low error
fig = make_pxy_plot(
    "105-57-7",
    "75-15-0",
    test_df,
    n_cols=2
)
fig.savefig("../data/08_reporting/cosmo/low_error_pxy.png", dpi=300)

# High error
g = make_pxy_plot(
    "105-57-7",
    "56-23-5",
    test_df
)
g.figure.savefig("../data/08_reporting/cosmo/high_error_pxy.png", dpi=300)