In [None]:
import numpy as np
from glob import glob
import matplotlib
from scipy import integrate
import matplotlib.pyplot as plt
import time
from tqdm import tqdm
import gc
import platform
import warnings
import copy
import matplotlib.transforms as mtransforms
from scipy.stats import bootstrap
from scipy.stats._common import ConfidenceInterval

%matplotlib inline
%config InlineBackend.figure_format='retina'

In [None]:
# MATPLOTLIB

SMALL_SIZE = 12
MEDIUM_SIZE = 14
BIGGER_SIZE = 16

plt.rc("font", size=SMALL_SIZE)  # controls default text sizes
plt.rc("axes", titlesize=SMALL_SIZE)  # fontsize of the axes title
plt.rc("axes", labelsize=MEDIUM_SIZE)  # fontsize of the x and y labels
plt.rc("xtick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("ytick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("legend", fontsize=SMALL_SIZE)  # legend fontsize
plt.rc("figure", titlesize=BIGGER_SIZE)  # fontsize of the figure title

In [None]:
# UTILS


# Functions for V_1 and V_2 in bounds
class bounded_zoo:
    def __init__(self, bounds):
        self.bounds = bounds

    def b_integrate(self, data, ch):
        if ch == "CH1":
            return integrate.trapezoid(data[self.bounds[0] : self.bounds[1]])
        else:
            return integrate.trapezoid(data[self.bounds[2] : self.bounds[3]])

    def b_max(self, data, ch):
        if ch == "CH1":
            return np.max(data[self.bounds[0] : self.bounds[1]])
        else:
            return np.max(data[self.bounds[2] : self.bounds[3]])

    def b_mean(self, data, ch):
        if ch == "CH1":
            return np.mean(data[self.bounds[0] : self.bounds[1]])
        else:
            return np.mean(data[self.bounds[2] : self.bounds[3]])


def temp_to_kelvin(tempstr):
    if tempstr[-1] == "c":
        return float(tempstr[:-1]) + 273.15
    if tempstr[-1] == "k":
        return float(tempstr[:-1])
    warnings.warn("Temperature units are not specified, defaulted to k.")
    return float(tempstr)


def plot_raw_data(data, bounds=None):
    if bounds is not None:
        plt.plot(data[bounds[0] : bounds[1]])
    else:
        plt.plot(data)
    plt.show()


def visual_check(raw_data_dict, typ="raw-raw", boundaries=[0, 1000, 0, 1000]):
    i = list(raw_data_dict["data"].keys())[0]
    if typ == "raw-raw":
        print(i, "CH1")
        plot_raw_data(raw_data_dict["data"][i]["CH1"][0][boundaries[0] : boundaries[1]])
        print(i, "CH2")
        plot_raw_data(raw_data_dict["data"][i]["CH2"][0][boundaries[2] : boundaries[3]])

    else:
        print(i, "CH1")
        plot_raw_data(raw_data_dict["data"][i]["CH1"])
        print(i, "CH2")
        plot_raw_data(raw_data_dict["data"][i]["CH2"])


# Due to 13 ns shift check which is the true zero
def check_pulse_zero(pulse_len_points, data_ch1, ch3_places, laser_err_points):
    for i in range(1, len(ch3_places)):
        if np.abs(ch3_places[i] - ch3_places[i - 1]) >= laser_err_points:
            print(ch3_places[i])
            plt.plot(
                data_ch1[i][
                    len(data_ch1[i]) // 2
                    - 100 : len(data_ch1[i]) // 2
                    + pulse_len_points * 2
                ]
            )
            plt.show()
            print(ch3_places[i - 1])
            plt.plot(
                data_ch1[i - 1][
                    len(data_ch1[i - 1]) // 2
                    - 100 : len(data_ch1[i - 1]) // 2
                    + pulse_len_points * 2
                ]
            )
            plt.show()
            print("Which laser point is the true zero? 1 or 2")
            true_val = int(input())
            if true_val == 1:
                return ch3_places[i]
            else:
                return ch3_places[i - 1]
    return ch3_places[-1]


# Recalc the delay according to true zero point
def calculate_delay(
    laser_err_ns,
    laser_err_points,
    supposed_delay,
    supposed_ch3,
    zero_point,
    x_or,
    x_inc,
    rtol=1e-2,
):
    if np.abs(supposed_delay) > np.abs(x_or * (1 - rtol)):
        return supposed_delay
    if (
        np.abs(supposed_ch3 + (supposed_delay / x_inc) - zero_point) >= laser_err_points
    ):  # becuse pulse goes "back"
        sign = np.sign(supposed_ch3 - (supposed_delay / x_inc) - zero_point)
        return supposed_delay + sign * laser_err_ns
    return supposed_delay


# Rebuild the results with the 13 ns fix applied
def replace_correctly(result_old):
    result_new = copy.deepcopy(result_old)
    delays_real = set()
    for i in list(result_new["data"].keys()):
        result_new["data"][i]["real_delay"] = [
            j for j in range(len(result_new["data"][i]["CH1"]))
        ]
        for j in range(len(result_new["data"][i]["CH1"])):
            result_new["data"][i]["real_delay"][j] = calculate_delay(
                result_new["laser_error_ns"],
                result_new["laser_error_points"],
                i,
                result_new["data"][i]["CH3"][j],
                result_new["zero_point"],
                result_new["X_or"],
                result_new["X_inc"],
            )
            delays_real.add(result_new["data"][i]["real_delay"][j])

    result_out = copy.deepcopy(result_old)
    result_out["data"] = {}
    for i in sorted(list(delays_real)):
        result_out["data"][i] = {}
        result_out["data"][i]["CH1"] = []
        result_out["data"][i]["CH2"] = []
        result_out["data"][i]["CH3"] = []
        result_out["data"][i]["T"] = []

    for i in list(result_old["data"].keys()):
        for j in range(len(result_old["data"][i]["CH1"])):
            result_out["data"][result_new["data"][i]["real_delay"][j]]["CH1"].append(
                copy.deepcopy(result_old["data"][i]["CH1"][j])
            )
            result_out["data"][result_new["data"][i]["real_delay"][j]]["CH2"].append(
                copy.deepcopy(result_old["data"][i]["CH2"][j])
            )
            result_out["data"][result_new["data"][i]["real_delay"][j]]["CH3"].append(
                copy.deepcopy(result_old["data"][i]["CH3"][j])
            )
            result_out["data"][result_new["data"][i]["real_delay"][j]]["T"].append(
                copy.deepcopy(result_old["data"][i]["T"][j])
            )

    return result_out


# It is very possible that we want points only if they were averaged N times
def filter_points(res, min_points):
    out = {}
    for i in list(res.keys()):
        if len(res[i]["R"]) >= min_points:
            out[i] = {}
            out[i]["R"] = res[i]["R"]
            out[i]["R_mean"] = res[i]["R_mean"]
            out[i]["R_conf"] = res[i]["R_conf"]
    return out


# Plot the results
def plotter(res, filtering=True, min_points=10, fname=None, title=""):
    plt.figure(figsize=(8, 5))

    if filtering:
        res = filter_points(res, min_points)

    delays = np.array(list(res.keys()))
    resistances = [res[i]["R_mean"] for i in delays]
    yerrs = [(res[i]["R_conf"].low, res[i]["R_conf"].high) for i in delays]
    for i in range(len(yerrs)):
        yerrs[i] = (resistances[i] - yerrs[i][0], yerrs[i][1] - resistances[i])
    plt.errorbar(
        delays,
        resistances,
        linestyle="-",
        marker="o",
        markersize=2,
        yerr=np.swapaxes(np.array(yerrs), 0, 1),
        ecolor="red",
    )
    plt.xscale("symlog")
    plt.ylabel("R, Ohms")
    plt.xlabel("Delay, ns")
    plt.tight_layout()
    plt.title(title)
    if fname:
        plt.savefig(fname)
    plt.show()

In [None]:
def read(filenames, meta_temp="mean", zero_point=True, fix_delays=True):
    if platform.system() == "Linux" or platform.system() == "Darwin":
        split_symbol = "/"
    else:
        split_symbol = "\\"
    add_symbol = "o"
    laser_err_ns = 13  # Laser error
    tol_ns = 1  # Absolute tolerance for laser error
    rtol_xor = 1e-2  # Relative tolerance (the shift is not exactly 13 ns)
    tmp = []
    delays = set()
    points = 0
    result = {}
    result["X_inc"] = None
    result["X_or"] = None

    for filename in filenames:
        if "preamble" in filename:
            LASER_PULSE_CONSTANT = int(
                filename.split("_")[-3].split(".")[0]
            )  # get the 40 \mu s +- some ns for the 0 delay
            break

    for filename in filenames:
        # Get delay list and the supposed amount of points for averaging
        if filename.split(split_symbol)[-1].split("_")[1] != "preamble":
            delays.add(
                round(
                    float(filename.split(split_symbol)[-1].split("_")[3]) * 1e9
                    - LASER_PULSE_CONSTANT,
                    0,
                )
            )
            tmp.append(filename)
            points = max(int(filename.split("-")[-1].split(".")[0]), points)
        else:
            # Check that data is consistent
            with open(filename, "r") as f:
                preamble = f.readline()
            preamble = preamble.strip().split(",")

            if result["X_inc"] is None:
                result["X_inc"] = float(preamble[4]) * 1e9
            elif result["X_inc"] != float(preamble[4]) * 1e9:
                raise ValueError("Preamble X_inc changed!")
            else:
                pass

            if result["X_or"] is None:
                result["X_or"] = float(preamble[5]) * 1e9
            elif result["X_or"] - float(preamble[5]) * 1e9 <= result["X_or"] * rtol_xor:
                raise ValueError(
                    f"Preamble X_or changed! {result['X_or']} to {float(preamble[5]) * 1e9}"
                )
            else:
                pass

    result["points"] = points + 1  # numeration from 0
    result["laser_error_points"] = int(
        (laser_err_ns - tol_ns) / result["X_inc"]
    )  # In osc. units (samples)
    result["laser_error_ns"] = laser_err_ns

    filenames = copy.deepcopy(tmp)
    del tmp
    filenames.sort(
        key=lambda x: round(
            float(x.split(split_symbol)[-1].split("_")[3]) * 1e9 - LASER_PULSE_CONSTANT,
            0,
        )
    )
    broken = []
    result["data"] = {}
    result["sample_type"] = filenames[0].split(split_symbol)[-1].split("_")[1]
    result["sample_id"] = filenames[0].split(split_symbol)[-1].split("_")[2]

    # TODO - add other params
    for i in sorted(list(delays)):
        # Prealloc space and names
        result["data"][i] = {}
        result["data"][i]["CH1"] = [j for j in range(result["points"])]
        result["data"][i]["CH2"] = [j for j in range(result["points"])]
        result["data"][i]["CH3"] = [j for j in range(result["points"])]
        result["data"][i]["T"] = [j for j in range(result["points"])]

    for filename in tqdm(filenames):
        with open(filename, "r") as f:
            filestr = f.readline()
            # Check for broken files & inform later
            if len(filestr.strip()) == 0:
                broken.append(filename)
                continue
            filename_splitted = filename.split("/")[-1].split("_")
            delay = round(float(filename_splitted[3]) * 1e9 - LASER_PULSE_CONSTANT, 0)
            # If a thermometer is used
            # In case of 2 thermometers in one system
            if add_symbol in filename_splitted[4]:
                if meta_temp == "mean":
                    result["data"][delay]["T"][
                        int(filename_splitted[-1].split("-")[-1].split(".")[0])
                    ] = np.mean(
                        [
                            temp_to_kelvin(filename_splitted[4].split(add_symbol)[0]),
                            temp_to_kelvin(filename_splitted[4].split(add_symbol)[1]),
                        ]
                    )

                elif type(meta_temp) == int:
                    result["data"][delay]["T"][
                        int(filename_splitted[-1].split("-")[-1].split(".")[0])
                    ] = float(
                        temp_to_kelvin(
                            filename_splitted[4].split(add_symbol)[meta_temp]
                        )
                    )

            else:
                result["data"][delay]["T"][
                    int(filename_splitted[-1].split("-")[-1].split(".")[0])
                ] = temp_to_kelvin(filename_splitted[4])

            result["pulse_dur_ns"] = int(filename_splitted[5])

            # Get the photodiode peak place
            if filename_splitted[-1][:-4].split("-")[0] == "CH3":
                result["data"][delay]["CH3"][
                    int(filename_splitted[-1].split("-")[-1].split(".")[0])
                ] = np.argmax(np.array(filestr.split(",")[:-1], dtype=float))

            # This is the data from the measurement, record to its place
            else:
                result["data"][delay][filename_splitted[-1][:-4].split("-")[0]][
                    int(filename_splitted[-1].split("-")[-1].split(".")[0])
                ] = np.array(filestr.split(",")[:-1], dtype=float)

    # If the number of points for averaging changed -- clean
    # Might happen when we don't really care much about points at high delays / preliminary review
    for i in sorted(list(delays)):
        for j in reversed(range(len(result["data"][i]["CH1"]))):
            if type(result["data"][i]["CH1"][j]) == int:
                result["data"][i]["CH1"].pop(j)
                result["data"][i]["CH2"].pop(j)
                result["data"][i]["CH3"].pop(j)
                result["data"][i]["T"].pop(j)

    # Do we find the 0 point?
    if zero_point:
        result["zero_point"] = check_pulse_zero(
            int(result["pulse_dur_ns"] / result["X_inc"]),
            result["data"][0]["CH1"],
            result["data"][0]["CH3"],
            result["laser_error_points"],
        )

    # Do we fix all the points?
    if fix_delays:
        return replace_correctly(result)
    else:
        return result


# Raw oscilloscope data -> V_1 and V_2
def to_volts(raw_data_dict, bounds=[0, 100, 0, 100], method="max"):
    result = {}
    if method == "max":
        func = np.max
    elif method == "integrate":
        func = integrate.trapezoid
    elif method == "bounded_integrate":
        func = bounded_zoo(bounds).b_integrate
    elif method == "bounded_mean":
        func = bounded_zoo(bounds).b_mean
    elif method == "bounded_max":
        func = bounded_zoo(bounds).b_max
    else:
        warnings.warn('Unknown oscillogram to volts method, defaulting to "max".')
        method = "max"
        func = np.max

    for i in list(raw_data_dict["data"].keys()):
        tmp = range(len(raw_data_dict["data"][i]["CH1"]))
        result[i] = {}
        result[i]["V1"] = []
        result[i]["V2"] = []
        for j in tmp:
            if "bounded" in method:
                result[i]["V1"].append(func(raw_data_dict["data"][i]["CH1"][j], "CH1"))
                result[i]["V2"].append(func(raw_data_dict["data"][i]["CH2"][j], "CH2"))
            else:
                result[i]["V1"].append(func(raw_data_dict["data"][i]["CH1"][j]))
                result[i]["V2"].append(func(raw_data_dict["data"][i]["CH2"][j]))
    return result


# V_1 and V_2 data to resistance
# soglres is the R_m, the impedance matching
def to_resistance(raw_volt_dict, soglres=51):
    result = {}
    for i in list(raw_volt_dict.keys()):
        result[i] = {}
        result[i]["R"] = []
        for j in range(len(raw_volt_dict[i]["V1"])):
            result[i]["R"].append(
                50
                * (
                    (2 * raw_volt_dict[i]["V2"][j] / raw_volt_dict[i]["V1"][j] - 1)
                    / (1 + 50 / soglres)
                    - 1
                )
            )

    for i in list(raw_volt_dict.keys()):
        result[i]["R_mean"] = np.mean(result[i]["R"])
        if np.array(result[i]["R"]).shape[0] > 1:
            # In case we have more than 1 point, let's find a conf interval
            result[i]["R_conf"] = bootstrap(
                (np.array(result[i]["R"]),),
                np.mean,
                confidence_level=0.95,
                random_state=42,
            ).confidence_interval
        else:
            result[i]["R_conf"] = ConfidenceInterval(
                low=np.mean(result[i]["R"]), high=np.mean(result[i]["R"])
            )
            warnings.warn(f"Only one sample for delay {i}, conf_int is invalid!")
    return result

In [None]:
rr = read(glob("")) # path/*.txt

In [None]:
visual_check(rr, boundaries=[]) # int, int, int, int

In [None]:
# int, int
ch1 = []
ch2 = []

boundaries = ch1 + ch2

In [None]:
vv = to_volts(rr, boundaries, method="bounded_integrate")

In [None]:
res = to_resistance(vv)

In [None]:
plotter(res, min_points=10, title="")