In [2]:
%%writefile helpers.py

import pickle5 as pickle
import _pickle as cPickle
import gzip
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import colors
import seaborn as sns
from sklearn.metrics import confusion_matrix
from skimage.transform import resize as sk_resize
from scipy import ndimage
import random
from skimage.transform import rescale, resize, rotate


def save(object, filename, protocol = -1):
    """Save an object to a compressed disk file.
       Works well with huge objects.
    """
    file = gzip.GzipFile(filename, 'wb')
    cPickle.dump(object, file, protocol)
    file.close()

def load(filename):
    """Loads a compressed object from disk
    """
    file = gzip.GzipFile(filename, 'rb')
    object = pickle.load(file)
    file.close()

    return object


def plot_lot(df1, lot, fig_size=(10, 10), col='waferMap', cmap='viridis'):
    """
    Helper function to plot entire lot of wafers from df1.
    Lots must have >= 2 samples.
    
    :param lot: -> str | lotName that will be plotted e.g. 'lot1'
    :param fig_size: -> tuple | size of plot
    :param col: -> str | column that contains waferMap image
    :param cmap: -> str | color scheme to use
    """

    lot_df = df1[df1['lotName'] == lot]
    lot_df.reset_index(inplace=True)

    total_rows = len(lot_df.index)
    ax_cnt = 5
    
    print(f'{lot}')

    fig, axs = plt.subplots(ax_cnt, ax_cnt, figsize=fig_size)
    fig.tight_layout()

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(25):
        if n_row < total_rows:
            img = lot_df[col][n_row]
            index = lot_df["index"][n_row]
            ftype = lot_df.failureType[n_row]
                
        else:
            img = np.zeros_like(lot_df[col][0])
            index = ''
            ftype = ''

        # imshow to plot image in axs i,j location in plot
        i = n_row % ax_cnt
        j = int(n_row/ax_cnt)
        axs[i, j].imshow(img,
                         interpolation='none',
                         cmap=cmap)
        axs[i, j].axis('off')

        # label the figure with the index# and defect classification [for future reference]
        axs[i, j].set_title(f'{index}\n{ftype}', fontsize=10)

    plt.show()
    
    
def plot_list(df1, wafer_list, fig_size=(10, 10), col='waferMap', cmap='viridis', mode='index'):
    """
    Helper function to plot a list of indices from df1.
    Lists must have >= 2 samples.
    
    :param wafer_list: -> list | list of indices or ids to be plotted
    :param fig_size: -> tuple | size of plot
    :param col: -> str | column that contains waferMap image
    :param cmap: -> str | color scheme to use
    :param mode: -> str | 'index' or 'id'
    """

    if mode == 'index':
        index_list = wafer_list
    elif mode == 'id':
        index_list = [df1.index[df1.ID == i][0] for i in wafer_list]
    
    list_df = df1.loc[index_list, :]
    list_df.reset_index(inplace=True)

    total_rows = len(list_df.index)
    ax_cnt = int(math.ceil(total_rows**(1/2)))


    fig, axs = plt.subplots(ax_cnt, ax_cnt, figsize=fig_size)
    fig.tight_layout()

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(ax_cnt**2):
        if n_row < total_rows:
            img = list_df[col][n_row]
            index = list_df["ID"][n_row]
            ftype = list_df.failureType[n_row]
                
        else:
            img = np.zeros_like(list_df[col][0])
            index = ''
            ftype = ''

        # imshow to plot image in axs i,j location in plot
        i = n_row % ax_cnt
        j = int(n_row/ax_cnt)
        axs[i, j].imshow(img,
                         interpolation='none',
                         cmap=cmap)
        axs[i, j].axis('off')

        # label the figure with the index# and defect classification [for future reference]
        axs[i, j].set_title(f'{index}\n{ftype}', fontsize=10)

    plt.show()

    
def defect_distribution(data, note='', mode='classify'):
    """Helper function to visualize distribution of defects
       :param mode -> str | classify or detect"""
    
    if mode == 'classify':
        col = 'classifyLabels'
    elif mode == 'detect':
        col = 'detectLabels'
    
    # count how many of each defect is present
    dist = data.groupby(col)[col].count().sort_values()
    y = dist.tolist()
    
    if mode == 'classify':
        fail_dict = {8: 'none', 0: 'Loc', 1: 'Edge-Loc', 2: 'Center', 3: 'Edge-Ring', 
                     4: 'Scratch', 5: 'Random', 6: 'Near-full', 7: 'Donut'}
        indices = dist.index.tolist()
        x = [fail_dict[i] for i in indices]
    elif mode == 'detect':
        x = ['None', 'Defect']
      
    # bar plot
    plt.barh(x, y)
    xlim = math.ceil(max(y)*1.15)
    plt.xlim(0, xlim)
    plt.title(f'Failure Type Distribution\n({note})')

    for index, value in enumerate(y):
        plt.text(value, index,
                 str(value))

    plt.show()
    

def flip_rotate(df, col, defect, classLabel, labels, number, frac=25):
    """Helper function to produce number of new samples
       by randomly flipping and rotating.
       Assumes that all samples are the same class.
       
       :param df -> dataframe | source data
       :param col -> column containing wafer map
       :param defect -> str | failureType value
       :param classLabel -> int | classifyLabel value
       :param labels -> list | list of source data indices
       :param number -> int | number of new samples to generate
       :param frac -> int | out of 100, half the fraction of samples to be flipped
       
       Returns df of new samples"""
    
    new_df = pd.DataFrame()
    
    # how many to flip on direction
    f = math.ceil(random.randint(0, frac) / 100 * number)
    
    # how many to rotate
    r = number - 2*f
    
    # generate new flipped samples
    fliplr_list = random.choices(labels, k=f)
    for i in fliplr_list:
        img = df[col].loc[i]
        new_df = new_df.append({'ID':'A', 'failureType': defect, 'classifyLabels': classLabel, 
                                col: np.fliplr(img)}, ignore_index=True)
    
    flipud_list = random.choices(labels, k=f)
    for i in flipud_list:
        img = df[col].loc[i]
        new_df = new_df.append({'ID':'A', 'failureType': defect, 'classifyLabels': classLabel, 
                                col: np.flipud(img)}, ignore_index=True)
    
    # generate new rotated samples
    rotate_list = random.choices(labels, k=r)
    for i in rotate_list:
        img = df[col].loc[i]
        theta = random.randint(1, 359)
        new_df = new_df.append({'ID':'A', 'failureType': defect, 'classifyLabels': classLabel, 
                                col: rotate(img, theta)}, ignore_index=True)
    
    return new_df


def plot_confusion_matrix(y_test, y_pred, mode='classify', normalize=True, figsize=(7,5)):
    """Helper function for plotting confusion matrix of model results
       Modes: detect, classify, all
       For all, assumes that none is labeled as 8"""
    
    if mode == 'classify':
        defects = ['L', 'EL', 'C', 'ER', 'S', 'R', 'NF', 'D']
    elif mode == 'detect':
        defects = ['None', 'Defect']
    elif mode == 'all':
        defects = ['L', 'EL', 'C', 'ER', 'S', 'R', 'NF', 'D', 'N']
    
    fig, ax = plt.subplots(figsize=figsize)
    
    if normalize:
        cm = confusion_matrix(y_test, y_pred, normalize='true')
        f = sns.heatmap(cm, annot=True, xticklabels=defects, yticklabels=defects)
    
    else:
        cm = confusion_matrix(y_test, y_pred, normalize=None)
        f = sns.heatmap(cm, annot=True, xticklabels=defects, yticklabels=defects, fmt='d')
        
    f.set(xlabel='Predicted Label', ylabel='True Label')

    
def visualize_misclassified(test_data, y_test, y_pred, true_label, pred_label, n, 
                            figsize=(10, 10), col='waferMap', cmap='viridis'):
    """Helper function that visualizes a random n samples
       that are mispredicted as pred_label.
       Uses helper function plot_list to visualize samples.
       
       :param true_label -> int | true label of the sample
       :param pred_label -> int | label that the model mistakenly predicted
       :param n -> int | number of samples to visualize"""
    
    # collect indices
    mistakes = [i for i in range(len(y_test)) if (y_test[i] == true_label and y_pred[i] == pred_label)]
    
    # take a random n samples
    if n > len(mistakes):
        random_n = random.sample(mistakes, len(mistakes))
    else:
        random_n = random.sample(mistakes, n)
    
    # visualize using plot_list
    plot_list(test_data, random_n, figsize, col, cmap)

Overwriting helpers.py


In [4]:
%%writefile keras_model_s3_wrapper.py

import s3fs
import zipfile
import tempfile
import numpy as np
from tensorflow import keras
from pathlib import Path
import logging
import os

# Source: https://gist.github.com/ramdesh/f00ec1f5d01f03114264e8f3d0c226e8

AWS_ACCESS_KEY=os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY=os.getenv("AWS_SECRET_ACCESS_KEY")
BUCKET_NAME='wafer-capstone'

def get_s3fs():
  return s3fs.S3FileSystem(key=AWS_ACCESS_KEY, secret=AWS_SECRET_KEY)


def zipdir(path, ziph):
  # Zipfile hook to zip up model folders
  length = len(path) # Doing this to get rid of parent folders
  for root, dirs, files in os.walk(path):
    folder = root[length:] # We don't need parent folders! Why in the world does zipfile zip the whole tree??
    for file in files:
      ziph.write(os.path.join(root, file), os.path.join(folder, file))

            
def s3_save_keras_model(model, model_name):
  with tempfile.TemporaryDirectory() as tempdir:
    model.save(f"{tempdir}/{model_name}")
    # Zip it up first
    zipf = zipfile.ZipFile(f"{tempdir}/{model_name}.zip", "w", zipfile.ZIP_STORED)
    zipdir(f"{tempdir}/{model_name}", zipf)
    zipf.close()
    s3fs = get_s3fs()
    s3fs.put(f"{tempdir}/{model_name}.zip", f"{BUCKET_NAME}/models/{model_name}.zip")
    logging.info(f"Saved zipped model at path s3://{BUCKET_NAME}/models/{model_name}.zip")
 

def s3_get_keras_model(model_name: str) -> keras.Model:
  with tempfile.TemporaryDirectory() as tempdir:
    s3fs = get_s3fs()
    # Fetch and save the zip file to the temporary directory
    s3fs.get(f"{BUCKET_NAME}/models/{model_name}.zip", f"{tempdir}/{model_name}.zip")
    # Extract the model zip file within the temporary directory
    with zipfile.ZipFile(f"{tempdir}/{model_name}.zip") as zip_ref:
        zip_ref.extractall(f"{tempdir}/{model_name}")
    # Load the keras model from the temporary directory
    return keras.models.load_model(f"{tempdir}/{model_name}")

import logging
import boto3
from botocore.exceptions import ClientError
import os


def s3_upload_file(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = os.path.basename(file_name)

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

Overwriting keras_model_s3_wrapper.py


In [1]:
%%writefile dashboard.py

# import libraries
import os
import time
import math
import random
import numpy as np
import pandas as pd
import pickle5 as pickle

import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib as matplotlib
import seaborn as sns

from pylab import *
from skimage.transform import resize as sk_resize
from skimage.util import img_as_ubyte


def generate_dashboard_data(saved_data=True, data_path=None, data=None, 
                            saved_predictions=True, predictions_path=None, predictions=None):
    """Helper function that generates dataframe and dictionary
       needed for dashboard of results
       
       :param saved_data: -> bool | whether data being loaded is saved pkl file or a dataframe
       :param data_path: -> str | where the data pkl file is located, if using saved file
       :param data: -> dataframe of data, if not using saved file
       :param saved_predictions: -> bool | whether predictions being loaded is saved pkl file or dataframe
       :param predictions_path: -> str | where the predictions pkl file is located, if using saved file
       :param predictions: -> list of lists containing model predictions (see model pipeline for format)
       
       Ouputs:
       - data dataframe augmented with prediction labels + probabilities 
         and second prediction labels + probabilities
       - dictionary of counts of predicted defective wafers per lot"""
    
    if saved_data:
        # load data
        with open(data_path, "rb") as fh:
            data = pickle.load(fh)
    
    if saved_predictions:
        # load predictions
        with open(predictions_path, "rb") as fh:
            predictions = pickle.load(fh)
    
    # unpack predictions
    defect_ids = predictions[0]
    detect_probs = predictions[1]
    classify_probs = predictions[2]
    labels = predictions[3]
    
    # probabilities for the highest class for each model
    detect_max_prob = [max(x) for x in detect_probs]
    classify_max_prob = [max(x) for x in classify_probs]

    # second highest class for defective sample
    classify_label2 = [x.argsort()[-2] for x in classify_probs]

    # second highest class probability
    classify_max_prob2 = [x[i] for x, i in zip(classify_probs, classify_label2)]
    
    # add columns to dataframe
    data['pred_labels'] = labels

    # add column of probabilities predicted class 
    def add_max_prob(row):
        i = row.name
        if row['pred_labels'] == 8:
            return detect_max_prob[i] * 100
        else:
            j = defect_ids.index(data.ID[i])
            return (detect_max_prob[i] * classify_max_prob[j]) * 100

    data['pred_prob'] = data.apply(lambda row: add_max_prob(row), axis=1)

    # add column for second prediction, if defective
    # if not defective, second prediction = 8
    def second_prediction(row):
        i = row.name
        if row['pred_labels'] == 8:
            return 8
        else:
            j = defect_ids.index(data.ID[i])
            return classify_label2[j]

    data['pred2_labels'] = data.apply(lambda row: second_prediction(row), axis=1)

    # add column of probabilities for second highest class, if defective
    # if not defective, second probability = 0
    def add_second_prob(row):
        i = row.name
        if row['pred_labels'] == 8:
            return 0
        else:
            j = defect_ids.index(data.ID[i])
            return (detect_max_prob[i] * classify_max_prob2[j]) * 100

    data['pred2_prob'] = data.apply(lambda row: add_second_prob(row), axis=1)
    
    print(f'Augmented results dataset shape: {data.shape}')
    
    # count how many defective wafers in each lot
    # list of unique lots
    unique_lots = data.lotName.unique()

    lot_count = {x:0 for x in unique_lots}
    for i in range(len(data)):
        if data.pred_labels[i] != 8:
            lot_count[data.lotName[i]] += 1
    
    print(f'Number of lots in lot count dictionary: {len(lot_count)}')
    
    return data, lot_count


def defect_distribution(data, note='', mode='all', color=None):
    """Helper function to visualize distribution of defects
       :param mode -> str | classify or detect"""
    
    if mode == 'detect':
        data['detectLabels'] = data.pred_labels.apply(lambda x: 0 if x == 8 else 1)
        col = 'detectLabels'
    else:
        col = 'pred_labels'
        if mode == 'classify':
            data = data[data.pred_labels != 8].reset_index(drop=True)    
    
    # count how many of each defect is present
    dist = data.groupby(col)[col].count().sort_values()
    y = dist.tolist()
    
    if mode == 'detect':
        x = ['None', 'Defect']
    else:
        fail_dict = {8: 'none', 0: 'Loc', 1: 'Edge-Loc', 2: 'Center', 3: 'Edge-Ring', 
                 4: 'Scratch', 5: 'Random', 6: 'Near-full', 7: 'Donut'}
        indices = dist.index.tolist()
        x = [fail_dict[i] for i in indices]

    # bar plot
    if color:
        plt.barh(x, y, color=color)
    else:
        plt.barh(x, y)
        
    xlim = math.ceil(max(y)*1.15)
    plt.xlim(0, xlim)
    
    if mode == 'all':
        plt.title(f'Overall Failure Type Distribution\n({note})')
    elif mode == 'classify':
        plt.title(f'Defect Distribution\n({note})')
    elif mode == 'detect':
        plt.title(f'None vs Defect Distribution\n({note})')

    for index, value in enumerate(y):
        plt.text(value, index,
                 str(value))

    plt.show()
    

def visualize_defective_lots(lot_count, cmap='viridis', white=True):
    """Helper function that creates a pie chart based on 
       the number of predicted defective wafers in each lot
       
       :param lot_count: -> dictionary of counts of predicted defective wafers per lot
       :param cmap: -> color scheme for pie chart
       :param white: -> bool | whether the autotext in the pie chart is white or black"""
    
    tiers = ['No Defects', '< 10 Defects', '< 20 Defects', '20+ Defects']
    defect_count = {x:0 for x in tiers}
    for key, value in lot_count.items():
        if value == 0:
            defect_count['No Defects'] += 1
        elif value < 20:
            defect_count['< 20 Defects'] += 1
        elif value >= 20:
            defect_count['20+ Defects'] += 1

    # Pie chart, where the slices will be ordered and plotted counter-clockwise:
    labels = [x for x in defect_count.keys() if defect_count[x] > 0]
    sizes = [defect_count[x] for x in defect_count.keys() if defect_count[x] > 0]

    fig1, ax1 = plt.subplots()
    theme = plt.get_cmap(cmap)
    ax1.set_prop_cycle("color", [theme(1. * i / len(sizes)) for i in range(len(sizes))])
    total = sum(sizes)
    patches, texts, autotexts = ax1.pie(sizes, labels=labels, startangle=90,
                                        autopct=lambda p: '{:.0f}'.format(p * total / 100))
    # [text.set_color('red') for text in texts]
    # texts[0].set_color('blue')
    if white:
        [autotext.set_color('white') for autotext in autotexts]
    ax1.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
    
    plt.suptitle('Lot Distribution')

    plt.show()
    
    
def plot_lot(df1, lot, fig_size=(10, 10), col='waferMap', cmap_img='gray_r', box_color='gray',
             resize=False, img_dims=[224,224], pct_color=True, cmap_pct='magma_r', binary=False):
    """
    Helper function to plot entire lot of wafers from df1.
    Lots must have >= 2 samples.
    
    :param lot: -> str | lotName that will be plotted e.g. 'lot1'
    :param fig_size: -> tuple | size of plot
    :param col: -> str | column that contains waferMap image
    :param cmap_img: -> str | color scheme to use on image plot
    :param box_color: -> str | color of box identifying defective wafers
    :param resize: -> bool | whether or not to apply resize to figure
    :param img_dims: -> resize dimensions
    :param pct_color: -> bool | whether or not to change the font color of the labels based on probability
    :param cmap_pct: -> str | color scheme to use on font color, if changing based on probability
    :param binary: -> bool | true if thinned map
    """
    
    lot_df = df1[df1['lotName'] == lot]
    lot_df.set_index('waferIndex', inplace=True)

    total_rows = 25
    ax_cnt = 5
    
    print(f'{lot}')
    
    fail_dict = {8:'None', 0:'Loc', 1:'Edge-Loc', 2:'Center', 3:'Edge-Ring', 
             4:'Scratch', 5:'Random', 6:'Near-full', 7:'Donut'}

    fig, axs = plt.subplots(5, 5, figsize=fig_size)
    fig.tight_layout()

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(25):

        img = lot_df[col][n_row+1]
        if resize:
            img = img_as_ubyte(sk_resize(img, img_dims, anti_aliasing=True))
        index = lot_df["ID"][n_row+1]
        ftype = fail_dict[lot_df.pred_labels[n_row+1]]
        pct = lot_df.pred_prob[n_row+1]

        # imshow to plot image in axs i,j location in plot
        j = n_row % 5
        i = int(n_row/5)
        if binary:
            axs[i, j].imshow(img,
                             interpolation='none',
                             cmap=cmap_img,
                             vmin=0, vmax=1)
        else:
            axs[i, j].imshow(img,
                             interpolation='none',
                             cmap=cmap_img,
                             vmin=0, vmax=2)
        axs[i, j].axis('off')
        
        if ftype != 'None':
            autoAxis = axs[i, j].axis()
            rec = Rectangle((autoAxis[0],autoAxis[2]),
                            (autoAxis[1]-autoAxis[0]),
                            (autoAxis[3]-autoAxis[2]),
                            fill=False, lw=1, color=box_color)
            rec = axs[i, j].add_patch(rec)
            rec.set_clip_on(False)

        # label the figure with the index# and defect classification 
        # change font color based on probability
        
        def color_map_color(value, cmap_name=cmap_pct, vmin=0, vmax=100):
            norm = matplotlib.colors.Normalize(vmin=vmin, vmax=vmax)
            cmap = cm.get_cmap(cmap_name)  # PiYG
            rgb = cmap(norm(abs(value)))[:3]  # will return rgba, we take only first 3 so we get rgb
            color = matplotlib.colors.rgb2hex(rgb)
            return color
        
        if pct_color:
            color = color_map_color(pct)
            axs[i, j].set_title(f'{index}: {ftype}\n{pct:.2f}%', fontsize=12, 
                                fontweight="bold", color=color)
        else:
            axs[i, j].set_title(f'{index}: {ftype}\n{pct:.2f}%', fontsize=12, fontweight="bold")

    plt.show()
    
    
def plot_list(df1, wafer_list, fig_size=(10, 10), col='waferMap', cmap_img='gray_r', mode='index', box_color='gray',
              resize=False, img_dims=[224,224], pct_color=True, cmap_pct='magma_r', binary=False):
    """
    Helper function to plot a list of indices from df1.
    Lots must have >= 2 samples.
    
    :param wafer_list: -> list | list of indices or ids to be plotted
    :param fig_size: -> tuple | size of plot
    :param col: -> str | column that contains waferMap image
    :param cmap_img: -> str | color scheme to use on image plot
    :param mode: -> str | 'index' or 'id'
    :param box_color: -> str | color of box identifying defective wafers
    :param resize: -> bool | whether or not to apply resize to figure
    :param img_dims: -> resize dimensions
    :param pct_color: -> bool | whether or not to change the font color of the labels based on probability
    :param cmap_pct: -> str | color scheme to use on font color, if changing based on probability
    :param binary: -> bool | true if thinned map
    """

    if mode == 'index':
        index_list = wafer_list
    elif mode == 'id':
        index_list = [df1.index[df1.ID == i][0] for i in wafer_list]
    
    list_df = df1.loc[index_list, :]
    list_df.reset_index(inplace=True)

    total_rows = len(list_df.index)
    ax_cnt = int(math.ceil(total_rows**(1/2)))
    
    fail_dict = {8:'None', 0:'Loc', 1:'Edge-Loc', 2:'Center', 3:'Edge-Ring', 
             4:'Scratch', 5:'Random', 6:'Near-full', 7:'Donut'}
    
    fig, axs = plt.subplots(ax_cnt, ax_cnt, figsize=fig_size)
    fig.tight_layout()

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(ax_cnt**2):
        if n_row < total_rows:
            img = list_df[col][n_row]
            if resize:
                img = img_as_ubyte(sk_resize(img, img_dims, anti_aliasing=True))
            index = list_df["ID"][n_row]
            ftype = fail_dict[list_df.pred_labels[n_row]]
            pct = list_df.pred_prob[n_row]
                
        else:
            img = np.zeros_like(list_df[col][0])
            index = ''
            ftype = ''
            pct = ''

        # imshow to plot image in axs i,j location in plot
        j = n_row % ax_cnt
        i = int(n_row/ax_cnt)
        if binary:
            axs[i, j].imshow(img,
                             interpolation='none',
                             cmap=cmap_img,
                             vmin=0, vmax=1)
        else:
            axs[i, j].imshow(img,
                             interpolation='none',
                             cmap=cmap_img,
                             vmin=0, vmax=2)
        axs[i, j].axis('off')
        
        if ftype != 'None':
            autoAxis = axs[i, j].axis()
            rec = Rectangle((autoAxis[0], autoAxis[2]),
                            (autoAxis[1]-autoAxis[0]),
                            (autoAxis[3]-autoAxis[2]),
                            fill=False, lw=1, color=box_color)
            rec = axs[i, j].add_patch(rec)
            rec.set_clip_on(False)

        # label the figure with the index# and defect classification 
        # change font color based on probability
        
        def color_map_color(value, cmap_name=cmap_pct, vmin=0, vmax=100):
            norm = matplotlib.colors.Normalize(vmin=vmin, vmax=vmax)
            cmap = cm.get_cmap(cmap_name)  # PiYG
            rgb = cmap(norm(abs(value)))[:3]  # will return rgba, we take only first 3 so we get rgb
            color = matplotlib.colors.rgb2hex(rgb)
            return color
        
        if pct_color:
            color = color_map_color(pct)
            axs[i, j].set_title(f'{index}: {ftype}\n{pct:.2f}%', fontsize=12, 
                                fontweight="bold", color=color)
        else:
            axs[i, j].set_title(f'{index}: {ftype}\n{pct:.2f}%', fontsize=12, fontweight="bold")


    plt.show()
    
    
def plot_lot_probs(df1, lot, cmap='cividis_r'):
    """
    Helper function to plot heatmap of model prediction probabilities for a lot
    
    :param lot: -> str | lotName that will be plotted e.g. 'lot1'
    :param fig_size: -> tuple | size of plot
    :param col: -> str | column that contains waferMap image
    :param cmap: -> str | color scheme to use
    """
       
    lot_df = df1[df1['lotName'] == lot]
    lot_df.set_index('waferIndex', inplace=True)
    
    # collect probabilities into a 5x5 array
    arr1 = np.array([lot_df.pred_prob[i] for i in [1, 2, 3, 4, 5]])
    arr2 = np.array([lot_df.pred_prob[i] for i in [6, 7, 8, 9, 10]])
    arr3 = np.array([lot_df.pred_prob[i] for i in [11, 12, 13, 14, 15]])
    arr4 = np.array([lot_df.pred_prob[i] for i in [16, 17, 18, 19, 20]])
    arr5 = np.array([lot_df.pred_prob[i] for i in [21, 22, 23, 24, 25]])
    probs = np.array([arr1, arr2, arr3, arr4, arr5])
   
    print(f'{lot}')
    
    f = sns.heatmap(probs, annot=True, cmap=cmap,
                    xticklabels=False, yticklabels=False, fmt='.2f')

Overwriting dashboard.py


In [1]:
%%writefile gradcam.py

# import libraries
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, models, losses, optimizers, callbacks, Model

import cv2
import numpy as np

# Source: https://towardsdatascience.com/understand-your-algorithm-with-grad-cam-d3b62fce353

def GradCam(model, img, layer_name, eps=1e-8):
    '''
    Creates a grad-cam heatmap given a model and a layer name contained with that model
    

    Args:
      model: tf model
      img: (img_width x img_width) numpy array
      layer_name: str


    Returns 
      uint8 numpy array with shape (img_height, img_width)

    '''

    img_array = np.expand_dims(img, axis=0)
    
    gradModel = Model(inputs=[model.inputs],
                      outputs=[model.get_layer(layer_name).output, model.output])
    
    with tf.GradientTape() as tape:
        # cast the image tensor to a float-32 data type, pass the
        # image through the gradient model, and grab the loss
        # associated with the specific class index
        inputs = tf.cast(img_array, tf.float32)
        (convOutputs, predictions) = gradModel(inputs)
        loss = predictions[:, 0]
    
    # use automatic differentiation to compute the gradients
    grads = tape.gradient(loss, convOutputs)
    
    # compute the guided gradients
    castConvOutputs = tf.cast(convOutputs > 0, "float32")
    castGrads = tf.cast(grads > 0, "float32")
    guidedGrads = castConvOutputs * castGrads * grads
    
    # the convolution and guided gradients have a batch dimension
    # (which we don't need) so let's grab the volume itself and
    # discard the batch
    convOutputs = convOutputs[0]
    guidedGrads = guidedGrads[0]
    
    # compute the average of the gradient values, and using them
    # as weights, compute the ponderation of the filters with
    # respect to the weights
    weights = tf.reduce_mean(guidedGrads, axis=(0, 1))
    cam = tf.reduce_sum(tf.multiply(weights, convOutputs), axis=-1)
  
    # grab the spatial dimensions of the input image and resize
    # the output class activation map to match the input image
    # dimensions
    (w, h) = (img_array.shape[2], img_array.shape[1])
    heatmap = cv2.resize(cam.numpy(), (w, h))

    # normalize the heatmap such that all values lie in the range
    # [0, 1], scale the resulting values to the range [0, 255],
    # and then convert to an unsigned 8-bit integer
    numer = heatmap - np.min(heatmap)
    denom = (heatmap.max() - heatmap.min()) + eps
    heatmap = numer / denom
    
    # heatmap = (heatmap * 255).astype("uint8")
    # return the resulting heatmap to the calling function
    return heatmap


def sigmoid(x, a, b, c):
    return c / (1 + np.exp(-a * (x-b)))


def superimpose(img_bgr, cam, thresh=0.5, emphasize=False, img_wt=1, heatmap_wt=0.75):
    
    '''
    Superimposes a grad-cam heatmap onto an image for model interpretation and visualization.
    *Modified from original function - uses cv2 to superimpose instead
    

    Args:
      img_bgr: (img_width x img_height x 3) numpy array
      cam: grad-cam heatmap, (img_width x img_width) numpy array
      threshold: float
      emphasize: boolean

    Returns 
      uint8 numpy array with shape (img_height, img_width, 3)

    '''
    heatmap = cv2.resize(cam, (img_bgr.shape[1], img_bgr.shape[0]))
    if emphasize:
        heatmap = sigmoid(heatmap, 50, thresh, 1)
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    
    superimposed_img_rgb = cv2.addWeighted(img_bgr, img_wt, heatmap, heatmap_wt, 0)
    
#     hif = 0.8
#     superimposed_img = heatmap * hif + img_bgr
#     superimposed_img = np.minimum(superimposed_img, 255.0).astype(np.uint8)  # scale 0 to 255  
#     superimposed_img_rgb = cv2.cvtColor(superimposed_img, cv2.COLOR_BGR2RGB)
    
    return superimposed_img_rgb

def generate_img_bgr(img, binary=False):
    """Helper function to properly format wafer images for superimposing
       
       Args:
       img: waferMap image, uint8 numpy array with shape (img_height, img_width) and values [0, 2]
       binary: boolean, true if thinned wafermap"""
    
    if binary:
        img2 = np.uint8(img*255)
        img_bgr = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
        return img_bgr
    else:
        img2 = np.uint8(img/2*255)
        img_bgr = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
        imagem = cv2.bitwise_not(img_bgr)
        return imagem

Overwriting gradcam.py


In [3]:
%%writefile eda.py

import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import colors
from skimage.transform import resize as sk_resize
from scipy import ndimage


def plot_lot(df1, lot, fig_size=(10, 10), img_dims=[30, 30], resize=False, 
             filter_size=3, mfilter=False, vmax=2):
    """
    Helper function to plot entire lot of wafers from df1.
    Lots must have >= 2 samples.
    
    :param lot: -> str | lotName that will be plotted e.g. 'lot1'
    :param fig_size: -> list [x,y] pixles to resize the image to
    :param img_dims: -> tuple (x,y) to adjust the overall figure size
    :param resize: -> bool | Resize the image to `img_dims` if True 
    :param filtersize: -> int to set median filter size
    :param mfilter -> bool | apply median filter if True
    :param vmax -> int/float | max pixel value
    """

    lot_df = df1[df1['lotName'] == lot]
    lot_df.reset_index(inplace=True)

    total_rows = len(lot_df.index)
    ax_cnt = 5
    
    print(f'{lot}')

    fig, axs = plt.subplots(ax_cnt, ax_cnt, figsize=fig_size)
    fig.tight_layout()
    
    # make a color map of fixed colors - blue passing die, fuchsia failing die
    cm_xkcd = colors.XKCD_COLORS.copy()
    cmap = colors.ListedColormap(
        [cm_xkcd['xkcd:white'], cm_xkcd['xkcd:azure'], cm_xkcd['xkcd:fuchsia']])

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(25):
        if n_row < total_rows:
            img = lot_df.waferMap[n_row]
            index = lot_df["index"][n_row]
            ftype = lot_df.failureType[n_row]
                
            if resize:
                img = sk_resize(img, img_dims, 
                                order=0, preserve_range=True, anti_aliasing=False)
                
            if mfilter:
                img = ndimage.median_filter(img, size=filter_size)
                
        else:
            img = np.zeros_like(lot_df.waferMap[0])
            index = ''
            ftype = ''

        # imshow to plot image in axs i,j location in plot
        i = n_row % ax_cnt
        j = int(n_row/ax_cnt)
        axs[i, j].imshow(img,
                         interpolation='none',
                         cmap=cmap,
                         vmin=0, vmax=vmax)
        axs[i, j].axis('off')

        # label the figure with the index# and defect classification [for future reference]
        axs[i, j].set_title(f'{index}\n{ftype}', fontsize=10)

    plt.show()
    
    
def plot_list(df1, wafer_list, fig_size=(10, 10), img_dims=[30, 30], resize=False, 
              filter_size=3, mfilter=False, vmax=2):
    """
    Helper function to plot a list of indices from df1.
    List must have length >= 2.
    
    :param lot: -> str | lotName that will be plotted e.g. 'lot1'
    :param fig_size: -> list [x,y] pixles to resize the image to
    :param img_dims: -> tuple (x,y) to adjust the overall figure size
    :param resize: -> bool | Resize the image to `img_dims` if True 
    :param filtersize: -> int to set median filter size
    :param mfilter -> bool | apply median filter if True
    :param vmax -> int/float | max pixel value
    """

    list_df = df1.loc[wafer_list, :]
    list_df.reset_index(inplace=True)

    total_rows = len(list_df.index)
    ax_cnt = int(math.ceil(total_rows**(1/2)))


    fig, axs = plt.subplots(ax_cnt, ax_cnt, figsize=fig_size)
    fig.tight_layout()
    
    # make a color map of fixed colors - blue passing die, fuchsia failing die
    cm_xkcd = colors.XKCD_COLORS.copy()
    cmap = colors.ListedColormap(
        [cm_xkcd['xkcd:white'], cm_xkcd['xkcd:azure'], cm_xkcd['xkcd:fuchsia']])

    # Nested for loops to loop through all digits and number of examples input for plotting
    for n_row in range(ax_cnt**2):
        if n_row < total_rows:
            img = list_df.waferMap[n_row]
            index = list_df["index"][n_row]
            ftype = list_df.failureType[n_row]
                
            if resize:
                img = sk_resize(img, img_dims, 
                                order=0, preserve_range=True, anti_aliasing=False)
                
            if mfilter:
                img = ndimage.median_filter(img, size=filter_size)
                
        else:
            img = np.zeros_like(list_df.waferMap[0])
            index = ''
            ftype = ''

        # imshow to plot image in axs i,j location in plot
        i = n_row % ax_cnt
        j = int(n_row/ax_cnt)
        axs[i, j].imshow(img,
                         interpolation='none',
                         cmap=cmap, vmin=0, vmax=vmax)
        axs[i, j].axis('off')

        # label the figure with the index# and defect classification [for future reference]
        axs[i, j].set_title(f'{index}\n{ftype}', fontsize=10)

    plt.show()


def filter_comparison(df, index, filter_size=3, img_dims=[30, 30], resize=False, vmax=2):
    """Helper function for looking at effect of median filter on one wafer map"""

    print(f"{df['lotName'].loc[index]}")
    print(f"{df['failureType'].loc[index]}")
    
    
    fig = plt.figure()

    # make a color map of fixed colors - blue passing die, fuchsia failing die
    cm_xkcd = colors.XKCD_COLORS.copy()
    #cmap = colors.ListedColormap(['white', 'blue', 'yellow'])
    cmap = colors.ListedColormap(
            [cm_xkcd['xkcd:white'], cm_xkcd['xkcd:azure'], cm_xkcd['xkcd:fuchsia']])

    ax1 = fig.add_subplot(121)  # left side
    ax2 = fig.add_subplot(122)  # right side
    
    ex = df['waferMap'].loc[index]
    
    if resize:
        ex = sk_resize(ex, img_dims, 
                        order=0, preserve_range=True, anti_aliasing=False)
        img = sk_resize(img, img_dims, 
                        order=0, preserve_range=True, anti_aliasing=False)
        
    img = ndimage.median_filter(ex, size=filter_size)
        
    ax1.imshow(ex, cmap=cmap, vmin=0, vmax=vmax)
    ax1.set_axis_off()
    ax1.set_title('Original')
    ax2.imshow(img, cmap=cmap, vmin=0, vmax=vmax)
    ax2.set_axis_off()
    ax2.set_title('Filtered')
    
    plt.show()

Overwriting eda.py
