-
Notifications
You must be signed in to change notification settings - Fork 3
/
SaveFeaturizedData.scala
149 lines (136 loc) · 5.54 KB
/
SaveFeaturizedData.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.cloudera.datascience.dl4j.cnn.examples.caltech256
import java.io.File
import com.cloudera.datascience.dl4j.cnn.Utils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.datavec.image.loader.NativeImageLoader
import org.deeplearning4j.nn.api.Layer
import org.deeplearning4j.nn.graph.ComputationGraph
import org.deeplearning4j.nn.modelimport.keras.trainedmodels.{TrainedModelHelper, TrainedModels}
import org.deeplearning4j.util.ModelSerializer
import org.nd4j.linalg.dataset.DataSet
import org.nd4j.linalg.factory.Nd4j
import scopt.OptionParser
/**
* This program is used to featurize a directory of images, obtaining intermediate output
* from a specified layer of the VGG16 convolutional network.
*/
object SaveFeaturizedData {
private val imageHeight = 224
private val imageWidth = 224
private val numChannels = 3
private[this] case class Params(
numClasses: Int = 257,
outputLayer: String = null,
imagePath: String = null,
savePath: String = null,
modelPath: Option[String] = None)
private[this] object Params {
def parseArgs(args: Array[String]): Params = {
val params = new OptionParser[Params]("Load and save jpegs") {
opt[Int]("numClasses")
.text("number of class labels")
.action((x, c) => c.copy(numClasses = x))
opt[String]("outputLayer")
.text("the layer that serves as the output layer for the chopped model")
.action((x, c) => c.copy(outputLayer = x))
opt[String]("imagePath")
.text("the path of jpeg data")
.action((x, c) => c.copy(imagePath = x))
opt[String]("savePath")
.text("the path to save the featurized or byte[] version of jpeg data")
.action((x, c) => c.copy(savePath = x))
opt[String]("modelPath")
.text("the path for the imported model")
.action((x, c) => c.copy(modelPath = Some(x)))
}.parse(args, Params()).get
require(params.imagePath != null && params.savePath != null && params.modelPath != null,
"You must supply the image data path and the path to save output.")
params
}
}
def main(args: Array[String]): Unit = {
val param = Params.parseArgs(args)
val numClasses = param.numClasses
val imagePath = param.imagePath
val savePath = param.savePath
val modelPath = param.modelPath
val spark = SparkSession.builder()
.appName("Save output of convolutional layers")
.getOrCreate()
val logger = org.apache.log4j.LogManager.getLogger(this.getClass)
try {
val sc = spark.sparkContext
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
import spark.sqlContext.implicits._
val vgg16 = modelPath.map { path =>
val modelFile = new File(path)
ModelSerializer.restoreComputationGraph(modelFile)
}.getOrElse {
logger.info("No model provided, fetching VGG...")
val helper = new TrainedModelHelper(TrainedModels.VGG16)
helper.loadModel
}
/*
Split the layers where `param.outputLayer` is the last layer in the left section.
What this would mean is that the frozen graph would have the NN layers up until
`param.outputLayer` and the unfrozen graph will contain layers from the layer after the
output layer, to the final "predictions" layer.
Note: you can review the layer info within a graph using `unfrozen.summary()`.
*/
val (_, unfrozen) = splitModelAt(param.outputLayer, vgg16)
val choppedGraph = Utils.removeLayer(vgg16, param.outputLayer, unfrozen)
Seq("train", "test", "valid").foreach { dir =>
val predictions = featurizeJpegs(sc, s"$imagePath$dir/", numClasses, choppedGraph)
val df = predictions.map { ds =>
(Nd4j.toByteArray(ds.getFeatureMatrix), Nd4j.toByteArray(ds.getLabels))
}.toDF()
df.write.parquet(s"$savePath$dir/")
}
} finally {
spark.stop()
}
}
/**
* Split a ComputationGraph.
* @param splitLayer The name of the last layer in the left portion.
*/
def splitModelAt(splitLayer: String, model: ComputationGraph): (Array[Layer], Array[Layer]) = {
val layers = model.getLayers
layers.splitAt(layers.map(_.conf().getLayer.getLayerName).indexOf(splitLayer) + 1)
}
/**
* Load individual .jpg images and featurize them through a neural network graph.
*
* @param sc Active Spark context.
* @param imagePath Path to load input jpgs from.
* @param numClasses Length of the one-hot encoded label vectors.
* @param choppedGraph The network to get predictions from.
* @return RDD of DataSet, one DataSet per input image.
*/
def featurizeJpegs(
sc: SparkContext,
imagePath: String,
numClasses: Int,
choppedGraph: ComputationGraph): RDD[DataSet] = {
val jpegs = sc.binaryFiles(imagePath).mapPartitions { it =>
val loader = new NativeImageLoader(imageHeight, imageWidth, numChannels)
it.map { case (path, img) =>
val inputStream = img.open()
val mat = loader.asMatrix(inputStream)
inputStream.close()
val regex = ".+\\/(\\d{3})_\\d{4}\\.jpg".r
val label = path match {
case regex(l) => l.toInt - 1
case _ =>
throw new IllegalArgumentException(s"Could not parse label from path: $path")
}
val labelArray = Nd4j.zeros(numClasses)
labelArray.putScalar(label, 1F)
new DataSet(mat, labelArray)
}
}
Utils.getPredictions(jpegs, choppedGraph, sc)
}
}