# **CZ4032 Project 1: Classification Based on Associations**

# Import Packages

In [None]:
import numpy as np
import pandas as pd
import time
import random
import functools
import sys
import math
import csv

## Functions to read from .data and .names files

1. .names has the attribute names and types (numerical, categorical and the class label)
2. .data file has the actual raw data

In [None]:

# Read dataset and convert into a list.
# path: directory of *.data file.
def read_data(path):
    data = []
    with open(path, 'r') as csv_file:
        reader = csv.reader(csv_file, delimiter=',')
        for line in reader:
            data.append(line)
        while [] in data:
            data.remove([])
    return data


# Read scheme file *.names and write down attributes and value types.
# path: directory of *.names file.
def read_scheme(path):
    with open(path, 'r') as csv_file:
        reader = csv.reader(csv_file, delimiter=',')
        attributes = next(reader)
        value_type = next(reader)
    return attributes, value_type


# convert string-type value into float-type.
# data: data list returned by read_data.
# value_type: list returned by read_scheme.
def str2numerical(data, value_type):
    #ct = 0
    #print("str2numerical")
    size = len(data) 
    columns = len(data[0])
    for i in range(size):
        #ct = 0
        for j in range(columns - 1):
            if value_type[j] == 'numerical' and data[i][j] != '?':
                #ct += 1
                data[i][j] = float(data[i][j])
        #print("ct: ",ct)
    return data


# Main method in this file, to get data list after processing and scheme list.
# data_path: tell where *.data file stores.
# scheme_path: tell where *.names file stores.
def read(data_path, scheme_path):
    data = read_data(data_path)
    attributes, value_type = read_scheme(scheme_path)
    data = str2numerical(data, value_type)
    return data, attributes, value_type

# Preprocessing

The following preprocessing functions are performed on every dataset before rule generation and classification, and is for the discretization of continuous attributes
<br> Another important function performed here is the binning of data to set bin labels for each attribute

In [None]:
# A block to be split
# It has 4 member:
#   data: the data table with a column of continuous-valued attribute and a column of class label
#   size: number of data case in this table
#   number_of_classes: obviously, the number of class in this table
#   entropy: entropy of dataset
class Block:
    def __init__(self, data):
        self.data = data
        self.size = len(data)
        classes = set([x[1] for x in data])     # get distinct class labels in this table
        self.number_of_classes = len(set(classes))
        self.entropy = calculate_entropy(data)


# Calculate the entropy of dataset
# parameter data: the data table to be used
def calculate_entropy(data):
    number_of_data = len(data)
    classes = set([x[1] for x in data])
    class_count = dict([(label, 0) for label in classes])
    for data_case in data:
        class_count[data_case[1]] += 1      # count the number of data case of each class
    entropy = 0
    for c in classes:
        p = class_count[c] / number_of_data
        entropy -= p * math.log2(p)         # calculate information entropy by its formula, where the base is 2
    return entropy


# Compute Gain(A, T: S) mentioned in Dougherty, Kohavi & Sahami (1995), i.e. entropy gained by splitting original_block
#   into left_block and right_block
# original_block: the block before partition
# left_block: the block split which its value below boundary
# right_block: the block above boundary
def entropy_gain(original_block, left_block, right_block):
    gain = original_block.entropy - \
            ((left_block.size / original_block.size) * left_block.entropy +
            (right_block.size / original_block.size) * right_block.entropy)
    return gain


# Get minimum entropy gain required for a split of original_block into 2 blocks "left" and "right", see Dougherty,
#   Kohavi & Sahami (1995)
# original_block: the block before partition
# left_block: the block split which its value below boundary
# right_block: the block above boundary
def min_gain(original_block, left_block, right_block):
    delta = math.log2(math.pow(3, original_block.number_of_classes) - 2) - \
            (original_block.number_of_classes * original_block.entropy -
             left_block.number_of_classes * left_block.entropy -
             right_block.number_of_classes * right_block.entropy)
    gain_sup = math.log2(original_block.size - 1) / original_block.size + delta / original_block.size
    return gain_sup


# Identify the best acceptable value to split block
# block: a block of dataset
# Return value: a list of (boundary, entropy gain, left block, right block) or
#   None when it's unnecessary to split
def split(block):
    candidates = [x[0] for x in block.data]     # candidates is a list of values can be picked up as boundary
    candidates = list(set(candidates))          # get different values in table
    candidates.sort()                           # sort ascending
    candidates = candidates[1:]                 # discard smallest, because by definition no value is smaller

    wall = []       # wall is a list storing final boundary
    for value in candidates:
        # split by value into 2 groups, below & above
        left_data = []
        right_data = []
        for data_case in block.data:
            if data_case[0] < value:
                left_data.append(data_case)
            else:
                right_data.append(data_case)

        left_block = Block(left_data)
        right_block = Block(right_data)

        gain = entropy_gain(block, left_block, right_block)
        threshold = min_gain(block, left_block, right_block)

        # minimum threshold is met, the value is an acceptable candidate
        if gain >= threshold:
            wall.append([value, gain, left_block, right_block])

    if wall:    # has candidate
        wall.sort(key=lambda wall: wall[1], reverse=True)   # sort descending by "gain"
        return wall[0]      # return best candidate with max entropy gain
    else:
        return None         # no need to split


# Top-down recursive partition of a data block, append boundary into "walls"
# block: a data block
def partition(block):
    walls = []

    # inner recursive function, accumulate the partitioning values
    # sub_block: just a data block
    def recursive_split(sub_block):
        wall_returned = split(sub_block)        # binary partition, get bin boundary
        if wall_returned:                       # still can be spilt
            walls.append(wall_returned[0])      # record this partitioning value
            recursive_split(wall_returned[2])   # recursively process left block
            recursive_split(wall_returned[3])   # recursively split right block
        else:
            return                              # end of recursion

    recursive_split(block)      # call inner function
    walls.sort()                # sort boundaries descending
    return walls

In [None]:
def get_mode(arr):
    mode = []
    arr_appear = dict((a, arr.count(a)) for a in arr)   # count appearance times of each key
    if max(arr_appear.values()) == 1:       # if max time is 1
        return      # no mode here
    else:
        for k, v in arr_appear.items():     # else, mode is the number which has max time
            if v == max(arr_appear.values()):
                mode.append(k)
    return mode[0]  # return first number if has many modes


# Fill missing values in column column_no, when missing values ration below 50%.
# data: original data list
# column_no: identify the column No. of that to be filled
def fill_missing_values(data, column_no):
    size = len(data)
    column_data = [x[column_no] for x in data]      # get that column
    while '?' in column_data:
        column_data.remove('?')
    mode = get_mode(column_data)
    for i in range(size):
        if data[i][column_no] == '?':
            data[i][column_no] = mode              # fill in mode
    return data


# Get the list needed by rmep.py, just glue the data column with class column.
# data_column: the data column
# class_column: the class label column
def get_discretization_data(data_column, class_column):
    size = len(data_column)
    result_list = []
    for i in range(size):
        result_list.append([data_column[i], class_column[i]])
    return result_list


# Replace numerical data with the No. of interval, i.e. consecutive positive integers.
# data: original data table
# column_no: the column No. of that column
# walls: the split point of the whole range
def replace_numerical(data, column_no, walls):
    size = len(data)
    num_spilt_point = len(walls)
    for i in range(size):
        if data[i][column_no] > walls[num_spilt_point - 1]:
            data[i][column_no] = num_spilt_point + 1
            continue
        for j in range(0, num_spilt_point):
            if data[i][column_no] <= walls[j]:
                data[i][column_no] = j + 1
                break
    return data


# Replace categorical values with a positive integer.
# data: original data table
# column_no: identify which column to be processed
def replace_categorical(data, column_no):
    size = len(data)
    classes = set([x[column_no] for x in data])
    classes_no = dict([(label, 0) for label in classes])
    j = 1
    for i in classes:
        classes_no[i] = j
        j += 1
    for i in range(size):
        data[i][column_no] = classes_no[data[i][column_no]]
    return data, classes_no


# Discard all the column with its column_no in discard_list
# data: original data set
# discard_list: a list of column No. of the columns to be discarded
def discard(data, discard_list):
    size = len(data)
    length = len(data[0])
    data_result = []
    for i in range(size):
        data_result.append([])
        for j in range(length):
            if j not in discard_list:
                data_result[i].append(data[i][j])
    return data_result


# Main method here, see Description in detail
# data: original data table
# attribute: a list of the name of attribute
# value_type: a list identifying the type of each column
# Returned value: a data table after process
def pre_process(data, attribute, value_type):
    column_num = len(data[0])
    size = len(data)
    class_column = [x[-1] for x in data]
    discard_list = []
    for i in range(0, column_num - 1):
        data_column = [x[i] for x in data]

        # process missing values
        missing_values_ratio = data_column.count('?') / size
        if missing_values_ratio > 0.5:
            discard_list.append(i)
            continue
        elif missing_values_ratio > 0:
            data = fill_missing_values(data, i)
            data_column = [x[i] for x in data]

        # discretization
        if value_type[i] == 'numerical':
            discretization_data = get_discretization_data(data_column, class_column)
            block = Block(discretization_data)
            walls = partition(block)
            if len(walls) == 0:
                max_value = max(data_column)
                min_value = min(data_column)
                print("Attribute: ",attribute[i])
                step = (max_value - min_value) / 3
                walls.append(min_value + step)
                walls.append(min_value + 2 * step)
            #print(attribute[i] + ":", walls)        # print out split points
            data = replace_numerical(data, i, walls)
        elif value_type[i] == 'categorical':
            data, classes_no = replace_categorical(data, i)
            print(attribute[i] + ":", classes_no)   # print out replacement list

    # discard
    if len(discard_list) > 0:
        data = discard(data, discard_list)
        print("discard:", discard_list)             # print out discard list
    return data

# Rule Generation

This preprocess function is specific to the rule generation step

In [None]:
# Preprocesses a dataframe to a transaction list
# Parameters:
# [DataFrame] df = the pandas dataframe to preprocess
# Returns:
# [list] list of preprocessed transactions
# [dict] dict of string to int replacements
# [dict] dict of int to string replacements

def preprocess_data(df):
    values = set([np.nan])
    for col in df.columns:
        values = values.union(set(df[col].unique()))
    replacement_dict = {k: v for v, k in enumerate(values)}
    inverse_dict = dict(map(reversed, replacement_dict.items()))
    preprocessed_df = df.replace(replacement_dict)
    transactions = [[element for element in row if element != 0] for row in preprocessed_df.values.tolist()]

    return transactions, replacement_dict, inverse_dict

In [None]:
# Postprocess to a dataframe from rules set
# Parameters:
# [set] rules = the rules set
# Returns:
# [DataFrame] returns the rules as a dataframe

def postprocess_data(rules, inverse_dict):
    import pandas as pd
    df = pd.DataFrame(rules, columns=['LHS', 'RHS', 'Support', 'Confidence'])
    df['RHS'] = df['RHS'].apply(lambda x: inverse_dict[x])
    df['LHS'] = df['LHS'].apply(lambda x: [inverse_dict[y] for y in list(x)] )

    return df

In [None]:
# Splits replacement dict values to ids and classes
# Parameters:
# [dict] replacement_dict = dict of string to int replacements
# [list] classes_list = list of strings for classes
# Returns:
# [set] the ids for LHS
# [set] the classes for RHS

def split_classes_ids(replacement_dict, classes_list):
    classes = set([replacement_dict[i] for i in classes_list])
    ids = set(replacement_dict.values()) - classes
    return ids, classes

In [None]:
class CARapriori:

    def __init__(self, transactions):
        self.transactions = transactions

    # Performs the init pass
    # Parameters:
    # [list] transactions = The transaction list
    # [set] ids = list of all occuring ids
    # [set] target_ids = list of all class ids
    # [float] min_support = Minimum specified support
    # [float] min_confidence = Minimum specified confidence
    # Returns:
    # [dict] condsupCount_pruned = Returns a dict for all pruned candidates for counting occurences in transactions
    # [dict] rulesupCount_pruned = Returns a dict for all pruned candidates for counting occurences together with target classes
    # [set] target_ids = Set of all class ids
    
    def init_pass(self, ids, target_ids, min_support, min_confidence):
        #print("Init_pass")
        candidates = self.car_candidate_gen(target_ids, ids)
        condsupCount, rulesupCount = self.init_counters(candidates)
        
        condsupCount, rulesupCount = self.search(self.transactions, candidates, target_ids, condsupCount, rulesupCount)
        print(condsupCount)
        counters_rc_pruned, rulesupCount_pruned = self.prune(len(self.transactions), condsupCount, rulesupCount, min_support, min_confidence)
        
        return counters_rc_pruned, rulesupCount_pruned

    # Used to initate counters to count support and confidence
    # Parameters:
    # [set] candidate_sets = Returns a set of all expanded test sets
    # Returns:
    # [dict] condsupCount = Returns an empty dict for all candidates for counting occurences in transactions
    # [dict] rulesupCount = Returns an empty dict for all candidates for counting occurences together with target classes
    
    def init_counters(self, candidate_sets):
        rulesupCount = {}
        condsupCount = {}
        for c in candidate_sets:
            rulesupCount[c] = 0
            condsupCount[c[0]] = 0
            
        return condsupCount, rulesupCount

    # Used to generate new testable permutations
    # Parameters:
    # [list] target_ids = list of all class ids
    # [set] f_k1 = set of tuples of items
    # [set] c_condition_set = set of already explored items
    # Returns:
    # [list] c = List of candidates
    
    def car_candidate_gen(self, target_ids, f_k, c_condition_set = set()):
        c = list()
        for class_ in target_ids:
            for item_ in f_k:
                item_set = c_condition_set.copy()
                if isinstance(item_, tuple):
                    item_set.add(item_[0])
                else:
                    item_set.add(item_)
                item_set = tuple(item_set)
                c.append(tuple([item_set,class_]))
        return c
        
    # Used to create new test sets
    # Parameters:
    # [set] last_pruned = the remaining item ids
    # [set] target_ids = Set of all class ids
    # Returns:
    # [set] candidate_sets = Returns a set of all expanded test sets
    
    def expand(self, last_pruned, target_ids):
        common_set = set()
        
        for key in last_pruned.keys():
            common_set.add(key[0])
        
        candidate_sets = set()
        for key in last_pruned.keys():
            new_set = common_set.copy()
            new_set.remove(key[0])
            
            candidates = self.car_candidate_gen(target_ids, new_set, set(key[0]))
            candidate_sets = candidate_sets.union(candidates)
        
        return candidate_sets

    # Search the transaction list and count occurences.
    # Parameters:
    # [list] transactions = The transaction list
    # [set] candidate_sets = A set of all expanded test sets
    # [set] target_ids = Set of all class ids
    # [dict] condsupCount = An empty dict for all candidates for counting occurences in transactions
    # [dict] rulesupCount = An empty dict for all candidates for counting occurences together with target classes
    # Returns:
    # [dict] condsupCount = Returns a dict for all candidates for counting occurences in transactions
    # [dict] rulesupCount = Returns a dict for all candidates for counting occurences together with target classes
    
    def search(self, transactions, candidate_sets, target_ids, condsupCount, rulesupCount):
        #print("Search begins, before outer loop")
        for t in transactions:
            t_set = set(t)
            classes_in_trans = t_set.intersection(target_ids)
            found_in_transaction = {}
            #print("Before inner loop")
            for c in candidate_sets:
                #print("Inner loop")
                items_set = set(c[0])
                items_in_trans = t_set.intersection(items_set)
                
                if items_in_trans == items_set:
                    t_item_set = tuple(items_set)
                    if t_item_set not in found_in_transaction:
                        #print(t_item_set)
                        #print(condsupCount)
                        if condsupCount.get(t_item_set) == None:
                            condsupCount[t_item_set] = 0
                        condsupCount[t_item_set] += 1
                        found_in_transaction[t_item_set] = True
                    
                    if c[1] in classes_in_trans:
                        rulesupCount[tuple(c)] += 1
            #print(condsupCount)
                        
        return condsupCount, rulesupCount

    # Prunes the results based on the given counters and thresholds
    # Parameters:
    # [int] transactions_length = The transaction list length
    # [dict] condsupCount = A dict for all candidates for counting occurences in transactions
    # [dict] rulesupCount = A dict for all candidates for counting occurences together with target classes
    # [float] min_support = Minimum specified support
    # [float] min_confidence = Minimum specified confidence
    # Returns:
    # [dict] condsupCount_pruned = Returns a dict for all pruned candidates for counting occurences in transactions
    # [dict] rulesupCount_pruned = Returns a dict for all pruned candidates for counting occurences together with target classes
    
    def prune(self, transactions_length, condsupCount, rulesupCount, min_support, min_confidence):
        condsupCount_pruned = dict()
        rulesupCount_pruned = dict()
        
        for key, val in condsupCount.items():
            if val > 0:
                support = round(val/transactions_length, 3)
                if support >= min_support:
                    condsupCount_pruned[key] = support
            
        for key, val in rulesupCount.items():
            if val > 0 and key[0] in condsupCount_pruned:
                confidence = round(val/condsupCount[key[0]], 3)
                if confidence >= min_confidence:
                    rulesupCount_pruned[key] = confidence
        
        return condsupCount_pruned, rulesupCount_pruned


    # Add the rules to the set
    # Parameters:
    # [set] rules = New generated rules
    # [dict] condsupCount_pruned = A dict for all pruned candidates for counting occurences in transactions
    # [dict] rulesupCount_pruned = A dict for all pruned candidates for counting occurences together with target classes
    # Returns:
    # [bool] Returns True if new rules are added
    
    def add_rules(self, rules, counters_rc_pruned, rulesupCount_pruned):
    
        rules_before = len(rules)
        for key, val in rulesupCount_pruned.items():
            rules.add(tuple([key[0],key[1],counters_rc_pruned[key[0]],val]))   
        rules_after = len(rules)
        
        #return True if new rules added
        return rules_after > rules_before 
        
    # Main function
    # Parameters:
    # [set] ids = list of all occuring ids
    # [set] target_ids = list of all class ids
    # [float] min_support = Minimum specified support
    # [float] min_confidence = Minimum specified confidence
    # [int] max_length = Maximum rule length to search for
    # Returns:
    # [set] Returns the data mined rules
    
    def run(self, ids, target_ids, min_support=0.01, min_confidence=0.5, max_length=2):
        rules = set()
        
        #print("inital pass")
        counters_rc_pruned, rulesupCount_pruned = self.init_pass(ids, target_ids, min_support, min_confidence)
        #print("Counter: ",counters_rc_pruned)
        #print("RulesupCount: ",rulesupCount_pruned)

        #try to add new rules
        rules_added = self.add_rules(rules, counters_rc_pruned, rulesupCount_pruned)

        #print("rules_added",rules_added)
        if rules_added:
            for iteration in range(max_length):

                #print("expand new candidates")
                candidate_sets = self.expand(rulesupCount_pruned, target_ids)
                #print("init counters")
                counters_rc, rulesupCount = self.init_counters(candidate_sets)
                #print(counters_rc)
                #search for test sets
                counters_rc, rulesupCount = self.search(self.transactions, candidate_sets, target_ids, counters_rc, rulesupCount)
                #prune
                counters_rc_pruned, rulesupCount_pruned = self.prune(len(self.transactions), counters_rc, rulesupCount, min_support, min_confidence)
                #add
                rules_added = self.add_rules(rules, counters_rc_pruned, rulesupCount_pruned)

                if not rules_added:
                #early stopping
                    break


        return rules


Read data from data and names files using Read function

In [None]:
data_path = 'iris.data' # only change filename here for different datasets
scheme_path = 'iris.names'
data, attributes, value_type = read(data_path, scheme_path)

In [None]:
#random.shuffle(data)
d = pre_process(data, attributes, value_type)
#print(type(df))
print(d)
#df = data

[[1, 3, 1, 1, 'Iris-setosa'], [1, 1, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 1, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 1, 1, 1, 'Iris-setosa'], [1, 1, 1, 1, 'Iris-setosa'], [2, 3, 1, 1, 'Iris-setosa'], [2, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [2, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 1, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 3, 1, 1, 'Iris-setosa'], [1, 2, 1, 1, 

Convert values of list from <value.> to <column no.,value>

In [None]:
for i in range(len(d)):
  for j in range(len(d[i]) - 1):
    #print(df[i][j])
    d[i][j] = str("(") + str(j) + "," + str(d[i][j]) + str(")") #this has to be a datatype acceptable by pandas unique
    #df[i][j] = tuple(str(j)) + tuple(str(df[i][j]))
    #print(df[i][j])

In [None]:
d = pd.DataFrame(d)

In [None]:
df = d.sample(frac=0.7,random_state = 25)
test_df = d.drop(df.index)

In [None]:
d = d.values.tolist()

In [None]:
test_df = test_df.values.tolist()

In [None]:
classLabels = pd.unique(df[len(df.columns)-1])
print((classLabels))

['Iris-setosa' 'Iris-versicolor' 'Iris-virginica']


In [None]:
transactions, replacement_dict, inverse_dict = preprocess_data(df)
# print("Transactions: ",transactions)
# print("Replacement_dict: ", replacement_dict)
# print("Inverse_dict: ", inverse_dict)
car = CARapriori(transactions)
# print("Car: ",car)
ids, classes = split_classes_ids(replacement_dict, classLabels)
# print("Id: ",ids)
# print("Classes: ",classes)

minsup = 0.01
minconf = 0.5
rules = car.run(ids, classes,minsup,minconf,3)
print('CARs')
#print(rules)
final = postprocess_data(rules, inverse_dict)
print(final)

{(0,): 0, (1,): 45, (2,): 35, (3,): 17, (4,): 25, (5,): 44, (8,): 40, (10,): 35, (11,): 57, (12,): 34, (13,): 31, (14,): 36, (15,): 21}
CARs
                              LHS              RHS  Support  Confidence
0                  [(1,1), (3,3)]   Iris-virginica    0.114       1.000
1    [(0,1), (2,2), (1,1), (3,2)]  Iris-versicolor    0.076       1.000
2           [(0,3), (2,3), (3,3)]   Iris-virginica    0.152       1.000
3                  [(1,3), (3,3)]   Iris-virginica    0.010       1.000
4           [(0,3), (1,1), (3,2)]  Iris-versicolor    0.124       0.615
..                            ...              ...      ...         ...
108                [(0,2), (3,2)]  Iris-versicolor    0.162       0.765
109                [(0,3), (1,1)]   Iris-virginica    0.200       0.619
110  [(3,1), (0,1), (1,3), (2,1)]      Iris-setosa    0.133       1.000
111                [(2,2), (1,1)]  Iris-versicolor    0.286       0.933
112         [(2,2), (0,3), (3,2)]  Iris-versicolor    0.086       1

Preparing Dataset and Rules for comparison

In [None]:
cars = final.values.tolist()

In [None]:
df = df.values.tolist()

# Classification
This section has all relevant functions and classes for our classifier

is_satisfy is the main check function which compares each datacase to each generated rule

In [None]:
def is_satisfy(datacase,rulecase):
  #print("Datacase: ",datacase[:-2])
  #print("Rulecase: ",rulecase[0])
  result =  all(elem in datacase[:-1] for elem in rulecase[0])
  #print("result: ",result)
  if not result:
  #if datacase[:-2] != rulecase[0] or datacase[-1] != rulecase[1]: #if datacase elements do not match condset, return none
    return None
  elif datacase[-1] == rulecase[1]: #if condset & class label match
    return True
  else:
    return False

### CBA-CB  - M1

In [None]:
class Classifier:
    """
    This class is our classifier.
    """
    def __init__(self):
        self.rule_list = list()
        self.default_class = None
        self._error_list = list()
        self._default_class_list = list()

    # insert a rule into rule_list, then choose a default class, and calculate the errors (see line 8, 10 & 11)
    def insert(self, rule, dataset):
        #print("Insert")
        self.rule_list.append(rule)             # insert r at the end of C
        self.sel_defclass(dataset)     # select a default class for the current C
        self.comp_err(dataset)            # compute the total number of errors of C

    # select the majority class in the remaining data
    def sel_defclass(self, dataset):
        #print("Select default class")
        class_column = [x[-1] for x in dataset]
        class_label = set(class_column)
        max = 0
        current_default_class = None
        for label in class_label:
            if class_column.count(label) >= max:
                max = class_column.count(label)
                current_default_class = label
        self._default_class_list.append(current_default_class)

    # compute the total number of errors
    def comp_err(self, dataset):
        #print("Compute error")
        if len(dataset) <= 0:
            self._error_list.append(sys.maxsize)
            return

        error_number = 0

        # the total number of errors that have been made by all the selected rules in C
        for case in dataset:
            is_cover = False
            for rule in self.rule_list:
                if is_satisfy(case, rule):
                    is_cover = True
                    break
            if not is_cover:
                error_number += 1

        # the number of errors to be made by the default class in the training set
        class_column = [x[-1] for x in dataset]
        error_number += len(class_column) - class_column.count(self._default_class_list[-1])
        self._error_list.append(error_number)

    # see line 14 and 15, to get the final classifier
    def discard(self):
        #print("discard")
        # find the first rule p in C with the lowest total number of errors and drop all the rules after p in C
        index = self._error_list.index(min(self._error_list))
        self.rule_list = self.rule_list[:(index+1)]
        self._error_list = None

        # assign the default class associated with p to default_class
        self.default_class = self._default_class_list[index]
        self._default_class_list = None

    # just print out all selected rules and default class in our classifier
    def print(self):
        print("Selected Rules: ") 
        for rule in self.rule_list:
            print(rule)
        print("default_class:", self.default_class)


Precedence calculation is a key step for our classification because our rule list is sorted according to the precedence of the rules and this helps us to quickly cover each training case with the most apt rule

In [None]:
# sort the set of generated rules car according to the precedence and return the sorted rule list
def prec_sort(car):
    def cmp_method(a, b):
        if a[3] < b[3]:     # 1. the confidence of ri > rj
            return 1
        elif a[3] == b[3]:
            if a[2] < b[2]:       # 2. their confidences are the same, but support of ri > rj
                return 1
            elif a[2] == b[2]:
                if len(a[0]) < len(b[0]):   # 3. both confidence & support are the same, ri earlier than rj
                    return -1
                elif len(a[0]) == len(b[0]):
                    return 0
                else:
                    return 1
            else:
                return -1
        else:
            return -1

    rule_list = car
    rule_list.sort(key=functools.cmp_to_key(cmp_method))
    # for a,b in itertools.combinations_with_replacement(rule_list,2):
    #   while(rule_list.index(a)!=rule_list.index(a)):
    #     cmp_method(a,b)
    return rule_list


#### CBA - M1 main classifier builder

In [None]:
# main method of CBA-CB: M1
def classifier_builder_m1(cars, dataset):
    #print("Entered builder")
    classifier = Classifier()
    #print("after Classifier()")
    cars_list = prec_sort(cars)
    print(cars_list)
    for rule in cars_list:
        #print("inside loop")
        temp = []
        mark = False
        for i in range(len(dataset)):
            #print("inside inner loop")
            is_satisfy_value = is_satisfy(dataset[i], rule)
            #print("is_satisfy_value: ",is_satisfy_value)
            if is_satisfy_value is not None:
                temp.append(i)
                if is_satisfy_value:
                    mark = True
        #print("mark: ",mark,"dataset: ",dataset[i])
        if mark:
            temp_dataset = list(dataset)
            for index in temp:
                temp_dataset[index] = []
            while [] in temp_dataset:
                temp_dataset.remove([])
            dataset = temp_dataset
            classifier.insert(rule, dataset)
            #print("inserted")
    classifier.discard()
    return classifier

In [None]:
classifier_m1 = classifier_builder_m1(cars, df)
classifier_m1.print()

[[['(2,1)'], 'Iris-setosa', 0.343, 1.0], [['(3,1)', '(2,1)'], 'Iris-setosa', 0.343, 1.0], [['(0,1)', '(2,1)'], 'Iris-setosa', 0.324, 1.0], [['(3,1)', '(0,1)', '(2,1)'], 'Iris-setosa', 0.324, 1.0], [['(3,3)'], 'Iris-virginica', 0.2, 1.0], [['(2,3)', '(3,3)'], 'Iris-virginica', 0.2, 1.0], [['(0,3)', '(3,3)'], 'Iris-virginica', 0.152, 1.0], [['(3,1)', '(1,3)'], 'Iris-setosa', 0.152, 1.0], [['(1,3)', '(2,1)'], 'Iris-setosa', 0.152, 1.0], [['(0,3)', '(2,3)', '(3,3)'], 'Iris-virginica', 0.152, 1.0], [['(3,1)', '(1,3)', '(2,1)'], 'Iris-setosa', 0.152, 1.0], [['(0,1)', '(1,3)'], 'Iris-setosa', 0.133, 1.0], [['(0,1)', '(1,3)', '(2,1)'], 'Iris-setosa', 0.133, 1.0], [['(3,1)', '(0,1)', '(1,3)'], 'Iris-setosa', 0.133, 1.0], [['(3,1)', '(0,1)', '(1,3)', '(2,1)'], 'Iris-setosa', 0.133, 1.0], [['(1,1)', '(3,3)'], 'Iris-virginica', 0.114, 1.0], [['(1,1)', '(2,3)', '(3,3)'], 'Iris-virginica', 0.114, 1.0], [['(0,1)', '(2,2)'], 'Iris-versicolor', 0.095, 1.0], [['(0,1)', '(2,2)', '(1,1)'], 'Iris-versicolo

### CBA-CB - M2 Improved Classifier

In [None]:
class Classifier_m2:
    """
    The definition of classifier formed in CBA-CB: M2. It contains a list of rules order by their precedence, a default
    class label.
    """
    def __init__(self):
        self.rule_list = list()
        self.default_class = None
        self._default_class_list = list()
        self._total_errors_list = list()

    # insert a new rule into classifier
    def add(self, rule, default_class, total_errors):
        #print("Adding")
        self.rule_list.append(rule)
        self._default_class_list.append(default_class)
        self._total_errors_list.append(total_errors)

    # discard those rules that introduce more errors. See line 18-20, CBA-CB: M2 (Stage 3).
    def discard(self):
        index = self._total_errors_list.index(min(self._total_errors_list))
        self.rule_list = self.rule_list[:(index + 1)]
        self._total_errors_list = None

        self.default_class = self._default_class_list[index]
        self._default_class_list = None

    # just print out rules and default class label
    def print(self):
        print("Selected Rules: ")
        for rule in self.rule_list:
            print(rule[:4])
        print("default_class:", self.default_class)

def return_cases(dataset):
        class_column = [x[-1] for x in dataset]
        class_label = set(class_column)
        x = dict((x, 0) for x in class_label)
        #print(x)
        return x

# convert ruleitem of class RuleItem to rule of class Rule
def ruleitem2rule(rule_item, dataset):
    rule_item.append(return_cases(dataset))
    rule_item.append(set())
    #rule = Rule(rule_item.cond_set, rule_item.class_label, dataset)
    #print(rule_item)
    return rule_item


# finds the highest precedence rule that covers the data case d from the set of rules having the same class as d.
def maxCoverRule_correct(cars_list, data_case):
    for i in range(len(cars_list)):
        if cars_list[i][1] == data_case[-1]:
            if is_satisfy(data_case, cars_list[i]):
                return i
    return None


# finds the highest precedence rule that covers the data case d from the set of rules having the different class as d.
def maxCoverRule_wrong(cars_list, data_case):
    for i in range(len(cars_list)):
        if cars_list[i][1] != data_case[-1]:
            temp_data_case = data_case[:-1]
            temp_data_case.append(cars_list[i][1])
            if is_satisfy(temp_data_case, cars_list[i]):
                return i
    return None


# compare two rule, return the precedence.
#   -1: rule1 < rule2, 0: rule1 < rule2 (randomly here), 1: rule1 > rule2
def compare(rule1, rule2):
    if rule1 is None and rule2 is not None:
        return -1
    elif rule1 is None and rule2 is None:
        return 0
    elif rule1 is not None and rule2 is None:
        return 1

    if rule1[3] < rule2[3]:     # 1. the confidence of ri > rj
        return -1
    elif rule1[3] == rule2[3]:
        if rule1[2] < rule2[2]:       # 2. their confidences are the same, but support of ri > rj
            return -1
        elif rule1[2] == rule2[2]:
            if len(rule1[0]) < len(rule2[0]):   # 3. confidence & support are the same, ri earlier than rj
                return 1
            elif len(rule1[0]) == len(rule2[0]):
                return 0
            else:
                return -1
        else:
            return 1
    else:
        return 1


# finds all the rules in U that wrongly classify the data case and have higher precedences than that of its cRule.
def allCoverRules(u, data_case, c_rule, cars_list):
    w_set = []
    for rule_index in u:
        # have higher precedences than cRule
        if compare(cars_list[rule_index], c_rule) > 0:
            # wrongly classify the data case
            if is_satisfy(data_case, cars_list[rule_index]) == False:
                w_set.append(rule_index)
    return w_set


# counts the number of training cases in each class
def compClassDistr(dataset):
    class_distr = dict()

    if len(dataset) <= 0:
        class_distr = None

    dataset_without_null = dataset
    while [] in dataset_without_null:
        dataset_without_null.remove([])

    class_column = [x[-1] for x in dataset_without_null]
    class_label = set(class_column)
    for c in class_label:
        class_distr[c] = class_column.count(c)
    return class_distr


# sort the rule list order by precedence
def sort_with_index(q, cars_list):
    def cmp_method(a, b):
        # 1. the confidence of ri > rj
        if cars_list[a][3] < cars_list[b][3]:
            return 1
        elif cars_list[a][3] == cars_list[b][3]:
            # 2. their confidences are the same, but support of ri > rj
            if cars_list[a][2] < cars_list[b][2]:
                return 1
            elif cars_list[a][2] == cars_list[b][2]:
                # 3. both confidence & support are the same, ri earlier than rj
                if len(cars_list[a][0]) < len(cars_list[b][0]):
                    return -1
                elif len(cars_list[a][0]) == len(cars_list[b][0]):
                    return 0
                else:
                    return 1
            else:
                return -1
        else:
            return -1

    rule_list = q
    rule_list.sort(key=functools.cmp_to_key(cmp_method))
    return rule_list


# get how many errors the rule wrongly classify the data case
def errorsOfRule(rule, dataset):
    error_number = 0
    for case in dataset:
        if case:
            if is_satisfy(case, rule) == False:
                error_number += 1
    return error_number


# choose the default class (majority class in remaining dataset)
def selectDefault(class_distribution):
    if class_distribution is None:
        return None

    max = 0
    default_class = None
    for index in class_distribution:
        if class_distribution[index] > max:
            max = class_distribution[index]
            default_class = index
    return default_class


# count the number of errors that the default class will make in the remaining training data
def defErr(default_class, class_distribution):
    if class_distribution is None:
        import sys
        return sys.maxsize

    error = 0
    for index in class_distribution:
        if index != default_class:
            error += class_distribution[index]
    return error


#### CBA - M2 main classifier builder

In [None]:
# main method, implement the whole classifier builder
def classifier_builder_m2(cars, dataset):
    classifier = Classifier_m2()

    cars_list = prec_sort(cars)
    for i in range(len(cars_list)):
        cars_list[i] = ruleitem2rule(cars_list[i], dataset)
        #print(cars_list[i])
    #for j in range()

    # stage 1
    q = [] # set
    u = [] # set
    a = [] # set
    mark_set = [] # set
    for i in range(len(dataset)):
        c_rule_index = maxCoverRule_correct(cars_list, dataset[i])
        w_rule_index = maxCoverRule_wrong(cars_list, dataset[i])
        if c_rule_index is not None:
            u.append(c_rule_index)
        if c_rule_index:
            cars_list[c_rule_index][4][dataset[i][-1]] += 1
        if c_rule_index and w_rule_index:
            if compare(cars_list[c_rule_index], cars_list[w_rule_index]) > 0:
                q.append(c_rule_index)
                mark_set.append(c_rule_index)
            else:
                a.append((i, dataset[i][-1], c_rule_index, w_rule_index))
        elif c_rule_index is None and w_rule_index is not None:
            a.append((i, dataset[i][-1], c_rule_index, w_rule_index))

    print("a: ",a)
    print("mark_set: ",mark_set)
    # stage 2
    for entry in a:
        lol = entry[3]
        if cars_list[lol] in mark_set:
            if entry[2] is not None:
                cars_list[entry[2]][4][entry[1]] -= 1
            cars_list[entry[3]][4][entry[1]] += 1
        else:
            if entry[2] is not None:
                w_set = allCoverRules(u, dataset[entry[0]], cars_list[entry[2]], cars_list)
            else:
                w_set = allCoverRules(u, dataset[entry[0]], None, cars_list)
            for w in w_set:
                cars_list[w][5].add((entry[2], entry[0], entry[1]))
                cars_list[w][4][entry[1]] += 1
            q = list(set().union(q,w_set))
            #q |= w_set

    # stage 3
    rule_errors = 0
    q = sort_with_index(q, cars_list)
    data_cases_covered = list([False] * len(dataset))
    for r_index in q:
        if cars_list[r_index][4][cars_list[r_index][1]] != 0:
            for entry in cars_list[r_index][5]:
                if data_cases_covered[entry[1]]:
                    cars_list[r_index][4][entry[2]] -= 1
                else:
                    if entry[0] is not None:
                        cars_list[entry[0]][4][entry[2]] -= 1
            for i in range(len(dataset)):
                datacase = dataset[i]
                if datacase:
                    is_satisfy_value = is_satisfy(datacase, cars_list[r_index])
                    if is_satisfy_value:
                        dataset[i] = []
                        data_cases_covered[i] = True
            rule_errors += errorsOfRule(cars_list[r_index], dataset)
            class_distribution = compClassDistr(dataset)
            default_class = selectDefault(class_distribution)
            default_errors = defErr(default_class, class_distribution)
            total_errors = rule_errors + default_errors
            classifier.add(cars_list[r_index], default_class, total_errors)
    classifier.discard()

    return classifier

In [None]:
classifier_m2 = classifier_builder_m2(cars, df)
classifier_m2.print()

a:  [(1, 'Iris-versicolor', 94, 80), (35, 'Iris-versicolor', 94, 80), (44, 'Iris-virginica', None, 74), (60, 'Iris-virginica', None, 74), (69, 'Iris-versicolor', 94, 80), (100, 'Iris-versicolor', 92, 81)]
mark_set:  [4, 19, 81, 17, 17, 80, 19, 80, 17, 17, 42, 17, 4, 17, 4, 19, 4, 4, 4, 4, 80, 4, 4, 80, 17, 81, 19, 17, 19, 4, 42, 4, 80, 17, 19, 80, 4, 19, 4, 19, 19, 80, 4, 17]
Selected Rules: 
[['(3,3)'], 'Iris-virginica', 0.2, 1.0]
[['(0,1)', '(2,2)'], 'Iris-versicolor', 0.095, 1.0]
[['(0,3)', '(2,2)'], 'Iris-versicolor', 0.086, 1.0]
[['(3,1)', '(2,2)'], 'Iris-versicolor', 0.038, 1.0]
[['(2,2)'], 'Iris-versicolor', 0.333, 0.943]
[['(0,3)', '(2,3)'], 'Iris-virginica', 0.248, 0.885]
default_class: Iris-setosa


# Accuracy Calculator

In [None]:
# calculates and returns the accuracy of the classifier on the dataset
def get_accuracy(classifier, dataset):
    size = len(dataset)
    correct_match = 0
    error_number = 0
    for case in dataset:
        is_satisfy_value = False
        for rule in classifier.rule_list:
            is_satisfy_value = is_satisfy(case, rule)
            #print(is_satisfy_value)
            if is_satisfy_value == True:
                correct_match += 1
                break
        if is_satisfy_value == False:
            error_number += 1
    return correct_match / (error_number + correct_match)

### 10 - Cross Validation

In [None]:
block_size = int(len(test_df) / 10)
split_point = [k * block_size for k in range(0, 10)]
split_point.append(len(test_df))

For M1

In [None]:
for k in range(len(split_point)-1):
    testt_df = d[split_point[k]:split_point[k+1]]
    print("Accuracy",k+1,": ",get_accuracy(classifier_m1,testt_df))

Accuracy 1 :  1.0
Accuracy 2 :  1.0
Accuracy 3 :  1.0
Accuracy 4 :  1.0
Accuracy 5 :  1.0
Accuracy 6 :  1.0
Accuracy 7 :  1.0
Accuracy 8 :  1.0
Accuracy 9 :  1.0
Accuracy 10 :  1.0


In [None]:
# accuracy of M1
accuracy_m1 = get_accuracy(classifier_m1,d)
print("Accuracy for M1: ",accuracy_m1)

Accuracy for M1:  0.9787234042553191


In [None]:
# accuracy of M2
accuracy_m2 = get_accuracy(classifier_m2,d)
print("Accuracy for M2: ",accuracy_m2)

Accuracy for M2:  1.0


In [None]:
d = test_df