In [51]:
import pandas as pd
import plotly.express as px
import numpy as np
from scipy.stats import linregress

In [52]:
class pk_data:
    def __init__(self, data):
        '''Initialize the pk_data object. Accepts either a DataFrame or a CSV file path.'''
        if isinstance(data, str):  # If a file path is given
            self.df = pd.read_csv(data)
        elif isinstance(data, pd.DataFrame):
            self.df = data
        else:
            raise ValueError("Input data must be a DataFrame or a path to a CSV file.")

        self.list_ids = self.df['ID'].unique()

    def summarize(self):
        summ = self.df.groupby('TIME')['CONC'].agg(["count", "mean", "std", "median", "min", "max"]).reset_index()
        return summ
        
    def summ_stats(self, vals:list, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR']):
        # Ensures user requests valid stats
        for i in stat:
            if i not in ['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR']:
                raise Exception("""Unsupported statistic. Please enter an array of one or more of 'mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', or 'IQR'.
               Leave blank to calculate all statistics.""")
    
        vals_series = pd.Series(vals)
    
        stats = {
                'mean': vals_series.mean(),
                'sd': vals_series.std(),
                'min': vals_series.min(),
                'max': vals_series.max(),
                'Q1': vals_series.quantile(0.25),
                'median': vals_series.median(),
                'Q3': vals_series.quantile(0.75),
                'IQR': vals_series.quantile(0.75) - vals_series.quantile(0.25)
        }
    
        stats = {k: v for k, v in stats.items() if k in stat}   
    
        return pd.DataFrame([stats])

    def half_life(self, term_elim_times:list, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR']):
        half_lives = []
        for ID in self.list_ids:
            subset = self.df.loc[self.df['ID'] == ID]
            ln_conc = np.log(subset['CONC'])
            slope, _, r_value, _, _ = linregress(subset['TIME'], ln_conc)
            if slope >= 0:
                continue  # biologically invalid slope, skip
            half_life = np.log(2) / -slope
            half_lives.append(half_life)
        return self.summ_stats(half_lives, stat=stat)

    def cmax(self, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR']):
        cmax_vals = self.df.groupby('ID')['CONC'].max()
        return self.summ_stats(cmax_vals, stat=stat)

    def tmax(self, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR']):
        tmax_vals = []
        for ID in self.list_ids:
            subset = self.df.loc[self.df['ID'] == ID]
            tmax = subset.loc[subset['CONC'] == subset['CONC'].max()]['TIME'].iloc[0]
            tmax_vals.append(tmax)
        return self.summ_stats(tmax_vals, stat=stat)

    def auc(self, start:int, end:int, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR'], ind=False):
        auc_vals = []
        for ID in self.list_ids:
            subset_id = self.df.loc[self.df['ID'] == ID]
            subset_time = subset_id[(subset_id['TIME'] >= start) & (subset_id['TIME'] <= end)]
            auc = np.trapezoid(subset_time['CONC'], subset_time['TIME'])
            auc_vals.append(auc)

        if ind:
            return auc_vals
        else:
            return self.summ_stats(auc_vals, stat=stat)

    def vd(self, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR'], silence_message=False):
        if not silence_message:
            print("NB: The current iteration of 'vd()' only works for a single bolus dose given at 'TIME' == 0.")
        
        vd_vals = []
        for ID in self.list_ids:
            subset = self.df.loc[(self.df['ID'] == ID) & (self.df['TIME'] == 0)]
            dose = subset['DOSE'].iloc[0]
            c0 = subset['CONC'].iloc[0]
            vd = dose / c0
            vd_vals.append(vd)
        return self.summ_stats(vd_vals, stat=stat)

    def cl(self, start, end, stat=['mean', 'sd', 'min', 'max', 'Q1', 'median', 'Q3', 'IQR'], silence_message=False):
        if not silence_message:
            print("NB: The current iteration of 'cl()' only works for a single bolus dose given at 'TIME' == 0.")
            
        subset = self.df.loc[self.df['DOSE'] != 0].copy().reset_index()
        auc = self.auc(start=start, end=end, ind=True)
        cl_vals = subset['DOSE'] / auc
        return self.summ_stats(cl_vals, stat=stat)
    
    def plot(self, summarized=False, log_scale=False):
        if summarized:
            summary = self.summarize()
            fig = px.line(summary, x="TIME", y="mean", error_y="std", markers=True, log_y=log_scale,
                          title="Mean (SD) PK concentration profile")
        else:
            fig = px.line(self.df, x="TIME", y="CONC", color="ID", markers=True, log_y=log_scale,
                          title="Individual PK concentration profiles")
        
        fig.update_layout(xaxis_title="Time", yaxis_title="Concentration")
        return fig

    def report(self, term_elim_times:list, start, end):
        print(f"Summary of PK data: \n{self.summarize()}\n\n")
        print("Results of NCA:\n")
        print(f"Cmax: \n{self.cmax()}\n\n")
        print(f"Tmax: \n{self.tmax()}\n\n")
        print(f"t1/2: \n{self.half_life(term_elim_times=term_elim_times)}\n\n")
        print(f"AUC({start}-{end}): \n{self.auc(start=start, end=end)}\n\n")
        print(f"Vd: \n{self.vd(silence_message=True)}\n\n")
        print(f"CL: \n{self.cl(start=start, end = end, silence_message=True)}\n\n")

    def report_df(self, term_elim_times:list, start, end):
        df_cmax = self.cmax()
        df_tmax = self.tmax()
        df_t12 = self.half_life(term_elim_times=term_elim_times)
        df_auc = self.auc(start=start, end=end)
        df_vd = self.vd(silence_message=True)
        df_cl = self.cl(start=start, end = end, silence_message=True)
        
        df_cmax.insert(0, "Parameter", "Cmax")
        df_tmax.insert(0, "Parameter", "Tmax")
        df_t12.insert(0, "Parameter", "t1/2")
        df_auc.insert(0, "Parameter", "AUC")
        df_vd.insert(0, "Parameter", "Vd")
        df_cl.insert(0, "Parameter", "CL")

        df_merge = pd.concat([df_cmax, df_tmax, df_t12, df_auc, df_vd, df_cl], ignore_index=True)
        
        return df_merge

        

In [53]:
pk = pk_data("pk_dummy_iv_bolus_1cmt.csv")

In [54]:
pk.report_df(term_elim_times = [48, 72, 168], start = 0, end = 168)

Unnamed: 0,Parameter,mean,sd,min,max,Q1,median,Q3,IQR
0,Cmax,98.683,3.653723,90.63,102.98,98.035,98.375,101.5775,3.5425
1,Tmax,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,t1/2,12.593292,0.038571,12.512272,12.648467,12.575971,12.593826,12.614522,0.038551
3,AUC,1869.785,45.78558,1793.955,1945.475,1841.2825,1859.84,1902.4725,61.19
4,Vd,1.01464,0.038868,0.971062,1.103387,0.984601,1.01652,1.020044,0.035443
5,CL,0.053511,0.001308,0.051401,0.055743,0.052563,0.053768,0.05431,0.001747
