In [136]:
from dataclasses import dataclass
from typing import List, Tuple
import numpy as np
import pandas as pd
import os
import glob
import matplotlib.pyplot as plt
from scipy.stats import norm
import scipy.stats as st
from statistics import NormalDist
from scipy.stats import multivariate_normal as mvn

@dataclass
class NormalDistribution:
    mean: float
    std: float
    
@dataclass
class Part:
    
    type: str
    sub_part_name: str
    sensor: str
    signals: List   # Signal is numpy array of (500,3) with [frequency, Z, X]

In [None]:

def load_part_data(part_type: str) -> List[Part]:
    
    parts = []
    for part_dir in os.listdir(f'psig_matcher/data/{part_type}'):
        
        sensor = part_dir[1:]
        measurement_files = glob.glob(f'psig_matcher/data/{part_type}/{part_dir}/*.npy')
        measurements = [np.load(f) for f in measurement_files]
        parts.append(Part(part_type, part_dir, sensor, measurements))
    
    return parts

con_parts = load_part_data('CON')
#conlid_parts = load_part_data('CONLID') # Need to handle the damage files
lid_parts = load_part_data('LID')

In [None]:
import dataclasses


def limit_deminsionality(parts: List[Part], frequeny_indexes: List[int]) -> List[Part]:
    """Use only a subset of the frequencies for the analysis. This effectivley transforms the 
    500 dimension multivariant distribution to a n-dimentional distribution where n is the
    length of the frequency_indexes.
    
    Further, this assumes use of the X axis"""
    
    return [
        dataclasses.replace(part, signals=[[signal[index][1] for index in frequeny_indexes] for signal in part.signals])
        for part in parts]

In [None]:
def estimate_normal_dist(x: List[float], confidence: float) -> NormalDistribution:
    """Estimate the normal distribution for the given data.
    This is done using: https://handbook-5-1.cochrane.org/chapter_7/7_7_3_2_obtaining_standard_deviations_from_standard_errors_and.htm#:~:text=The%20standard%20deviation%20for%20each,should%20be%20replaced%20by%205.15.
    
    TODO (henry): I'm not sure this is correct.
    """
    
    # Use T distribution for small sample sizes
    if len(x) < 30:
        lower, upper = st.t.interval(confidence, len(x)-1, loc=np.mean(x), scale=st.sem(x))
        t_value = st.t.ppf(confidence, len(x)-1)
        std = np.sqrt(len(x))*(upper-lower)*t_value
    
    # Use normal distribution for larger sample sizes
    else:
        lower, upper = st.norm.interval(confidence, loc=np.mean(x), scale=st.sem(x))
        z_value = st.norm.ppf(confidence)
        std = np.sqrt(len(x))*(upper-lower)*z_value
    
    return NormalDistribution(np.mean(x, axis=0), std)

In [None]:
def concrete_normal_dist(x: List[float]) -> NormalDistribution:
    return NormalDistribution(np.mean(x, axis=0), np.std(x, axis=0))

In [None]:
def plot_single_distributions(pdfs: List[NormalDistribution], labels: List[str], title: str):
    """ pdfs is a list of tuples of (mean, std) for each distribution."""
    
    for pdf, label in zip(pdfs, labels):
        
        x = np.linspace(pdf.mean - 3* pdf.std, pdf.mean + 3* pdf.std, 100)
        plt.plot(x, norm.pdf(x, pdf.mean, pdf.std), label=label)
    
    plt.legend(loc='best')
    plt.title(title)
    plt.show()

In [None]:
def find_overlap(normal_d_1: NormalDistribution, normal_d_2: NormalDistribution) -> float:
    """Finds the overlap between two distributions."""
    
    return NormalDist(mu=normal_d_1.mean, sigma=normal_d_1.std).overlap(NormalDist(mu=normal_d_2.mean, sigma=normal_d_2.std))


In [None]:
def find_overlap_of_set(pdfs: List[NormalDistribution]) -> float:
    """Finds the overlap between a set of distributions."""
    
    overlaps = []
    for i in range(len(pdfs)):
        for k in range(i+1, len(pdfs)):
            # Currently this method redundantly counts overlaps that may have already been accounted for. 
            # If A and B overlap on the edge of B, but C also overlaps on the edge of B, we're double
            # counting that overlap. Maybe we want to do this? 
            overlaps.append(find_overlap(pdfs[i], pdfs[k]))
    
    return np.mean(overlaps)

In [None]:
def perform_1d_analysis(parts: List[Part]):
    
    single_d_parts = limit_deminsionality(parts, [0])
    print(single_d_parts[0].signals)
    pdfs = [estimate_normal_dist(part.signals, 0.95) for part in single_d_parts]
    plot_single_distributions(pdfs, [f"{part.type} - {part.sub_part_name}" for part in single_d_parts], f'1D Analysis - Estimated Confidence at 95%')
    print(f"Overlap of Estimated pdf's at 95% confidence: {find_overlap_of_set(pdfs)}")
    
    pdfs = [concrete_normal_dist(part.signals) for part in single_d_parts]
    plot_single_distributions(pdfs, [f"{part.type} - {part.sub_part_name}" for part in single_d_parts], f'1D Analysis - Concrete')
    print(f"Overlap of concrete pdf's: {find_overlap_of_set(pdfs)}")
    
    
    

perform_1d_analysis(con_parts)
#perform_1d_analysis(lid_parts)


In [148]:
def estimate_overlap_of_set(pdfs: List[NormalDistribution], samples: int, confidence_bound: float) -> float:
    """Estimates the overlap between a set of distributions.
    
    The meta pdf is really just the combined pdfs of all the distributions, then we're drawing from that
    and seeing how many samples would cause conflicts. How can we prove the distribution we're pulling samples
    from is representative of the entire population? Is the estimated confidence good enough? 
    
    Could we potentially randomly sample from each distribution and just see which ones end up overlapping
    with the other distributions? TODO (henry): Think about this more """
    
    pdf_means = [pdf.mean for pdf in pdfs]    
    meta_pdf = estimate_normal_dist(pdf_means, 0.95)
    samples = np.random.multivariate_normal(meta_pdf.mean, np.diag(meta_pdf.std), samples)
    min_confidence = 1 - confidence_bound
    
    sample_confidences = [
        [mvn(mean=pdf.mean, cov=pdf.std).cdf(sample) 
        for sample in samples]
        for pdf in pdfs]
    
    filtered_confidences = [
        list(filter(lambda confidence: confidence > min_confidence, sample_confidence))
        for sample_confidence in sample_confidences]
    
    # We're ok with up to 1 match, but every one more than that is a conflict.
    collisions = [min(len(confidences)-1, 0) for confidences in filtered_confidences]
    return (sum(collisions)/samples)
    
    
    

In [149]:
def perform_multivariant_analsis(parts: List[Part]):
    
    multivariant_parts = limit_deminsionality(parts, list(range(500)))
    pdfs = [estimate_normal_dist(part.signals, 0.95) for part in multivariant_parts]
    estimated_collision_rate = estimate_overlap_of_set(pdfs, 2, 0.95)
    print(f"Estimated collision rate: {estimated_collision_rate}")
    
    
    

perform_multivariant_analsis(con_parts)

Estimated collision rate: [[-0.00228368 -0.0022973  -0.00245822 -0.00244293 -0.00257974 -0.00250048
  -0.00255609 -0.00294516 -0.0029291  -0.0028985  -0.00298094 -0.00282669
  -0.00327084 -0.00320156 -0.00299901 -0.00330149 -0.00327155 -0.00342526
  -0.00373731 -0.00347958 -0.00325078 -0.00348476 -0.00371873 -0.00386561
  -0.00409649 -0.00467353 -0.00395093 -0.00401733 -0.00421094 -0.0047179
  -0.00456817 -0.00397749 -0.0042952  -0.00454321 -0.0044598  -0.00411328
  -0.00506144 -0.00436395 -0.00515157 -0.00481664 -0.00510896 -0.0052352
  -0.00591252 -0.00467591 -0.00541045 -0.0068676  -0.00574652 -0.00687603
  -0.00534061 -0.00648637 -0.00615721 -0.00535302 -0.00617933 -0.00492329
  -0.00557907 -0.00610416 -0.00682947 -0.00630692 -0.00707016 -0.00593625
  -0.00612705 -0.00635973 -0.00644751 -0.00868459 -0.00628634 -0.00712607
  -0.00660627 -0.00701461 -0.00770586 -0.00677606 -0.00644139 -0.00673975
  -0.00670749 -0.00643107 -0.0066229  -0.00707988 -0.00687796 -0.00705387
  -0.00638045 