In [3]:

import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.spark.ml.feature.{LSH, LSHModel}
import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasSeed
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.StructType


@Since("2.1.0")
class RandomHyperplanesLSHModel private[ml](
    override val uid: String,
    private[ml] val randomHyperplanes: Array[Vector])
  extends LSHModel[RandomHyperplanesLSHModel] {

  /** @group setParam */
  @Since("2.4.0")
  override def setInputCol(value: String): this.type = super.set(inputCol, value)

  /** @group setParam */
  @Since("2.4.0")
  override def setOutputCol(value: String): this.type = super.set(outputCol, value)

  @Since("2.1.0")
  override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {
    require(elems.nonZeroIterator.nonEmpty, "Must have at least 1 non zero entry.")
    val hashValues = randomHyperplanes.map(plane => if (elems.dot(plane) > 0) 1 else -1)
    hashValues.map(Vectors.dense(_))
  }

  @Since("2.1.0")
  override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    if (Vectors.norm(x, 2) * Vectors.norm(y, 2) == 0){
      1
    } else {
      1 - x.dot(y) / (Vectors.norm(x, 2) * Vectors.norm(y, 2))
    }
  }

  @Since("2.1.0")
  override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {
    // Since it's generated by hashing, it will be a pair of dense vectors.
    // TODO: This hashDistance function requires more discussion in SPARK-18454
    x.iterator.zip(y.iterator).map(vectorPair =>
      vectorPair._1.toArray.zip(vectorPair._2.toArray).count(pair => pair._1 != pair._2)
    ).min
  }

  @Since("2.1.0")
  override def copy(extra: ParamMap): RandomHyperplanesLSHModel = {
    val copied = new RandomHyperplanesLSHModel(uid, randomHyperplanes).setParent(parent)
    copyValues(copied, extra)
  }

  @Since("2.1.0")
  override def write: MLWriter = new RandomHyperplanesLSHModel.RandomHyperplanesLSHModelWriter(this)

  @Since("3.0.0")
  override def toString: String = {
    s"RandomHyperplanesLSHModel: uid=$uid, numHashTables=${$(numHashTables)}"
  }
}



@Since("2.1.0")
class RandomHyperplanesLSH(override val uid: String) extends LSH[RandomHyperplanesLSHModel] with HasSeed {

  @Since("2.1.0")
  override def setInputCol(value: String): this.type = super.setInputCol(value)

  @Since("2.1.0")
  override def setOutputCol(value: String): this.type = super.setOutputCol(value)

  @Since("2.1.0")
  override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value)

  @Since("2.1.0")
  def this() = {
    this(Identifiable.randomUID("RandomHyperplanesLSH"))
  }

  /** @group setParam */
  @Since("2.1.0")
  def setSeed(value: Long): this.type = set(seed, value)

  @Since("2.1.0")
  override protected[ml] def createRawLSHModel(inputDim: Int): RandomHyperplanesLSHModel = {
    val rand = new Random($(seed))
    val randomHyperplanes: Array[Vector] = Array.fill($(numHashTables)) {
        Vectors.dense(Array.fill(inputDim)(rand.nextDouble() * 2 - 1))
      }
    new RandomHyperplanesLSHModel(uid, randomHyperplanes)
  }

  @Since("2.1.0")
  override def transformSchema(schema: StructType): StructType = {
    SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT)
    validateAndTransformSchema(schema)
  }

  @Since("2.1.0")
  override def copy(extra: ParamMap): this.type = defaultCopy(extra)
}


@Since("2.1.0")
object RandomHyperplanesLSHModel extends MLReadable[RandomHyperplanesLSHModel] {

  @Since("2.1.0")
  override def read: MLReader[RandomHyperplanesLSHModel] = new RandomHyperplanesLSHModelReader

  @Since("2.1.0")
  override def load(path: String): RandomHyperplanesLSHModel = super.load(path)

  private[RandomHyperplanesLSHModel] class RandomHyperplanesLSHModelWriter(instance: RandomHyperplanesLSHModel)
    extends MLWriter {

    private case class Data(randomHyperplanes: Array[Vector])

    override protected def saveImpl(path: String): Unit = {
      DefaultParamsWriter.saveMetadata(instance, path, sc)
      val data = Data(instance.randomHyperplanes);
      val dataPath = new Path(path, "data").toString
      sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
    }
  }

  private class RandomHyperplanesLSHModelReader extends MLReader[RandomHyperplanesLSHModel] {

    /** Checked against metadata when loading model */
    private val className = classOf[RandomHyperplanesLSHModel].getName

    override def load(path: String): RandomHyperplanesLSHModel = {
      val metadata = DefaultParamsReader.loadMetadata(path, sc, className)

      val dataPath = new Path(path, "data").toString
      val data = sparkSession.read.parquet(dataPath).select("randomHyperplanes").head()
      val randomHyperplanes = data.getSeq[Vector](0).toArray
      val model = new RandomHyperplanesLSHModel(metadata.uid, randomHyperplanes)

      metadata.getAndSetParams(model)
      model
    }
  }
}


Name: Compile Error
Message: <console>:31: error: class LSHModel in package feature cannot be accessed in package org.apache.spark.ml.feature
         extends LSHModel[RandomHyperplanesLSHModel] {
                 ^
<console>:30: error: ml is not an enclosing class
           private[ml] val randomHyperplanes: Array[Vector])
                           ^
<console>:28: error: ml is not an enclosing class
       class RandomHyperplanesLSHModel private[ml](
                                       ^
<console>:42: error: ml is not an enclosing class
         override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {
                                    ^
<console>:49: error: ml is not an enclosing class
         override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
                                    ^
<console>:58: error: ml is not an enclosing class
         override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {
                 