In [1]:
from pgnano.stats_analysis.primitives import PGPoreType
from pgnano.stats_analysis.jupyter_data_preparation import flatten_sample_data
from functools import partial
from pgnano.stats_analysis.coding_analysis_scripts import *
from scipy.stats import geom, halfnorm
from scipy.linalg import lstsq
from scipy.fft import fft
from inspect import signature
import numpy as np
import numpy.typing as npt
import matplotlib.pyplot as plt
import statistics

In [2]:
signal_data, chunk_data = flatten_sample_data(PGPoreType.P10_4_1,100)
one_signal = signal_data[0]
error = transform_signal_to_error(one_signal)
code = transform_error_to_code(error)
low_part = code & 0x1F

['/data/datananoraw/data_analysis/10_4_1/2d6ce589-PAG70058_pass_90cff557_229fb1c2_1643.pod5', '/data/datananoraw/data_analysis/10_4_1/58662236-PAG70133_pass_976dfe21_5dadfada_5.pod5', '/data/datananoraw/data_analysis/10_4_1/679e8449-PAG70058_pass_90cff557_229fb1c2_1581.pod5', '/data/datananoraw/data_analysis/10_4_1/7646a9e4-PAG65784_pass_f306681d_16a70748_508.pod5', '/data/datananoraw/data_analysis/10_4_1/889add85-PAG67404_fail_a8a15ce6_4a74c11f_28.pod5', '/data/datananoraw/data_analysis/10_4_1/898fa111-PAG68757_fail_39c39833_26077d5d_167.pod5', '/data/datananoraw/data_analysis/10_4_1/8ba56eeb-PAG65784_pass_f306681d_16a70748_1319.pod5', '/data/datananoraw/data_analysis/10_4_1/a60ded57-PAG65902_pass_96491aed_0156c9a3_1335.pod5', '/data/datananoraw/data_analysis/10_4_1/a7ff4244-PAG68757_pass_39c39833_26077d5d_944.pod5', '/data/datananoraw/data_analysis/10_4_1/af5d6be7-PAG70133_pass_976dfe21_5dadfada_1065.pod5', '/data/datananoraw/data_analysis/10_4_1/c2f433b8-PAG65902_fail_96491aed_0156c

In [3]:
def add_previous_sample(xs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    previous = np.append([0], xs[:-1])
    return (xs, previous)

In [4]:
def unwrap_ctxs(o) -> np.ndarray:
    ctxs, idxs, positions = o
    return ctxs

In [5]:
def adaptive_probability_length_estimate(xs: np.ndarray):
    prob = np.ones(len(np.bincount(xs)))
    count = len(prob)
    code_len_low_estimate = 0
    for x in xs:
        code_len_low_estimate -= np.log2(prob[x] / count)
        prob[x] += 1
        count += 1
    return code_len_low_estimate

In [6]:
def contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions):
    i = 0
    for (x, idx) in zip(xs, ctxs_idxs):
        ctxs[idx].append(x)
        ctxs_positions[idx].append(i)
        i += 1


In [7]:
def process_entropy_lower_bound(xss: List[np.ndarray]):
    code_len_low_estimate = 0
    cummulative_len = sum(map(lambda x: len(x), xss))
    for xs in xss:
        code_len_low_estimate += adaptive_probability_length_estimate(xs)
    return code_len_low_estimate / cummulative_len

In [8]:
def max_bits(x: np.unsignedinteger) -> np.unsignedinteger:
    i = 0
    while x / (2**i) > 0:
        i += 1
    return i

In [9]:
def contextualize_even_odd_previous(xs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    _, previous = add_previous_sample(xs)
    ctxs = [[], []]
    ctxs_positions = [[], []]
    ctxs_idxs = previous & 0x1
    contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions)
    return ctxs, ctxs_idxs, ctxs_positions

In [10]:
def contextualize_by_derivative_absolute_value(xs: np.ndarray) -> List[np.ndarray]:
    _, prev = add_previous_sample(xs)#np.concatenate([0], xs[:-1])
    _, prev_prev = add_previous_sample(prev)
    deriv = prev - prev_prev
    f = np.vectorize(lambda x: 0 if abs(x) > 7 else 1)
    ctxs_idxs = f(deriv)
    ctxs = [[],[]]
    ctxs_positions = [[], []]
    contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions)
    return ctxs, ctxs_idxs, ctxs_positions

In [11]:
def contextualize_by_derivative_sign(xs: np.ndarray) -> List[np.ndarray]:
    _, prev = add_previous_sample(xs)
    _, prev_prev = add_previous_sample(prev)
    deriv = prev - prev_prev
    f = np.vectorize(lambda x: 0 if x > 0 else 1)
    ctxs_idxs = f(deriv)
    ctxs = [[],[]]
    ctxs_positions = [[], []]
    contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions)
    return ctxs, ctxs_idxs, ctxs_positions

In [12]:
def contextualize_by_rugosity(xs: np.ndarray) -> List[np.ndarray]:
    _, xs_1 = add_previous_sample(xs)
    _, xs_2 = add_previous_sample(xs_1)
    _, xs_3 = add_previous_sample(xs_2)
    der_1 = xs_1 - xs_2
    der_2 = xs_2 - xs_3
    contexts = [[],[],[],[]]
    for x, d_1, d_2 in zip(xs, der_1, der_2):
        context_idx = []

In [13]:
def contextualize_by_signal_1(xs: np.ndarray, signal: np.ndarray) -> List[np.ndarray]:
    signal_max = max(signal)
    signal_min = min(signal)
    threshold = (signal_max + signal_min) / 2
    f = np.vectorize(lambda x: 0 if x > threshold else 1)
    ctxs_idxs = f(add_previous_sample(signal)[1])
    ctxs = [[], []]
    ctxs_positions = [[], []]
    contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions)
    return ctxs, ctxs_idxs, ctxs_positions

In [14]:
def contextualize_by_signal_2(xs: np.ndarray, signal: np.ndarray) -> List[np.ndarray]:
    signal_max = max(signal)
    signal_min = min(signal)
    middle_point = (signal_max + signal_min) / 2
    f = np.vectorize(
        lambda x: 0 if x > (middle_point + signal_max) / 2 else
                  1 if x > (((middle_point + signal_max) / 2) + (middle_point))/2 else
                  2 if x > (signal_min + middle_point) / 2 else
                  3
    )
    ctxs_idxs = f(add_previous_sample(signal)[1])
    ctxs = [[], [], [], []]
    ctxs_positions = [[], [], [], []]
    contextualize_inplace(xs, ctxs, ctxs_idxs, ctxs_positions)
    return ctxs, ctxs_idxs, ctxs_positions

In [15]:
def cartesian_context(xs, f, h, signal = None) -> List[np.ndarray]:
    def call_by_signature(f, xs, signal):
        if len(signature(f).parameters.values()) == 1:
            return f(xs)
        elif len(signature(f).parameters.values()) == 2:
            return f(xs, signal)
        else:
            raise "Not recognized signature"

    ctxs_f, ctxs_idxs_f, ctxs_positions_f = call_by_signature(f, xs, signal)
    ctxs_h, ctxs_idxs_h, ctxs_positions_h = call_by_signature(h, xs, signal)# FIXME: ctxt_idxs should actually be the index in the input array
    
    ctxs_res = []
    ctxs_idxs_res = []
    ctxs_positions_res = []
    f_num_idxs = len(set(ctxs_idxs_f))
    h_num_idxs = len(set(ctxs_idxs_h))

    for _ in range(len(ctxs_f) * len(ctxs_h)):
        ctxs_res.append([])
        ctxs_positions_res.append([])

    for i in range(len(xs)):
        new_idx = f_num_idxs * ctxs_idxs_h[i] + ctxs_idxs_f[i]
        ctxs_idxs_res.append(new_idx)
        ctxs_idxs_f[i]
        ctxs_f[ctxs_idxs_f[i]]
        ctxs_res[new_idx].append(ctxs_f[ctxs_idxs_f[i]][0])
        ctxs_f[ctxs_idxs_f[i]].pop()
    return ctxs_res, ctxs_idxs_res, ctxs_positions_res

In [16]:
process_entropy_lower_bound([low_part])

4.955913889795664

In [17]:
process_entropy_lower_bound(unwrap_ctxs(contextualize_even_odd_previous(low_part)))

4.920541600263688

In [18]:
process_entropy_lower_bound(unwrap_ctxs(contextualize_by_derivative_absolute_value(low_part)))

4.956400860293818

In [19]:
process_entropy_lower_bound(unwrap_ctxs(contextualize_by_derivative_sign(low_part)))

4.956357602663067

In [20]:
process_entropy_lower_bound(unwrap_ctxs(contextualize_by_signal_1(low_part, one_signal)))

4.955052142149329

In [21]:
process_entropy_lower_bound(unwrap_ctxs(contextualize_by_signal_2(low_part, one_signal)))

4.95064642489646

In [22]:
process_entropy_lower_bound(unwrap_ctxs(cartesian_context(low_part,contextualize_by_signal_2,contextualize_even_odd_previous,one_signal)))

0.004342823560820523