In [70]:
from pyspark.sql import Row, SparkSession
import pyspark.sql.functions
from pyspark.ml.image import ImageSchema
import pandas as pd

In [46]:
spark = SparkSession.builder\
    .master("local[4]")\
    .appName("sql.functions tests")\
    .getOrCreate()
sc = spark.sparkContext

In [47]:
TRAINING_FILE_LOCATION = '/Users/johnmorin/Documents/GitHub/Obj-Det-Tutorial/TensorFlow-Object-Detection-API-Tutorial-Train-Multiple-Objects-Windows-10/images/train/*.jpg'
LABEL_FILE_LOCATION = '/Users/johnmorin/Documents/GitHub/Obj-Det-Tutorial/TensorFlow-Object-Detection-API-Tutorial-Train-Multiple-Objects-Windows-10/images/train_labels.csv'

In [48]:
image_df = spark.read.format('image').load(TRAINING_FILE_LOCATION)

In [49]:
# read in CSV attach
label_files = spark.read.format('csv').option("header", "true").load(LABEL_FILE_LOCATION)

In [50]:
image_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [51]:
label_files.printSchema()

root
 |-- filename: string (nullable = true)
 |-- width: string (nullable = true)
 |-- height: string (nullable = true)
 |-- class: string (nullable = true)
 |-- xmin: string (nullable = true)
 |-- ymin: string (nullable = true)
 |-- xmax: string (nullable = true)
 |-- ymax: string (nullable = true)



In [52]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

In [53]:
image_df = image_df.withColumn('filename', image_df.image.origin.cast("string")[145:300])

In [54]:
image_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- filename: string (nullable = true)



In [55]:
image_df.first().filename

'cam_image26.jpg'

In [56]:
combined_df = image_df.join(label_files, image_df.filename == label_files.filename, 'inner').drop(image_df.filename)

In [57]:
combined_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- filename: string (nullable = true)
 |-- width: string (nullable = true)
 |-- height: string (nullable = true)
 |-- class: string (nullable = true)
 |-- xmin: string (nullable = true)
 |-- ymin: string (nullable = true)
 |-- xmax: string (nullable = true)
 |-- ymax: string (nullable = true)



In [58]:
combined_df.first().filename

'cam_image26.jpg'

In [59]:
def class_text_to_int(row_label):
    if row_label == 'nine':
        return 1
    elif row_label == 'ten':
        return 2
    elif row_label == 'jack':
        return 3
    elif row_label == 'queen':
        return 4
    elif row_label == 'king':
        return 5
    elif row_label == 'ace':
        return 6
    else:
        return None

udf_labels = udf(class_text_to_int, IntegerType())

In [60]:
combined_df = combined_df.withColumn('num_class', udf_labels(combined_df['class']))

In [61]:
combined_df2 = combined_df.withColumn('img_data', combined_df['image'].data)

In [62]:
combined_df2 = combined_df2.withColumn('origin', combined_df['image'].origin)

In [63]:
combined_df2 = combined_df2.drop('image')

In [64]:
combined_df2.printSchema()

root
 |-- filename: string (nullable = true)
 |-- width: string (nullable = true)
 |-- height: string (nullable = true)
 |-- class: string (nullable = true)
 |-- xmin: string (nullable = true)
 |-- ymin: string (nullable = true)
 |-- xmax: string (nullable = true)
 |-- ymax: string (nullable = true)
 |-- num_class: integer (nullable = true)
 |-- img_data: binary (nullable = true)
 |-- origin: string (nullable = true)



In [65]:
combined_df2.count()

95

In [103]:
from pyspark.sql.functions import col


In [85]:
filenames_list = 


()


In [100]:
filenames_list = [str(x)[14:-2] for x in combined_df2.select('filename').distinct().collect()]

In [121]:
for name in filenames_list:
    combined_df2.where(combined_df2.filename==name).select('width')
    combined_df2.where(combined_df2.filename==name).select('height')
#    [list(row) for row in df.collect()]

    x = [list(row) for row in combined_df2.where(combined_df2.filename==name).select('xmin').collect()]
    combined_df2.where(combined_df2.filename==name).select('ymin').collect()
    combined_df2.where(combined_df2.filename==name).select('xmax').collect()
    combined_df2.where(combined_df2.filename==name).select('ymax').collect()
    combined_df2.where(combined_df2.filename==name).select('num_class')
    combined_df2.where(combined_df2.filename==name).select('class')
    combined_df2.where(combined_df2.filename==name).select('img_data')
    break
    #print(combined_df2.where(col('filename')==name).select(col('xmin'))).collect()
#blah.show()


In [122]:
print(x[0])

['286']


In [92]:
filenames_list2 = [str(i[0])[14:-2] for i in filenames_list]
print(filenames_list2[0])




In [72]:
grouped = combined_df2.groupBy('filename')

my_df = pd.DataFrame()

for group in grouped:
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    classes_text = []
    classes = []
    for i in range(0,group.count()-1):
        im = group.take[i][0]
        xmins.append(im.xmin / im.width)
        xmaxs.append(im.xmax / im.width)
        ymins.append(im.ymin / im.height)
        ymaxs.append(im.ymax / im.height)
#        classes_text.append(im.class)
        # Changed to directly get class num from column
        classes.append(im.num_class)
    filename = group.first().filename
    origin = group.first().origin
    height = group.first().height
    width = group.first().width
    data = group.first().data
    pd_data = pd.DataFrame({'image/height':height,'image/width':width, 'image/filename':filename, 
                  'image/encoded':data, 'image/format':'jpg', 'image/object/bbox/xmin':xmin,
                  'image/object/bbox/ymin':ymin, 'image/object/bbox/xmax':xmax, 'image/object/bbox/ymax':ymax,
                  'image/object/class/text':classes_text})
    # , 'image/object/class/label':classes
    bleh = pd.concat(my_df, pd_data)
    
fixed_df = createDataFrame(bleh)
    #column_names = ['image/height', 'image/width', 'image/filename', 'image/encoded',
    #                'image/format', 'image/object/bbox/xmin','image/object/bbox/ymin',
    #                'image/object/bbox/xmax','image/object/bbox/ymax', 'image/object/class/text',
    #                'image/object/class/label']

TypeError: 'GroupedData' object is not iterable

In [None]:
#pddf = combined_df2.toPandas()

In [None]:
import tensorflow as tf
from object_detection.utils import dataset_util
from collections import namedtuple, OrderedDict
from pyspark.sql.types import *

In [None]:
combined_df2.write.format('tfrecords').option('recordType', 'Example').save('tf_train.record')

In [71]:
def create_tf_example(group, path):
    #with tf.gfile.GFile(os.path.join(path, '{}'.format(group.filename)), 'rb') as fid:
        #encoded_jpg = fid.read()
    #encoded_jpg_io = io.BytesIO(encoded_jpg)
    #image = Image.open(encoded_jpg_io)
    #width, height = image.size
    height = group.height[0]
    width = group.width[0]
    encoded_jpg = group.img_data[0]
    
    filename = group.filename.encode('utf8')
    image_format = b'jpg'
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    classes_text = []
    classes = []

    for index, row in group.object.iterrows():
        xmins.append(row['xmin'] / width)
        xmaxs.append(row['xmax'] / width)
        ymins.append(row['ymin'] / height)
        ymaxs.append(row['ymax'] / height)
        classes_text.append(row['class'].encode('utf8'))
        # Changed to directly get class num from column
        classes.append(row['num_class'])

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': dataset_util.int64_feature(height),
        'image/width': dataset_util.int64_feature(width),
        'image/filename': dataset_util.bytes_feature(filename),
        'image/source_id': dataset_util.bytes_feature(filename),
        'image/encoded': dataset_util.bytes_feature(encoded_jpg),
        'image/format': dataset_util.bytes_feature(image_format),
        'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
        'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
        'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
        'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
        'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
        'image/object/class/label': dataset_util.int64_list_feature(classes),
    }))
    return tf_example

In [None]:

# Need to alter for .ipynb

def main(_):
    #writer = tf.python_io.TFRecordWriter(FLAGS.output_path)
    #path = os.path.join(FLAGS.image_dir)
    #examples = pd.read_csv(FLAGS.csv_input)
    # changed to Pandas Version of Combined_DF from above DataFrame
    grouped = split(pddf, 'filename')
    for group in grouped:
        tf_example = create_tf_example(group, path)
        writer.write(tf_example.SerializeToString())

    writer.close()
    output_path = os.path.join(os.getcwd(), FLAGS.output_path)
    print('Successfully created the TFRecords: {}'.format(output_path))


# Potentially Faster way to run labels?
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_text_to_int(row_label):
    if row_label is 'nine':
        return 1
    elif row_label is 'ten':
        return 2
    elif row_label is 'jack':
        return 3
    elif row_label is 'queen':
        return 4
    elif row_label is 'king':
        return 5
    elif row_label is 'ace':
        return 6
    #else:
        #return None
        
blah = combined_df.withColumn('num_class', pandas_text_to_int(combined_df['class']))

In [None]:
data = df.take(2)    # take the first four rows of the dataframe

for i in range(0,2):
    im = data[i][0]
    print("image index: {}".format(i))
    print("image type: {}, number of fields: {}".format(type(im), len(im)))
    print("image path: {}".format(im.origin))
    print("height: {}, width: {}, OpenCV type: {}".format(im.height, im.width, im.mode))
    print("nChannels: {}".format(im.nChannels))
    print("\n")