<a href="https://colab.research.google.com/github/ruthbrennankk/scalable_group_project/blob/master/Copy_of_captcha_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Imports
import os
import cv2
import numpy as np
import string
import random
import argparse
import tensorflow as tf
import tensorflow.keras as keras
import scipy.ndimage
import random
import captcha.image

from PIL import Image
import tflite_runtime.interpreter as tflite
import shutil
import pandas as pd


In [None]:
!pip3 install --index-url https://google-coral.github.io/py-repo/ tflite_runtime
!pip install captcha
!pip install stsci.ndimage

Looking in indexes: https://google-coral.github.io/py-repo/
Collecting tflite_runtime
  Downloading https://github.com/google-coral/pycoral/releases/download/v2.0.0/tflite_runtime-2.5.0.post1-cp37-cp37m-linux_x86_64.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 5.4 MB/s 
Installing collected packages: tflite-runtime
Successfully installed tflite-runtime-2.5.0.post1
Collecting captcha
  Downloading captcha-0.3-py3-none-any.whl (101 kB)
[K     |████████████████████████████████| 101 kB 4.1 MB/s 
Installing collected packages: captcha
Successfully installed captcha-0.3
Collecting stsci.ndimage
  Downloading stsci.ndimage-0.10.3.tar.gz (99 kB)
[K     |████████████████████████████████| 99 kB 3.6 MB/s 
Building wheels for collected packages: stsci.ndimage
  Building wheel for stsci.ndimage (setup.py) ... [?25l[?25hdone
  Created wheel for stsci.ndimage: filename=stsci.ndimage-0.10.3-cp37-cp37m-linux_x86_64.whl size=235557 sha256=8d06effac15e70b308d1a5760d54e6e09add781216d

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

In [None]:
captcha_symbols="0123456789ceghijknpoqsuvwxyzABCDFJKMPQRSTUVWXYZ#[]+:></%{}\-|_ "  # I didn't see it in mine but potentially '\'

In [None]:
# Preprocess
def preprocess(raw_data) :
    return smallPreprocess(raw_data)

def erase_circles(img, circles):
    circles = circles[0]  # hough circles returns a nested list for some reason
    for circle in circles:
        center = (round(circle[0]),round(circle[1]))
        img = cv2.circle(img, center, radius=round(circle[2]), color=255, thickness=1)  # erase circle by making it white (to match the image background)
    return img

def detect_and_remove_circles(img):
    hough_circle_locations = cv2.HoughCircles(img, method=cv2.HOUGH_GRADIENT, dp=1, minDist=1, param1=50, param2=5, minRadius=0, maxRadius=2)
    if hough_circle_locations is not None:
        img = erase_circles(img, hough_circle_locations)
    return img

def smallPreprocess(raw_data):
    img = cv2.cvtColor(raw_data, cv2.COLOR_BGR2GRAY) # Gray Image
    img = cv2.erode(img, np.ones((2, 2), np.uint8), iterations=1)  # dilate image to initial stage (erode works similar to dilate because we thresholded the image the opposite way)
    img = cv2.dilate(img, np.ones((3, 3), np.uint8), iterations=1)  # erode just a bit to polish fine details
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) # Back to Colour for channel
    image = np.array(img) / 255.0
    (c, h, w) = image.shape
    image = image.reshape([-1, c, h, w])
    return image

def bigPreprocess(raw_data) :
    #   Back to Black
    img = cv2.cvtColor(raw_data, cv2.COLOR_BGR2GRAY)
    # run some basic tests to get rid of easy-to-remove noise -- first pass
    img = ~img  # white letters, black background
    img = cv2.erode(img, np.ones((2, 2), np.uint8), iterations=1)  # weaken circle noise and line noise
    img = ~img  # black letters, white background
    img = scipy.ndimage.median_filter(img, (5, 1))  # remove line noise
    img = scipy.ndimage.median_filter(img, (1, 3))  # weaken circle noise
    img = cv2.erode(img, np.ones((2, 2), np.uint8), iterations=1)  # dilate image to initial stage (erode works similar to dilate because we thresholded the image the opposite way)
    img = scipy.ndimage.median_filter(img, (3, 3))  # remove any final 'weak' noise that might be present (line or circle)
    # detect any remaining circle noise
    img = detect_and_remove_circles(img)  # after dilation, if concrete circles exist, use hough transform to remove them
    # eradicate any final noise that wasn't removed previously -- second pass
    img = cv2.dilate(img, np.ones((3, 3), np.uint8), iterations=1)  # actually performs erosion
    img = scipy.ndimage.median_filter(img, (5, 1))  # finally completely remove any extra noise that remains
    img = cv2.erode(img, np.ones((3, 3), np.uint8), iterations=2)  # dilate image to make it look like the original
    img = cv2.dilate(img, np.ones((3, 3), np.uint8), iterations=1)  # erode just a bit to polish fine details
    #Edge Detection
    # img = cv2.Canny(img, 100, 200)
    # Back to Colour
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    # Format - (different for pi)
    image = np.array(img) / 255.0
    (c, h, w) = image.shape
    return image.reshape([-1, c, h, w])


In [None]:
# Train
#!/usr/bin/env python3

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Build a Keras model given some parameters
def create_model(captcha_length, captcha_num_symbols, input_shape, model_depth=5, module_size=2):
  input_tensor = keras.Input(input_shape)
  x = input_tensor
  for i, module_length in enumerate([module_size] * model_depth): 
      for j in range(module_length):
          x = keras.layers.Conv2D(32*2**min(i, 3), kernel_size=3, padding='same', kernel_initializer='he_uniform')(x)
          x = keras.layers.BatchNormalization()(x)
          x = keras.layers.Activation('relu')(x)
      x = keras.layers.MaxPooling2D(2)(x)

  x = keras.layers.Flatten()(x)
  x = [keras.layers.Dense(captcha_num_symbols, activation='softmax', name='char_%d'%(i+1))(x) for i in range(captcha_length)]
  model = keras.Model(inputs=input_tensor, outputs=x)

  return model

# A Sequence represents a dataset for training in Keras
# In this case, we have a folder full of images
# Elements of a Sequence are *batches* of images, of some size batch_size
class ImageSequence(keras.utils.Sequence):
    def __init__(self, directory_name, batch_size, label_file, captcha_symbols, captcha_width, captcha_height):
        self.directory_name = directory_name
        self.batch_size = batch_size
        self.captcha_length = 6
        self.captcha_symbols = captcha_symbols
        self.captcha_width = captcha_width
        self.captcha_height = captcha_height
        self.label_file = label_file
        file_list = os.listdir(self.directory_name)
        with open(self.label_file,"+r") as f:
          labelList = f.readlines()
          self.files = dict(zip(map(lambda x:x.split(",")[0],labelList),map(lambda x:x.split(",")[1].strip("\n"),labelList)))
        self.used_files = []
        self.count = len(file_list)
    def __len__(self):
        return int(np.floor(self.count / self.batch_size))

    def __getitem__(self, idx): 
        X = np.zeros((self.batch_size, self.captcha_height, self.captcha_width, 1), dtype=np.float32)
        y = [np.zeros((self.batch_size, len(self.captcha_symbols)), dtype=np.uint8) for i in range(self.captcha_length)]

        for i in range(min(self.batch_size, len(self.files))):
            random_image_file = random.choice(list(self.files.keys()))
            random_image_label = self.files[random_image_file]
            # We've used this image now, so we can't repeat it in this iteration
            #self.used_files.append(self.files.pop(random_image_label))

            # We have to scale the input pixel values to the range [0, 1] for
            # Keras so we divide by 255 since the image is 8-bit RGB
            raw_data = cv2.imread(os.path.join(self.directory_name, random_image_file))
            rgb_data = cv2.cvtColor(raw_data, cv2.COLOR_BGR2GRAY)
            ret, thresh1 = cv2.threshold(rgb_data, 120, 255, cv2.THRESH_BINARY + 
                                            cv2.THRESH_OTSU)
            #cv2.imshow('Otsu Threshold', thresh1)
            thresh1 = cv2.bitwise_not(thresh1)
            kernel = np.ones((2,2),np.uint8)
            opening = cv2.morphologyEx(thresh1, cv2.MORPH_OPEN, kernel)
            opening = cv2.medianBlur(opening,3)
            #processed_data = numpy.array(rgb_data) / 255.0
            processed_data=np.array(opening)
            processed_data = processed_data[:, :, np.newaxis]
            X[i] = processed_data
            for j, ch in enumerate(random_image_label):
                y[j][i, :] = 0
                y[j][i, self.captcha_symbols.find(ch)] = 1
        return X, y

def main():
    # inputs
    width = 128
    height = 64
    length = 6 
    symbols = 'symbols.txt'
    batch_size = 32
    epochs = 5
    output_model_name = '/content/drive/MyDrive/MS/Sem1/Scalable Computing/Project2/model04'
    validate_outputFile = '/content/validation_lables.txt'
    validate_dataset = '/content/validation_data/'
    train_dataset = '/content/training_data/'
    train_outputFile = '/content/training_lables.txt'
    captcha_symbols="0123456789ceghijklnpoqstuvwxyzABCDFJKMPQRSTUVWXYZ#[]+:></%{}\-|_ "
    # captcha_symbols = None
    # with open(symbols) as symbols_file:
    #     captcha_symbols = symbols_file.readline()

    with tf.device('/device:GPU:0'):
        model = create_model(length, len(captcha_symbols), (height, width, 1))

        model.compile(loss='categorical_crossentropy',
                      optimizer=keras.optimizers.Adam(1e-3, amsgrad=True),
                      metrics=['accuracy'])

        model.summary()

        training_data = ImageSequence(train_dataset, batch_size,train_outputFile, captcha_symbols, width, height)
        validation_data = ImageSequence(validate_dataset, batch_size, validate_outputFile, captcha_symbols, width, height)

        callbacks = [keras.callbacks.EarlyStopping(patience=3),
                     # keras.callbacks.CSVLogger('log.csv'),
                     keras.callbacks.ModelCheckpoint(output_model_name+'.h5', save_best_only=False)]

        # Save the model architecture to JSON
        with open(output_model_name+".json", "w") as json_file:
            json_file.write(model.to_json())

        try:
            model.fit_generator(generator=training_data,
                                validation_data=validation_data,
                                epochs=epochs,
                                callbacks=callbacks,
                                use_multiprocessing=True)
        except KeyboardInterrupt:
            print('KeyboardInterrupt caught, saving current weights as ' + output_model_name+'_resume.h5')
            model.save_weights(output_model_name+'_resume.h5')

if __name__ == '__main__':
    main()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 64, 128, 1)] 0                                            
__________________________________________________________________________________________________
conv2d_10 (Conv2D)              (None, 64, 128, 32)  320         input_2[0][0]                    
__________________________________________________________________________________________________
batch_normalization_10 (BatchNo (None, 64, 128, 32)  128         conv2d_10[0][0]                  
__________________________________________________________________________________________________
activation_10 (Activation)      (None, 64, 128, 32)  0           batch_normalization_10[0][0]     
____________________________________________________________________________________________



Epoch 1/5
1472/7812 [====>.........................] - ETA: 21:00 - loss: 12.1651 - char_1_loss: 2.5289 - char_2_loss: 2.7786 - char_3_loss: 2.5498 - char_4_loss: 2.0310 - char_5_loss: 1.4476 - char_6_loss: 0.8292 - char_1_accuracy: 0.3658 - char_2_accuracy: 0.3345 - char_3_accuracy: 0.4073 - char_4_accuracy: 0.5376 - char_5_accuracy: 0.6827 - char_6_accuracy: 0.8345

In [None]:
# with open('/content/drive/My Drive/foo.txt', '+w') as f:
#   f.write('Hello Google Drive!')
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls /content/

drive	     training_data	  validation_data
sample_data  training_lables.txt  validation_lables.txt


In [None]:
from google.colab import files
files.download('/content/model02')

FileNotFoundError: ignored

In [None]:
raw_data = cv2.imread(os.path.join('/content/training_data/', 'img_1209.png'))
print(raw_data)

[[[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [239 251 254]
  [239 251 254]
  [239 251 254]]

 [[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [239 251 254]
  [239 251 254]
  [239 251 254]]

 [[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [239 251 254]
  [239 251 254]
  [239 251 254]]

 ...

 [[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [226 235 237]
  [133 124 116]
  [ 66  45  29]]

 [[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [226 235 237]
  [199 203 202]
  [ 66  45  29]]

 [[239 251 254]
  [239 251 254]
  [239 251 254]
  ...
  [239 251 254]
  [239 251 254]
  [239 251 254]]]


In [None]:
# Generate
#!/usr/bin/env python3
def main():
    width = 128
    height = 64
    length = 6
    count = 250000 #100 #140800 #128000 - 40m
    # output_dir = '/content/drive/MyDrive/Year V/Scalable Computing/Practical 2/test_set' #test set
    # output_dir = '/content/drive/MyDrive/Year V/Scalable Computing/Practical 2/train_set' #training set
    outputFile = '/content/training_lables.txt'
    output_dir = '/content/training_data/' #validation set
    # Appedning symbol set with \ and -
    # 15.21 - time
    captcha_symbols="0123456789ceghijklnpoqstuvwxyzABCDFJKMPQRSTUVWXYZ#[]+:></%{}\-|_" # I didn't see it in mine but potentially '\'    # symbols = '/content/drive/MyDrive/Year V/Scalable Computing/Practical 2/symbols.txt'
    # for x in range(0, 9):
    #   captcha_symbols = captcha_symbols + ' '

    captcha_generator = captcha.image.ImageCaptcha(width=width, height=height)

    # symbols_file = open(symbols, 'r')
    # captcha_symbols = symbols_file.readline().strip()
    # symbols_file.close()

    print("Generating captchas with symbol set {" + captcha_symbols + "}")

    if not os.path.exists(output_dir):
        print("Creating output directory " + output_dir)
        os.makedirs(output_dir)

    filenames = []
    index = 0
  
    for i in range(count):
        caplength = np.random.randint(1,7)
        random_str = ''.join([random.choice(captcha_symbols) for j in range(caplength)])
        img_name = 'img_'+str(i)+'.png'
        image_path = os.path.join(output_dir, img_name)
        if os.path.exists(image_path):
            version = 1
            while os.path.exists(os.path.join(output_dir, random_str + '_' + str(version) + '.png')):
                version += 1
            image_path = os.path.join(output_dir, random_str + '_' + str(version) + '.png')

        image = np.array(captcha_generator.generate_image(random_str))
        if(caplength < 6 or len(random_str) < 6):
              random_str = random_str.ljust(6,' ')
        cv2.imwrite(image_path, image)
        with open(outputFile,'a+') as f:
            f.write(img_name + "," + random_str + "\n")

    # for i in range(count):
    #     random_str = ''.join([random.choice(captcha_symbols) for j in range(length)])

    #     filenames.append(random_str)

    #     image_path = os.path.join(output_dir, str(index)+'.png')
        
    #     if os.path.exists(image_path):
    #         version = 1
    #         while os.path.exists(os.path.join(output_dir, str(index) + '_' + str(version) + '.png')):
    #             version += 1
    #         image_path = os.path.join(output_dir, str(index) + '_' + str(version) + '.png')

    #     image = np.array(captcha_generator.generate_image(random_str))
    #     cv2.imwrite(image_path, image)
    #     index = index + 1

    # pd.DataFrame(filenames).to_csv("/content/drive/MyDrive/Year V/Scalable Computing/Practical 2/validation_set_names.csv")

if __name__ == '__main__':
    main()

Generating captchas with symbol set {0123456789ceghijklnpoqstuvwxyzABCDFJKMPQRSTUVWXYZ#[]+:></%{}\-|_}
Creating output directory /content/training_data/


In [None]:
#@title
# Move Files
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

# base = "/content/drive/MyDrive/Year V/Scalable Computing/Practical 1/"
base = '/content/'
base_dir = '/content/drive/MyDrive/MS/Sem1/Scalable Computing/Project2/training_data'
# files setup
file_list = os.listdir(base_dir)
files = dict(zip(map(lambda x: x.split('.')[0], file_list), file_list))
print('base len = ' + str(len(files)))

# Train Setup
train_dir = "train_data"
# os.mkdir(base + train_dir)

# Val Setup
val_dir = "val_data"
val_size = 12800
# os.mkdir(base + val_dir)

#   Addressing Files
baseTake = base_dir
# baseTake = base + base_dir
basePutTrain = base + train_dir
basePutVal = base + val_dir

#   Val Takes
for i in range(val_size):
    random_image_label = random.choice(list(files.keys()))
    random_image_label_file = random_image_label + '.png'
    os.replace(os.path.join(baseTake, random_image_label_file), os.path.join(basePutVal, random_image_label_file))
    files.pop(random_image_label)

#   Train Takes
for x in files:
    # x = random.choice(list(files.keys()))
    label = x + '.png'
    os.replace(os.path.join(baseTake, label), os.path.join(basePutTrain, label))

# Check Size - Test
base_list = os.listdir(base_dir)
print('base len = ' + str(len(base_list)))
# Check Size - Train
train_list = os.listdir(train_dir)
print('train_data len = ' + str(len(train_list)))
# Check Size - Val
val_list = os.listdir(val_dir)
print('val_list len = ' + str(len(val_list)))

In [None]:
#!/usr/bin/env python3
# Converter

import tensorflow as tf
import tensorflow.keras as keras

def main(model_name):
    with open(f'/content/{model_name}.json') as f:
        model = f.read()
    model = keras.models.model_from_json(model)
    model.load_weights(f'/content/{model_name}.h5')
    cvt = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite = cvt.convert()
    with open(f'/content/drive/MyDrive/Year V/converted_{model_name}.tflite', 'wb') as f:
        f.write(tflite)

if __name__ == '__main__':
    model_name = 'test'
    main(model_name)





INFO:tensorflow:Assets written to: /tmp/tmpi20stg_4/assets


INFO:tensorflow:Assets written to: /tmp/tmpi20stg_4/assets


In [None]:
#   Classify
#!/usr/bin/env python3

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

def decode(characters, y):
    y = np.argmax(np.array(y), axis=2)[:,0]
    return ''.join([characters[x] for x in y])

def main():
    model_name = '/content/train_model_01'
    captcha_dir = '/content/testers_5_lowercase'
    output = '/content/output.csv'
    symbols = '/content/drive/MyDrive/Year V/Scalable Computing/Practical 1/symbols.txt'

    # symbols_file = open(symbols, 'r')
    # captcha_symbols = symbols_file.readline().strip()
    # symbols_file.close()

    print("Classifying captchas with symbol set {" + captcha_symbols + "}")
    correct = 0
    incorrect = 0

    with tf.device('/cpu:0'):
        with open(output, 'w') as output_file:
            json_file = open(model_name+'.json', 'r')
            loaded_model_json = json_file.read()
            json_file.close()
            model = keras.models.model_from_json(loaded_model_json)
            model.load_weights(model_name+'_resume.h5')
            model.compile(loss='categorical_crossentropy',
                          optimizer=keras.optimizers.Adam(1e-3, amsgrad=True),
                          metrics=['accuracy'])

            for x in os.listdir(captcha_dir):
                # load image and preprocess it
                raw_data = cv2.imread(os.path.join(captcha_dir, x))

                #   Preprocess Image
                image = preprocess(raw_data)

                prediction = model.predict(image)
                predictedAnswer = decode(captcha_symbols, prediction)
                output_file.write(x + ", " + predictedAnswer + "\n")

                print('Classified ' + x + '///' + predictedAnswer)

                answer = x[:-4]
                if (answer == predictedAnswer) :
                    correct = correct + 1
                else :
                    incorrect = incorrect + 1

    print('correct = '+str(correct))
    print('incorrect ='+str(incorrect))


if __name__ == '__main__':
    main()

In [None]:
# Classify - Lite
#!/usr/bin/env python3
import os
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

def decode(characters, y):
    y = np.argmax(np.array(y), axis=1)
    #y = np.argmax(np.array(y), axis=2)[:,0]
    return ''.join([characters[x] for x in y])

def main():
    
    model_name = '/content/converted_test.tflite'
    captcha_dir = '/content/testers_4_characters_no_lowercase'
    output = '/content/converted_4_output.csv'
    symbols = '/content/drive/MyDrive/Year V/Scalable Computing/Practical 1/symbols.txt'

# Pi Addresses
    # model_name = '/users/ugrad/brennar5/captcha_detection/converted_test.tflite'
    # captcha_dir = '/users/ugrad/brennar5/captcha_detection/testers_4'
    # output = '/users/ugrad/brennar5/captcha_detection/converted_4_output.csv'

    # symbols_file = open(symbols, 'r')
    # captcha_symbols = symbols_file.readline().strip()
    # symbols_file.close()

    # captcha_symbols = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'

    print("Classifying captchas with symbol set {" + captcha_symbols + "}")
    correct = 0
    incorrect = 0

    with open(output, 'w') as output_file:
        
        tf_interpreter = tflite.Interpreter(model_name)
        tf_interpreter.allocate_tensors()
        input_tf = tf_interpreter.get_input_details()
        output_tf = tf_interpreter.get_output_details()
        # print(input_tf)
        # print(output_tf)

        files = os.listdir(captcha_dir)
        files = sorted(files)

        for x in files:
            # Load & Preprocess Image - Currently to work with test
            raw_data = cv2.imread(os.path.join(captcha_dir, x))
            rgb_data = cv2.cvtColor(raw_data, cv2.COLOR_BGR2RGB)
            image = np.array(rgb_data, dtype=np.float32) / 255.0
            (c, h, w) = image.shape
            image = image.reshape([-1, c, h, w])
            
            tf_interpreter.set_tensor(input_tf[0]['index'],image)
            tf_interpreter.invoke()
            prediction = []
            for output_node in output_tf:
                prediction.append(tf_interpreter.get_tensor(output_node['index']))
            prediction = np.reshape(prediction,(len(output_tf),-1))
            decoded_symbol = decode(captcha_symbols, prediction)
            output_file.write(x + "," + decoded_symbol + "\n")
            
            print('Classified ' + x + '///' + decoded_symbol)

            answer = x[:-4]
            if (answer == decoded_symbol) :
                correct = correct + 1
            else :
                incorrect = incorrect + 1

    print('correct = '+str(correct))
    print('incorrect ='+str(incorrect))
                
if __name__ == '__main__':
    main()