In [1]:
# !wget http://download.tensorflow.org/example_images/flower_photos.tgz
# !tar -xvf "flower_photos.tgz"

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [2]:
spark = SparkSession.builder \
        .appName('Pyspark-Image-Classification') \
        .getOrCreate()
spark

In [3]:
# !pip install pandas
# !pip install pillow
# !pip install tensorflow
# !pip install PyArrow

In [4]:
import pandas as pd
from PIL import Image
import numpy as np
import io

import tensorflow as tf
from tensorflow.keras.applications.efficientnet import EfficientNetB0, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.types import *
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, split, reverse, udf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer,StringIndexer
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, NaiveBayes, LogisticRegression

In [5]:
# loaded image
df = spark.read.format("binaryFile")\
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load('flower_photos')

In [6]:
df.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



In [7]:
df = df.withColumn('label', reverse(split(df['path'], '/')).getItem(1))
df.toPandas().head()

Unnamed: 0,path,modificationTime,length,content,label
0,file:/opt/bitnami/spark/Pyspark-Flower-Classif...,2016-01-11 06:54:55,281953,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",tulips
1,file:/opt/bitnami/spark/Pyspark-Flower-Classif...,2016-01-11 06:18:33,277326,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",sunflowers
2,file:/opt/bitnami/spark/Pyspark-Flower-Classif...,2016-01-11 06:55:53,265806,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",tulips
3,file:/opt/bitnami/spark/Pyspark-Flower-Classif...,2016-01-11 06:19:25,257418,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",sunflowers
4,file:/opt/bitnami/spark/Pyspark-Flower-Classif...,2016-01-11 06:06:37,248540,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",daisy


In [8]:
model = EfficientNetB0(include_top=False)
model.summary()

Model: "efficientnetb0"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, None, None,  0                                            
__________________________________________________________________________________________________
rescaling (Rescaling)           (None, None, None, 3 0           input_1[0][0]                    
__________________________________________________________________________________________________
normalization (Normalization)   (None, None, None, 3 7           rescaling[0][0]                  
__________________________________________________________________________________________________
stem_conv_pad (ZeroPadding2D)   (None, None, None, 3 0           normalization[0][0]              
_____________________________________________________________________________________

In [9]:
bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

def model_fn():
    """
    Returns a Efficient Net B0 model with top layer removed and broadcasted pretrained weights.
    """
    model = EfficientNetB0(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

In [10]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [11]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [12]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [13]:
features_df = df.repartition(8).select(col("path"), featurize_udf("content").alias("features"), col('label'))
features_df.show(5)

+--------------------+--------------------+----------+
|                path|            features|     label|
+--------------------+--------------------+----------+
|file:/opt/bitnami...|[-0.11560434, -0....|sunflowers|
|file:/opt/bitnami...|[0.9454496, -0.25...|     daisy|
|file:/opt/bitnami...|[-0.083090514, -0...|    tulips|
|file:/opt/bitnami...|[-0.16955833, -0....|sunflowers|
|file:/opt/bitnami...|[-0.19819307, -0....|sunflowers|
+--------------------+--------------------+----------+
only showing top 5 rows



In [14]:
# MLLib post processing for features column formatting
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.select(
   col("path"),  
   list_to_vector_udf(features_df["features"]).alias("features"),
   col("label")
)

In [15]:
# split train/valid/test set
train_df, test_df =  features_df.randomSplit([0.8, 0.2], 42)  

In [16]:
# concatenates all features into a single feature vector
vectorAssembler = VectorAssembler(inputCols=['features'], outputCol="featuresModel")

# add the labels through String Indexer
labelIndexer = StringIndexer(inputCol="label", outputCol="indexLabel").fit(features_df)

## Logistic Regression

In [17]:
# define Classifier
lr = LogisticRegression(maxIter=5, 
    regParam=0.03, 
    elasticNetParam=0.5, 
    labelCol="indexLabel", 
    featuresCol="featuresModel"
    )

In [None]:
%%time
# define Pipeline
pipeline = Pipeline(stages=[labelIndexer, vectorAssembler, lr])
lrmodel = pipeline.fit(train_df)

In [None]:
# evaluating the model and show results
pred = lrmodel.transform(test_df)
pred.select("prediction", "indexLabel", "features").show(5)

In [None]:
# LR - compare prediction and GT and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexLabel", predictionCol="prediction")

print('F1-Score ', evaluator.evaluate(pred, {evaluator.metricName:'f1'}))
print('Precision ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedPrecision'}))
print('Recall ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedRecall'}))
print('Accuracy ', evaluator.evaluate(pred, {evaluator.metricName: 'accuracy'}))

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import itertools
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.GnBu):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# LR - Confusion Matrix
y_true = pred.select("indexLabel").toPandas()
y_pred = pred.select("prediction").toPandas()
confusion_matrix(lrmodel, y_true, y_pred, range(5))

%matplotlib inline
sns.set_style("darkgrid")
plt.figure(figsize=(7,7))
plt.grid(False)

plot_confusion_matrix(cnf_matrix, classes=['dandelion'])

## SVM

In [17]:
# define Classifier
svm = LinearSVC(maxIter=5, 
    regParam=0.03, 
    elasticNetParam=0.5, 
    labelCol="indexLabel", 
    featuresCol="featuresModel"
    )

In [None]:
%%time
# define Pipeline
pipeline = Pipeline(stages=[labelIndexer, vectorAssembler, svm])
svmmodel = pipeline.fit(train_df)

In [None]:
# evaluating the model and show results
pred = svmmodel.transform(test_df)
pred.select("prediction", "indexLabel", "features").show(5)

In [None]:
# SVM - compare prediction and GT and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexLabel", predictionCol="prediction")

print('F1-Score ', evaluator.evaluate(pred, {evaluator.metricName:'f1'}))
print('Precision ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedPrecision'}))
print('Recall ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedRecall'}))
print('Accuracy ', evaluator.evaluate(pred, {evaluator.metricName: 'accuracy'}))

In [None]:
# SVM - Confusion Matrix
y_true = pred.select("indexLabel").toPandas()
y_pred = pred.select("prediction").toPandas()
confusion_matrix(lrmodel, y_true, y_pred, range(5))

%matplotlib inline
sns.set_style("darkgrid")
plt.figure(figsize=(7,7))
plt.grid(False)

plot_confusion_matrix(cnf_matrix, classes=['dandelion'])

## Naive Bayes

In [17]:
# define Classifier
nb = NaiveBayes(maxIter=5, 
    regParam=0.03, 
    elasticNetParam=0.5, 
    labelCol="indexLabel", 
    featuresCol="featuresModel"
    )

In [None]:
%%time
# define Pipeline
pipeline = Pipeline(stages=[labelIndexer, vectorAssembler, lr])
nbmodel = pipeline.fit(train_df)

In [None]:
# evaluating the model and show results
pred = nbmodel.transform(test_df)
pred.select("prediction", "indexLabel", "features").show(5)

In [None]:
# NB - compare prediction and GT and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexLabel", predictionCol="prediction")

print('F1-Score ', evaluator.evaluate(pred, {evaluator.metricName:'f1'}))
print('Precision ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedPrecision'}))
print('Recall ', evaluator.evaluate(pred, {evaluator.metricName: 'weightedRecall'}))
print('Accuracy ', evaluator.evaluate(pred, {evaluator.metricName: 'accuracy'}))

In [None]:
# NB - Confusion Matrix
y_true = pred.select("indexLabel").toPandas()
y_pred = pred.select("prediction").toPandas()
confusion_matrix(lrmodel, y_true, y_pred, range(5))

%matplotlib inline
sns.set_style("darkgrid")
plt.figure(figsize=(7,7))
plt.grid(False)

plot_confusion_matrix(cnf_matrix, classes=['dandelion'])