In [1]:
%%init_spark
launcher.packages = ["JohnSnowLabs:spark-nlp:2.4.5"]

In [2]:
//We need a pipeline

import org.apache.spark.ml.Pipeline

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.149:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1588596533031)
SparkSession available as 'spark'


import org.apache.spark.ml.Pipeline


In [3]:
//Load processed data
val test_data = spark.read.load("test.processed.parquet")
val train_data = spark.read.load("train.processed.parquet")

test_data: org.apache.spark.sql.DataFrame = [category: string, description: string ... 10 more fields]
train_data: org.apache.spark.sql.DataFrame = [category: string, description: string ... 10 more fields]


In [12]:
//This is a very very ugly way to setup a set of classes but it is useful and faster
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml._
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol, HasLabelCol, HasPredictionCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.ml.linalg.SparseVector
import org.tensorflow.Graph
import com.johnsnowlabs.ml.tensorflow._
import com.johnsnowlabs.nlp.util.io.ResourceHelper
import org.apache.commons.io.IOUtils
import scala.util.Random

/*
    A Scala wrapper around a TF graph. Loads the graph from a .pb file and provides
    basic functionality for working with the model, i.e. training and testing.
    All parameters are hard-coded except for batch size and number of epochs
    Inspired by the SparkNLP source, but simplified
*/
class DummyTFClassifier(var tfGraphFileName: String) {
    private val inputKey = "inputs"
    private val labelKey = "outputs"
    private val learningRateKey = "Adam/learning_rate"
    private val trainingKey = "Adam"

    private val predictionKey = "predictions"
    private val lossKey = "loss"
    private val accuracyKey = "accuracy"
    private val initKey = "init"    
    
    lazy private val tf_model = {
        
        val graph = new Graph()
        val graphStream = ResourceHelper.getResourceStream(tfGraphFileName)
        val graphBytesDef = IOUtils.toByteArray(graphStream)

        graph.importGraphDef(graphBytesDef)
        
        new TensorflowWrapper(
            Variables(Array.empty[Byte], Array.empty[Byte]), 
            graph.toGraphDef
        )
    }
    
     def train(features: Array[Array[Float]], labels: Array[Array[Float]], endEpoch: Int = 100, batchSize : Int = 10) = {
         
        
        val zippedDataset = features.zip(labels).toSeq
        val trainingDataset = Random.shuffle(zippedDataset)
         
        tf_model.createSession().runner.addTarget(initKey).run()
        
        for( epoch <- 1 to endEpoch){
            
            var loss = 0.0f
            var acc = 0.0f            
            var batches = 0
            
            for (batch <- trainingDataset.toArray.grouped(batchSize)){
                
                val tensors = new TensorResources()
                
                val featuresArray = batch.map(x => x._1)
                val labelsArray = batch.map(x => x._2)
                
                val inputTensor = tensors.createTensor(featuresArray)
                val labelTensor = tensors.createTensor(labelsArray)        

                val calculated = tf_model
                          .getSession()
                          .runner
                          .feed(inputKey, inputTensor)
                          .feed(labelKey, labelTensor)
                          .addTarget(trainingKey)
                          .fetch(lossKey)
                          .fetch(accuracyKey)
                           .fetch(predictionKey)
                          .run();
                loss += TensorResources.extractFloats(calculated.get(0))(0);
                acc += TensorResources.extractFloats(calculated.get(1))(0);
                batches += 1
                tensors.clearTensors()   
                
            }
                        
            println(f"Epoch ${epoch}/$endEpoch loss: ${loss/batches} - accuracy: ${acc/batches}")

        }  
    }
    
    def predict(features: Array[Array[Float]], labels: Array[Array[Float]]) = {
        
        val tensors = new TensorResources()
        
        val inputTensor = tensors.createTensor(features)
        val labelTensor = tensors.createTensor(labels)        

        val calculated = tf_model
                  .getSession()
                  .runner
                  .feed(inputKey, inputTensor)
                  .feed(labelKey, labelTensor)
                  .fetch(predictionKey)
                  .fetch(accuracyKey)
                  .run();
        
        val result = TensorResources.extractLongs(calculated.get(0))
        var acc = TensorResources.extractFloats(calculated.get(1))(0)
        tensors.clearTensors()          
        
        println(f"Prediction accuracy is ${acc}")

        result
    }
}

trait DummyParams extends Params
  with HasInputCol with HasLabelCol with HasPredictionCol{
  
}

/*
    A simple and very inefficient way of annotating a Spark dataframe
    In order to make training easier, I've limitted the number of rows used for trainig to TRAIN_SET_SIZE
*/
trait DummyDataProcessor {
    
    private val TRAIN_SET_SIZE = 100000
    
    def process(data: Dataset[_], labelCol: String, inputCol: String): (Array[Array[Float]], Array[Array[Float]], DataFrame) = {
       //retreve labels
        val unique_labels = data.select(labelCol).distinct().collect().toArray.map(x => x(0).toString)
        //get the index of a label
        val label_to_index = udf {(label: String) => 
          unique_labels.indexOf(label)
        }

        val label_id = labelCol + "_id"

        val ndata = data
            .limit(TRAIN_SET_SIZE)
            .withColumn(label_id, label_to_index(col(labelCol)))
            .withColumn("mon_id", monotonically_increasing_id())              
        
        val labels = ndata.select(label_id).collect().map(r => r.getAs[Int](label_id)).map(
            i => {
                val v = new SparseVector(unique_labels.length, Array(i), Array(1)).toArray
                v.map(x => x.toFloat)
            }
        )

        val features = ndata.select(inputCol).collect().map {
             row => row.get(0).asInstanceOf[SparseVector].toDense.toArray.map(d => d.toFloat)
        }        
        
        return (features, labels, ndata)
    }
}

/*
    Spark model for transforming the data to be classified
*/
class DummyModel (override val uid: String, tfdModel: DummyTFClassifier) 
    extends Model[DummyModel] with DummyParams with DummyDataProcessor {
              
    override def copy(extra: ParamMap): DummyModel = defaultCopy(extra)

    override def transform(dataset: Dataset[_]): DataFrame = {
        val (features, labels, ndata) = this.process(dataset, $(labelCol), $(inputCol))


        val prediction_labels = ndata.select($(labelCol)).distinct().collect().toArray.map(x => x(0).toString)
        
        val predictions = tfdModel.predict(features, labels)
        
        val get_prediction = udf {(mon_id: Integer) => 
          prediction_labels(predictions(mon_id).toInt)
        }

        
        
        
        val t_ndata = ndata.withColumn($(predictionCol), get_prediction(col("mon_id")))
        
        t_ndata
    }
 
    override def transformSchema(schema: StructType): StructType = {
        StructType(Seq(StructField($(predictionCol), StringType, true)).++(schema))
    }
}

/*
    Spark estimator for training the dummy TF model
*/
class DummyEstimator(override val uid: String) extends Estimator[DummyModel]
  with DummyParams with DefaultParamsWritable with DummyDataProcessor{

  def this() = this(Identifiable.randomUID("DummyEstimator"))

  def setInputCol(value: String): this.type = set(inputCol -> value)    
  def setLabelCol(value: String): this.type = set(labelCol -> value)    
  def setPredictionCol(value: String): this.type = set(predictionCol -> value)    
  
  override def transformSchema(schema: StructType): StructType = {
    schema
  }

  override def copy(extra: ParamMap): Estimator[DummyModel] = defaultCopy(extra)


  override def fit(dataset: Dataset[_]): DummyModel = {
            
     val (features, labels, ndata) = this.process(dataset, $(labelCol), $(inputCol))

    val myDummy = new DummyTFClassifier("/home/i/projects/johnsnow/mymodel.pb")
    
    myDummy.train(features, labels, 100, 100)
    //do some fitting here
      
    copyValues(new DummyModel(uid, myDummy).setParent(this))
  }
}

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml._
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol, HasLabelCol, HasPredictionCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.ml.linalg.SparseVector
import org.tensorflow.Graph
import com.johnsnowlabs.ml.tensorflow._
import com.johnsnowlabs.nlp.util.io.ResourceHelper
import org.apache.commons.io.IOUtils
import scala.util.Random
defined class DummyTFClassifier
defined trait DummyParams
defined trait DummyDataProcessor
defined class DummyModel
defined class DummyEstimator


In [13]:
//Compose a pipeline
val dummy = new DummyEstimator().setLabelCol("category").setInputCol("features").setPredictionCol("prediction")
val pipeline = new Pipeline().setStages(Array(dummy))


dummy: DummyEstimator = DummyEstimator_7a2d79103e3e
pipeline: org.apache.spark.ml.Pipeline = pipeline_0d08671908dd


In [14]:
//Fit, i.e. tain the dummy TF model
//Mind that we are current using TRAIN_SET_SIZE rows to save computational resource (my laptop is going to explode of this buggy Java)
val m1 = pipeline.fit(train_data)


Epoch 1/100 loss: 0.65244454 - accuracy: 0.7649796
Epoch 2/100 loss: 0.5553829 - accuracy: 0.7923904
Epoch 3/100 loss: 0.5466695 - accuracy: 0.79494053
Epoch 4/100 loss: 0.54079574 - accuracy: 0.79668015
Epoch 5/100 loss: 0.5359634 - accuracy: 0.7979803
Epoch 6/100 loss: 0.53156924 - accuracy: 0.79931015
Epoch 7/100 loss: 0.52734375 - accuracy: 0.8006404
Epoch 8/100 loss: 0.52313375 - accuracy: 0.80223006
Epoch 9/100 loss: 0.51882946 - accuracy: 0.80361944
Epoch 10/100 loss: 0.5143486 - accuracy: 0.8050593
Epoch 11/100 loss: 0.5096366 - accuracy: 0.8068894
Epoch 12/100 loss: 0.50465864 - accuracy: 0.8088391
Epoch 13/100 loss: 0.49939582 - accuracy: 0.8108293
Epoch 14/100 loss: 0.4938512 - accuracy: 0.81299967
Epoch 15/100 loss: 0.4880399 - accuracy: 0.81526995
Epoch 16/100 loss: 0.48197564 - accuracy: 0.8174898
Epoch 17/100 loss: 0.4756737 - accuracy: 0.8199401
Epoch 18/100 loss: 0.46914673 - accuracy: 0.8225399
Epoch 19/100 loss: 0.46241516 - accuracy: 0.82542014
Epoch 20/100 loss: 0.

m1: org.apache.spark.ml.PipelineModel = pipeline_0d08671908dd


In [15]:
//And transform the test_data
val results = m1.transform(test_data)

Prediction accuracy is 0.7501316


results: org.apache.spark.sql.DataFrame = [category: string, description: string ... 13 more fields]


In [None]:
results.select

In [17]:
//Just checking
results.select("category", "prediction").orderBy(rand()).show(20)

+--------+----------+
|category|prediction|
+--------+----------+
|Business|     World|
|  Sports|    Sports|
|  Sports|    Sports|
|   World|  Business|
|Business|  Business|
|Business|     World|
|Sci/Tech|  Sci/Tech|
|Sci/Tech|  Sci/Tech|
|Sci/Tech|  Sci/Tech|
|   World|  Business|
|Sci/Tech|  Sci/Tech|
|  Sports|    Sports|
|  Sports|  Business|
|  Sports|    Sports|
|Business|     World|
|Business|     World|
|  Sports|    Sports|
|Business|  Sci/Tech|
|  Sports|    Sports|
|  Sports|    Sports|
+--------+----------+
only showing top 20 rows

