In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from scipy import optimize, integrate

In [None]:
def recursive_fit_piecewise(x,y, err_thresh, level=0, slope=10.):
    """ Recursive top-down algorithm for piece-wise linear
        fitting. Consecutive splits of domain based on error
        threshold.

    Args:
        x (1D array): independent variable (float)
        y (1D array): dependent variable (float)
        err_thresh (float): rms err threshold for curve segment

    Returns:
        elbow points: list of (x,y) points
    """
    p, cov, left_rms, right_rms = fit_piecewise(x,y)
    lpoints, rpoints = [], []
    if left_rms > err_thresh:
        lx, ly = x[x<=p[0]], y[x<=p[0]]
        if len(lx) > 10:
            lpoints = recursive_fit_piecewise(lx,ly, err_thresh, level=level+1)
    if right_rms > err_thresh:
        rx, ry = x[x>=p[0]], y[x>=p[0]]
        if len(rx) > 10:
            rpoints = recursive_fit_piecewise(rx,ry, err_thresh, level=level+1)

    if level ==0:
        lpoints = [(x[0],y[0])]+lpoints
        rpoints = rpoints + [(x[-1],y[-1])]
    if left_rms < err_thresh or right_rms < err_thresh:
        ret = lpoints + [(p[0],p[1])] + rpoints
    else:
        ret = lpoints + rpoints

    if level == 0:
        ret = check_segments(x,y,ret,thresh=err_thresh)
        ret = merge_similar_segments(ret, slope=slope)
        ret = np.array(ret)

    return ret

def check_segments(x,y,pts,thresh=7.5):
    new_pts =[pts[0]]
    for i in range(1,len(pts)):
        x0,y0 = pts[i-1]
        x1,y1 = pts[i]

        inds = (x>=x0)*(x<=x1)
        xp,yp = x[inds],y[inds]

        yi = (y1-y0)*(xp-x0) + y0
        rms = np.sqrt(np.mean((yi-yp)**2))
        if rms > thresh:
            p, cov, left_rms, right_rms = fit_piecewise(xp,yp)
            new_pts.append((p[0],p[1]))
        new_pts.append(pts[i])
    return new_pts

def piecewise_linear(x, x0, y0, k1, k2):
    x = np.array(x, dtype=np.float)
    return np.piecewise(x, [x <= x0, x>x0],
                        [lambda x:k1*x + y0-k1*x0, lambda x:k2*x + y0-k2*x0])

def linear(x, x0, y0, k1):
    return lambda x:k1*x + y0-k1*x0

def fit_linear(x,y):
    ymax= max(y.min()+1,y.max())
    p, cov = optimize.curve_fit(linear, x, y, p0=(np.mean(x),np.mean(y),0.),
                                bounds=([x.min(),y.min(),-np.inf],
                                        [x.max(),ymax, np.inf]))
    rms = np.sqrt( np.mean((linear(x,*p) - y)**2) )
    return p, cov, rms

def fit_piecewise(x,y):
    """Fits 2-piece linear function to data

    Args:
        x (1D array): independent variable (float)
        y (1D array): dependent variable (float)

    Returns:
        p: parameter tuple (x0,y0,k1,k2)
        cov: covariance matrix
        left_rms: left curve rms
        right_rms: right curve rms
    """
    ymax= max(y.min()+1,y.max())
    p, cov = optimize.curve_fit(piecewise_linear, x, y, p0=(np.mean(x),np.mean(y),0.,0.),
                                bounds=([x.min(),y.min(),-np.inf, -np.inf],
                                        [x.max(),ymax, np.inf, np.inf]))
    left_rms = np.sqrt( np.mean((piecewise_linear(x[x<=p[0]],*p) - y[x<=p[0]])**2) )
    right_rms = np.sqrt( np.mean((piecewise_linear(x[x>p[0]],*p) - y[x>p[0]])**2) )
    return p, cov, left_rms, right_rms

def smooth(x,window_len=11,window='hanning'):
    """smooth the data using a window with requested size.

    This method is based on the convolution of a scaled window with the signal.
    The signal is prepared by introducing reflected copies of the signal
    (with the window size) in both ends so that transient parts are minimized
    in the begining and end part of the output signal.

    input:
        x: the input signal
        window_len: the dimension of the smoothing window; should be an odd integer
        window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
            flat window will produce a moving average smoothing.

    output:
        the smoothed signal

    example:

    t=linspace(-2,2,0.1)
    x=sin(t)+randn(len(t))*0.1
    y=smooth(x)

    see also:

    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
    scipy.signal.lfilter

    TODO: the window parameter could be the window itself if an array instead of a string
    NOTE: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y.
    """

    x = np.array(x)
    if x.ndim != 1:
        raise ValueError("smooth only accepts 1 dimension arrays.")

    if x.size < window_len:
        raise ValueError("Input vector needs to be bigger than window size.")

    if window_len<3:
        return x

    if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
        raise ValueError("Window is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")

    #s=np.r_[x[window_len-1:0:-1],x,x[-2:-window_len-1:-1]]
    #print(len(s))
    if window == 'flat': #moving average
        w=np.ones(window_len,'d')
    else:
        w=eval('np.'+window+'(window_len)')

    N = window_len
    spad = np.pad(x, (N//2, N-1-N//2), mode='edge')
    ret = np.convolve(w/w.sum(),spad,mode='valid')
    #ret = y[int(window_len/2)-1:-int(window_len/2)-1]

    assert len(ret) == len(x)
    return ret

def merge_similar_segments(pts, slope=10.):
    ### Merge adjacent segements with similar slopes
    clean_pts = [pts[0]]
    for i in range(1,len(pts)-1):
        dx1,dx2 = pts[i][0] - pts[i-1][0], pts[i+1][0] - pts[i][0]
        dy1,dy2 = pts[i][1] - pts[i-1][1], pts[i+1][1] - pts[i][1]

        m1, m2 = dy1/dx1, dy2/dx2

        if abs(m1-m2)>slope:
            clean_pts.append(pts[i])
    clean_pts.append(pts[-1])
    return clean_pts

def remove_small_segments(clean_pts):
    ### Eliminate small segments between larger segments
    if len(clean_pts) >= 5:
        extra_clean = [clean_pts[0]]
        pts = np.array(clean_pts)
        dp = np.diff(pts,axis=0)
        xnorm = pts[:,0].max() - pts[:,0].min()
        ynorm = pts[:,1].max() - pts[:,1].min()
        dp[:,0] /= xnorm
        dp[:,1] /= ynorm

        i=1
        while i < len(clean_pts)-2:
            dl1 = np.linalg.norm(dp[i-1])
            dl2 = np.linalg.norm(dp[i])
            dl3 = np.linalg.norm(dp[i+1])

            if dl1/dl2 > 5 and dl3/dl2 > 5:
                extra_clean.append(clean_pts[i+1])
                i += 2
            else:
                extra_clean.append(clean_pts[i])
                i += 1
        extra_clean.append(clean_pts[-2])
        extra_clean.append(clean_pts[-1])
        return np.array(extra_clean)
    else:
        return np.array(clean_pts)

def getOutlierMask(metric,threshold=2,method="stdev"):
    """Returns mask for outliers in time series

    Args:
        metric (array): 1D numpy array, presumably time series
        threshold (int, optional): Threshold value. Default is 2 for stdev.
        method (str, optional): "stdev" or "percent". Defaults to "stdev".

    Returns:
        mask: numpy masked array
    """
    mask = np.zeros_like(metric)
    if len(metric) > 25:
        smetric = smooth(metric)
        diff = abs(smetric - metric)

        if method=="percent":
            rdiff = abs(diff/(metric + 1e-3))
            mask += rdiff > threshold

        if method=="stdev":
            dmean = np.mean(diff)
            dstd = np.std(diff)
            mask += diff > dmean + threshold*dstd
    else:
        print("Input array len must be > 25")
    return mask

def clean_series(x,y):
    inds = ~np.isnan(y)
    xn = x[inds]
    yn = y[inds]
    mask = getOutlierMask(yn)
    ynn = yn[mask==0]
    xnn = xn[mask==0]
    return xnn.copy(),ynn.copy()

def integral_estimator(xi,yi, thresh=40., slope=10.):
    x,y = clean_series(xi,yi)
    y_int = integrate.cumtrapz(y, x, initial=0)
    pts = recursive_fit_piecewise(x,y_int,thresh,slope=slope)
    pts = remove_small_segments(pts)
    return np.array(pts)

def infer_stable_portions(t,yi, fraction_thresh=0.1, buffer_dt=[10,2]):
    wd = min(len(t)//20, 95)
    #extract each condition segment
    y = smooth(yi,window_len=wd)
    # pick out value at halfway mark
    n = len(t)//2
    constval = np.nanmean(yi[n:n+20])
    # find first point where value stabilizes within threshold (e.g. 10%)
    dy,dymax = abs(y - constval), constval*fraction_thresh
    ts = t[0:n]
    unstable = ts[dy[0:n] > dymax]
    try:
        stable_start = unstable[-1] + buffer_dt[0]
    except:
        stable_start = ts[0] + buffer_dt[0]
    # find last point where value stabilizes within threshold (e.g. 10%)
    stable = t[dy < dymax]
    try:
        stable_end = stable[-1] - buffer_dt[1]
    except:
        stable_end = t[-1] - buffer_dt[1]

    # plt.plot(t,dy)
    # print(constval)
    # plt.show()
    return stable_start, stable_end, constval

def infer_conditions(t,yi, thresh=40., minval=10.):
    pts = integral_estimator(t,yi, thresh=thresh, slope=minval)
    dt = np.gradient(pts[:,0])
    dI = np.gradient(pts[:,1])
    conditions = []
    for i in range(0,len(dt)):
        if dt[i] >5 and dI[i]/dt[i] > minval:
            inds = (t>pts[i,0])*(t<pts[i+1,0])
            if len(conditions) == 0:
                buffer_dt = [10,3]
            else:
                buffer_dt = [2,3]
            stable_start, stable_end, avg = infer_stable_portions(t[inds],yi[inds],buffer_dt=buffer_dt)
            conditions.append({'value':avg, 'start':pts[i,0], 'end':pts[i+1,0],
                               'stable_start':stable_start, 'stable_end':stable_end})
    return conditions

def get_intersection(a0,a1,b0,b1):
    t0,t1 = max(a0,b0),min(a1,b1)
    if t0 > t1:
        return None, None
    else:
        return t0,t1

def collate_conditions(condsI,condsF):
    allconds = []
    for icond in condsI:
        for fcond in condsF:
            t0,t1 = get_intersection(icond['start'],icond['end'],fcond['start'],fcond['end'])
            ts0,ts1 = get_intersection(icond['stable_start'],icond['stable_end'],fcond['stable_start'],fcond['stable_end'])
            ival,fval = icond['value'],fcond['value']
            if t0 is not None:
                allconds.append({'I_val':ival,'F_val':fval, 'start':t0, 'end':t1,
                                 'stable_start':ts0, 'stable_end':ts1})
    return allconds

def get_ConditionInstance(cond_dict, ListInsts, thresh=.15):
    score = np.zeros(len(ListInsts))
    for i in range(0,len(ListInsts)):
        cinst = ListInsts[i]
        I = cinst.condition.current
        F = cinst.condition.plasma_gas_flow
        ierr = abs(cond_dict['I_val'] - I)/cond_dict['I_val']
        ferr = abs(cond_dict['F_val']-F)/cond_dict['F_val']
        score[i] = np.sqrt(ierr**2 + ferr**2)
    if score.min() < thresh:
        return ListInsts[int(np.argmin(score))]
    else:
        return None

def get_condition_match(cond_dict, cinst, thresh=.15):
    I = cinst.condition.current
    F = cinst.condition.plasma_gas_flow
    ierr = abs(cond_dict['I_val'] - I)/cond_dict['I_val']
    ferr = abs(cond_dict['F_val']-F)/cond_dict['F_val']
    score = np.sqrt(ierr**2 + ferr**2)
    if score < thresh:
        return True
    else:
        return False

In [None]:
class Conv1DNet(nn.Module):
    def __init__(self):
        super(Conv1DNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=7, stride=2, padding=3)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=16, kernel_size=7, stride=2, padding=3)
        self.convtrans1 = nn.ConvTranspose1d(in_channels=16, out_channels=16, kernel_size=7, stride=2, padding=3, output_padding=1)
        self.convtrans2 = nn.ConvTranspose1d(in_channels=16, out_channels=32, kernel_size=7, stride=2, padding=3, output_padding=1)
        self.convtrans3 = nn.ConvTranspose1d(in_channels=32, out_channels=3, kernel_size=3, stride=1, padding=1)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.dropout(x)
        x = F.relu(self.conv2(x))
        x = self.dropout(x)
        x = F.relu(self.convtrans1(x))
        x = self.dropout(x)
        x = F.relu(self.convtrans2(x))
        x = self.dropout(x)
        x = self.convtrans3(x)
        # Applying softmax on the last dimension (channels) to get probabilities
        x = F.softmax(x, dim=1)
        return x

In [None]:
### Artificial data generation function
def gen_trace(amp =0.9, amp2=0.7, noise_amp=.015, n=500):
    # Generate fake top hat data
    rnd = sorted(np.random.random_sample((3,)))
    start,middle,stop = rnd[0],rnd[1],rnd[2]
    t = np.linspace(0, 1, n)
    vals = np.zeros(n)
    vals[t>start]= amp
    vals[t>middle]= amp2
    vals[t>stop] = 0
    spikeamp = sorted(np.random.random_sample((1,)))[0]/2
    if spikeamp> 0.2:
        vals[abs(t-start)<0.015] = spikeamp
    vals = smooth(vals)
    noise = np.random.normal(0, noise_amp, (500))
    raw = vals + noise
    #newrnd

    # label fake top hat data
    labels = np.zeros(n)
    labels[t>start]= 2
    labels[t>stop] = 0
    labels[abs(t-start)<.025] = 1
    labels[abs(t-stop)<.025] = 1
    labels[abs(t-middle)<.025] = 1

    encoded_labels = np.zeros((n,3))
    encoded_labels[:,0] = labels==0
    encoded_labels[:,1] = labels==1
    encoded_labels[:,2] = labels==2

    return  raw, encoded_labels

In [None]:
# Assuming gen_trace and smooth are defined as given and above
# Generate data
num_samples = 1000  # Number of samples to generate
x_train = []
y_train = []

for _ in range(num_samples):
    raw, labels = gen_trace()
    x_train.append(raw)
    # Convert labels to single class labels if necessary. Here we choose the max label as class.
    y_train.append(np.argmax(labels, axis=1))

x_train = np.array(x_train, dtype=np.float32)
y_train = np.array(y_train, dtype=np.long)

# Convert to PyTorch tensors
x_train = torch.from_numpy(x_train).long() # Shape: [batch, channels, length]
y_train = torch.from_numpy(y_train).long()
# Create DataLoader
dataset = TensorDataset(x_train_tensor, y_train_tensor)  # Add channel dimension
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)

In [None]:
# Initialize the model, loss function, and optimizer
model = Conv1DNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 40
for epoch in range(epochs):
    running_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()

        # Forward pass
        inputs = inputs.unsqueeze(1)
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        loss.backward()
        optimizer.step()


        running_loss += loss.item()
    print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}')

torch.save( model.state_dict(),"time_seg.pth")
print('model saved')

# Use the Colab file management to download
# from google.colab import files
# files.download('time_seg.pth')

In [None]:
# Generate a single test input sequence
test_raw, test_labels = gen_trace()
test_input = torch.tensor(test_raw, dtype=torch.float).unsqueeze(0).unsqueeze(1)  # Shape: [1, 1, 500]

In [None]:
test_input = torch.tensor(test_raw, dtype=torch.float).unsqueeze(0).unsqueeze(1)  # Shape: [1, 1, 500]

model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    test_output = model(test_input)
    # # Convert logits to probabilities (optional)
    probabilities = F.softmax(test_output, dim=1)
    # Get the predicted class for each time step
    predictions = torch.argmax(test_output, dim=1)#.squeeze()  # Shape: [500]
    print(probabilities)
# Flatten test_input for plotting
test_input_flat = test_input.squeeze().cpu().numpy()

# Convert probabilities to numpy for easy handling
probabilities_np = predictions.squeeze().cpu().numpy()  # Shape: [num_classes, 500]

# Time steps for x-axis
time_steps = np.arange(test_input_flat.shape[0])

In [None]:
plt.figure(figsize=(14, 6))

# Plot the input sequence
plt.plot(time_steps, test_input_flat, label='Input Sequence', color='black', linewidth=1, alpha=0.7)

# Plot prediction probabilities for each class
class_colors = ['red', 'green', 'blue']  # Colors for classes 0, 1, 2
class_labels = ['Class 0 Prob', 'Class 1 Prob', 'Class 2 Prob']  # Labels for classes 0, 1, 2

for i, color in enumerate(class_colors):
    plt.plot(time_steps, probabilities_np, label=class_labels[i], color=color, alpha=0.5)

plt.title('Input Sequence and Class Prediction Probabilities')
plt.xlabel('Time Step')
plt.ylabel('Amplitude / Probability')
plt.legend()
plt.show()

In [None]:
# Selecting the first sequence in the batch for visualization
sequence_probabilities = probabilities[0].cpu().numpy()  # Convert to numpy array for easy handling

time_steps = np.arange(sequence_probabilities.shape[1])  # Assuming sequence_length is the size

# Plotting the probabilities for each class
plt.figure(figsize=(14, 4))
for class_index in range(sequence_probabilities.shape[0]):  # Iterate over each class
    plt.plot(time_steps, sequence_probabilities[class_index], label=f'Class {class_index}')

plt.title('Class Probabilities over Time')
plt.xlabel('Time Step')
plt.ylabel('Probability')
plt.legend()
plt.show()