# This will demenstate how to pull images stored in Kinetica, run Tensorflow inferencing against the images, and store results back into Kinetica

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os.path
import re
import sys
import tarfile

import numpy as np
from six.moves import urllib
import tensorflow as tf

In [2]:
FLAGS = None

# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long


my_collection = "MASTER"
inferenced_table = "caltech_inferenced"
image_count=0
images_table="caltech256"

In [3]:
class NodeLookup(object):
  """Converts integer node ID's to human readable labels."""

  def __init__(self,
               label_lookup_path=None,
               uid_lookup_path=None):
    if not label_lookup_path:
      label_lookup_path = os.path.join(
          '/tmp/imagenet', 'imagenet_2012_challenge_label_map_proto.pbtxt')
    if not uid_lookup_path:
      uid_lookup_path = os.path.join(
          '/tmp/imagenet', 'imagenet_synset_to_human_label_map.txt')
    self.node_lookup = self.load(label_lookup_path, uid_lookup_path)

  def load(self, label_lookup_path, uid_lookup_path):
    """Loads a human readable English name for each softmax node.
    Args:
      label_lookup_path: string UID to integer node ID.
      uid_lookup_path: string UID to human-readable string.
    Returns:
      dict from integer node ID to human-readable string.
    """
    if not tf.gfile.Exists(uid_lookup_path):
      tf.logging.fatal('File does not exist %s', uid_lookup_path)
    if not tf.gfile.Exists(label_lookup_path):
      tf.logging.fatal('File does not exist %s', label_lookup_path)

    # Loads mapping from string UID to human-readable string
    proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
    uid_to_human = {}
    p = re.compile(r'[n\d]*[ \S,]*')
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      uid = parsed_items[0]
      human_string = parsed_items[2]
      uid_to_human[uid] = human_string

    # Loads mapping from string UID to integer node ID.
    node_id_to_uid = {}
    proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
    for line in proto_as_ascii:
      if line.startswith('  target_class:'):
        target_class = int(line.split(': ')[1])
      if line.startswith('  target_class_string:'):
        target_class_string = line.split(': ')[1]
        node_id_to_uid[target_class] = target_class_string[1:-2]

    # Loads the final mapping of integer node ID to human-readable string
    node_id_to_name = {}
    for key, val in node_id_to_uid.items():
      if val not in uid_to_human:
        tf.logging.fatal('Failed to locate: %s', val)
      name = uid_to_human[val]
      node_id_to_name[key] = name

    return node_id_to_name

  def id_to_string(self, node_id):
    if node_id not in self.node_lookup:
      return ''
    return self.node_lookup[node_id]


In [4]:
def create_graph():
  """Creates a graph from saved GraphDef file and returns a saver."""
  # Creates graph from saved graph_def.pb.
  with tf.gfile.FastGFile(os.path.join(
      "/tmp/imagenet", 'classify_image_graph_def.pb'), 'rb') as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name='')


In [5]:
def run_inference_on_image(images,primaryKeys):
  """Runs inference on an image.
  Args:
    image: Image file name.
  Returns:
    Nothing
  """
  import time
  images_data = images



  with tf.Session() as sess:
    encoded_obj_list = []
    # Some useful tensors:
    # 'softmax:0': A tensor containing the normalized prediction across
    #   1000 labels.
    # 'pool_3:0': A tensor containing the next-to-last layer containing 2048
    #   float description of the image.
    # 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
    #   encoding of the image.
    # Runs the softmax tensor by feeding the image_data as input to the graph.
    softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
    

    start = time.time()
    
    for ix in range(len(images_data)):
        predictions = sess.run(softmax_tensor,
                               {'DecodeJpeg/contents:0': images_data[ix]})
        predictions = np.squeeze(predictions)

        # Creates node ID --> English string lookup.
        node_lookup = NodeLookup()

       # top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
        top_k = predictions.argsort()[-5:][::-1]



        for node_id in top_k:
          human_string = node_lookup.id_to_string(node_id)
          score = predictions[node_id]
          splits = human_string.split(',')
          for inference in splits:
            encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primaryKeys[ix], inference, score.item()]).binary_data)
          #print('%s (score = %.5f)' % (human_string, score))   
    
    end = time.time()
    elapsed = end - start
    
    print("Total model run time: "+str(elapsed))
    
    writeToKinetica(encoded_obj_list)
            
    
    #return inference, scores
      #print(score)
      #print('%s (score = %.5f)' % (human_string, score))


In [6]:
def maybe_download_and_extract():
  """Download and extract model tar file."""
  dest_directory = "/tmp/imagenet"
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (
          filename, float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
  tarfile.open(filepath, 'r:gz').extractall(dest_directory)

In [7]:
 maybe_download_and_extract()

## Create table to store inference output

In [8]:
import sys
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/')
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/packages')
import base64
import gpudb
import numpy

gpudb_host = "127.0.0.1"
my_collection = "MASTER"
my_table = "caltech256"
h_db = gpudb.GPUdb(encoding = 'BINARY', host = gpudb_host, port = '9191')

columns = []
columns.append(gpudb.GPUdbRecordColumn("id", gpudb.GPUdbRecordColumn._ColumnType.STRING, [gpudb.GPUdbColumnProperty.CHAR16, gpudb.GPUdbColumnProperty.SHARD_KEY]))
columns.append(gpudb.GPUdbRecordColumn("inference", gpudb.GPUdbRecordColumn._ColumnType.STRING, [gpudb.GPUdbColumnProperty.CHAR64]))
columns.append(gpudb.GPUdbRecordColumn("score", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
# Create the type object
image_inference_record_type = gpudb.GPUdbRecordType(columns, label="image_inference")
print(image_inference_record_type)

""" Create the type in the database and save the type ID, needed to create
    a table in the next step """
image_inference_record_type.create_type(h_db)
image_type_id = image_inference_record_type.type_id


<gpudb.GPUdbRecordType object at 0x7fe3270ca3d0>


In [9]:
h_db.clear_table( table_name = inferenced_table, authorization = '', options = {} )

{'status_info': {u'data_type': u'clear_table_response_avro',
  u'message': u'',
  'response_time': 0.00688,
  u'status': u'OK'},
 u'table_name': u'caltech_inferenced'}

In [10]:
response = h_db.create_table(table_name=inferenced_table, type_id=image_type_id,options = {"collection_name":my_collection})
print(response['status_info']['status'])

OK


In [11]:
def writeToKinetica(encoded_obj_list):
    import time

    start = time.time()

    print("writing")
    response = h_db.insert_records(table_name=inferenced_table, data=encoded_obj_list, list_encoding="binary", options={})
    
    print(response)
    
    end = time.time()
    elapsed = end - start
    
    print("Total write time: "+str(elapsed))

## Find total count for image table

In [12]:
import sys
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/')
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/packages')
import base64
import gpudb
import numpy
import datetime

gpudb_host = "127.0.0.1"
my_collection = "MASTER"
my_table = "caltech256"
h_db = gpudb.GPUdb(encoding = 'BINARY', host = gpudb_host, port = '9191')

response=h_db.aggregate_group_by(table_name=images_table, column_names=['count(*) AS c'], offset=0, limit=10, encoding="binary", options={})
tcount = gpudb.GPUdbRecord.decode_binary_data(response["response_schema_str"], response["binary_encoded_response"])

for k, v in tcount.items():
    if str(k) == 'column_1':
        image_count=v[0]
        print("Total images in "+images_table+": "+str(image_count))
    


Total images in caltech256: 30607


## Loop over images and run inference and output in db/engine

In [13]:
from multiprocessing.dummy import Pool as ThreadPool 
import sys
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/')
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/packages')
import base64
import gpudb
import numpy
import datetime


#pool = ThreadPool(16) 


gpudb_host = "127.0.0.1"
my_collection = "MASTER"
my_table = "caltech256"
h_db = gpudb.GPUdb(encoding = 'BINARY', host = gpudb_host, port = '9191')




# define worker function before a Pool is instantiated
'''def worker(record):
    primary_k=None
    gimage=None
    try:
        encoded_obj_list = []
        
        for k, v in record.items():
            if str(k) == 'id':
                primary_k=v
            if str(k) == 'image':
                gimage=v
        a,b = run_inference_on_image(gimage)

        for ix in range(len(a)):
            encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primary_k, a[ix], b[ix].item()]).binary_data)
        response = h_db.insert_records(table_name=inferenced_table, data=encoded_obj_list, list_encoding="binary", options={})
        #print(response)
    except Exception as e: print(e)  
'''
        
#keep count of number of images processed for while loop
images_processed=0

print("Total number of images to process: "+str(image_count))



while (image_count > images_processed ):

    response = h_db.get_records(table_name='caltech256',offset=images_processed,limit=10000)
    res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
    
    print("Number of images fetched: "+ str(len(res_decoded)))
    
    glist=[]
    pklist=[]
    
    for index in range(len(res_decoded)):
        for k, v in res_decoded[index].items():
            if str(k) == 'id':
                pklist.append(v)
            if str(k) == 'image':
                glist.append(v)

    create_graph()
    run_inference_on_image(glist,pklist)
    
    images_processed+=len(res_decoded)
    print("Number of images processed: "+str(images_processed))
    print("Total remaining images to process: " + str(image_count-images_processed))





Total number of images to process: 30607
Number of images fetched: 10000
Total model run time: 1844.32697392
writing
{u'count_updated': 0, u'record_ids': [], 'status_info': {u'status': u'OK', u'data_type': u'insert_records_response_avro', u'message': u'', 'response_time': 0.12553}, u'count_inserted': 93925}
Total write time: 0.73143696785
Number of images processed: 10000
Total remaining images to process: 20607
Number of images fetched: 10000
Total model run time: 1869.92629504
writing
{u'count_updated': 0, u'record_ids': [], 'status_info': {u'status': u'OK', u'data_type': u'insert_records_response_avro', u'message': u'', 'response_time': 0.11456}, u'count_inserted': 93906}
Total write time: 0.708550930023
Number of images processed: 20000
Total remaining images to process: 10607
Number of images fetched: 10000
Total model run time: 1885.29081082
writing
{u'count_updated': 0, u'record_ids': [], 'status_info': {u'status': u'OK', u'data_type': u'insert_records_response_avro', u'message'

KeyError: 'type_schema'

### Testing..ignore below

In [None]:
'''from multiprocessing.dummy import Pool as ThreadPool 
import sys
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/')
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/packages')
import base64
import gpudb
import numpy
import datetime


pool = ThreadPool(16) 


gpudb_host = "127.0.0.1"
my_collection = "MASTER"
my_table = "caltech256"
h_db = gpudb.GPUdb(encoding = 'BINARY', host = gpudb_host, port = '9191')




# define worker function before a Pool is instantiated
def worker(record):
    primary_k=None
    gimage=None
    try:
        encoded_obj_list = []
        
        for k, v in record.items():
            if str(k) == 'id':
                primary_k=v
            if str(k) == 'image':
                gimage=v
        a,b = run_inference_on_image(gimage)

        for ix in range(len(a)):
            encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primary_k, a[ix], b[ix].item()]).binary_data)
        response = h_db.insert_records(table_name=inferenced_table, data=encoded_obj_list, list_encoding="binary", options={})
        #print(response)
    except Exception as e: print(e)  
        
        

response = h_db.get_records(table_name='caltech256',offset=0,limit=image_count)
res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])

print(len(res_decoded))

encoded_obj_list = []

create_graph()

results = pool.map(worker, res_decoded)


pool.close()
pool.join()

    #for ix in range(len(inference)):
    #    val1 = numpy.asscalar(scores[ix])
    #    encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primary_k, inference[ix], val1]).binary_data)           

'''

In [None]:
'''from multiprocessing.pool import ThreadPool as Pool
import sys
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/')
sys.path.insert(0, '/opt/gpudb/api/python/gpudb/packages')
import base64
import gpudb
import numpy
import datetime


pool_size = 20  # your "parallelness"


gpudb_host = "127.0.0.1"
my_collection = "MASTER"
my_table = "caltech256"
h_db = gpudb.GPUdb(encoding = 'BINARY', host = gpudb_host, port = '9191')




# define worker function before a Pool is instantiated
def worker(record):
    primary_k=None
    gimage=None
    try:
        encoded_obj_list = []
        create_graph()
        
        for k, v in record.items():
            if str(k) == 'id':
                primary_k=v
            if str(k) == 'image':
                gimage=v
        a,b = run_inference_on_image(gimage)

        for ix in range(len(a)):
            encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primary_k, a[ix], b[ix].item()]).binary_data)
        response = h_db.insert_records(table_name=inferenced_table, data=encoded_obj_list, list_encoding="binary", options={})
        #print(response)
    except Exception as e: print(e)  
        
        
        
pool = Pool(pool_size)

print(pool.is_alive())


response = h_db.get_records(table_name='caltech256',offset=0,limit=image_count)
res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])


encoded_obj_list = []


for index in range(image_count):
    pool.apply_async(worker, (res_decoded[index],))
           #run_inference_on_image(gimage)
        


pool.close()
pool.join()

    #for ix in range(len(inference)):
    #    val1 = numpy.asscalar(scores[ix])
    #    encoded_obj_list.append(gpudb.GPUdbRecord(image_inference_record_type, [primary_k, inference[ix], val1]).binary_data)          
    
    '''
