In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
import math
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from IPython.display import display, display_png

pd.options.display.max_columns = None

In [None]:
def load_data (image_concepts_path, ids_patterns_path, cart_patterns_path, exp_patterns_path):

    image_concepts = pd.read_csv(image_concepts_path)
    ids_patterns = pd.read_csv(ids_patterns_path)
    cart_patterns = pd.read_csv(cart_patterns_path)
    exp_patterns = pd.read_csv(exp_patterns_path)

    return image_concepts, ids_patterns, cart_patterns, exp_patterns

In [None]:
def load_patterns ():

    all_patterns_list = []
	
    if "exp" in rule_methods: 
        exp_patterns = pd.read_csv(exp_patterns_path)
        exp_patterns['method'] = 'Exp'
        all_patterns_list.append(exp_patterns)
        
    if "ids" in rule_methods:
        ids_patterns = pd.read_csv(ids_patterns_path)
        ids_patterns['method'] = 'IDS'
        all_patterns_list.append(ids_patterns)
        
    if "cart" in rule_methods:
        cart_patterns = pd.read_csv(cart_patterns_path)
        cart_patterns['method'] = 'CART'
        all_patterns_list.append(cart_patterns)

    all_patterns = pd.concat(all_patterns_list, ignore_index=True)

    # ids_patterns['method'] = 'IDS'
    # cart_patterns['method'] = 'CART'
    # exp_patterns['method'] = 'Exp'
    # all_patterns = pd.concat([ids_patterns, cart_patterns, exp_patterns], ignore_index=True)

    patterns_to_remove = []
    exp_patterns_count = 0
    
    for i,pattern in all_patterns.iterrows():
        if remove_inactivated_patterns:
            # Removing patterns with "no" features, because they're not very accurate and useful: 
            to_remove = False
            for attr in list(pattern.index): 
                pattern_value = pattern[attr]
                if (attr not in meta_cols) and (pattern_value == low_value):
                    to_remove = True
                    break
                    
            if to_remove:
                print('Pattern {{{}}} to be removed because of having inactivated features!'.format(get_pattern_description(pattern)))
                patterns_to_remove.append(i)
                continue

        if pattern['method'] != 'Exp':
            continue

        # Using only the specified maximum number of Exp patterns: 
        # exp_patterns_count += 1
        # if exp_patterns_count > exp_num_patterns:
        #     print('Exp pattern {{{}}} to be removed because of max number of Exp patterns ({})!'.format(get_pattern_description(pattern), exp_num_patterns))
        #     patterns_to_remove.append(i)
        #     continue

        # Handling those patterns by Exp Tables which have confidence lower than 0.5 and need to be inverted to be meaningful and useful: 
        # pred = pattern['pred']
        # conf = pattern['confidence']
        # if conf < 0.5:
        #     new_pred = pred
        #     new_conf = conf
        #     classes = list(class_titles.keys())
        #     if len(classes) == 2:
        #         new_pred = 1 if pred == 0 else 0
        #         new_conf = 1.0 - conf
        #     else:
        #         # Need to find the new prediction and accurate confidence value, as there may be more than one other classes in this case: 
        #         pattern_cp = pattern.copy(deep=True)
        #         for c in classes:
        #             if pred == c:
        #                 continue
        #             pattern_cp['pred'] = c
        #             supporting_indices = find_images_supporting_pattern(image_concepts, pattern_cp)
        #             matching_indices = find_images_matching_pattern(image_concepts, pattern_cp, supporting_indices)
        #             temp_conf = len(matching_indices) / len(supporting_indices)
        #             if temp_conf > new_conf:
        #                 new_pred = c
        #                 new_conf = temp_conf

        #     all_patterns.loc[i, 'pred'] = new_pred
        #     all_patterns.loc[i, 'confidence'] = new_conf
        #     print('Exp pattern with pred {} and conf {} changed to new pred {} and new conf {}'.format(pred, conf, new_pred, new_conf))

    if len(patterns_to_remove) > 0:
        all_patterns.drop(patterns_to_remove, axis=0, inplace=True)

    all_patterns['support'] = all_patterns['support'].round(2)
    all_patterns['confidence'] = all_patterns['confidence'].round(2)
    all_patterns['accuracy'] = all_patterns['accuracy'].round(2)

    concept_cols = list(set(all_patterns.columns) - set(meta_cols))
    all_patterns['score'] = all_patterns.apply(lambda p: compute_pattern_score(p, concept_cols), axis=1)

    group_cols = list(all_patterns.columns)
    group_cols.remove('method')
    all_patterns_grouped = all_patterns.groupby(group_cols, as_index=False)

    all_patterns = all_patterns_grouped.agg({'method': lambda p: ', '.join(p.unique())})

    all_patterns.sort_values(by=['score', 'confidence', 'support', 'accuracy', 'method'], ascending=False, inplace=True)
    all_patterns.reset_index(drop=True, inplace=True)
    all_patterns.insert(loc=0, column='index', value=(all_patterns.index + 1))
    all_patterns['score'] = all_patterns['score'].round(2)

    all_patterns = all_patterns.iloc[:max_patterns]
    all_patterns = all_patterns.loc[:, (all_patterns != -1).any(axis=0)]
    #all_patterns.dropna(axis=1, how='all', inplace=True)
    concept_cols = list(set(all_patterns.columns) - set(meta_cols))

    image_concepts = pd.read_csv(concepts_path)
    concepts_to_keep = set(concept_cols).union(set(['pred', 'label', 'id', 'file', 'path']))
    concepts_to_remove = list(set(image_concepts.columns) - concepts_to_keep)
    image_concepts.drop(concepts_to_remove, axis=1, inplace=True)

    return all_patterns, image_concepts, concept_cols

In [None]:
def find_images_supporting_pattern (image_concepts, pattern):

    df = image_concepts.copy()
    for attr in list(pattern.index): 
        pattern_value = pattern[attr]
        if (attr not in meta_cols) and (pattern_value != -1):
            if (not binning_features) or (pattern_value % 1 == 0): 
                df = df[df[attr] == pattern_value]
            else:
                # Handling the case of 0.5 or 1.5 values for a pattern feature: 
                a = math.floor(pattern_value)
                b = math.ceil(pattern_value)
                print('attr {} with value {}, floor {}, and ceil {}'.format(attr, pattern_value, a, b))
                df = df[(df[attr] == a) | (df[attr] == b)]

    supporting_indices = list(df.index.values)
    return supporting_indices

In [None]:
def find_images_matching_pattern (image_concepts, pattern, supporting_indices=None): 

    if supporting_indices is None:
        supporting_indices = find_images_supporting_pattern(image_concepts, pattern)
    pattern_label = pattern['pred']

    matching_indices = []
    supporting_labels = list(image_concepts.iloc[supporting_indices]['pred'])

    for i,label in enumerate(supporting_labels):
        if label == pattern_label:
            matching_indices.append(supporting_indices[i])

    return matching_indices

In [None]:
def find_images_supporting_pattern_not_matching (image_concepts, pattern, supporting_indices=None, matching_indices=None):

    if supporting_indices is None:
        supporting_indices = find_images_supporting_pattern(image_concepts, pattern)
    if matching_indices is None:
        matching_indices = find_images_matching_pattern(image_concepts, pattern, supporting_indices)
    
    nonmatching_indices = sorted(list(set(supporting_indices) - set(matching_indices)))
    return nonmatching_indices

In [None]:
def find_images_matching_pattern_wrong_predicted (image_concepts, pattern, matching_indices=None):

    if matching_indices is None:
        matching_indices = find_images_matching_pattern(image_concepts, pattern)

    wrong_indices = []
    matching_concepts = image_concepts.iloc[matching_indices][['pred', 'label']]

    for i,row in matching_concepts.iterrows():
        if row['pred'] != row['label']:
            wrong_indices.append(i)

    return wrong_indices

In [None]:
def find_images_having_concept (image_concepts, target_concept): 

    image_target_concepts = list(image_concepts[target_concept])
    matching_indices = []

    for i,val in enumerate(image_target_concepts):
        if val == 1:
            matching_indices.append(i)

    return matching_indices

In [None]:
def get_pattern_description (pattern):

    antecedents = []
    for attr in list(pattern.index): 
        if (attr not in meta_cols) and (pattern[attr] != -1):
            antecedents.append(attr + '=' + str(pattern[attr]))
    
    pred = pattern['pred']
    sup = pattern['support']
    conf = pattern['confidence']
    acc = pattern['accuracy']
    desc = 'If {}, then {} (sup: {}, conf: {}, acc: {})'.format(' & '.join(antecedents), class_titles[pred], sup, conf, acc)
    return desc

In [None]:
def get_image_description (img_concepts, target_concept=None, target_channel=None):

    desc = 'Predicted ' + r'$\it{' + class_titles[img_concepts['pred']] + '}$'
    desc += ', Labeled ' + r'$\it{' + class_titles[img_concepts['label']] + '}$'

    if target_concept is None: 
        high_concepts = []
        mid_concepts = []
        for attr in list(img_concepts.index):
            if (attr not in ['pred', 'label', 'id', 'file', 'path']):
                if binning_features:
                    if img_concepts[attr] == mid_value:
                        mid_concepts.append(attr)
                    elif img_concepts[attr] == high_value: 
                        high_concepts.append(attr)
                else:
                    if img_concepts[attr] == high_value:
                        high_concepts.append(attr)
    
        if len(high_concepts) > 0:
            part_title = '\nHigh concepts: ' if binning_features else '\nConcepts: '
            desc += part_title + r'$\it{' + ', '.join(high_concepts) + '}$'
        if len(mid_concepts) > 0:
            desc += '\nMid concepts: ' + r'$\it{' + ', '.join(mid_concepts) + '}$'
        if (len(high_concepts) == 0) and (len(mid_concepts) == 0):
            desc += '\nNo concepts activated'
    else:
        desc += '\nConcept ' + r'$\it{' + target_concept + '}$ highlighted'
        if target_channel != None:
            desc += ' (filter ' + r'$\it{' + str(target_channel) + '}$)'

    return desc

In [None]:
def list_image_activation_images (image_fname, activations_path, target_concept=None):

    ind = image_fname.rfind('.')
    image_fname_raw = image_fname[:ind]

    activations_info = {}
    for fname in os.listdir(activations_path):
        if not fname.startswith(image_fname_raw + "_"): 
            continue

        ind = fname.rfind('.')
        ext = fname[ind:]
        if (target_concept != None) and (not fname.endswith(target_concept + ext)):
            continue

        ind = len(image_fname_raw + "_")
        ind2 = fname.rfind(ext)
        main_body = fname[ind:ind2]
        parts = main_body.split('_')

        feature_value_cat = None
        channel = None
        concept = None
        if len(parts) == 2:
            channel = int(parts[0])
            concept = parts[1]
        elif len(parts) == 3:   # the case of binned features where the mid or high value of concept is also part of the file name
            feature_value_cat = parts[0]
            channel = int(parts[1])
            concept = parts[2]
        else:
            print('Error: Name of image file {} does not include feature value, concept and channel parts: {}'.format(fname, parts))
            continue

        if (target_concept != None) and (concept != target_concept):
            print('Error: Concept {} in name of file {} not matching the target concept {}'.format(concept, fname, target_concept))
            continue

        full_path = os.path.join(activations_path, fname)
        if concept in activations_info: 
            activations_info[concept].append((channel, full_path))
        else:
            activations_info[concept] = [(channel, full_path)]

    return activations_info

In [None]:
def prepare_image_items_for_display (matching_image_concepts, activations_path=None, target_concept=None):

    image_items = []
    for i, (ind, img_concepts) in enumerate(matching_image_concepts.iterrows()):
        img_item = {}
        img_path = img_concepts['path']
        img_fname = img_concepts['file']

        if target_concept is None:
            img_item['path'] = img_path
            img_item['desc'] = get_image_description(img_concepts)
            image_items.append(img_item)
        else:
            activations_info = list_image_activation_images(img_fname, activations_path, target_concept)
            
            if target_concept in activations_info:
                inf = activations_info[target_concept]
                item = inf[0]
                img_item['path'] = item[1]
                img_item['desc'] = get_image_description(img_concepts, target_concept, target_channel=item[0])
                image_items.append(img_item)
            else:
                print('Error: Activations info for image {} does not include the target concept {}: {}'.format(img_fname, target_concept, activations_info))

    return image_items

In [None]:
def plot_images (image_items, n_cols=3): 

    n_images = len(image_items)
    if n_images == 0:
        return

    n_rows = math.ceil(n_images / n_cols)

    fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols * 4, n_rows * 4))
    axs = axs.flatten()
    for i, img_item in enumerate(image_items):
        ax = axs[i]
        ax.axes.xaxis.set_visible(False)
        ax.axes.yaxis.set_visible(False)
        img_path = img_item['path']
        img_desc = img_item['desc']
        img = cv2.imread(img_path)
        ax.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        title = ax.set_title(img_desc, fontsize=11)
        #title.set_y(1.05)
        #fig.subplots_adjust(top=0.8, bottom=0.8)

    plt.tight_layout()
    plt.show()

In [None]:
def display_images_matching_pattern (pattern, image_concepts, activations_path=None, target_concept=None, max_images=20):

    matching_indices = find_images_matching_pattern(image_concepts, pattern)
    matching_image_concepts = image_concepts.iloc[matching_indices].iloc[:max_images]
    image_items = prepare_image_items_for_display(matching_image_concepts, activations_path, target_concept)
    
    print('\nImages matching pattern {{{}}}'.format(get_pattern_description(pattern)) + 
          (', with concept {} highlighted\n'.format(target_concept) if target_concept != None else '\n'))
    plot_images(image_items, n_cols=4)

In [None]:
def display_images_supporting_pattern_not_matching (pattern, image_concepts, activations_path=None, target_concept=None, max_images=20):

    nonmatching_indices = find_images_supporting_pattern_not_matching(image_concepts, pattern)
    nonmatching_image_concepts = image_concepts.iloc[nonmatching_indices].iloc[:max_images]
    image_items = prepare_image_items_for_display(nonmatching_image_concepts, activations_path, target_concept)

    print('\nImages supporting but not matching pattern {{{}}}'.format(get_pattern_description(pattern)) + 
          (', with concept {} highlighted\n'.format(target_concept) if target_concept != None else '\n'))
    plot_images(image_items, n_cols=4)

In [None]:
def display_images_matching_pattern_wrong_predicted (pattern, image_concepts, activations_path=None, target_concept=None, max_images=20):

    wrong_indices = find_images_matching_pattern_wrong_predicted(image_concepts, pattern)
    wrong_image_concepts = image_concepts.iloc[wrong_indices].iloc[:max_images]
    image_items = prepare_image_items_for_display(wrong_image_concepts, activations_path, target_concept)

    print('\nImages matching but predicted wrong for pattern {{{}}}'.format(get_pattern_description(pattern)) + 
          (', with concept {} highlighted\n'.format(target_concept) if target_concept != None else '\n'))
    plot_images(image_items, n_cols=4)

In [None]:
def display_image_activated_concepts (img_concepts, activations_path):

    img_id = img_concepts['id']
    img_fname = img_concepts['file']
    activations_info = list_image_activation_images(img_fname, activations_path)
    
    for con,inf in activations_info.items():
        image_items = []
        for item in inf:
            img_item = {}
            img_item['path'] = item[1]
            img_item['desc'] = get_image_description(img_concepts, target_concept=con, target_channel=item[0])
            image_items.append(img_item)

        print('\nImage {} with file name {}, with concept {} highlighted\n'.format(img_id, img_fname, con))
        plot_images(image_items, n_cols=3)

In [None]:
def display_images_having_concept (image_concepts, target_concept, activations_path=None, max_images=20):

    matching_indices = find_images_having_concept(image_concepts, target_concept)
    matching_image_concepts = image_concepts.iloc[matching_indices].iloc[:max_images]
    image_items = prepare_image_items_for_display(matching_image_concepts, activations_path, target_concept)
    
    print('\nImages having concept {}\n'.format(target_concept))
    plot_images(image_items, n_cols=4)

In [None]:
def display_single_images (image_concepts, target_concept=None, target_channel=None):

    image_items = []
    for i, (ind, img_concepts) in enumerate(image_concepts.iterrows()):
        img_item = {}
        img_item['path'] = img_concepts['path']
        img_item['desc'] = get_image_description(img_concepts, target_concept, target_channel)
        image_items.append(img_item)

    plot_images(image_items, n_cols=4)

In [None]:
def extract_class_titles (ds_name):
    ctitles = {}
    name_parts = ds_name.split('_')
    if len(name_parts) <= 1:
        return ctitles

    n_classes = len(name_parts[1:])
    for i,p in enumerate(name_parts[1:]):
        ctitles[i] = p
        if binning_classes:
            ctitles[i + n_classes] = 'maybe ' + p

    return ctitles

In [None]:
def compute_pattern_score (row, concept_cols):

    sup = row['support']
    conf = row['confidence']
    size = 0

    for col in concept_cols:
        if row[col] != -1:
            size += 1

    score = (sup * (conf ** 2)) / (size ** 2)   # (sup * conf) / size   # (sup * (conf ** 2)) / (size ** 2)  # (sup * (conf ** 2)) / (math.sqrt(size))
    return score

In [None]:
def get_feature_value_desc (val):
    
    if val == -1:
        return ''

    if not binning_features:
        return 'yes' if (val == high_value) else 'no'

    if val == high_value:
        return 'yes'
    elif val == mid_value:
        return 'maybe'
    elif val == low_value:
        return 'no'
    else:
        a = math.floor(val)
        b = math.ceil(val)
        if (a == low_value) and (b == mid_value):
            return 'no/maybe'
        elif (a == mid_value) and (b == high_value):
            return 'maybe/yes'
    
    return ''

In [None]:
current_setting_path = '/content/drive/My Drive/Python Projects/POEM Pipeline Results/current_setting.txt'
with open(current_setting_path, 'r') as f:
    current_setting_title = f.read().splitlines()[0]
    print('Current setting:', current_setting_title)

title_parts = current_setting_title.split('_')
model_name = title_parts[0]
dataset_name = '_'.join(title_parts[1:]) 

old_process = True
binning_classes = False
binning_features = False
class_titles = extract_class_titles(dataset_name)

high_value = 2 if binning_features else 1
mid_value = 1
low_value = 0

remove_inactivated_patterns = False
rule_methods = ['cart']
if not old_process:
    rule_methods.append('exp')
    rule_methods.append('ids')

ids_param = 0.01
cart_param = 0.03
exp_param = 0.03
max_patterns = 10

drive_result_path = '/content/drive/My Drive/Python Projects/POEM Pipeline Results/' + model_name + '_' + dataset_name
if old_process:
    drive_result_path += '_old'

dataset_file = 'dataset.zip'
drive_dataset_dir = drive_result_path + '/' + dataset_file
!cp "$drive_dataset_dir" '.'
!unzip -qq -n $dataset_file -d '.'

concepts_file = 'image_concepts.csv'
ids_patterns_file = 'ids_patterns' + ('_' + str(ids_param) if (ids_param != None) else '') + '.csv'
cart_patterns_file = 'cart_patterns' + ('_' + str(cart_param) if (cart_param != None) else '') + '.csv'
exp_patterns_file = 'exp_patterns' + ('_' + str(exp_param) if (exp_param != None) else '') + '.csv'
activation_images_file = 'activation_images.zip'

concepts_path = drive_result_path + '/' + concepts_file
ids_patterns_path = drive_result_path + '/' + ids_patterns_file
cart_patterns_path = drive_result_path + '/' + cart_patterns_file
exp_patterns_path = drive_result_path + '/' + exp_patterns_file
activation_images_path = drive_result_path + '/' + activation_images_file

!cp "$concepts_path" '.'
!cp "$ids_patterns_path" '.'
!cp "$cart_patterns_path" '.'
!cp "$exp_patterns_path" '.'
!cp "$activation_images_path" '.'

!unzip -qq -n $activation_images_file -d '.'
activations_path = 'activation_images'

In [None]:
meta_cols = ['index', 'pred', 'support', 'confidence', 'accuracy', 'method', 'score']
#image_concepts, ids_patterns, cart_patterns, exp_patterns = load_data(concepts_path, ids_patterns_path, cart_patterns_path, exp_patterns_path)
all_patterns, image_concepts, concept_cols = load_patterns()

# Use a copy of the patterns dataframe for display:
all_patterns_df = all_patterns.copy(deep=True)
all_patterns_df.set_index('index', inplace=True)
all_patterns_df['pred'] = all_patterns_df['pred'].apply(lambda p: class_titles[p])

# For test:
all_patterns_df['support'] = all_patterns_df['support'].apply(lambda p: p if p >= 0.03 else 0.03)

renamed_cols = {}
for con in concept_cols:
    all_patterns_df[con] = all_patterns_df[con].apply(lambda v: get_feature_value_desc(v))
    if '-' in con:
        i = con.rfind('-')
        renamed_cols[con] = con[:i]

#renamed_cols = {}
for col in meta_cols:
    renamed_cols[col] = col.upper()
    if col is 'pred':
        renamed_cols[col] = 'PREDICTION'

all_patterns_df.rename(columns=renamed_cols, inplace=True)
all_patterns_df = all_patterns_df.rename_axis(None)

display(all_patterns_df)

In [None]:

pattern_index = 5
#pattern = all_patterns.iloc[3]
pattern = all_patterns.loc[all_patterns['index'] == pattern_index].iloc[0]

display_images_matching_pattern(pattern, image_concepts)


In [None]:

display_images_matching_pattern(pattern, image_concepts, activations_path, target_concept='blue-c')


In [None]:

display_images_supporting_pattern_not_matching(pattern, image_concepts)


In [None]:

display_images_supporting_pattern_not_matching(pattern, image_concepts, activations_path, target_concept='red')


In [None]:

display_images_matching_pattern_wrong_predicted(pattern, image_concepts)


In [None]:

display_images_matching_pattern_wrong_predicted(pattern, image_concepts, activations_path, target_concept='shelf')


In [None]:

img_concepts = image_concepts[image_concepts['id'] == 25].iloc[0]

display_image_activated_concepts(img_concepts, activations_path)


In [None]:

full_image_concepts = pd.read_csv(concepts_path)
target_concept = 'red-c'
display_images_having_concept(full_image_concepts, target_concept, activations_path)


In [None]:

# Test visualizations: 

image_concepts_list = [
    {'sea': 1, 'pred': 0, 'label': 0, 'file': '00003622.png', 'path': '00003622.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00000404.png', 'path': '00000404.png'},
    {'sea': 1, 'pred': 0, 'label': 1, 'file': '00000886.png', 'path': '00000886.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00001312.png', 'path': '00001312.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00001933.png', 'path': '00001933.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00002914.png', 'path': '00002914.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00003013.png', 'path': '00003013.png'},
    {'sea': 1, 'pred': 1, 'label': 1, 'file': '00004180.png', 'path': '00004180.png'},
]

image_concepts = pd.DataFrame(image_concepts_list)
target_concept = 'sea'
target_channel = 144

display_single_images(image_concepts, target_concept, target_channel)
