In [66]:
%run CAR_creation.ipynb

In [67]:
import re
import numpy as np


def make_intervalfunc(minv, maxv, left_inclusivity, right_inclusivity):
    def inner_func(value):
        if greaterthan(value, minv, left_inclusivity) and lesserthan(value, maxv, right_inclusivity):
            return True
        else:
            return False
        
    return inner_func
        
def greaterthan(a, b, inclusivity):
    if inclusivity:
        if a >= b: return True
    elif a > b: return True
    
    return False
        
def lesserthan(a, b, inclusivity):
    if inclusivity:
        if a <= b: return True
    elif a < b: return True
    
    return False


class Interval:

    def __init__(self, minval, maxval, left_inclusive, right_inclusive):
        self.minval = minval
        self.maxval = maxval
        self.left_inclusive = left_inclusive
        self.right_inclusive = right_inclusive
        
        
        self.left_bracket = "<" if left_inclusive else "("
        self.right_bracket = ">" if right_inclusive else ")"
        
        self.__membership_func = np.vectorize(
            make_intervalfunc(self.minval, self.maxval, self.left_inclusive, self.right_inclusive)
        )
            
    
    def __hash__(self):
        return hash(repr(self))
    
    def __eq__(self, other):
        return hash(self) == hash(other)
            
    def refit(self, vals):
        """refit values to a finer grid
        """
        values = np.array(vals)
        
        mask = self.test_membership(values)
        new_array = values[mask]

        left, right = min(new_array), max(new_array)

        return Interval(left, right, True, True)
        
            
    def test_membership(self, value):
        return self.__membership_func(value)
        

    def string(self):
        return "{}{};{}{}".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)
        
    def __repr__(self):
        return "Interval[{}{};{}{}]".format(self.left_bracket, self.minval, self.maxval, self.right_bracket)

In [68]:
import re

class IntervalReader():
    
    
    interval_regex = re.compile("(<|\()(\d+(?:\.(?:\d)+)?);(\d+(?:\.(?:\d)+)?)(\)|>)")
    
    
    def __init__(self):
        # opened interval brackets
        self.__open_bracket = "(", ")"
        
        # closed interval brackets
        self.__closed_bracket = "<", ">"
        
        # negative and positive infinity symbol,
        # e.g. -inf, +inf
        self.__infinity_symbol = "-inf", "+inf"
        
        # decimal separator, e.g. ".", ","
        self.__decimal_separator = "."
        
        # interval members separator
        self.__members_separator = ";"
        
        self.compile_reader()
        
        
    def compile_reader(self):

        left_bracket_open = re.escape(self.open_bracket[0])
        left_bracket_closed = re.escape(self.closed_bracket[0])
        
        right_bracket_open = re.escape(self.open_bracket[1])
        right_braket_closed = re.escape(self.closed_bracket[1])
        
        # e.g. (   <    |   \(    ) 
        #      (   {}   |   {}    )
        left_bracket_regex_string = "({}|{})".format(
            left_bracket_open,
            left_bracket_closed
        )
        
        # e.g. (   >   |   \)    ) 
        #      (   {}   |   {}    )
        right_bracket_regex_string = "({}|{})".format(
            right_bracket_open,
            right_braket_closed
        )
        
        # ((   \d+  (?:  \.   (?:\d)+  )?   )|-inf)
        # (   \d+  (?:  {}   (?:\d)+  )?   )
        left_number_regex_string = "(\d+(?:{}(?:\d)+)?|{})".format(
            re.escape(self.decimal_separator),
            re.escape(self.infinity_symbol[0]),
        )
        
        
        # ((   \d+  (?:  \.   (?:\d)+  )?   )|+inf)
        # (   \d+  (?:  {}   (?:\d)+  )?   )
        right_number_regex_string = "(\d+(?:{}(?:\d)+)?|{})".format(
            re.escape(self.decimal_separator),
            re.escape(self.infinity_symbol[1]),
        )
        
        members_separator_regex = "{}".format(
            re.escape(self.members_separator)
        )
        
        
        interval_regex_string = "{}{}{}{}{}".format(
            left_bracket_regex_string,
            left_number_regex_string,
            members_separator_regex,
            right_number_regex_string,
            right_bracket_regex_string
        )
        
        self.__interval_regex = re.compile(interval_regex_string)
        
        
    def read(self, interval_string):
        # returns array of results, take first member
        args = self.__interval_regex.findall(interval_string)[0]
        
        left_bracket, minval, maxval, right_bracket = args
        
        left_inclusive = True if left_bracket == self.closed_bracket[0] else False
        right_inclusive = True if right_bracket == self.closed_bracket[1] else False
        
        interval = Interval(
            float(minval),
            float(maxval),
            left_inclusive,
            right_inclusive
        )
        
        return interval
      
        
    # boilerplate getter/setter code    
    
    @property
    def open_bracket(self):
        return self.__open_bracket
    
    @open_bracket.setter
    def open_bracket(self, val):
        self.__open_bracket = val
        return self
    
    @property
    def closed_bracket(self):
        return self.__closed_bracket
    
    @closed_bracket.setter
    def closed_bracket(self, val):
        self.__closed_bracket = val
        return self
        
    @property
    def infinity_symbol(self):
        return self.__infinity_symbol
    
    @infinity_symbol.setter
    def infinity_symbol(self, val):
        self.__infinity_symbol = val
        return self
    
    @property
    def decimal_separator(self):
        return self.__decimal_separator
    
    @decimal_separator.setter
    def decimal_separator(self, val):
        self.__decimal_separator = val
        return self
    
    @property
    def members_separator(self):
        return self.__members_separator
    
    @members_separator.setter
    def members_separator(self, val):
        self.__members_separator = val
        return self
    
    
        
interval_reader = IntervalReader()

interval_reader.compile_reader()

interval_reader.read("<1.2;2.3>")

Interval[<1.2;2.3>]

In [69]:
%run ../../main.py

import copy

class QuantitativeCAR:
    
    interval_reader = IntervalReader()
    
    def __init__(self, rule):
        self.antecedent = self.__create_intervals_from_antecedent(rule.antecedent)
        self.consequent = copy.copy(rule.consequent)
        
        self.confidence = rule.confidence
        self.support = rule.support
        self.rulelen = rule.rulelen
        self.rid = rule.rid
        
        # property which indicates wheter the rule was extended or not
        self.was_extended = False
        # literal which extended the rule
        self.extension_literal = None
        
        
    def __create_intervals_from_antecedent(self, antecedent):
        interval_antecedent = []
        
        for literal in antecedent:
            attribute, value = literal
            
            interval = interval_reader.read(value)
            
            interval_antecedent.append((attribute, interval))
        
        
        return self.__sort_antecedent(interval_antecedent)
    
    
    def __sort_antecedent(self, antecedent):
        return sorted(antecedent)
    
    
    def update_properties(self, quant_dataframe):
        """updates rule properties using instance
        of QuantitativeDataFrame
        
        properties:
            support, confidence, rulelen
        
        """
        
        if type(quant_dataframe) != QuantitativeDataFrame:
            raise Exception(
                "type of quant_dataframe must be QuantitativeDataFrame"
            )
            
        
        support, confidence = quant_dataframe.calculate_rule_statistics(self)
        
        self.support = support
        self.confidence = confidence
        # length of antecedent + length of consequent
        self.rulelen = len(self.antecedent) + 1
        
    
    def copy(self):
        return copy.deepcopy(self)
        
        
    def __repr__(self):
        ant = self.antecedent
        ant_string_arr = [ key + "=" + val.string() for key, val in ant ]
        ant_string = "{" + ",".join(ant_string_arr) + "}"
        
        args = [
            ant_string,
            "{" + self.consequent.string() + "}",
            self.support,
            self.confidence,
            self.rulelen,
            self.rid
        ]
        
        text = "CAR {} => {} sup: {:.2f} conf: {:.2f} len: {}, id: {}".format(*args)

        return text
    
    
    def __gt__(self, other):
        """
        precedence operator. Determines if this rule
        has higher precedence. Rules are sorted according
        to their confidence, support, length and id.
        """
        if (self.confidence > other.confidence):
            return True
        elif (self.confidence == other.confidence and
              self.support > other.support):
            return True
        elif (self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen < other.rulelen):
            return True
        elif(self.confidence == other.confidence and
              self.support == other.support and
              self.rulelen == other.rulelen and
              self.rid < other.rid):
            return True
        else:
            return False
        
    
    def __lt__(self, other):
        """
        rule precedence operator
        """
        return not self > other
    
    
    def __eq__(self, other):
        print("self", self)
        print("other", other)
        print()
        
        return self.rid == other.rid


In [70]:
class RuleCoverCache:
    pass
    

In [71]:
class LiteralCache:
    """class which stores literals
    and corresponding truth values
    e.g. [
        "food=banana": [True, True, False, False, True],
        "food=apple" : [True, True, True, True, False]
    ]
    
    """
    
    def __init__(self):
        self.__cache = {}

    def insert(self, literal, truth_values):
        self.__cache[literal] = truth_values
        
    def get(self, literal):
        return self.__cache[literal]
        
    def __contains__(self, literal):
        """function for using in
        on LiteralCache object
        """
        
        return literal in self.__cache.keys()
    
    
    
cache = LiteralCache()

cache.insert("food=apple", np.array([True, True, False, False, True]))
cache.insert("food=gingerbread", np.array([False, False, False, False, True]))

assert "food=apple" in cache
assert "blabla" not in cache
assert "food=gingerbread" in cache

In [72]:
import pandas
import numpy as np


class QuantitativeDataFrame:
    
    def __init__(self, dataframe):
        if type(dataframe) != pandas.DataFrame:
            raise Exception("type of dataframe must be pandas.dataframe")
        
        
        self.__dataframe = dataframe
        
        # sorted and unique columns of the dataframe
        # saved as a numpy array
        self.__preprocessed_columns = self.__preprocess_columns(dataframe)
        
        
        # literal cache for computing rule statistics
        # - support and confidence
        self.__literal_cache = LiteralCache()

        # so that it doesn't have to be computed over and over
        self.size = dataframe.index.size
        
        
    @property
    def dataframe(self):
        return self.__dataframe
    
    
    def column(self, colname):
        return self.__preprocessed_columns[colname]
    
    
    def mask(self, vals):
        return self.__dataframe[vals]
    
    
    def find_covered_by_antecedent_mask(self, antecedent):
        """
        returns:
            mask - an array of boolean values indicating which instances
            are covered by antecedent
        """
        
        # todo: compute only once to make function faster
        dataset_size = self.__dataframe.index.size
        
        for literal in antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
    
    
    def find_covered_by_literal_mask(self, literal):
        """
        returns:
            mask - an array of boolean values indicating which instances
            are covered by literal
        """
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
    
    
    def find_covered_by_rule_mask(self, rule):
        """
        returns:
            covered_by_antecedent_mask:
                - array of boolean values indicating which
                dataset rows satisfy antecedent
                
            covered_by_consequent_mask:
                - array of boolean values indicating which
                dataset rows satisfy conseqeunt
        """
        
        dataset_size = self.__dataframe.index.size
        
        # initialize a mask filled with True values
        # it will get modified as futher literals get
        # tested
        
        # for optimization - create cummulated mask once
        # in constructor
        cummulated_mask = np.array([True] * dataset_size)
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
            
            
        
        instances_satisfying_antecedent_mask = cummulated_mask
        instances_satisfying_consequent_mask = self.__get_consequent_coverage_mask(rule)
        instances_satisfying_consequent_mask = instances_satisfying_consequent_mask.reshape(dataset_size)
        
        return instances_satisfying_antecedent_mask, instances_satisfying_consequent_mask
        
        
    
    def calculate_rule_statistics(self, rule):
        """calculates rule's confidence and
        support using efficient numpy functions
        
        
        returns:
        --------
        
            support:
                float
            
            confidence:
                float
        """
        
        dataset_size = self.__dataframe.index.size
        
        # initialize a mask filled with True values
        # it will get modified as futher literals get
        # tested
        
        # for optimization - create cummulated mask once
        # in constructor
        cummulated_mask = np.array([True] * dataset_size)
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            # the column that concerns the
            # iterated attribute
            # instead of pandas.Series, grab the ndarray
            # using values attribute
            relevant_column = self.__dataframe[[attribute]].values.reshape(dataset_size)
            
            # this tells us which instances satisfy the literal
            current_mask = self.get_literal_coverage(literal, relevant_column)
            
            # add cummulated and current mask using logical AND
            cummulated_mask &= current_mask
            
            
        
        
        instances_satisfying_antecedent = self.__dataframe[cummulated_mask].index
        instances_satisfying_antecedent_count = instances_satisfying_antecedent.size
        
        # using cummulated mask to filter out instances that satisfy consequent
        # but do not satisfy antecedent
        instances_satisfying_consequent_mask = self.__get_consequent_coverage_mask(rule)
        instances_satisfying_consequent_mask = instances_satisfying_consequent_mask.reshape(dataset_size)
        
        instances_satisfying_consequent_and_antecedent = self.__dataframe[
            instances_satisfying_consequent_mask & cummulated_mask
        ].index
        
        instances_satisfying_consequent_and_antecedent_count = instances_satisfying_consequent_and_antecedent.size
        instances_satisfying_consequent_count = self.__dataframe[instances_satisfying_consequent_mask].index.size
        
        # instances satisfying consequent both antecedent and consequent 
        support = instances_satisfying_consequent_and_antecedent_count / dataset_size
        confidence = instances_satisfying_consequent_and_antecedent_count / instances_satisfying_antecedent_count
        
        return support, confidence
    
    
    def __get_consequent_coverage_mask(self, rule):
        consequent = rule.consequent
        attribute, value = consequent
        
        class_column = self.__dataframe[[attribute]].values
        
        literal_key = "{}={}".format(attribute, value)

        mask = []
        
        if literal_key in self.__literal_cache:
            mask = self.__literal_cache.get(literal_key)
        else:
            mask = class_column == value
        
        return mask
    
    
    def get_literal_coverage(self, literal, values):
        """returns mask which describes the instances that
        satisfy the interval
        
        function uses cached results for efficiency
        """
        
        if type(values) != np.ndarray:
            raise Exception("Type of values must be numpy.ndarray")
            
        mask = []
        
        attribute, interval = literal
        
        literal_key = "{}={}".format(attribute, interval)
        
        # check if the result is already cached, otherwise
        # calculate and save the result
        if literal_key in self.__literal_cache:
            mask = self.__literal_cache.get(literal_key)
        else:
            mask = interval.test_membership(values)
            
            self.__literal_cache.insert(literal_key, mask)
            
        # reshape mask into single dimension
        mask = mask.reshape(values.size)
            
        return mask
    
    
    def __preprocess_columns(self, dataframe):
        
        # covert to dict
        # column -> list
        # need to convert it to numpy array
        dataframe_dict = dataframe.to_dict(orient="list")
        
        dataframe_ndarray = {}
        
        
        for column, value_list in dataframe_dict.items():
            transformed_list = np.sort(np.unique(value_list))
            dataframe_ndarray[column] = transformed_list
            
        return dataframe_ndarray
        
        
    
    
    
qds = QuantitativeDataFrame(movies_undiscr_txns)

ds = movies_undiscr_txns





In [73]:
import pandas

class RuleExtender:
    
    def __init__(self, dataframe):
    
        if type(dataframe) != QuantitativeDataFrame:
            raise Exception(
                "type of dataset must be pandas.DataFrame"
            )
            
        self.__dataframe = dataframe
        
        
        
    def transform(self, rules):
        
        copied_rules = [ rule.copy() for rule in rules ]
        
        extended_rules = [ self.__extend(rule) for rule in copied_rules ]
        
        return extended_rules
    
    
    
    def __extend(self, rule):
        ext = self.__extend_rule(rule)
        
        return ext
        
    def __extend_rule(self, rule, min_improvement=0, min_conditional_improvement=-0.01):
        
        # check improvemnt argument ranges
        
        current_best = rule
        direct_extensions = self.__get_extensions(rule)
        
        
        while True:
            extension_succesful = False
            
            for candidate in direct_extensions:
                
                candidate.update_properties(self.__dataframe)
                
                delta_confidence = candidate.confidence - current_best.confidence
                delta_support = candidate.support - current_best.support
                
                
                if self.__crisp_accept(delta_confidence, delta_support, min_improvement):
                    current_best = candidate
                    extension_succesful = True
                    break
                    
                
                if self.__conditional_accept(delta_confidence, min_conditional_improvement):
                    enlargement = candidate
                    
                    while True:
                        
                        enlargement = self.get_beam_extensions(enlargement)
                        
                        if not enlargement:
                            break
                            
                        candidate.update_properties(self.__dataframe)
                        enlargement.update_properties(self.__dataframe)

                        delta_confidence = enlargement.confidence - current_best.confidence
                        delta_support = enlargement.support - current_best.support

                        if self.__crisp_accept(delta_confidence, delta_support, min_improvement):
                            current_best = enlargement
                            extension_succesful = True
                            
                        elif self.__conditional_accept(delta_confidence, min_conditional_improvement):
                            continue
                        
                        else:
                            break
            
            
                    if extension_succesful == True:
                        break
                        

                else:
                    # continue to next candidate
                    continue
           
        
            if extension_succesful == False:
                break
                    
        return current_best
        
        
    def __get_extensions(self, rule):
        extended_rules = set()
        
        for literal in rule.antecedent:
            attribute, interval = literal
            
            neighborhood = self.__get_direct_extensions(literal)
            
            for extended_literal in neighborhood:
                # copy the rule so the extended literal
                # can replace the default literal
                copied_rule = rule.copy()
                
                # find the index of the literal
                # so that it can be replaced
                current_literal_index = copied_rule.antecedent.index(literal)
                
                copied_rule.antecedent[current_literal_index] = extended_literal
                copied_rule.was_extended = True
                copied_rule.extended_literal = extended_literal
                
                extended_rules.add(copied_rule)
                
        return extended_rules
            
    
    def __get_direct_extensions(self, literal):
        """
        ensure sort and unique
        before calling functions
        """
        
        attribute, interval = literal
        
        vals = self.__dataframe.column(attribute)
        vals_len = vals.size

        mask = interval.test_membership(vals)

        # indices of interval members
        # we want to extend them 
        # once to the left
        # and once to the right
        # bu we have to check if resulting
        # indices are not larger than value size
        member_indexes = np.where(mask)[0]

        first_index = member_indexes[0]
        last_index = member_indexes[-1]

        first_index_modified = first_index - 1
        last_index_modified = last_index + 1
        
        no_left_extension = False
        no_right_extension = False

        if first_index_modified < 0:
            no_left_extension = True

        # if last_index_modified is larger than
        # available indices
        if last_index_modified > vals_len - 1:
            no_right_extension = True


        new_left_bound = interval.minval
        new_right_bound = interval.maxval

        if not no_left_extension:
            new_left_bound = vals[first_index_modified]

        if not no_right_extension:
            new_right_bound = vals[last_index_modified]


        # prepare return values
        extensions = []

        if not no_left_extension:
            extension = new_left_bound, interval.maxval
            
            # when values are [1, 2, 3, 3, 4, 5]
            # and the corresponding interval is (2, 4)
            # instead of resulting interval being (1, 4)
            
            temp_interval = Interval(
                new_left_bound,
                interval.maxval,
                True,
                interval.right_inclusive
            )

            extensions.append((attribute, temp_interval))

        if not no_right_extension:
            extensoin = interval.minval, new_right_bound

            temp_interval = Interval(
                interval.minval,
                new_right_bound,
                interval.left_inclusive,
                True
            )

            extensions.append((attribute, temp_interval))

        return extensions
        
    
    # make private
    def get_beam_extensions(self, rule):
        if not rule.was_extended:
            return None

        # literal which extended the rule
        literal = rule.extended_literal
        
        extended_literal = self.__get_direct_extensions(literal)
        
        if not extended_literal:
            return None
        
        copied_rule = rule.copy()
        
        literal_index = copied_rule.antecedent.index(literal)
        
        # so that literal is not an array
        copied_rule.antecedent[literal_index] = extended_literal[0]
        copied_rule.was_extended = True
        copied_rule.extended_literal = extended_literal[0]
        
        return copied_rule

    
    
    def __crisp_accept(self, delta_confidence, delta_support, min_improvement):
        if delta_confidence >= min_improvement and delta_support > 0:
            return True
        else:
            return False
    
    def __conditional_accept(self, delta_conf, min_improvement):
        if delta_conf >= min_improvement:
            return True
        
        
        
rule_ext = RuleExtender(qds)      

qrules = [ QuantitativeCAR(r) for r in rules ]

extended = rule_ext.transform(qrules)


for qr in qrules:
    qr.update_properties(qds)

[ print(qr) for qr in qrules ]
print()
print()
[ print(ext) for ext in extended ]

qds.calculate_rule_statistics(qrules[0])

TypeError: unhashable type: 'QuantitativeCAR'

In [None]:
class RuleRefitter:
    """Refits the rule to a finer grid
    """
    
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        refitted = [ self.__refit(rule) for rule in copied_rules ]
        
        return refitted
        
    def __refit(self, rule):
        """refits a single rule
        """

        for idx, literal in enumerate(rule.antecedent):
            attribute, interval = literal
        
            current_attribute_values = self.__dataframe.column(attribute)

            refitted_interval = interval.refit(current_attribute_values)

            rule.antecedent[idx] = attribute, refitted_interval
            
            
        return rule
            
    
    
            
    
    
rule_refitter = RuleRefitter(qds)

rule_refitter.transform(qrules)

In [None]:
class RuleLiteralPruner:
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        trimmed = [ self.__trim(rule) for rule in copied_rules ]
        
        return trimmed
    
    
    def produce_combinations(self, array):
        arr_len = len(array)
    
        for i in range(arr_len):
            combination = array[0:i] + array[i+1:arr_len]
        
            yield combination
    
    
    def __trim(self, rule):
        if type(rule) != QuantitativeCAR:
            raise Exception("type of rule must be QuantClassAssociationRule")

            
        attr_removed = False
    
        literals = rule.antecedent
        consequent = rule.consequent
        
        rule.update_properties(self.__dataframe)
        
        dataset_len = self.__dataframe.size

        if len(literals) < 1:
            return rule

        while True:
            for literals_combination in self.produce_combinations(literals):
                if not literals_combination:
                    continue
                    
                copied_rule = rule.copy()
                
                copied_rule.antecedent = literals_combination
                copied_rule.update_properties(self.__dataframe)

                if copied_rule.confidence > rule.confidence:
                    rule.support = copied_rule.support
                    rule.confidence = copied_rule.confidence
                    rule.rulelen = copied_rule.rulelen
                    
                    rule.antecedent = copied_rule.antecedent

                    attr_removed = True
                    
                    break
                    
                else:
                    attr_removed = False

            if attr_removed == False:
                break
                
                
        return rule
    
    
literal_pruner = RuleLiteralPruner(qds)

literal_pruner.transform(qrules)

In [None]:
class RuleTrimmer:
    """Trims the rule
    """
    
    
    def __init__(self, quantitative_dataframe):
        self.__dataframe = quantitative_dataframe
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules  ]
        trimmed = [ self.__trim(rule) for rule in copied_rules ]
        
        return trimmed
    
    
    def __trim(self, rule):
        if type(rule) != QuantitativeCAR:
            raise Exception("type of rule must be QuantClassAssociationRule")

            
        covered_by_antecedent_mask, covered_by_consequent_mask = self.__dataframe.find_covered_by_rule_mask(rule)
        
        covered_by_rule_mask = covered_by_antecedent_mask & covered_by_consequent_mask
        
        # instances covered by rule
        correctly_covered_by_r = self.__dataframe.mask(covered_by_rule_mask)
        
        antecedent = rule.antecedent

        for idx, literal in enumerate(antecedent):

            attribute, interval = literal
            
            current_column = correctly_covered_by_r[[attribute]].values
            current_column_unique = np.unique(current_column)

            if not current_column.any():
                continue

            minv = np.asscalar(min(current_column))
            maxv = np.asscalar(max(current_column))

            new_interval = Interval(minv, maxv, True, True)

            antecedent[idx] = attribute, new_interval

        return rule
    
    
    
rule_trimmer = RuleTrimmer(qds)


[ print(r) for r in qrules ]

print()

rule_trimmer.transform(qrules)

In [None]:
import collections
from scipy import stats

class RulePostPruner:
    
    def __init__(self, quantitative_dataset):
        self.__dataframe = quantitative_dataset
        
        
    def transform(self, rules):
        copied_rules = [ rule.copy() for rule in rules ]

        self.prune(copied_rules)
        
        
    def preprocess_dataframe(self):
        return self.__dataframe.dataframe.index.values
        
        
        
        
    def get_most_frequent_class(self):
        """ 
        requires class column to be the last in dataframe
        
        gets the most frequent class from dataset
        - naive implementation
        """
        
        index_counts, possible_classes = pd.factorize(self.__dataframe.dataframe.iloc[:, -1].values)
        counts = np.bincount(index_counts)
        counts_max = counts.max()
        most_frequent_classes = possible_classes[counts == counts_max]
        
        # return only one
        return most_frequent_classes[0], counts_max
    
    
    def get_most_frequent_from_numpy(self, ndarray):
        """gets a mode from numpy array
        """
        unique, pos = np.unique(a, return_inverse=True) 
        counts = np.bincount(pos)                  
        maxpos = counts.argmax()                      

        return (unique[maxpos], counts[maxpos])
        
    
    def find_covered(self):
        pass
        
        
    def prune(self, rules):
        
        dataset = self.preprocess_dataframe()
        dataset_len = dataset.size
        dataset_mask = [ True ] * dataset_len
        
        cutoff_rule = rules[-1]
        cutoff_class, cutoff_class_count = self.get_most_frequent_class()
        
        default_class = cutoff_class

        total_errors_without_default = 0
        
        lowest_total_error = dataset_len - cutoff_class_count
        
        # implement comparators
        rules.sort(reverse=True)
        
        rules_pruned = []
        
        for rule in rules:
            covered_antecedent, covered_consequent = self.__dataframe.find_covered_by_rule_mask(rule)

            correctly_covered = covered_antecedent & covered_consequent
            
            # dataset -= covered_antecedent
            dataset_mask = dataset_mask & covered_antecedent
                        
            
            if any(correctly_covered):
                misclassified = np.sum(covered_antecedent) - np.sum(correctly_covered)
                
                
                total_errors_without_default += misclassified
                
                modified_dataset = dataset[dataset_mask]
                
                
            
                default_class, default_class_count = self.get_most_frequent_from_numpy(modified_dataset)
                
        
                # don't forget to update dataset length
                default_rule_error = dataset_len - default_class_count
                total_errors_with_default = default_rule_error + total_errors_without_default
                
                if total_errors_with_default < lowest_total_error:
                    cutoff_rule = rule
                    lowest_total_error = total_errors_with_default
                    cutoff_class = default_class
                
                
        
        # remove all rules below cutoff rule
        index_to_cut = rules.index(cutoff_rule)
        rules_pruned = rules[:index_to_cut+1]
        
        # append new default rule
        empty_rule = cutoff_rule.copy()
        empty_rule.antecedent = []
        empty_rule.consequent = self.__dataframe.dataframe.columns[-1], cutoff_class
        
        
        rules_pruned.append(empty_rule)
        
        return rules_pruned
        
        
        
        

rulepostpruner = RulePostPruner(qds)



rulepostpruner.transform(qrules)



In [None]:
hash(frozenset([2, 2, 3]))
