In [None]:
import pyspark
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.ml import Transformer, Estimator, Pipeline
from pyspark.ml.classification import LogisticRegression
from mmlspark import CNTKModel, ModelDownloader

from mmlspark import *

import numpy as np, pandas as pd, os, sys, time
from os.path import join, abspath, exists

In [None]:
model = ModelDownloader(spark, "models").downloadByName("ResNet50")

In [None]:
image_path = 'personalities/*/*.jpg'

def getLabel(path):
    if 'jobs' in path: label = 1.0
    elif 'zuckerberg' in path: label = 2.0
    return label

#Read in images
imageDF = spark.readImages(image_path)
getLabelUDF = udf(lambda row: getLabel(row[0]), DoubleType())
imageDF = imageDF.withColumn("labels", getLabelUDF(col('image')))

imageDF.printSchema()

In [None]:
# Make some featurizers
it = (ImageTransformer()
    .setOutputCol("scaled")
    .resize(height = 256, width = 256))

ur = (UnrollImage()
    .setInputCol("scaled")
    .setOutputCol("features"))
    
dc1 = DropColumns().setCols(["scaled", "image"])

lr1 = LogisticRegression().setFeaturesCol("features").setLabelCol("labels")

dc2 = DropColumns().setCols(["features"])

basicModel = Pipeline(stages=[it, ur, dc1, lr1, dc2])

In [None]:
resnet = (ImageFeaturizer()
    .setInputCol("image")
    .setOutputCol("features")
    .setModelLocation(model.uri)
    .setLayerNames(model.layerNames)
    .setCutOutputLayers(1))
    
dc3 = DropColumns().setCols(["image"])
    
lr2 = LogisticRegression().setFeaturesCol("features").setLabelCol("labels")

dc4 = DropColumns().setCols(["features"])

deepModel = Pipeline(stages=[resnet, dc3, lr2, dc4]) 

In [None]:
def timedExperiment(model, train, test):
    start = time.time()
    result =  model.fit(train).transform(test).toPandas()
    print("Experiment took {}s".format(time.time() - start))
    return result

In [None]:
train, test = imageDF.randomSplit([.8,.2])
train.count(), test.count()

In [None]:
basicResults = timedExperiment(basicModel, train, test)

In [None]:
deepResults = timedExperiment(deepModel, train, test)