Depthseek Pipeline

Make sure you had completed sort your sample data by col1.
If didnot, please use the following command in your terminal:

"sort -S 50% --parallel 16 -k1,1 /path/validpairs > /output_path/sorted.validpairs",

then let's start.

Oh, right. For the small samples in the early sequencing stage, the sorting process won't take long. If you want to verify with samples that have a deeper sequencing depth, 

the sort process may take longer. We recommend that when Depthseek is used for deep sequencing samples, it is best not to exceed the range of predicting 200million from 

15 million reads. 

You know, this is indeed a kind of limit. hahah

# Import Libraries for Depthseek Pipeline

In [None]:
import os
import sys
import argparse
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
import warnings
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
from matplotlib import rcParams

# Set Global Parameters 

In [None]:
plt.rcParams['font.family'] = 'Arial'
plt.rcParams['font.size'] = 6
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
input_file = "./Demo/test_data/SRR10093262.sorted.validPairs" 
output_dir = "./Demo/test_output" 
temp_dir = "./Demo/test_tmp"

os.makedirs(output_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)

# Set the Parameters for Processing and Predicting tasks of Depthseek Pipeline

## Parameters for Processing

In [None]:
sort_threads = 64
sort_memory = "6G"
max_parallel_jobs = 20


## Parameters for Predicting

When the prediction task is Forward prediction, what is executed is the library complexity of predicting the Target value based on the data. Conversely, when the task values are Inverse, the task is to predict the depth of library sequencing required for the Target (in this case, it is the library complexity or the number of effective interactions) based on the data.

In [None]:
prediction_mode = "forward"  ##TODO Mode："forward" or "inverse"
target_value = 150          ##TODO Target: forward mode is Million reads, while inverse mode is reads.(Important!)
model_type = "depthseek"         ##TODO Formula: "lw", "depthseek", or "both". (lw is Lander-Waterman Formula, depthseek is Self-adaptive lw)

# Analog sampling sequencing (FIFO downsampling and deduplication)

In [None]:
def run_fifo_downsample_dedup(input_file, output_dir, temp_dir, sort_threads, sort_memory, max_parallel_jobs):
    """Run the FIFO downsampling and deduplication process"""
    
    def validate_args(args):
        if not os.path.isfile(args.i):
            sys.exit(f"Error: Input file {args.i} does not exist")
        os.makedirs(args.o, exist_ok=True)
        os.makedirs(args.t, exist_ok=True)

    def run_task(line, args, sample_name):
        start_time = time.time()
        sample_tmp_dir = os.path.join(args.t, sample_name, str(line))
        os.makedirs(sample_tmp_dir, exist_ok=True)
        
        fifo_path = os.path.join(sample_tmp_dir, 'data.fifo')
        try:
            os.mkfifo(fifo_path)
        except FileExistsError:
            pass

        # Extract the first N lines to the FIFO
        head_cmd = f"head -n {line} {args.i} > {fifo_path}"
        head_proc = subprocess.Popen(head_cmd, shell=True)
        
        # Process Data
        sort_cmd = [
            'sort',
            f'--parallel={args.p}',
            f'-T', sample_tmp_dir,
            f'-S', args.s,
            '-k2,2V',
            '-k3,3n',
            '-k5,5V',
            '-k6,6n',
            fifo_path
        ]
        
        awk_cmd = [
            'awk',
            '-F\t',
            'BEGIN{c1=0;c2=0;s1=0;s2=0}(c1!=$2 || c2!=$5 || s1!=$3 || s2!=$6){print;c1=$2;c2=$5;s1=$3;s2=$6}'
        ]
        
        wc_cmd = ['wc', '-l']
        
        # Build a processing Pipe
        sort_proc = subprocess.Popen(sort_cmd, stdout=subprocess.PIPE)
        awk_proc = subprocess.Popen(awk_cmd, stdin=sort_proc.stdout, stdout=subprocess.PIPE)
        wc_proc = subprocess.Popen(wc_cmd, stdin=awk_proc.stdout, stdout=subprocess.PIPE)
        
        # Obtain the final result
        dedup_lines = wc_proc.communicate()[0].decode().strip()
        
        # Generate the output file
        data_file = os.path.join(args.o, f"{sample_name}_{line}_data.txt")
        with open(data_file, 'w') as f:
            normalized_line = line // 1000000
            f.write(f"{normalized_line}\t{dedup_lines}\n")
        
        # Record the running time
        runtime = int(time.time() - start_time)
        runtime_log = os.path.join(args.o, f"{sample_name}_runtime_log.txt")
        with open(runtime_log, 'a') as f:
            f.write(f"Task {line} completed in {runtime} seconds\n")
        
        # Clean up temporary files
        head_proc.wait()
        os.remove(fifo_path)
        os.rmdir(sample_tmp_dir)

    # Create parameter objects
    class Args:
        def __init__(self, i, o, t, p, s, f):
            self.i = i
            self.o = o
            self.t = t
            self.p = p
            self.s = s
            self.f = f
    
    args = Args(input_file, output_dir, temp_dir, sort_threads, sort_memory, max_parallel_jobs)
    validate_args(args)
    
    # Extract sample name
    file_name = os.path.basename(args.i)
    sample_name = file_name.split('_sorted')[0]
    
    # Generate task list
    tasks = [i * 1000000 for i in range(1, 21)]  ##TODO range(1,x) should be changed for your data sample.
    
    # Parallel processing tasks
    with ThreadPoolExecutor(max_workers=args.f) as executor:
        futures = []
        for line in tasks:
            futures.append(executor.submit(run_task, line, args, sample_name))
        
        # Wait for all tasks to be completed
        for future in futures:
            try:
                future.result()
            except Exception as e:
                print(f"Task failed: {e}")
    
    # Merge result files
    combined_data = []
    for line in tasks:
        data_file = os.path.join(args.o, f"{sample_name}_{line}_data.txt")
        if os.path.exists(data_file):
            with open(data_file) as f:
                combined_data.append(f.read().strip())
    
    # Sort and write to the final file
    final_output = os.path.join(args.o, f"{sample_name}_final_output.txt")
    sorted_data = sorted(combined_data, key=lambda x: int(x.split('\t')[0]))
    with open(final_output, 'w') as f:
        f.write("\n".join(sorted_data))
    
    # Clean up temporary files
    for line in tasks:
        data_file = os.path.join(args.o, f"{sample_name}_{line}_data.txt")
        if os.path.exists(data_file):
            os.remove(data_file)

    runtime_log = os.path.join(args.o, f"{sample_name}_runtime_log.txt")
    max_runtime = 0
    if os.path.exists(runtime_log):
        with open(runtime_log) as f:
            for line in f:
                if 'completed' in line:
                    rt = int(line.split()[-2])
                    max_runtime = max(max_runtime, rt)
        with open(runtime_log, 'a') as f:
            f.write(f"Longest runtime: {max_runtime} seconds\n")
    
    print(f"Results saved to {final_output}")
    print(f"Longest runtime: {max_runtime} seconds")
    return final_output

# Run Analog Sampling Sequencing

In [None]:
print("Statr -- Analog Sampling Sequencing...")
final_output_file = run_fifo_downsample_dedup(input_file, output_dir, temp_dir, sort_threads, sort_memory, max_parallel_jobs)
print("Finished -- Analog Sampling Sequencing!")

# Data Analysis and Prediction

In [None]:
def load_and_preprocess_data(file_path):
    """Loading and Preprocessing data"""
    data = np.loadtxt(file_path)
    X = data[:, 0].reshape(-1, 1)
    y = data[:, 1].reshape(-1, 1)
    
    scaler_X = StandardScaler()
    X = scaler_X.fit_transform(X)
    
    scaler_y = StandardScaler()
    y = scaler_y.fit_transform(y)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, scaler_X, scaler_y


In [None]:
def build_model(model_type):
    """Building the prediction model"""  ##TODO Make a choice~(although we had tested for many times, the results is similar )
    if model_type == 'lasso':
        model = Lasso(alpha=0.1, random_state=42)
    elif model_type == 'random_forest':
        model = RandomForestRegressor(n_estimators=100, random_state=42)
    elif model_type == 'xgboost':
        model = XGBRegressor(random_state=42)
    elif model_type == 'svm':
        model = SVR(kernel='rbf')
    else:
        raise ValueError("Unsupported model type")
    return model

In [None]:
def train_model(model, X_train, y_train):
    """Training"""
    model.fit(X_train, y_train.ravel())
    return model

In [None]:
def evaluate_model(model, X_train, y_train, X_test, y_test):
    """Evaluate the Models"""
    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    
    train_loss = mean_squared_error(y_train, train_pred)
    test_loss = mean_squared_error(y_test, test_pred)
    
    print(f"Train Loss: {train_loss}")
    print(f"Test Loss: {test_loss}")
    return train_loss, test_loss


In [None]:
def custom_function_1(x, m):
    """lw"""
    return m - m * np.exp(-1000000 * x / m)

def custom_function_2(x, a, m):
    """Depthseek"""  # self-adaptive lw
    return m - m * np.exp(-((1000000*x)**a) / m)

In [None]:
# Define the inverse functions
def inverse_function_1(y, m):
    """inverse - lw"""
    y_clipped = np.minimum(y, m * 0.999)
    return (-m / 1000000) * np.log(1 - y_clipped / m)

def inverse_function_2(y, a, m):
    """inverse - Depthseek"""
    y_clipped = np.minimum(y, m * 0.999)
    inner = -m * np.log(1 - y_clipped / m)
    return (inner ** (1/a)) / 1000000

In [None]:
def estimate_parameters(X, y, y_max):
    """Parameter estimation optimization"""
    # lw -- for m -- Theoretical sequencing library complexity
    try:
        initial_guess_1 = [y_max * 0.8]
        bounds_1 = ([0.5*y_max], [np.inf])
        popt_1, _ = curve_fit(custom_function_1, 
                             X.ravel(), 
                             y.ravel(),
                             p0=initial_guess_1,
                             bounds=bounds_1,
                             method='trf',
                             maxfev=10000)
        m_est_1 = popt_1[0]
    except RuntimeError:
        m_est_1 = y_max

    # self-adaptive-lw -- for a and m -- a Library specific parameters,m Theoretical sequencing library complexity
    try:
        initial_guess_2 = [0.95, y_max*0.9]
        bounds_2 = ([0.8, 0.7*y_max], [1.2, np.inf])
        popt_2, _ = curve_fit(custom_function_2,
                             X.ravel(),
                             y.ravel(),
                             p0=initial_guess_2,
                             bounds=bounds_2,
                             method='trf',
                             maxfev=10000)
        a_est_2, m_est_2 = popt_2
    except RuntimeError:
        a_est_2, m_est_2 = 1.0, y_max

    return m_est_1, a_est_2, m_est_2


In [None]:
def calculate_deviation(y_true, y_pred):
    """Calculation deviation"""
    deviation = np.mean(np.abs((y_true - y_pred) / y_true))
    return deviation


In [None]:
def calculate_inverse_deviation(X_true, X_pred):
    """Calculation deviation for inverse mode"""
    valid_indices = (X_true > 1) & (X_pred > 1)
    if np.any(valid_indices):
        deviation = np.mean(np.abs((X_true[valid_indices] - X_pred[valid_indices]) / X_true[valid_indices]))
    else:
        deviation = np.mean(np.abs((X_true - X_pred) / np.maximum(X_true, 1)))
    return deviation

In [None]:
def plot_fitted_curve(X, y, m_est_1, a_est_2, m_est_2, scaler_X, scaler_y, label, color, ax1, ax2):
    """Plot the fit curve"""
    X = scaler_X.inverse_transform(X)
    y = scaler_y.inverse_transform(y)
    
    x_fit = np.linspace(0, min(X.max(), 1200), 100)
    y_fit_1 = custom_function_1(x_fit, m_est_1)
    y_fit_2 = custom_function_2(x_fit, a_est_2, m_est_2)
    
    ax1.plot(x_fit, y_fit_1, color='#1f77b4', linestyle='--', label=f'LW Model')
    ax2.plot(x_fit, y_fit_2, color='#ff7f0e', linestyle='-.', label=f'Depthseek Model')


In [None]:
def plot_single_fitted_curve(X, y, a_est, m_est, scaler_X, scaler_y, output_dir, dataset_name):
    """Plot one-fit curves (for HTML reporting)"""
    X = scaler_X.inverse_transform(X)
    y = scaler_y.inverse_transform(y)
    
    fig, ax = plt.subplots(figsize=(5/2.54, 5/2.54))
    
    # Plot the raw data points
    ax.scatter(X, y, s=5, color='#1E90FF', alpha=0.6, label='Observed Data')
    
    # Plot the fit curve
    x_fit = np.linspace(0, min(X.max(), 1200), 100)
    y_fit = custom_function_2(x_fit, a_est, m_est)
    ax.plot(x_fit, y_fit, color='#EB5B25', linestyle='-', linewidth=1, label='Fitted Curve')
    
    # Set graph properties
    ax.set_xlim(0, 800)
    ax.set_xlabel('Sequencing Depth (Million Reads)', fontsize=6)
    ax.set_ylabel('Unique Reads', fontsize=6)
    
    # Set the tick and border
    ax.tick_params(axis='both', which='major', direction='in', length=3, width=0.5)
    for spine in ax.spines.values():
        spine.set_linewidth(0.5)
    
    # Add formula text
    formula = r'$y = m - m \cdot \exp\left(-\frac{x^a}{m}\right)$'
    ax.text(0.95, 0.075, formula, 
        transform=ax.transAxes, 
        fontsize=5,
        horizontalalignment='right',
        verticalalignment='bottom'
        )

    # Add legend
    ax.legend(loc='lower right', 
          fontsize=5,
          framealpha=0.8,
          bbox_to_anchor=(0.95, 0.125),
          facecolor='white',
          edgecolor='none',
          borderpad=0.3)
    
    # save the output svg file
    output_svg = os.path.join(output_dir, f"{dataset_name}_fit_result.svg")
    plt.savefig(output_svg, format='svg', bbox_inches='tight', dpi=300)
    plt.close()

In [None]:
def generate_html_report(output_dir, dataset_name, a_est, m_est, y_pred_n, deviation, y_max, n_value):
    """Generate HTML"""
    html_content = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Fitting Report - {dataset_name}</title>
    <style>
        body {{ font-family: Arial, sans-serif; font-size: 10px; }}
        .container {{ width: 80%; margin: 0 auto; }}
        .image-container {{ text-align: center; margin: 10px 0; }}
        .results {{ margin: 15px 0; }}
        table {{ width: 100%; border-collapse: collapse; }}
        th, td {{ border: 1px solid #ddd; padding: 5px; text-align: left; }}
        th {{ background-color: #f2f2f2; }}
    </style>
</head>
<body>
    <div class="container">
        <h2>Fitting Report - {dataset_name}</h2>
        
        <div class="image-container">
            <img src="{dataset_name}_fit_result.svg" alt="Fitting Result" style="width: 5cm; height: 5cm;">
        </div>
        
        <div class="results">
            <h3>Fitting Results</h3>
            <table>
                <tr>
                    <th>Parameter</th>
                    <th>Value</th>
                </tr>
                <tr>
                    <td>Max observed y</td>
                    <td>{y_max:.2f}</td>
                </tr>
                <tr>
                    <td>Parameter a</td>
                    <td>{a_est:.3f}</td>
                </tr>
                <tr>
                    <td>Parameter m</td>
                    <td>{m_est:.2f}</td>
                </tr>
                <tr>
                    <td>Predicted value at {n_value}M</td>
                    <td>{y_pred_n:.2f}</td>
                </tr>
                <tr>
                    <td>Deviation</td>
                    <td>{deviation:.2%}</td>
                </tr>
            </table>
        </div>
    </div>
</body>
</html>
    """
    html_file = os.path.join(output_dir, f"{dataset_name}_report.html")
    with open(html_file, 'w') as f:
        f.write(html_content)


In [None]:
def run_prediction_analysis(input_file, output_dir, prediction_mode, target_value, model_type):
    """Operation prediction analysis"""
    dataset_name = os.path.basename(input_file).replace('.txt', '')
    
    # Load and preprocess data
    X_train, X_test, y_train, y_test, scaler_X, scaler_y = load_and_preprocess_data(input_file)
    X_train_inv = scaler_X.inverse_transform(X_train)
    y_train_inv = scaler_y.inverse_transform(y_train)
    X_test_inv = scaler_X.inverse_transform(X_test)
    y_test_inv = scaler_y.inverse_transform(y_test)
    
    # Estimated parameters
    y_max = np.max(y_train_inv)
    m_est_1, a_est_2, m_est_2 = estimate_parameters(X_train_inv, y_train_inv, y_max)
    
    # Forward Mode (x -> y)
    if prediction_mode == "forward":
        # XGBoost
        model = build_model('xgboost')
        model = train_model(model, X_train, y_train)
        train_loss, test_loss = evaluate_model(model, X_train, y_train, X_test, y_test)
        
        # Both formulas
        y_pred_1 = custom_function_1(target_value, m_est_1)
        y_pred_2 = custom_function_2(target_value, a_est_2, m_est_2)
        
        # Evaluate function performance on the test set
        test_pred_1 = custom_function_1(X_test_inv.ravel(), m_est_1)
        test_pred_2 = custom_function_2(X_test_inv.ravel(), a_est_2, m_est_2)
        deviation_1 = calculate_deviation(y_test_inv.ravel(), test_pred_1)
        deviation_2 = calculate_deviation(y_test_inv.ravel(), test_pred_2)
        
        # Select the output based on the model type
        if model_type == "lw":
            print(f"LW Model prediction: At the sequencing depth of {target_value}M reads, the estimated library complexity is {y_pred_1:.2f}")
            print(f"LW Model deviation: {deviation_1: 0.2%}")

            # Write in Logs
            log_file = os.path.join(output_dir, f"{dataset_name}_lw_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"LW Model params: m={m_est_1:.2f}\n")
                f.write(f"Predicted value at {target_value}M: {y_pred_1:.2f}\n")
                f.write(f"Deviation: {deviation_1:.2%}\n")
                
        elif model_type == "depthseek":
            print(f"Depthseek Model prediction: At the sequencing depth of {target_value}M reads, the estimated library complexity is {y_pred_2:.2f}")
            print(f"Depthseek Model deviation: {deviation_2:.2%}")       

            # Generate the HTML file
            plot_single_fitted_curve(X_train, y_train, a_est_2, m_est_2, scaler_X, scaler_y, output_dir, dataset_name)
            generate_html_report(output_dir, dataset_name, a_est_2, m_est_2, y_pred_2, deviation_2, y_max, target_value)
            
            # Write in logs
            log_file = os.path.join(output_dir, f"{dataset_name}_depthseek_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"Depthseek Model params: a={a_est_2:.3f}, m={m_est_2:.2f}\n")
                f.write(f"Predicted value at {target_value}M: {y_pred_2:.2f}\n")
                f.write(f"Deviation: {deviation_2:.2%}\n")
                
        else:
            print(f"LW Model prediction: At the sequencing depth of {target_value}M reads, the estimated library complexity is {y_pred_1:.2f}")
            print(f"Depthseek Model prediction: At the sequencing depth of {target_value}M reads, the estimated library complexity is {y_pred_2:.2f}")
            print(f"LW Model deviation: {deviation_1: 0.2%}")
            print(f"Depthseek Model deviation: {deviation_2:.2%}")

            # Draw the fitting curves of the two models
            cm = 1/2.54
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12*cm, 6*cm))
            plot_fitted_curve(X_train, y_train, m_est_1, a_est_2, m_est_2, 
                             scaler_X, scaler_y, label=dataset_name, color='#EB5B25',
                             ax1=ax1, ax2=ax2)
            
            ax1.scatter(X_train_inv, y_train_inv, s=5, color='gray', alpha=0.3, label='Training Data')
            ax2.scatter(X_train_inv, y_train_inv, s=5, color='gray', alpha=0.3, label='Training Data')
            
            for ax in [ax1, ax2]:
                ax.set_xlim(0, 800)
                ax.tick_params(axis='both', labelsize=6)
                ax.grid(False)
                ax.yaxis.get_offset_text().set_fontsize(6)
                ax.legend(loc='lower right', fontsize=4,edgecolor='white')
                ax.spines['top'].set_visible(False)
                ax.spines['right'].set_visible(False)
            
            ax1.set_xlabel('Sequencing Depth (Million Reads)', fontsize=6)
            ax1.set_ylabel('Library Complexity (Read Counts)', fontsize=6)
            ax2.set_xlabel('')
            ax2.set_ylabel('')

            ax1.text(0.5, 1.075, f'LW Model (Deviation: {deviation_1:.2%})', 
                 fontsize=6, ha='center', va='bottom', transform=ax1.transAxes)
            ax2.text(0.5, 1.075, f'Depthseek Model (Deviation: {deviation_2:.2%})', 
                 fontsize=6, ha='center', va='bottom', transform=ax2.transAxes)

            plt.tight_layout()
            output_svg = os.path.join(output_dir, f"{dataset_name}_both_fit.svg")
            plt.savefig(output_svg, format='svg', bbox_inches='tight')
            plt.close()
            
            # Write in logs
            log_file = os.path.join(output_dir, f"{dataset_name}_both_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"LW Model params: m={m_est_1:.2f}\n")
                f.write(f"Depthseek Model params: a={a_est_2:.3f}, m={m_est_2:.2f}\n")
                f.write(f"Predicted value at {target_value}M (LW): {y_pred_1:.2f}\n")
                f.write(f"Predicted value at {target_value}M (Depthseek): {y_pred_2:.2f}\n")
                f.write(f"Deviation (LW): {deviation_1:.2%}\n")
                f.write(f"Deviation (Depthseek): {deviation_2:.2%}\n")
    
    # Inverse mode (y -> x)
    else:
        target_y = target_value
        
        # Predict the value of x using the inverse function
        predicted_x1 = inverse_function_1(target_y, m_est_1)
        predicted_x2 = inverse_function_2(target_y, a_est_2, m_est_2)
        
        # Evaluate the performance of inverse functions on the training set
        train_x_pred_1 = inverse_function_1(y_train_inv, m_est_1)
        train_x_pred_2 = inverse_function_2(y_train_inv, a_est_2, m_est_2)
        deviation_1 = calculate_inverse_deviation(X_train_inv, train_x_pred_1)
        deviation_2 = calculate_inverse_deviation(X_train_inv, train_x_pred_2)
        
        # Evaluate the performance of inverse functions on the test set
        test_x_pred_1 = inverse_function_1(y_test_inv, m_est_1)
        test_x_pred_2 = inverse_function_2(y_test_inv, a_est_2, m_est_2)
        test_deviation_1 = calculate_inverse_deviation(X_test_inv, test_x_pred_1)
        test_deviation_2 = calculate_inverse_deviation(X_test_inv, test_x_pred_2)
        
        # Select the output based on the model type
        if model_type == "lw":
            print(f"LW Model inverse inference: To achieve the library complexity of {target_y}, \
                  it is expected to require the sequencing depth of {predicted_x1:.2f}M reads ")
            print(f"LW Model training set deviation: {deviation_1: 0.2%}")
            print(f"LW Model test set bias: {test_deviation_1: 1.2%}")

            # Write in logs
            log_file = os.path.join(output_dir, f"{dataset_name}_lw_inverse_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"LW Model params: m={m_est_1:.2f}\n")
                f.write(f"Target Y value: {target_y:.2f}\n")
                f.write(f"Predicted X (LW Model): {predicted_x1:.2f}\n")
                f.write(f"Train Deviation (LW Model): {deviation_1:.2%}\n")
                f.write(f"Test Deviation (LW Model): {test_deviation_1:.2%}\n")
                
        elif model_type == "depthseek":
            print(f"Depthseek Model inverse inference: To achieve the library complexity of {target_y}, \
                  it is expected to require the sequencing depth of {predicted_x2:.2f}M reads ")
            print(f"Depthseek Model training set deviation: {deviation_2:.2%}")
            print(f"Depthseek Model test set bias: {test_deviation_2:.2%}")

            # Write in logs
            log_file = os.path.join(output_dir, f"{dataset_name}_depthseek_inverse_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"Depthseek Model params: a={a_est_2:.3f}, m={m_est_2:.2f}\n")
                f.write(f"Target Y value: {target_y:.2f}\n")
                f.write(f"Predicted X (Depthseek Model): {predicted_x2:.2f}\n")
                f.write(f"Train Deviation (Depthseek Model): {deviation_2:.2%}\n")
                f.write(f"Test Deviation (Depthseek Model): {test_deviation_2:.2%}\n")
                
        else:
            print(f"LW Model reverse inference: To achieve the library complexity of {target_y}, \
                  it is expected to require the sequencing depth of {predicted_x1:.2f}M reads ")
            print(f"Depthseek Model reverse inference: To achieve the library complexity of {target_y}, \
                  it is expected to require the sequencing depth of {predicted_x2:.2f}M reads ")
            print(f"LW Model training set deviation: {deviation_1: 0.2%}")
            print(f"Depthseek Model training set deviation: {deviation_2:.2%}")
            print(f"LW Model test set bias: {test_deviation_1: 1.2%}")
            print(f"Depthseek Model test set bias: {test_deviation_2:.2%}")

            # Write in logs
            log_file = os.path.join(output_dir, f"{dataset_name}_both_inverse_log.txt")
            with open(log_file, 'w') as f:
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"Max observed y: {y_max:.2f}\n")
                f.write(f"LW Model params: m={m_est_1:.2f}\n")
                f.write(f"Depthseek Model params: a={a_est_2:.3f}, m={m_est_2:.2f}\n")
                f.write(f"Target Y value: {target_y:.2f}\n")
                f.write(f"Predicted X (LW Model): {predicted_x1:.2f}\n")
                f.write(f"Predicted X (Depthseek Model): {predicted_x2:.2f}\n")
                f.write(f"Train Deviation (LW Model): {deviation_1:.2%}\n")
                f.write(f"Train Deviation (Depthseek Model): {deviation_2:.2%}\n")
                f.write(f"Test Deviation (LW Model): {test_deviation_1:.2%}\n")
                f.write(f"Test Deviation (Depthseek Model): {test_deviation_2:.2%}\n")


# Run Data Analysis and Prediction

In [None]:
print("Start predictive analysis...")
run_prediction_analysis(final_output_file, output_dir, prediction_mode, target_value, model_type)
print("Predictive analysis completed!")

# Result presentation(Optional)

In [None]:
output_files = os.listdir(output_dir)
print(" Output file list :")
for file in output_files:
    print(f"  - {file}")

# If an HTML report is generated, you could display it
html_report = os.path.join(output_dir, f"{os.path.basename(final_output_file).replace('.txt', '')}_report.html")
if os.path.exists(html_report):
    display(HTML(f'<a href="{html_report}" target="_blank">View HTML report</a>'))

# Fit at full depth (0 to 400) and plot the repetition rate growth curve

In [None]:
# Predict and save the results from the maximum sequencing depth of the current input +1 to 400 million
def predict_to_400M(input_file, output_dir):
    """Predict and save the results from the maximum sequencing depth of the current input +1 to 400 million"""
    # Load data
    data = np.loadtxt(input_file)
    X_original = data[:, 0]
    y_original = data[:, 1]
    
    # Get the current maximum depth
    max_depth = int(np.max(X_original))
    y_max = np.max(y_original)
    
    # Estimate the parameters using the Depthseek model
    _, a_est, m_est = estimate_parameters(X_original, y_original, y_max)
    
    # Predict the values from max_depth+1 to 400
    predict_depths = np.arange(max_depth + 1, 401)
    predicted_values = custom_function_2(predict_depths, a_est, m_est)
    
    # Merge the original data and the predicted data
    combined_depths = np.concatenate([X_original, predict_depths])
    combined_values = np.concatenate([y_original, predicted_values])
    
    # Save the result of the current input +1 to 400 million
    dataset_name = os.path.basename(input_file).replace('.txt', '')
    output_file = os.path.join(output_dir, f"{dataset_name}_prediction_to_400M.txt")
    np.savetxt(output_file, np.column_stack((combined_depths, combined_values)), 
               fmt=['%.1f', '%.1f'], delimiter='\t')
    
    print(f"The prediction is completed and the result is saved to: {output_file}")
    print(f"The number of input data points: {len(X_original)}")
    print(f"The number of predicted data points: {len(predict_depths)}")
    print(f"Total number of data points: {len(combined_depths)}")

# Run the function predicted to 400M
print("Start predicting to a depth of 400 meters...")
predict_to_400M(final_output_file, output_dir)
print("Prediction to a depth of 400 meters is completed!")

## Fit the curve with the output file above

In [None]:
dataset_name = os.path.basename(input_file).replace('.txt', '')
fit_dup_Curve_input = os.path.join(output_dir, f"{dataset_name}_prediction_to_400M.txt")
output_file = os.path.join(output_dir, f"{dataset_name}_Dup_Rate_plot.svg")

rcParams['font.family'] = 'Arial'
rcParams['font.size'] = 8
plt.rcParams['axes.unicode_minus'] = False

def calculate_duplicate_rate(row):
    return (row[0] * 1e6 - row[1]) / (row[0] * 1e6)

def plot_single_sample(file_path, output_path):
    fig, ax = plt.subplots(figsize=(6/2.54, 6/2.54))
    
    df = pd.read_csv(file_path, sep='\t', header=None, names=['x', 'y'])
    df['duplicate_rate'] = df.apply(calculate_duplicate_rate, axis=1)
    
    ax.plot(df['x'], df['duplicate_rate'], color='#2ca02c', linewidth=1.5)
    
    ax.set_xticks([10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    ax.set_yticks([0.0, 0.05, 0.1, 0.15])
    ax.tick_params(axis='both', length=2, labelsize=5)
    
    ax.set_xlim(0, 100)
    ax.set_ylim(0.05, 0.15)
    
    ax.set_xlabel('Sequencing Depth (Million Reads)', fontsize=8)
    ax.set_ylabel('Duplication Rate', fontsize=8)
    
    ax.set_title(f'The Duplication Rate Growth Curve\nSample "{dataset_name}"', fontsize=8)

    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_linewidth(0.5)
    ax.spines['bottom'].set_linewidth(0.5)
    
    plt.tight_layout(pad=0.5)
    plt.savefig(output_path, dpi=600, bbox_inches='tight')
    plt.show()

plot_single_sample(fit_dup_Curve_input, output_file)