In [104]:
from src.signal_generator import SignalGenerator
from src.metrics import Metric
from src.data_type import Measurement
# import matplotlib.pyplot as plt
from src.data_compressor.other import CompressNTHS
from src.data_compressor.other import CompressMinMax
from src.data_compressor.other import CompressPWP
from src.data_compressor.pip import CompressPIP_ED
from src.data_compressor.pip import CompressPIP_PD
from src.data_compressor.pip import CompressPIP_VD
from src.data_compressor.paa import CompressPAA
from src.data_compressor.paa import CompressPAAVI
from src.data_compressor.paa import CompressByChunk
from src.data_compressor.pla import CompressAPCADFT
from src.data_compressor.pla import CompressAPCAFFT
from src.data_compressor.pla import CompressSTC
from src.data_compressor.pla import CompressHigherDeriveration

In [105]:
from src.data_type import Measurement
from src.data_compressor.compressor import Compressor
from typing import List, Dict
from collections import defaultdict as dict

In [106]:
signal_generator = SignalGenerator(0, 100).with_peaks(3).with_peaks(3, direction=-1).sin(0.2, 0.2)
measurements = [Measurement(measurement, index * 100) for index, measurement in enumerate(signal_generator.data)]

In [107]:
import matplotlib.pyplot as plt
from typing import Tuple, Any, List
def vizualize(original_data, compressed_data, show_compressed: bool = True, show_interpolation: bool = True):
  x_original = [measurement.timestamp for measurement in original_data]
  y_original = [measurement.value for measurement in original_data]
  plt.figure(dpi=150)
  if show_interpolation:
      plt.plot(x_original, y_original, 'b')
  plt.plot(x_original, y_original, 'bo')
  if show_compressed:
    x_compressed = [measurement.timestamp for measurement in compressed_data]
    y_compressed = [measurement.value for measurement in compressed_data]
    if show_interpolation:
      plt.plot(x_compressed, y_compressed, 'r')
    plt.plot(x_compressed, y_compressed, 'ro')
  plt.grid()
  plt.show()

In [116]:
class AlgorythmSelector:

  def __init__(self) -> None:
    self.metrics_containter = Metric()
    self.compressors: Dict[str, Compressor] = {
      'CompressNTHS': CompressNTHS(),
      'CompressMinMax': CompressMinMax()
    }

  def set_metrics(self):
    pass # todo
  
  def get_best(self, data: List[Measurement]):
    metric_result = dict()
    for name, compressor in self.compressors.items():
      compressor.set_data(data)
      compressor.compress()
      compressed_data = compressor.compressed_data
      # vizualize(data, self.metrics_containter._interpolate_data(data, compressed_data))
      metric_result[name] = {
        'compression_rate': self.metrics_containter.compression_ratio_score(data, compressed_data) * 2,
        'sum_differences': self.metrics_containter.sum_differences_score(data, compressed_data),
        'arithmetic_average': self.metrics_containter.arithmetic_average_score(data, compressed_data),
        'standard_derivative': self.metrics_containter.standard_derivative_score(data, compressed_data),
        'function_field': self.metrics_containter.function_field_score(data, compressed_data),
        'diff_of_min': self.metrics_containter.diff_of_min_score(data, compressed_data),
        'diff_of_max': self.metrics_containter.diff_of_max_score(data, compressed_data),
        'min_max_diff': self.metrics_containter.min_max_diff_score(data, compressed_data),
        'value_crossing': self.metrics_containter.value_crossing_score(data, compressed_data),
        'positive_value_crossing': self.metrics_containter.positive_value_crossing_score(data, compressed_data),
        'negative_value_crossing': self.metrics_containter.negative_value_crossing_score(data, compressed_data),
        'peak_count': self.metrics_containter.peak_count_score(data, compressed_data),
        'positive_peak_count': self.metrics_containter.positive_peak_count_score(data, compressed_data),
        'negative_peak_count': self.metrics_containter.negative_peak_count_score(data, compressed_data),
        'median': self.metrics_containter.median_score(data, compressed_data),
        'covariance': self.metrics_containter.covariance_score(data, compressed_data),
        'corelation_pearson': self.metrics_containter.corelation_pearson_score(data, compressed_data),
        'corelation_spearman': self.metrics_containter.corelation_spearman_score(data, compressed_data),
      }
    # for key, metrics in metric_result.items():
    #   print(key)
    #   for name, value in metrics.items():
    #     print(f"\t{name}:\t{value}")
    agregated_metrics = dict()
    for method_name, metrics_value in metric_result.items():
      agregated_metrics[method_name] = sum(metrics_value.values())
    sorted_methods = sorted(agregated_metrics.items(), key=lambda item: item[1])
    # print(sorted_methods)
    # todo: get best with the largest compress ratio
    best_method_name = sorted_methods[0][0]
    return self.compressors[best_method_name]

AlgorythmSelector().get_best(measurements)

<src.data_compressor.other.compress_nths.CompressNTHS at 0x10a44e910>