In [129]:
import numpy as np
import matplotlib.pyplot as plt

In [249]:
class Catagorization:
    def __init__(self, num_ants, num_rules_converge, data, attributes, options, classes):
        self.num_ants = num_ants
        self.num_rules_converge = num_rules_converge
        self.data = data
        self.original_data = data.copy()
        self.attributes = attributes
        self.options = options
        self.classes = classes
        self.heuristic = self.calc_heuristic()
        self.pharamones = self.init_pharamones()
        self.discovered_rules = {}
        self.qualities = []
        self.leftover_cases = len(self.data) * 0.1
        self.init_ants() 
        
    def init_ants(self):
        self.ants = [Ant(self.pharamones, self.heuristic, self.attributes, 50, self.options, self.classes) for i in range(self.num_ants)]
        
    def run_simulation(self):
        while len(self.data) > self.leftover_cases:
            self.converged_rules = 1
            best_rule = []
            best_quality = -1
            consequent = None
            
            for j, ant in enumerate(self.ants):
                self.run_one_ant(ant)
                
                    
                if best_quality < 0 and set(self.ants[j-1].rule) == set(ant.rule) and ant.rule:
                    self.converged_rules += 1
                else:
                    self.converged_rules = 1
                if self.converged_rules == self.num_rules_converge:
                    print('rules have converged')
                    break
                if ant.quality > best_quality:
                    best_quality = ant.quality
                    best_rule = ant.rule
                    consequent = ant.consequent
            n = self.remove_relevant_cases(best_rule, consequent)
            print(len(self.data))
            res = self.discovered_rules.get(consequent, [])
            res.append((best_rule, best_quality, n))
            self.discovered_rules[consequent] = res
            self.pharamones = self.init_pharamones()
            self.init_ants()
        print('simulation done. Remaining cases: ', len(self.data))
            
    def run_one_ant(self, ant):
        ant.pharamones = self.pharamones
        ant.add_terms(self.data)
        q = self.prune_ant(ant)
        consequent = self.calc_consequent(ant.rule)
        self.update_pharamones(ant.rule, q)
        ant.quality = q
        ant.consequent = consequent
        
    def calc_heuristic(self):
        total_num = len(self.attributes)*len(self.options)*len(self.classes)
        probs = {}
        heuristic = {}
        for i in attributes:
            for j in options:
                for k in classes:
                    a = [game for game in data_listed if k == game[-1]]
                    b = [game for game in a if j == game[i]]
                    p = len(b)/len(data_listed)
                    c = probs.get((i,j),[]) 
                    c.append(np.log2(p**p))
                    probs[(i,j)] = c
                heuristic[(i,j)] = -sum(probs[(i,j)])
        return heuristic

    def pick_best_rule(self):
        most_correct = 0
        best_ant = None
        for ant in self.ants:
            consequence, num_correct = self.calc_num_correct(ant.rule)
            ant.consequence = consequence
            if num_correct > most_correct:
                most_correct = num_correct
                best_ant = ant
        return most_correct, best_ant
    
    def find_relevant_cases(self, rule):
        relevant_cases = self.data.copy()
        for case in self.data:
            for term in rule:
                if case[term[0]] != term[1]:
                    relevant_cases.remove(case)
                    break
        return relevant_cases
    
    def remove_relevant_cases(self, rule, consequent):
        relevant_cases = self.find_relevant_cases(rule)
        count = 0
        for case in relevant_cases:
            if case[-1] == consequent:
                self.data.remove(case)
                count += 1
        return count
    def calc_consequent(self, rule):
        relevant_cases = self.find_relevant_cases(rule)
        classes = {}
        if not relevant_cases:
            relevant_cases = self.data
            
        for case in relevant_cases:
            classes[str(case[-1])] = classes.get(str(case[-1]), 0) + 1
        res = max(classes, key=classes.get)
        return res
    
    def prune_ant(self, ant):
        """Iteratively goes through the rulelist for an ant and sees which rules are not helping the quality of the rule"""
        n = len(ant.rule)
        if n == 0:
            return 0
        for iteration in range(n):
            max_delta_quality = 0
            best_new_rule = ant.rule
            consequent = self.calc_consequent(ant.rule)
            base_quality = self.calc_quality(ant.rule, consequent)
            
            for i, term in enumerate(ant.rule):
                new_rule = ant.rule[0:i] + ant.rule[i+1:]
                new_consequent = self.calc_consequent(new_rule)
                new_quality = self.calc_quality(new_rule, new_consequent)
                if max_delta_quality <= new_quality - base_quality:
                    max_delta_quality = new_quality - base_quality
                    best_new_rule = new_rule
            if max_delta_quality >= 0:
                if best_new_rule:
                    ant.rule = best_new_rule
                    max_delta_quality = 0
                else:
                    print('adding empty rule???')
            else:
                break
        return max_delta_quality + base_quality
        
    def calc_quality(self, rule, consequent):
        relevant_cases = self.find_relevant_cases(rule)
        num_cases_covered = len(relevant_cases)
        num_cases_not_covered = len(self.data) - len(relevant_cases)
        
        true_positives = len([case for case in relevant_cases if case[-1] == consequent])
        false_positives = len(relevant_cases) - true_positives
        false_negatives = len([case for case in self.data if case not in relevant_cases and case[-1] == consequent])
        true_negatives = len(self.data) - len(relevant_cases) - false_negatives
        
        sensitivity = true_positives / (true_positives + false_negatives)
        if true_negatives == 0: return 0
        specificity = true_negatives / (true_negatives + false_positives)
        
        return sensitivity * specificity
        
    def run_sim(self):
        ants = {}
        ants['ant0'] = Ant(pharamones)
        for i in range(self.num_ants):
            ants["ant" + str(i)] = Ant(ants["ant" + str(i-1)].pharamones)
            
    def init_pharamones(self):
        pharamones = {}
        initial_value = 1/(len(self.attributes) * len(self.options))
        for i in attributes:
            for j in options:
                pharamones[(i,j)] = initial_value
        return pharamones
    
    def update_pharamones(self, rule, quality):
        for term in rule:
            self.pharamones[(term[0], term[1])] += quality
        normalization_factor = sum(self.pharamones.values())
        keys = self.pharamones.keys()
        for key in keys:
            self.pharamones[key] /= normalization_factor
        
        assert(sum(self.pharamones.values()) - 1 < 0.0001)
        
    def evaluate_discovered_rules(self, data):
        num_correct = 0
        num_total = len(data)
        self.data = data
        for consequent in self.discovered_rules.keys():
            rules = self.discovered_rules[consequent]
            for rule in rules:
#                 print(rule[0])
#                 print(consequent)
#                 relevant_cases = self.find_relevant_cases(rule[0])
#                 print(relevant_cases)
#                 num_correct += len([case for case in relevant_cases if case[-1] == consequent])
                num_correct += self.remove_relevant_cases(rule[0], consequent)
        return num_correct / num_total

In [245]:
class Ant:
    delta_p = .05
    def __init__(self,pharamones, heuristic, attributes, min_cases_per_rule, options, classes):
        ""
        self.rule = []
        self.pharamones = pharamones
        self.heuristic = heuristic
        self.decision = np.zeros((9,3))
        self.k = 2
        self.classes = classes
        self.attributes = attributes
        self.min_cases_per_rule = min_cases_per_rule
        self.options = options
        self.used_attributes = []
        
    def add_terms(self, data):
        "adds a term to the ruleset based on the pharamone trail and heuristic function"
        quit = False
        tries = 0
        for i in self.attributes:
            probs = self.calc_prob()
#             print('probs are: ', probs)
#             print('')
            picking = True
            while picking:
                term = self.pick_term(probs)
                if len(self.find_relevant_cases(self.rule+[term], data)) > self.min_cases_per_rule:
                    if term[0] not in self.used_attributes:
                        self.used_attributes.append(term[0])
                        picking = False
                else:
                    tries += 1
                if tries > 10 and self.rule:
                    picking = False
                    quit = True
                    
            if not quit:
                self.rule.append(term)
            else:
                break
    def normalize(self, function):
        norm = {}
        for i in self.attributes:
            for j in self.options:
                num = function(i, j)
                if num == 0:
                    norm[(i,j)] = 0
                    continue
                unused_attributes = len(self.attributes)-len(self.rule)
                normalization_factor = 0
                for jj in self.options:
                    normalization_factor += function(i, jj)
                norm[(i,j)] = num / (unused_attributes * normalization_factor)
        return norm
        
    def normalize_heuristic(self):
        def f(i, j):
            return np.log2(self.k) - self.heuristic[(i,j)]
        return self.normalize(f)
    
    def calc_prob(self):
        self.normalized_heuristic = self.normalize_heuristic()
        def f(i, j):
            if i not in self.used_attributes:
                return self.pharamones[(i,j)] * self.normalized_heuristic[(i,j)]
            else:
                return 0
        return self.normalize(f)
    
    def pick_term(self, probs):
        index = np.random.choice(len(probs), 1, p = list(probs.values()))
        index = index[0]
        term = list(probs.keys())[index]
        return term
    
    def find_relevant_cases(self, rule, data):
        relevant_cases = data.copy()
        for case in data:
            for term in rule:
                if case[term[0]] != term[1]:
                    relevant_cases.remove(case)
                    break
        return relevant_cases


In [262]:
import os
data = open('../data/tic-tac-toe.data', 'r')
# data = open('../data/breast-cancer.data', 'r')
data_listed = []
#for i in range(0,957):
#    entry = []
#    for i in range(0,9):
#        continue
for i in data.readlines():
    data_listed.append(tuple(str.split(i[:-1], ',')))
n=len(data_listed)
train_ind = np.random.choice(n, round(n*0.8), replace=False)
train = [data_listed[index] for index in train_ind]
test = [case for case in data_listed if case not in train]
len(train), len(test)

(766, 192)

In [263]:
import numpy as np
attributes = list(range(0,9))
options = ['x','o','b']
classes = ['positive', 'negative']


s = Catagorization(100, 10, train, attributes, options, classes)
# pharamones = s.pharamones
# h = s.heuristic
# aa = Ant(pharamones, h, attributes, options, classes)
s.run_simulation()
s.discovered_rules
# s.evaluate_discovered_rules()

479
389
264
196
146
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty rule???
adding empty r

{'negative': [([(0, 'o')], 0.3557133028957409, 125),
  ([(7, 'o')], 0, 60),
  ([(0, 'x')], 0, 59)],
 'positive': [([(4, 'x')], 0.4236162361623616, 287),
  ([(4, 'b')], 0.3672296338347999, 90),
  ([(2, 'x'), (4, 'o')], 0.4341769212909218, 68),
  ([(6, 'x'), (4, 'o')], 0.7808219178082192, 50)]}

In [264]:
s.evaluate_discovered_rules(test)

0.953125

In [89]:
s.ants[0].temp_heuristic
s.ants[0].rule
sa = s.ants[0]
s.prune_ant(s.ants[0])
c= s.calc_consequent(sa.rule)
s.calc_quality(sa.rule, c)

0.25452769544632203

In [153]:
b = {'a': 4, 'b':3, 'c':19}
sum(b.values())
b['b'] /= 3
b
c=b.get(44, [])
c.append(3)

In [54]:
s.calc_quality(s.ants[-1].rule, s.calc_consequent(s.ants[-1].rule))

sldkf
295


0.29665787751645556

In [57]:
a = [1,2,4,5]
a+[5]
a

[1, 2, 4, 5]

In [143]:
len(s.data)
s.leftover_cases

AttributeError: 'Catagorization' object has no attribute 'leftover_cases'