# About Program

## License
This piece of software is licensed under GNU Public License v3. You can find full text of license by this [link](https://www.gnu.org/licenses/gpl-3.0.en.html).

## Description
This program tries to prioritize equations retrieved with dimentional analysis. To do so it uses additional measurements CSV table for known quantities and heuristics applied to unknown quantity. More detailed description (in russian) you can find by following this [link](here link to blogpost).

*Now you can select **"Cell"->"Run all" then scroll down** and you will see user interface for interacting with program.*

In [1]:
# quantity parser

import sympy
from enum import Enum
import re
from collections import OrderedDict


class MultiplicationParser(object):
    class ParseState(Enum):
        unit = 0
        unit_power = 1

    def __init__(self, str_expr):
        self.units = []
        self.str_expr = str_expr

        self.cur_state = self.ParseState.unit
        self.__reset_current_unit()
        self.__parse_mul()

    def get_units_with_powers(self):
        return self.units

    def __add_unit(self, name, power):
        if name == '':
            raise Exception("Empty unit name while parsing '{}'".format(self.str_expr))
        if name[0] in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'):
            raise Exception("Unit names starting with digits are not allowed (while parsing '{}')".format(self.str_expr))
        power_rational = sympy.Rational(power[0], power[1])
        if power_rational == sympy.nan:
            raise Exception("Unexpected power value (while parsing '{}')".format(self.str_expr))
        self.units.append((name, power_rational))

    def __reset_current_unit(self):
        self.cur_unit = ''
        self.cur_unit_power = [None, None]
        self.cur_unit_power_signs = [1, 1]
        self.cur_unit_power_index = 0

    def __switch_state(self, state):
        if state == self.ParseState.unit:
            if not self.cur_unit == '':
                # set numerator and denumerator of power to 1 if they were not specified
                powers = [1 if p is None else p for p in self.cur_unit_power]
                result_power = [sign * power for sign, power in zip(powers, self.cur_unit_power_signs)]
                self.__add_unit(self.cur_unit, result_power)
            self.__reset_current_unit()
        self.cur_state = state

    def __parse_mul(self):
        for ch in self.str_expr:
            if ch in ' \t':
                continue
            if self.cur_state == self.ParseState.unit:
                if ch == '^':
                    self.__switch_state(self.ParseState.unit_power)
                elif ch == '*':
                    self.__switch_state(self.ParseState.unit)
                else:
                    self.cur_unit += ch
            elif self.cur_state == self.ParseState.unit_power:
                if ch == '-':
                    if not self.cur_unit_power[self.cur_unit_power_index] is None:
                        raise Exception("Unexpected '-' in {}".format(self.str_expr))
                    self.cur_unit_power_signs[self.cur_unit_power_index] = -1
                elif ch == '/':
                    if self.cur_unit_power_index != 0:
                        raise Exception('Only one denominator is allowed (while parsing {})'.format(self.str_expr))
                    self.cur_unit_power_index = 1
                elif ch in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'):
                    ch_int = int(ch)
                    if self.cur_unit_power[self.cur_unit_power_index] is None:
                        self.cur_unit_power[self.cur_unit_power_index] = ch_int
                    else:
                        self.cur_unit_power[self.cur_unit_power_index] = \
                            self.cur_unit_power[self.cur_unit_power_index] * 10 + ch_int
                elif ch == '*':
                    self.__switch_state(self.ParseState.unit)
        self.__switch_state(self.ParseState.unit)


class QuantityParser(object):
    def __init__(self, str_expr):
        self.units_and_powers = OrderedDict()
        self.quantity_name = None

        self.__parse_quantity_expr(str_expr)

    def get_units_and_powers(self):
        return self.units_and_powers

    def get_quantity_name(self):
        return self.quantity_name

    def __parse_quantity_expr(self, quantity_expr):
        pattern = r'[ \t\r\n]*([a-zA-Z\\][a-zA-Z0-9_]*)(\[([^\]]+)\])?'
        match = re.match(pattern, quantity_expr)
        if match is None:
            raise ValueError('Unable to parse expression "{}"'.format(quantity_expr))
        groups = match.groups()
        if len(groups) != 1 and len(groups) != 3:
            raise ValueError('Unable to parse expression "{}"'.format(quantity_expr))
        self.quantity_name = groups[0]
        if len(groups) == 3:
            units_str = groups[2]
            self.__parse_units(units_str)

    def __add_units_mul(self, powers_sign, units_with_powers):
        powers = [(unit, powers_sign * power) for unit, power in units_with_powers]
        for unit, power in powers:
            if unit in self.units_and_powers:
                self.units_and_powers[unit] *= power
            else:
                self.units_and_powers[unit] = power

    def __parse_units(self, fraction_str):
        if fraction_str == '':
            raise Exception("Empty fraction (while parsing '{}')".format(fraction_str))
        multiplies = fraction_str.split('//')
        if len(multiplies) > 2:
            raise Exception(
                "Only one fraction division is allowed (while parsing '{}')".format(fraction_str))
        numerator_parser = MultiplicationParser(multiplies[0])
        self.__add_units_mul(1, numerator_parser.get_units_with_powers())
        if len(multiplies) == 2:
            denominator_parser = MultiplicationParser(multiplies[1])
            self.__add_units_mul(-1, denominator_parser.get_units_with_powers())

            
#if __name__ == '__main__':
#    expr_str = 'Q[kg*m^-3*s^2/3//a^14*b^2/-3*c^1]'
#    parser = QuantityParser(expr_str)
#    result = parser.get_units_and_powers()
#    print(result)

In [2]:
# report printing

import pylatex as pl
from sympy import fraction
import os.path
import urllib.parse


def gen_latex_formula(quantity_names, powers):
    positive_powers = [i for i in range(len(powers)) if powers[i] > 0]
    negative_powers = [i for i in range(len(powers)) if powers[i] < 0]

    def quantity_with_power_to_latex(name, power):
        numerator, denominator = fraction(power)
        assert numerator != 0
        # negative powers are processed separately
        numerator, denumerator = abs(numerator), abs(denominator)
        quant_latex = name
        if numerator != 1:
            quant_latex = quant_latex + '^{' + str(numerator) + '}'
        if denumerator != 1:
            quant_latex = '\\sqrt{' + quant_latex + '}'
        return quant_latex

    numerator = ''
    if len(positive_powers) == 0:
        numerator = '1'
    else:
        positive_quantities_wth_powers = [(quantity_names[indx], powers[indx]) for indx in positive_powers]
        numerator = ' '.join([quantity_with_power_to_latex(*quant_and_pow)
                             for quant_and_pow in positive_quantities_wth_powers])
    formula_latex = numerator

    if len(negative_powers) != 0:
        negative_quantities_wth_powers = [(quantity_names[indx], powers[indx]) for indx in negative_powers]
        denominator = ' '.join([quantity_with_power_to_latex(*quant_and_pow)
                                for quant_and_pow in negative_quantities_wth_powers])
        formula_latex = '$\\frac{' + formula_latex + '}{' + denominator + '}$'
        
    domain = 'https://approach0.xyz/search/?q='
    query = urllib.parse.quote(formula_latex)
    formula_with_href = pl.NoEscape('\\href{' + domain + query + '}{' + formula_latex + '}')

    return formula_with_href


def print_report(report_table, columns_ordered, quantity_names_ordered, out_path):
    doc = pl.Document()
    doc.documentclass = pl.Command(
        'documentclass',
        options=['landscape', 'a4paper','oneside', 'legno', 'notitlepage'],
        arguments=['report'])
    #doc.packages.append(pl.Package('hyperref'))
    doc.packages.append(pl.Package('hyperref', options=['hidelinks',
                                                        'colorlinks=true',
                                                        'linkcolor=blue',
                                                        'urlcolor=blue',
                                                        'citecolor=blue',
                                                        'anchorcolor=blue']))
    doc.packages.append(pl.Package('geometry', options=['margin=0.5 in']))

    full_columns = ('N',) + tuple(columns_ordered)  # + ('Unknown component', )
    formats = ['l']*len(full_columns)
    formats[columns_ordered.index('formula')+1] = 'p{2cm}'

    with doc.create(pl.Section('Summaries')):
        with doc.create(pl.LongTable(' '.join(formats))) as summaries_table:
            summaries_table.add_hline()
            summaries_table.add_row(full_columns)
            summaries_table.add_hline()
            summaries_table.end_table_header()
            summaries_table.add_hline()
            summaries_table.add_row((pl.MultiColumn(len(full_columns), align='r',
                                     data='Containued on Next Page'),))
            summaries_table.add_hline()
            summaries_table.end_table_footer()
            summaries_table.add_hline()
            summaries_table.add_row((pl.MultiColumn(len(full_columns), align='r',
                                     data='Not Containued on Next Page'),))
            summaries_table.add_hline()
            summaries_table.end_table_last_footer()

            for row_indx, row in enumerate(report_table):
                row_ordered = ['{0:.3f}'.format(row[col_name]) if col_name != 'formula'
                               else gen_latex_formula(quantity_names_ordered, row[col_name])
                               for col_name in columns_ordered]
                row_ordered = [row_indx + 1] + row_ordered
                summaries_table.add_row(row_ordered)

                
    path_and_ext = os.path.splitext(out_path)
    if len(path_and_ext) != 2:
        raise ValueError('Wrong output path: "{}"'.format(out_path))
    ext = path_and_ext[1].lower()
    if ext != '.pdf':
        raise ValueError('Only pdf files are supported')
    doc.generate_pdf(path_and_ext[0], silent=False)


In [3]:
# common tools

import sympy as sp
import numpy as np

def power_cost(x):
    n, d = sp.fraction(sp.Rational(x))
    return abs(n) + abs(d)


def clamp(minimum, maximum, x):
    return min(maximum, max(minimum, x))


def nearly_equal(x, y):
    eps = np.finfo(float).eps
    diff = abs(x-y)
    x = abs(x)
    y=abs(y)
    scaled_eps = eps * max(x, y)
    return diff <= scaled_eps


def normalize_80(container, key, revert=False):
    """
    normalization based on 80% of elements
    """
    elements = [elem[key] for elem in container if key in elem]
    elements.sort()
    ten_percent_count = int(len(elements) * 0.01 * 10)
    assert ten_percent_count * 2 < len(elements)
    elements = elements[ten_percent_count:len(elements)-ten_percent_count]
    maximum = max(elements)
    minimum = min(elements)
    clamp_min, clamp_max = -0.2, 1.2

    if maximum == minimum:
        for elem in container:
            elem[key] = 0.5
        return

    if not revert:
        for elem in container:
            # todo: optional exception?
            if key not in elem:
                continue
            elem[key] = clamp(clamp_min, clamp_max, (elem[key] - minimum) / (maximum - minimum))
    else:
        elements = np.ndarray((len(container), 1), dtype=np.float128)
        for i, elem in enumerate(container):
            if key not in elem:
                assert False
            elements[i] = elem[key]
        elements = np.clip(1.0 - (elements - minimum) / (maximum-minimum), clamp_min, clamp_max)
        for i, elem in enumerate(container):
            if key not in elem:
                assert False
            elem[key] = float(elements[i])


In [4]:
# formula estimators
import numpy as np
from scipy.interpolate import UnivariateSpline

# increase estimation
def make_increase_estimation(unknown_component_points):
    table_Y = unknown_component_points

    y0, yn = table_Y[0], table_Y[-1]
    if nearly_equal(y0, yn):
        return 0.5
    return 1.0 if yn > y0 else 0.0


# linearity estimation
def make_linearity_estimation(param, unknown_component_points, dbg_figname=None):
    table_X, table_Y = param, unknown_component_points
    domain = np.min(table_X), np.max(table_X)
    min_y, max_y = np.min(table_Y), np.max(table_Y)
    if min_y == max_y:
        return 1.0

    # todo: x already in [0, 1]
    Y_normalized = [(y - min_y) / (max_y - min_y) for y in table_Y]


#    if dbg_figname is not None:
#        import matplotlib.pyplot as plt
#
#        plt.plot(table_X, Y_normalized)
#        plt.savefig('plot_dump/{}.png'.format(dbg_figname))
#        plt.close()
#        with open('plot_dump/{}.txt'.format(dbg_figname), 'w') as ys_log:
#            for y in Y_normalized:
#                print(y, file=ys_log)

    spline = UnivariateSpline(table_X, Y_normalized, k=4, s=0)

    segment_count = 20
    step = 1.0 / segment_count
    segment_center_xs = [domain[0] + (domain[1] - domain[0]) * (step * i + step * 0.5) for i in range(segment_count)]
    segment_center_xs = [clamp(domain[0], domain[1], x) for x in segment_center_xs]
    assert len(segment_center_xs) == segment_count

    segment_center_ys = [spline(x) for x in segment_center_xs]

    segment_corner_xs = [domain[0] + (domain[1] - domain[0]) * (step * i) for i in range(segment_count + 1)]
    segment_corner_ys = [spline(x) for x in segment_corner_xs]
    assert len(segment_corner_xs) == segment_count + 1

    segments_linearity = []
    for i in range(segment_count):
        segment_center_pt = np.array([segment_center_xs[i], segment_center_ys[i]], dtype=np.float128)

        segment_left_pt = np.array((segment_corner_xs[i], segment_corner_ys[i]), dtype=np.float128)
        segment_right_pt = np.array((segment_corner_xs[i+1], segment_corner_ys[i+1]), dtype=np.float128)

        segment_left_dir = segment_left_pt - segment_center_pt
        segment_left_dir /= np.linalg.norm(segment_left_dir)

        segment_right_dir = segment_right_pt - segment_center_pt
        segment_right_dir /= np.linalg.norm(segment_right_dir)

        angle_cos = np.dot(segment_left_dir, segment_right_dir)
        segment_linearity = 0.5 - angle_cos*0.5
        segments_linearity.append(segment_linearity)

    total_linearity = sum(segments_linearity) / segment_count

    return total_linearity


# magnitude estimation
def make_magnitude_penalty(evaluated_unknown_points):
    max_value = np.max(np.abs(evaluated_unknown_points), axis=-1)
    return max_value

def make_avg_derivative_magnitude_estimation(param, evaluated_unknown_points):
    table_X, table_Y = param, evaluated_unknown_points

    dxdy_avg = (table_Y[-1] - table_Y[0]) / (table_X[-1] - table_X[0])
    return abs(dxdy_avg)


# monothonicity estimation
def make_non_monothonicity_estimation(param, unknown_component_points, dbg_figname=None):
    table_X, table_Y = param, unknown_component_points
    domain = np.min(table_X), np.max(table_X)
    min_y, max_y = np.min(table_Y), np.max(table_Y)
    #if min_y == max_y:
    #    return 1.0

    # todo: x already in [0, 1]
    X_normalized = table_X
    Y_normalized = [(y - min_y) / (max_y - min_y) for y in table_Y]

    derivatives = []
    for i in range(0, len(Y_normalized)-1):
        dy = (Y_normalized[i+1] - Y_normalized[i])
        dx = (X_normalized[i+1] - X_normalized[i])
        assert dx != 0
        dydx = dy / dx
        derivatives.append(dydx)

    sum_positive = sum(d for d in derivatives if d > 0)
    sum_negative = sum(d for d in derivatives if d < 0)

    estimation = abs(min(sum_negative, sum_positive))

    return estimation


#periodicity estimation
def are_function_segments_equal(function, segment1_domain, segment2_domain, accuracy):
    point_count = int(1.0/accuracy + 1)
    xs1 = [segment1_domain[0] + (segment1_domain[1] - segment1_domain[0]) * accuracy * i for i in range(point_count)]
    xs1 = [clamp(segment1_domain[0], segment1_domain[1], x) for x in xs1]
    ys1 = [function(x) for x in xs1]

    xs2 = [segment2_domain[0] + (segment2_domain[1] - segment2_domain[0]) * accuracy * i for i in range(point_count)]
    xs2 = [clamp(segment2_domain[0], segment2_domain[1], x) for x in xs2]
    ys2 = [function(x) for x in xs2]

    for y1, y2 in zip(ys1, ys2):
        equal = abs(y1 - y2) <= accuracy
        if not equal:
            return False
    return True


def split_domain_on_segments(domain, segment_length, accuracy):
    '''
    разбивает область определения функции на сегменты
    длиной segment_length (+ остаточный последний сегмент, если он длинее точности)
    '''
    domain_length = domain[1] - domain[0]
    full_segment_count = int(domain_length / segment_length)

    segment_starts = [domain[0] + segment_length * i for i in range(full_segment_count)]
    segment_ends = [domain[0] + segment_length * (i + 1) for i in range(full_segment_count)]

    last_segment_start = segment_ends[-1]  # должен совпадать с концом предыдущего сегмента
    last_segment_end = domain[1]  # идёт до конца области определения
    last_segment_length = last_segment_end - last_segment_start
    if last_segment_length > accuracy:
        segment_starts.append(last_segment_start)
        segment_ends.append(last_segment_end)

    segments_domains = list(zip(segment_starts, segment_ends))
    return segments_domains


def make_periodicity_estimation(table_X, table_Y, dbg_figname=None):
    '''
    взять стартовую точку
    найти все y совпадающие с данной в пределах точности
    сравнивать период начинающийся с первой точки с периодои, начинающимся с 2, затем с 3
    (если точка совпадает с экстремумом, то и следующая по очереди может быть началом периода)
    как сравнивать два периода:
        * составить список точек одинакового количества в каждом предполагаемом периоде
        * сравнивать поточечно в пределах точности
    если 2 сравниваемых периода совпало, необходимо проверить все вмещающиеся в область определения периоды
    (включая возможно незавершенный период в конце области определения)
    если периоды совпали: оценка периодичности = 1 - accuracy
    '''
    domain = np.min(table_X), np.max(table_X)
    min_y, max_y = np.min(table_Y), np.max(table_Y)
    if min_y == max_y:
        return 1.0

    Y_normalized = [(y - min_y) / (max_y - min_y) for y in table_Y] # todo: x already in [0, 1]

    if dbg_figname is not None:
        import matplotlib.pyplot as plt

        plt.plot(table_X, Y_normalized)
        plt.savefig('plot_dump/{}.png'.format(dbg_figname))
        plt.close()
        with open('plot_dump/{}.txt'.format(dbg_figname), 'w') as ys_log:
            for y in Y_normalized:
                print(y, file=ys_log)

    spline = UnivariateSpline(table_X, Y_normalized, k=4, s=0)

    step = 0.01
    accuracy = 0.0
    periodicity_found = False
    while (not periodicity_found) and accuracy < 1.0:
        accuracy += step
        point_count = int(1.0 / accuracy + 1)
        xs = [domain[0] + (domain[1]-domain[0]) * accuracy * i for i in range(point_count)]
        xs = [clamp(domain[0], domain[1], x) for x in xs]
        ys = [spline(x) for x in xs]
        control_x = xs[0]
        control_y = ys[0]
        similar_y_indices = list(filter(lambda i: abs(ys[i] - control_y) <= accuracy, range(1, point_count)))
        for indx in similar_y_indices:
            segment_x_length = xs[indx] - control_x
            segments_domains = split_domain_on_segments(domain, segment_x_length, accuracy)
            control_segment = segments_domains[0]
            compared_segments = segments_domains[1:]
            segments_equality = [are_function_segments_equal(spline, control_segment, segment_i, accuracy)
                                 for segment_i in compared_segments]
            if all(segments_equality):
                periodicity_found = True
                break
    if not periodicity_found:
        return 0.0
    return clamp(0.0, 1.0, 1.0 - accuracy)


# implicity estimation
def make_simplicity_estimation(powers_list):
    power_costs = [power_cost(power) for power in powers_list]
    total_cost = sum(power_costs)
    return float(total_cost)

In [5]:
# formula search

import sympy as sp


class DimensionalFormulaSearch(object):
    @staticmethod
    def find_powers_equations(required_quantity_units, influencing_quantities_units):
        num_rows = len(influencing_quantities_units)
        assert num_rows > 0
        num_cols = len(influencing_quantities_units[0])
        assert num_rows > 0
        assert num_cols == len(required_quantity_units)

        # solve linear equation system to find applicable powers of quantities
        unit_matrix = sp.Matrix(influencing_quantities_units)
        quantities_pow_variables = sp.Matrix(1, num_rows, sp.symbols('x0:{}'.format(num_rows)))
        right_eq_side = sp.Matrix(1, num_cols, required_quantity_units)
        eq = list(quantities_pow_variables * unit_matrix - right_eq_side)
        quantities_pow_variables = list(quantities_pow_variables)

        solutions = sp.solve(eq, quantities_pow_variables, set=True)
        if len(solutions) == 0:
            raise ValueError('Required units cannot be derived from given quantities')
        dependent_variables = tuple(solutions[0])
        assert len(solutions[1])==1
        solution = list(solutions[1])[0]

        independent_variables = tuple([variable for variable in quantities_pow_variables
                                 if variable not in dependent_variables])
        return dependent_variables, independent_variables, solution, quantities_pow_variables

    @staticmethod
    def generate_powers_for_weight(power_weight):
        if power_weight == 0:
            return (0,),
        elif power_weight == 1:
            return (1,), (-1,)

        powers = []
        numerator = power_weight
        denumerator = 1
        end = numerator + 1
        while denumerator != end:
            power = sp.Rational(numerator, denumerator)
            result_numerator = sp.fraction(power)[0]
            # ones and other reduced fractions
            if result_numerator != numerator:
                numerator -= 1
                denumerator += 1
                continue
            powers.extend([(power,), (-power,)])
            numerator -= 1
            denumerator += 1
        return tuple(powers)

    @staticmethod
    def generate_powers_for_n_independent_quantities(independent_quantity_count, power_weight):
        #todo: debug
        def recursive_power_list_generation(left_weight, quantity_count):
            if quantity_count == 1:
                return DimensionalFormulaSearch.generate_powers_for_weight(left_weight)
            if left_weight == 0:
                return (0,)*quantity_count,
            result_grid = []
            for i in range(left_weight+1):
                cur_quantity_powers = DimensionalFormulaSearch.generate_powers_for_weight(i)
                next_quantities_powers = recursive_power_list_generation(left_weight-i, quantity_count-1)
                cur_grid = [cur_quantity_pow + next_quantity_pow for cur_quantity_pow in cur_quantity_powers
                               for next_quantity_pow in next_quantities_powers]
                result_grid.extend(cur_grid)
            return tuple(result_grid)

        independent_power_grid = recursive_power_list_generation(power_weight, independent_quantity_count)
        return independent_power_grid

    @staticmethod
    def power_cost(x):
        n, d = sp.fraction(sp.Rational(x))
        return abs(n) + abs(d)

    @staticmethod
    def power_set_cost(power_set):
        return sum([DimensionalFormulaSearch.power_cost(power) for power in power_set])

    @staticmethod
    def generate_quantities_powers_multiplies(max_formulas, max_downcycles, required_units, quantities_units):
        dependent_power_variables, independent_power_variables, dependent_powers_equations, all_power_vars_ordered = \
            DimensionalFormulaSearch.find_powers_equations(required_units, quantities_units)

        if len(independent_power_variables) == 0:
            exact_solution = []
            for var in all_power_vars_ordered:
                var_solution = dependent_powers_equations[var]
                assert var_solution.is_number
                exact_solution.append(var_solution)
            return [tuple(exact_solution)]

        independent_power_count = len(independent_power_variables)

        found_formulas = []

        downcycle_count = -1
        current_weight = 0
        future_powers = dict()
        while len(found_formulas) < max_formulas and downcycle_count < max_downcycles:
            found_formula = False
            generated_independent_powers = DimensionalFormulaSearch.generate_powers_for_n_independent_quantities(
                independent_power_count,
                current_weight)
            for generated_indep_pows in generated_independent_powers:
                assert len(generated_indep_pows) == independent_power_count
                indep_powers_by_vars ={indep_pow_var: power
                                            for indep_pow_var, power
                                            in zip(independent_power_variables, generated_indep_pows)}
                dependent_powers_by_vars = {power: power_equation.subs(indep_powers_by_vars)
                                              for power, power_equation
                                            in zip(dependent_power_variables, dependent_powers_equations)}
                all_powers_by_vars = {**indep_powers_by_vars, **dependent_powers_by_vars}
                all_powers_ordered = tuple(all_powers_by_vars[var] for var in all_power_vars_ordered)

                formula_cost = DimensionalFormulaSearch.power_set_cost(all_powers_ordered)
                assert formula_cost >= current_weight
                if formula_cost > current_weight:
                    if formula_cost in future_powers.keys():
                        future_powers[formula_cost].append(all_powers_ordered)
                    else:
                        future_powers[formula_cost] = [all_powers_ordered]
                else:
                    found_formula = True
                    found_formulas.append(all_powers_ordered)

            if current_weight in future_powers.keys():
                assert DimensionalFormulaSearch.power_set_cost(future_powers[current_weight][0]) == current_weight
                found_formulas.extend(future_powers[current_weight])
                del future_powers[current_weight]
                found_formula = True

            if found_formula:
                downcycle_count = 0
            else:
                downcycle_count += 1
            current_weight += 1

        return found_formulas

In [6]:
# formula estimate

import numpy as np


def make_formulas_estimation(formulas_list,
                             quantities_table,
                             required_quantity_name,
                             influencing_quantity_names_ordered,
                             magnitude_greater_is_better,
                             increase_is_better,
                             greater_change_is_better):
    """

    """
    # todo: make more convenient
    # find one quantity with measurements to determine table height
    table_height = None
    for quantity_data in quantities_table.values():
        if 'measurements' in quantity_data:
            table_height = len(quantity_data['measurements'])
            break
    assert table_height is not None

    #  move all quantities to one side of equations
    quantity_names_ordered = tuple(influencing_quantity_names_ordered) + (required_quantity_name, )
    formulas_list = [formula + (-1,) for formula in formulas_list]

    known_quantities_mask = ['measurements' in quantities_table[formula_name] for formula_name in quantity_names_ordered]
    measurements_only_table = np.transpose(np.array([quantities_table[quantity_name]['measurements']
                                        if 'measurements' in quantities_table[quantity_name]
                                        else [1.0]*table_height
                                        for quantity_name in quantity_names_ordered]))
    # todo: optionally load from csv?
    todo_param = np.arange(0.0, 1.0, 1.0/table_height)
    raw_formulas_estimations = []
    for formula in formulas_list:
        evaluated_unknown_component_points = eval_unknown_component_points(
            formula=formula,
            table=measurements_only_table,
            knowns_mask=known_quantities_mask)

        for i in range(len(evaluated_unknown_component_points)):
            if (np.isnan(evaluated_unknown_component_points[i])
                    or np.isinf(evaluated_unknown_component_points[i])
                    or np.isneginf(evaluated_unknown_component_points[i])):
                raw_formulas_estimations.append({'invalid': 'invalid in given measurements'})
                continue

        #dbg_filename = [str(power).replace('/', '_') for power in formula]
        #dbg_filename = '__'.join(dbg_filename)

        raw_simplicity_estim = make_simplicity_estimation(formula)
        raw_magnitude_estim = make_magnitude_penalty(evaluated_unknown_component_points)
        raw_change_estim = make_avg_derivative_magnitude_estimation(
            todo_param,
            evaluated_unknown_component_points)
        raw_linearity_estim = make_linearity_estimation(
            todo_param,
            evaluated_unknown_component_points)
        raw_periodicity_estim = make_periodicity_estimation(
            todo_param,
            evaluated_unknown_component_points)
        raw_monotonicity_estim = make_non_monothonicity_estimation(
            todo_param,
            evaluated_unknown_component_points)
        raw_increase_estim = make_increase_estimation(evaluated_unknown_component_points)
        raw_formulas_estimations.append({
            'simplicity': raw_simplicity_estim,
            'magnitude': raw_magnitude_estim,
            'linearity': raw_linearity_estim,
            'periodicity': raw_periodicity_estim,
            'monotonicity': raw_monotonicity_estim,
            'change magnitude': raw_change_estim,
            'change sign': raw_increase_estim,
            'parameter_points': todo_param,
            'unknowns_points': evaluated_unknown_component_points}
        )

    normalized_estimations = raw_formulas_estimations
    normalize_80(normalized_estimations, 'simplicity', revert=True)
    normalize_80(normalized_estimations, 'magnitude', revert=not magnitude_greater_is_better)
    normalize_80(normalized_estimations, 'linearity', revert=False)
    normalize_80(normalized_estimations, 'periodicity', revert=False)
    normalize_80(normalized_estimations, 'monotonicity', revert=True)
    normalize_80(normalized_estimations, 'change magnitude', revert=not greater_change_is_better)
    normalize_80(normalized_estimations, 'change sign', revert=not increase_is_better)

    return normalized_estimations


def eval_unknown_component_points(formula,
                                  table,
                                  knowns_mask):
    # степени, в которые требуется возвести известные величины для получения произведения неизвестных
    # todo: когда лучше инвертировать степени? (на данный момент - когда неизвестна только искомая величина)
    all_unknowns_in_denominator = True
    for i, power in enumerate(formula):
        if not knowns_mask[i] and power > 0:
            all_unknowns_in_denominator = False
            break

    power_inverter = -1.0
    if all_unknowns_in_denominator:
        power_inverter = 1.0

    knowns_powers_inverse = np.array([power_inverter*float(base_unit_power) if known else 0.0
                             for base_unit_power, known in zip(formula, knowns_mask)])
    powed = np.power(table, knowns_powers_inverse)
    unknown_component_points = np.prod(powed, axis=-1)
    return unknown_component_points

In [7]:
# parse csv
import csv

def load_measurements_csv(csv_data):
        quantities = OrderedDict()
        quantities_by_column_indices = OrderedDict()
        not_measured_quantities = list()

        csv_table = csv.reader(csv_data, dialect='excel') # libreoffice allows to choose dialect
        # TODO: fix
        #if len(csv_table) < 2:
        #    raise ValueError('Specified csv table has no required data')
        csv_table = list(csv_table)
        quantities_row = csv_table[0]
        for col_indx, quantity_str in enumerate(quantities_row):
            quantity_parser = QuantityParser(quantity_str)
            quantity_name = quantity_parser.get_quantity_name()
            if quantity_name in quantities.keys():
                raise ValueError('Quantity named "{}" specified twice in file "{}"'
                                 .format(quantity_name, csv_path))
            quantity_data = OrderedDict()
            quantity_units = quantity_parser.get_units_and_powers()
            if quantity_units is not None:
                quantity_data['units'] = quantity_units
            quantities[quantity_name] = quantity_data
            quantities_by_column_indices[col_indx] = quantity_name

        first_measurements_row = csv_table[1]
        for col_indx, csv_col in enumerate(first_measurements_row):
            if col_indx not in quantities_by_column_indices.keys():
                raise ValueError('Unable to parse "{}" file: different number of columns in rows'
                                 .format(csv_file))
            if len(csv_col) == 0:
                not_measured_quantities.append(col_indx)
                continue
            quantities[quantities_by_column_indices[col_indx]]['measurements'] = []

        for row_indx, csv_row in zip(range(2, len(csv_table) - 2), csv_table[2:]):
            for col_indx, csv_col in enumerate(csv_row):
                if col_indx not in quantities_by_column_indices.keys():
                    raise ValueError('Unable to parse "{}" file: different number of '
                                     'columns in rows'.format(csv_file))
                if len(csv_col) == 0:
                    if col_indx not in not_measured_quantities:
                        raise ValueError(
                            'Unable to parse "{}" file: quantity measurements'
                            ' are expected to be fully specified or fully unspecified'
                            .format(csv_path))
                    continue
                if col_indx in not_measured_quantities:
                    raise ValueError(
                        'Unable to parse "{}" file: quantity measurements '
                        'are expected to be fully specified or fully unspecified'
                        .format(csv_path))
                quantities[quantities_by_column_indices[col_indx]]['measurements']\
                    .append(float(csv_col))
        return quantities

In [8]:
# main program

from copy import deepcopy

def make_quantities_base_units_vectors(quantities_table):
        from sympy import Rational

        all_base_unit_names = set()
        for quantity_data in quantities_table.values():
            if 'units' not in quantity_data.keys():
                continue
            all_base_unit_names |= set(quantity_data['units'].keys())
        all_base_unit_names = sorted(list(all_base_unit_names))

        quantities_base_unit_vectors = OrderedDict()
        for quantity_name, quantity_data \
                in zip(quantities_table.keys(), quantities_table.values()):
            if 'units' not in quantity_data.keys():
                quantities_base_unit_vectors[quantity_name] = (0,)*len(all_base_unit_names)
                continue
            units_vector = []
            for unit_name in all_base_unit_names:
                if unit_name not in quantity_data['units']:
                    units_vector.append(Rational(0))
                else:
                    units_vector.append(quantity_data['units'][unit_name])
            quantities_base_unit_vectors[quantity_name] = tuple(units_vector)

        return quantities_base_unit_vectors, all_base_unit_names

    
def prepare_report_table(
        formulas,
        formula_estims,
        simplicity_weight,
        magnitude_weight,
        linearity_weight,
        periodicity_weight,
        monotonicity_weight,
        change_magnitude_weight,
        change_sign_weight):
    report_table = list()
    for formula, estim_row in zip(formulas, formula_estims):
        if 'invalid' in estim_row:
            continue
        report_row = deepcopy(estim_row)
        report_row['simplicity'] = simplicity_weight * report_row['simplicity']
        report_row['magnitude'] = magnitude_weight * report_row['magnitude']
        report_row['linearity'] = linearity_weight * report_row['linearity']
        report_row['periodicity'] = periodicity_weight * report_row['periodicity']
        report_row['monotonicity'] = monotonicity_weight * report_row['monotonicity']
        report_row['change magnitude'] = change_magnitude_weight * report_row['change magnitude']
        report_row['change sign'] = change_sign_weight * report_row['change sign']
        report_row['total'] = report_row['simplicity'] \
                              + report_row['magnitude'] \
                              + report_row['linearity'] \
                              + report_row['periodicity'] \
                              + report_row['monotonicity'] \
                              + report_row['change magnitude'] \
                              + report_row['change sign']
        report_row['formula'] = formula
        report_table.append(report_row)
    report_table.sort(key=lambda formula_row: formula_row['total'], reverse=True)
    return report_table
    
def dimentional_analysis_estimate(max_formulas,
                                  max_downcycles, 
                                  simplicity_weight, 
                                  magnitude_weight, 
                                  magnitude_greater_better,
                                  linearity_weight,
                                  periodicity_weight,
                                  change_sign_weight,
                                  increase_is_better,
                                  change_magnitude_weight,
                                  change_greater_better,
                                  monothonicity_weight,
                                  input_csv_data,
                                  searched_quantity_name,
                                  report_filename):
    quantities_table = load_measurements_csv(input_csv_data)
    
    required_quantity_name = searched_quantity_name
    all_quantity_names = sorted(list(set(quantities_table.keys())))
    if required_quantity_name not in all_quantity_names:
        raise ValueError('Searched quantity "{}" is not present in "{}" table'.format(
            required_quantity_name,
            args.input_csv))
    influencing_quantity_names_ordered = \
        sorted(list(set(all_quantity_names) - {required_quantity_name}))
    if len(influencing_quantity_names_ordered) == 0:
        raise ValueError('There should be more quantities in "{}"'.format(args.input_csv))
    
    quantities_base_unit_vectors, all_unit_names = \
            make_quantities_base_units_vectors(quantities_table)
    required_quantity_units_vec = quantities_base_unit_vectors[required_quantity_name]
    influencing_quantities_units_matr = [quantities_base_unit_vectors[quantity_name]
                                         for quantity_name
                                         in influencing_quantity_names_ordered]
    
    formulas_represented_by_powers = \
        DimensionalFormulaSearch.generate_quantities_powers_multiplies(
            max_formulas,
            max_downcycles,
            required_quantity_units_vec,
            influencing_quantities_units_matr)
    
    
    formulas_estimations = make_formulas_estimation(
        formulas_represented_by_powers, quantities_table,
        required_quantity_name, influencing_quantity_names_ordered,
        magnitude_greater_better,
        increase_is_better,
        change_greater_better)
    
    report_table = prepare_report_table(
    formulas_represented_by_powers,
    formulas_estimations,
    simplicity_weight,
    magnitude_weight,
    linearity_weight,
    periodicity_weight,
    monothonicity_weight,
    change_magnitude_weight,
    change_sign_weight)
    
    print_report(report_table,
                 ('formula',
                  'simplicity',
                  'magnitude',
                  'change magnitude',
                  'change sign',
                  'linearity',
                  'periodicity',
                  'monotonicity'),
                 influencing_quantity_names_ordered,
                 report_filename)

In [9]:
# user interface

import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

csv_uploader = widgets.FileUpload(
    accept='.csv', 
    multiple=False,
    description = 'Open CSV table'
)
searched_quantity_name = widgets.Select(description='Searched quantity name', options=[])
display(widgets.HBox([csv_uploader, searched_quantity_name]))

max_formulas = widgets.IntSlider(value=20, min=10, max=100, step=1, description='Max formulas')
display(max_formulas)
#max_downcycles = widgets.IntSlider(value=200, min=100, max=1000, step=1, description='Max downcycles')
#display(widgets.HBox([max_formulas, max_downcycles]))

display(widgets.Label(value='Heuristics weight:'))

simplicity_weight = widgets.FloatSlider(value=1.0, min=0.0, max=1.0, step=0.01, description='Simplicity')
linearity_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Linearity')
display(widgets.HBox([simplicity_weight, linearity_weight]))


periodicity_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Periodicity')
monothonicity_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Monothonicity')
display(widgets.HBox([periodicity_weight, monothonicity_weight]))

magnitude_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Magnitude')
magnitude_greater_better = widgets.Checkbox(value = False, description='Greater magnitude is better')
display(widgets.HBox([magnitude_weight, magnitude_greater_better]))

change_sign_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Change sign')
increase_is_better = widgets.Checkbox(value = False, description='Increase is better')
display(widgets.HBox([change_sign_weight, increase_is_better]))

change_magnitude_weight = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='Change magnitude')
change_magnitude_greater_better = widgets.Checkbox(value=False, description='Greater change magnitude better')
display(widgets.HBox([change_magnitude_weight, change_magnitude_greater_better]))

output = widgets.Output()

def on_file_chosen(b):
    with output:
        try:
            csv_content = csv_uploader.value[list(csv_uploader.value)[-1]]['content']
            csv_content = csv_content.decode('utf-8').splitlines()
            quantities = load_measurements_csv(csv_content)
            searched_quantity_name.options = list(quantities.keys())
            searched_quantity_name.value = searched_quantity_name.options[0]
        except Exception as e:
            clear_output()
            print(e)
            
csv_uploader.observe(on_file_chosen, names='data')            

def run_program(b):    
    with output:
        clear_output()
        if not csv_uploader.value:
            print('Select csv file first.')
            return
        if not searched_quantity_name.value:
            print('Select searched quantity name first.')
            return
        
        csv_content = csv_uploader.value[list(csv_uploader.value)[-1]]['content']
        csv_content = csv_content.decode('utf-8').splitlines()
        
        print('Please wait...')
        try:
            dimentional_analysis_estimate(max_formulas.value,
                                          400, 
                                          simplicity_weight.value, 
                                          magnitude_weight.value, 
                                          magnitude_greater_better.value,
                                          linearity_weight.value,
                                          periodicity_weight.value,
                                          change_sign_weight.value,
                                          increase_is_better.value,
                                          change_magnitude_weight.value,
                                          change_magnitude_greater_better.value,
                                          monothonicity_weight.value,
                                          csv_content,
                                          searched_quantity_name.value,
                                          'report.pdf')
            clear_output()
            display(HTML('<A href=report.pdf target="_blank">Click to open report</A>'))
        except Exception as e:
            clear_output()
            print(e)


run_btn = widgets.Button(description='Run')
run_btn.on_click(run_program)
display(run_btn)
display(output)


HBox(children=(FileUpload(value={}, accept='.csv', description='Open CSV table'), Select(description='Searched…

IntSlider(value=20, description='Max formulas', min=10)

Label(value='Heuristics weight:')

HBox(children=(FloatSlider(value=1.0, description='Simplicity', max=1.0, step=0.01), FloatSlider(value=0.0, de…

HBox(children=(FloatSlider(value=0.0, description='Periodicity', max=1.0, step=0.01), FloatSlider(value=0.0, d…

HBox(children=(FloatSlider(value=0.0, description='Magnitude', max=1.0, step=0.01), Checkbox(value=False, desc…

HBox(children=(FloatSlider(value=0.0, description='Change sign', max=1.0, step=0.01), Checkbox(value=False, de…

HBox(children=(FloatSlider(value=0.0, description='Change magnitude', max=1.0, step=0.01), Checkbox(value=Fals…

Button(description='Run', style=ButtonStyle())

Output()