#### Setting up Spark

In [1]:
# Find spark.
import findspark
findspark.init()

In [2]:
# Submit arguements to install spark deep learning package.
SUBMIT_ARGS = "--packages databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11 pyspark-shell"

In [3]:
import os

In [4]:
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [5]:
# Create spark configuration object.
import pyspark
conf = pyspark.SparkConf()

In [6]:
# Create spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("g-lab").config(conf = conf).getOrCreate()

In [7]:
spark

#### Importing the required packages

In [8]:
import pandas as pd
import hvplot.pandas

import keras
import tensorflow as tf
from sparkdl import *
from sparkdl.image import imageIO
from sparkdl import DeepImageFeaturizer

from pyspark.ml.image import ImageSchema
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler

Using TensorFlow backend.


#### Import labels dataset

In [9]:
# Reading in labels csv from Google cloud storage.
labels = spark.createDataFrame(pd.read_csv('https://storage.googleapis.com/openimages/v5/validation-annotations-human-imagelabels-boxable.csv'))

In [10]:
labels.printSchema()

root
 |-- ImageID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- LabelName: string (nullable = true)
 |-- Confidence: long (nullable = true)



In [11]:
labels.show()

+----------------+------------+----------+----------+
|         ImageID|      Source| LabelName|Confidence|
+----------------+------------+----------+----------+
|0001eeaf4aed83f9|verification|  /m/0cmf2|         1|
|0004886b7d043cfd|verification| /m/01g317|         0|
|0004886b7d043cfd|verification| /m/04hgtk|         0|
|0004886b7d043cfd|verification|  /m/09j2d|         0|
|0004886b7d043cfd|verification| /m/0463sg|         0|
|000595fe6fee6369|verification|  /m/07j7r|         0|
|000595fe6fee6369|verification|  /m/02xwb|         1|
|000595fe6fee6369|verification|  /m/05s2s|         0|
|000595fe6fee6369|verification| /m/0c9ph5|         0|
|000595fe6fee6369|verification|  /m/02wbm|         1|
|00075905539074f2|verification|  /m/09j2d|         1|
|00075905539074f2|verification|  /m/04rky|         1|
|00075905539074f2|verification|/m/02p0tk3|         1|
|00075905539074f2|verification| /m/05y5lj|         1|
|00075905539074f2|verification|  /m/0dzf4|         1|
|00075905539074f2|verificati

The label names are encoded. We can join them to the label key to determine the names of the labels

In [47]:
# Loading the dataframe with the actual name of the labels
label_names = spark.createDataFrame(pd.read_csv('https://storage.googleapis.com/openimages/v5/class-descriptions-boxable.csv', header = None, names = ["label_id","label_name"]))

In [48]:
# Printing the schema
label_names.printSchema()

root
 |-- label_id: string (nullable = true)
 |-- label_name: string (nullable = true)



In [49]:
label_names.count()

601

In [82]:
# Joining df and the actual label names
labelled_df = labels.join(label_names, labels["LabelName"] == label_names["label_id"])

In [83]:
# Dropping the repeated column
labelled_df=labelled_df.drop("LabelName","Source","Confidence")

In [79]:
# Printing the schema
labelled_df.printSchema()

root
 |-- ImageID: string (nullable = true)
 |-- label_id: string (nullable = true)
 |-- label_name: string (nullable = true)



In [54]:
labelled_df.show(n=3)

+----------------+------------+----------+---------+----------+
|         ImageID|      Source|Confidence| label_id|label_name|
+----------------+------------+----------+---------+----------+
|536d4d20aa48fb9b|verification|         1|/m/02vqfm|    Coffee|
|579e2886e28fd8e6|verification|         0|/m/02vqfm|    Coffee|
|57e23f2e9ec38dea|verification|         1|/m/02vqfm|    Coffee|
+----------------+------------+----------+---------+----------+
only showing top 3 rows



#### Importing the image files

In [17]:
# Reading in images.
imagesDF = imageIO.readImagesWithCustomFn("s3://open-images-dataset/validation", decode_f=imageIO.PIL_decode)

In [18]:
type(imagesDF)

pyspark.sql.dataframe.DataFrame

In [19]:
imagesDF.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)



In [20]:
imagesDF.count()

41620

#### Exploratory Analysis

##### Counts for each of 'ImageID'Â¶

We see that some images are mentioned more than once. This means that each image can have more than one label (positive or negative included)

In [57]:
labelled_df.groupBy("ImageID").count().orderBy('count', ascending=False).show(n=10)

+----------------+-----+
|         ImageID|count|
+----------------+-----+
|c9beadfaec7b5043|   25|
|bdb8487ccbb6d988|   25|
|a52964795b0e2475|   25|
|935acc24545d071a|   24|
|f243d6d523d3ce1c|   24|
|dd56ac13a2c82644|   24|
|9f8550843e6f1ba6|   24|
|ecbbb807ff44e14d|   24|
|8330ba335d902294|   24|
|618dbeb8fb32291f|   23|
+----------------+-----+
only showing top 10 rows



In [58]:
labelled_df.groupBy("ImageID").count().orderBy('count', ascending=True).show(n=10)

+----------------+-----+
|         ImageID|count|
+----------------+-----+
|943aa2bbcd569fc0|    1|
|bb68d8ee3243c449|    1|
|175b6ab0f97c28e2|    1|
|de28cb110df155fc|    1|
|932fd97687d89da3|    1|
|140ab911aaf54c2a|    1|
|2b319452011f49b9|    1|
|af76966c17b351a2|    1|
|ec26e6417302bb1c|    1|
|63eb53b282bcf8dc|    1|
+----------------+-----+
only showing top 10 rows



In [88]:
labelled_df=labelled_df.toPandas()

AttributeError: 'DataFrame' object has no attribute 'toPandas'

In [86]:
images = labelled_df.groupby('ImageID').count()
images.columns = ['label_id_Count', 'label_name_Count']
display(images.sort_values('label_id_Count', ascending=False).head(10))
images.hvplot.hist('label_id_Count', bins=50, height=400, width=600)

Unnamed: 0_level_0,label_id_Count,label_name_Count
ImageID,Unnamed: 1_level_1,Unnamed: 2_level_1
bdb8487ccbb6d988,25,25
a52964795b0e2475,25,25
c9beadfaec7b5043,25,25
8330ba335d902294,24,24
ecbbb807ff44e14d,24,24
9f8550843e6f1ba6,24,24
dd56ac13a2c82644,24,24
f243d6d523d3ce1c,24,24
935acc24545d071a,24,24
3ecd3b0292520228,23,23


AttributeError: 'DataFrame' object has no attribute 'hvplot'

##### Label Name Counts

In [None]:
dcounts = labelled_df.label_name.value_counts(normalize=True)
dcounts_df = pd.DataFrame({'label': dcounts.index.tolist(), 'pct_of_images': dcounts})
dcounts_df.reset_index(drop=True, inplace=True)
dcounts_df[0:50].hvplot.bar(x='label', y='pct_of_images', invert=True, flip_yaxis=True, 
                            height=600, width=600, ylim=(0,0.12))

##### Hierarchical clustering among label names based on occurances

In [None]:
import missingno as msno

test = labelled_df[labelled_df.label_name.isin(dcounts_df.loc[0:48,'label'])]
trainpiv = test.pivot_table(index='ImageID', columns='label_name', aggfunc='size')
msno.dendrogram(trainpiv, inline=True)

In [87]:
labelled_df.groupBy("label_name").count().orderBy('count',ascending = False).show()

AttributeError: 'DataFrame' object has no attribute 'groupBy'

#### Preparing the data

Since we want data with positive labels of cars or trees, we need to filter our data accordingly

In [55]:
# Filtering by Confidence = 1 since we want only positive labels
labels_filtered = labelled_df.filter(labels["Confidence"]==1)

In [13]:
# Filtering labels of cars and trees since for the binary classification
labels_filtered = labels_filtered.filter("LabelName = '/m/07j7r' OR LabelName = '/m/0k4j'")

In [14]:
# For the ML model, we only need the ImageID and the label name hence we can drop other columns.
labels_filtered = labels_filtered.drop("Confidence")
labels_filtered = labels_filtered.drop("Source")

In [15]:
labels_filtered.printSchema()

root
 |-- ImageID: string (nullable = true)
 |-- LabelName: string (nullable = true)



In [16]:
labels_filtered.count()

8823

We need an image ID column in order to join our image and label datasets

In [21]:
# The origin field contains the id of the image
imagesDF.select("image.origin").first()

Row(origin='s3://open-images-dataset/validation/103bd62a82a79144.jpg')

In [22]:
# Extracting id from image origin
imagesDF =imagesDF.select("image").withColumn('id', imagesDF['image.origin'].substr(37, 16))

In [23]:
imagesDF.select("id").first()

Row(id='1e15c584ac2337d8')

In [24]:
imagesDF.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- id: string (nullable = true)



Spark ML does not work well with null values hence we will drop all null values from our joined dataset

In [25]:
# Dropping the nulls
imagesDFNotNull = imagesDF.na.drop()

In [26]:
# Checking how many rows we have after dropping the nulls
imagesDFNotNull.count()

41064

In [45]:
# Inner Join with label dataset since we only want the images that have the car and tree labels 
imagesDF_joined = imagesDFNotNull.join(labels_filtered,imagesDFNotNull['id'] == labels_filtered['ImageID'])

In [28]:
imagesDF_joined.count()

8755

#### Building the model 

In [29]:
# Splitting data into train and test
df_train, df_test = imagesDF_joined.randomSplit([0.7,0.3])

In [31]:
# Converting the string label to a numeric value and skip null values
stringIndexer_label = StringIndexer(inputCol="LabelName", outputCol="label",handleInvalid='skip')

In [32]:
# Converting the image into features using the using the Inceptionv3 pre-trained convolutional neural network 
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")

In [33]:
# Adding a logistic model layer on top of the neural network
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")

In [35]:
# Defining the pipeline 
p = Pipeline(stages=[stringIndexer_label,featurizer, lr])

In [36]:
# Fitting the model on the training data
p_model = p.fit(df_train)

#### Predicting on test data

In [37]:
# Predicting on the test data
predictions = p_model.transform(df_test)

In [38]:
# View the predictions
# There is a probability column and a prediction column
predictions.show()

+--------------------+----------------+----------------+---------+-----+--------------------+--------------------+--------------------+----------+
|               image|              id|         ImageID|LabelName|label|            features|       rawPrediction|         probability|prediction|
+--------------------+----------------+----------------+---------+-----+--------------------+--------------------+--------------------+----------+
|[s3://open-images...|08e34d5847106ac3|08e34d5847106ac3|  /m/0k4j|  0.0|[0.0,0.0,0.0,0.42...|[2.12782137270333...|[0.89357800639232...|       0.0|
|[s3://open-images...|26062053361f57cb|26062053361f57cb| /m/07j7r|  1.0|[0.0,0.0,0.0,0.0,...|[-1.4208722331831...|[0.19452488136220...|       1.0|
|[s3://open-images...|27a36d80be726d15|27a36d80be726d15|  /m/0k4j|  0.0|[0.0,1.3226109743...|[2.67081822003892...|[0.93528257514181...|       0.0|
|[s3://open-images...|37638a64ab4510af|37638a64ab4510af| /m/07j7r|  1.0|[0.0,0.0,0.511036...|[-3.0662826587652...|[0.0

In [39]:
# Create an evaluator to measure accuracy
evaluatorLR = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [40]:
accuracy = evaluatorLR.evaluate(predictions)

In [44]:
print(accuracy)

0.9265667051134179


In [None]:
# Create an evaluator to measure f1score
evaluatorLR = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

In [None]:
f1 = evaluatorLR.evaluate(predictions)

In [55]:
print(f1)

0.9266176795318206

In [None]:
# Create an evaluator to measure weightedPrecision
evaluatorLR = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

In [None]:
weightedPrecision = evaluatorLR.evaluate(predictions)

In [60]:
print(weightedPrecision)

0.9267024825351136


In [None]:
# Create an evaluator to measure weightedRecall
evaluatorLR = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

In [63]:
print(weightedRecall)

0.9265667051134179


In [91]:
evaluator = BinaryClassificationEvaluator()\
.setMetricName("areaUnderROC")\
.setRawPredictionCol("prediction")\
.setLabelCol("label")

In [92]:
AUC = evaluator.evaluate(predictions)

In [93]:
AUC

0.9286619761772512

In [72]:
# End the spark session
spark.stop()