In [None]:
#https://github.com/kushalvyas/Bag-of-Visual-Words-Python/blob/master/Bag.py
'''
Trevor Little & Steven Taylor
'''
import time
import cv2
import numpy as np 
from glob import glob 
import sys
import os
import argparse
from matplotlib import pyplot as plt 
import numpy as np 
from matplotlib import pyplot as plt 
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs 
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
# create a dataset sample space that will be used
# to test KMeans. Use function : make_blobs
# 
def conversion(s):
    m = s // 60
    s = s % 60
    h = m // 60
    m = m % 60
    print("Total Run Time= {0}:{1}:{2}".format(int(h),int(m),s))
    
class ImageHelpers:
	def __init__(self):
		self.sift_object = cv2.SIFT_create()
	def gray(self, image):
		gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
		return gray

	def features(self, image):
		keypoints, descriptors = self.sift_object.detectAndCompute(image, None)
		return [keypoints, descriptors]


class BOVHelpers:
	def __init__(self, n_clusters = 75):
		self.n_clusters = n_clusters
		self.kmeans_obj = KMeans(n_clusters = n_clusters)
		self.kmeans_ret = None
		self.descriptor_vstack = None
		self.mega_histogram = None
		self.clf  = SVC()	

	def cluster(self):
		"""	
		cluster using KMeans algorithm, 
		"""
		self.kmeans_ret = self.kmeans_obj.fit_predict(self.descriptor_vstack)

	def developVocabulary(self,n_images, descriptor_list, kmeans_ret = None):
		
		"""
		Each cluster denotes a particular visual word 
		Every image can be represeted as a combination of multiple 
		visual words. The best method is to generate a sparse histogram
		that contains the frequency of occurence of each visual word 
		Thus the vocabulary comprises of a set of histograms of encompassing
		all descriptions for all images
		"""
		self.mega_histogram = np.array([np.zeros(self.n_clusters) for i in range(n_images)])
		old_count = 0
		for i in range(n_images):
			l = len(descriptor_list[i])  
			for j in range(l):
				if kmeans_ret is None:
					idx = self.kmeans_ret[old_count+j]
				else:
					idx = kmeans_ret[old_count+j]
				self.mega_histogram[i][idx] += 1
			old_count += l
		print ("Vocabulary Histogram Generated")

	def standardize(self, std=None):
		"""
		
		standardize is required to normalize the distribution
		wrt sample size and features. If not normalized, the classifier may become
		biased due to steep variances.
		"""
		if std is None:
			self.scale = StandardScaler().fit(self.mega_histogram)
			self.mega_histogram = self.scale.transform(self.mega_histogram)
		else:
			print ("STD not none. External STD supplied")
			self.mega_histogram = std.transform(self.mega_histogram)

	def formatND(self, l):
		"""	
		restructures list into vstack array of shape
		M samples x N features for sklearn
		"""
		vStack = np.array(l[0])
		for remaining in l[1:]:
			vStack = np.vstack((vStack, remaining))
		self.descriptor_vstack = vStack.copy()
		return vStack

	def train(self, train_labels):
		"""
		uses sklearn.svm.SVC classifier (SVM) 
		"""
		print ("Training SVM")
		print (self.clf)
		self.clf.fit(self.mega_histogram, train_labels)
		print ("Training completed")

	def predict(self, iplist):
		predictions = self.clf.predict(iplist)
		return predictions

	def plotHist(self, vocabulary = None):
		print ("Plotting histogram")
		if vocabulary is None:
			vocabulary = self.mega_histogram
		print("clusters", self.n_clusters)
		x_scalar = np.arange(self.n_clusters)
		y_scalar = np.array([abs(np.sum(vocabulary[:,h], dtype=np.int32)) for h in range(self.n_clusters)])
		#print (y_scalar)
		plt.bar(x_scalar, y_scalar)
		plt.xlabel("Visual Word Index")
		plt.ylabel("Frequency")
		plt.title("Complete Vocabulary Generated")
		plt.xticks(x_scalar + 0.4, x_scalar)
		plt.show()

class FileHelpers:

    def __init__(self):
        pass

    def getFiles(self, path):
        """
        - returns  a dictionary of all files 
        having key => value as  objectname => image path
        - returns total number of files.
        """

        imlist = {}
        count = 0
        for each in glob(path + "*"):
            x = each.split("\\")[-1]
            word = x.strip()
            imlist[word] = []
            imgDir =  path + word + "\\"
            data_path = os.path.join(imgDir,'*.jpg') 
            files = glob(data_path)
            for imagefile in files:
                im = cv2.imread(imagefile, 0)
                normalized = cv2.normalize(im, None, 0, 255, cv2.NORM_MINMAX)
                imlist[word].append(normalized)
                count +=1 
        print("images loaded in")
        return [imlist, count]

        
        
class BOV:
    def __init__(self, no_clusters):
        self.no_clusters = no_clusters
        self.train_path = None
        self.test_path = None
        self.im_helper = ImageHelpers()
        self.bov_helper = BOVHelpers(no_clusters)
        self.file_helper = FileHelpers()
        self.images = None
        self.trainImageCount = 0
        self.train_labels = np.array([])
        self.name_dict = {}
        self.descriptor_list = []

    def trainModel(self):
        """
        This method contains the entire module 
        required for training the bag of visual words model
        Use of helper functions will be extensive.
        """
        # read file. prepare file lists.
        self.images, self.trainImageCount = self.file_helper.getFiles(self.train_path)
        # extract SIFT Features from each image
        label_count = 0 
        for word, imlist in self.images.items():
            self.name_dict[str(label_count)] = word
            print ("Computing Features for ", word)
            for im in imlist:
                self.train_labels = np.append(self.train_labels, label_count)
                kp, des = self.im_helper.features(im)
                self.descriptor_list.append(des)
            print("completed sift for", word)
            label_count += 1
        start_time = time.time()
        # perform clustering
        print("Computing stack")
        bov_descriptor_stack = self.bov_helper.formatND(self.descriptor_list)
        print("descriptor complete")
        self.bov_helper.cluster()
        end_time = time.time()
        runtime = end_time - start_time
        conversion(runtime)
        print("K clustering done")
        self.bov_helper.developVocabulary(n_images = self.trainImageCount, descriptor_list=self.descriptor_list)
        self.bov_helper.standardize()
        self.bov_helper.train(self.train_labels)


    def recognize(self,test_img, test_image_path=None):
        """ 
        This method recognizes a single image 
        It can be utilized individually as well.
        """
        kp, des = self.im_helper.features(test_img)
        vocab = np.array( [[ 0 for i in range(self.no_clusters)]])
        test_ret = self.bov_helper.kmeans_obj.predict(des)
        for each in test_ret:
            vocab[0][each] += 1

        vocab = self.bov_helper.scale.transform(vocab)
        lb = self.bov_helper.clf.predict(vocab)     
        return lb



    def testModel(self):
        """ 
        This method is to test the trained classifier
        read all images from testing path 
        use BOVHelpers.predict() function to obtain classes of each image
        """
        self.testImages, self.testImageCount = self.file_helper.getFiles(self.test_path)
        predictions = []
        for word, imlist in self.images.items():
            print ("processing " ,word)
            for im in imlist:
                cl = self.recognize(im)
                predictions.append({
                    'image':im,
                    'class':cl,
                    'object_name':self.name_dict[str(int(cl[0]))]
                    })
        pcnt = 0 
        fcnt = 0 
        mcnt = 0 
        ccnt = 0 
        lcnt = 0
        count = 0 
        f = open("Images\outputImg.txt", "a")
        for each in predictions:
            path = self.test_path + each['object_name'] + '\\'
            filenamePrefix = ''
            if each['object_name'] == "car":
                filenamePrefix = 'C'
                cv2.imwrite(path + filenamePrefix +str(ccnt) + each['object_name'] + '.jpg', each['image'])
                ccnt = ccnt + 1 
            elif each['object_name'] == "plane":
                filenamePrefix = 'A'
                cv2.imwrite(path + filenamePrefix +str(pcnt) + each['object_name'] + '.jpg', each['image'])
                pcnt = pcnt + 1
            elif each['object_name'] == "motorcycle":
                filenamePrefix = 'M'
                cv2.imwrite(path + filenamePrefix +str(mcnt) + each['object_name'] + '.jpg', each['image'])
                mcnt = mcnt + 1
            elif each['object_name'] == "face":
                filenamePrefix = 'F'
                cv2.imwrite(path + filenamePrefix +str(fcnt) + each['object_name'] + '.jpg', each['image'])
                fcnt = fcnt + 1
            else:
                filenamePrefix = 'L'
                cv2.imwrite(path + filenamePrefix +str(lcnt) + each['object_name'] + '.jpg', each['image'])
                lcnt = lcnt + 1
            f.write("Image: %s,\n" % str(each['image']))
            f.write("Classified as: %s \n" % each['object_name'])
            plt.imshow(cv2.cvtColor(each['image'], cv2.COLOR_GRAY2RGB))
            plt.title(each['object_name'])
            plt.show()
        f.close()


    def print_vars(self):
        pass


def main():

    train_path = 'Images/minTrain/'
    test_path = 'Images/minTest/' 
    start_time = time.time()
    bov = BOV(no_clusters=75)
    # set training path
    bov.train_path = train_path
    # set output path
    bov.test_path = test_path
    # train the model
    bov.trainModel()
    print("completed training")
    # test model
    bov.testModel()
    end_time = time.time()
    runtime = end_time - start_time
    conversion(runtime)
    
main()

images loaded in
Computing Features for  car
completed sift for car
Computing Features for  face
