# Scala uses

In [68]:
val c: Seq[Int] = Seq(1, 2, 3, 3, 6, 9, 0)
println(c.size)
val d = Vector("a", "b", "c", "c")
println(c.count(_==3))

println(c.groupBy(identity).mapValues(_.size))
println(d.groupBy(identity).mapValues(_.size))

7
2
Map(0 -> 1, 1 -> 1, 6 -> 1, 9 -> 1, 2 -> 1, 3 -> 2)
Map(b -> 1, a -> 1, c -> 2)


c: Seq[Int] = List(1, 2, 3, 3, 6, 9, 0)
d: scala.collection.immutable.Vector[String] = Vector(a, b, c, c)


In [None]:
def suma(a: Int) = a + 3
suma(8)

In [None]:
val x = 1 to 10
val double = (i:Int) => i * 2
val double_list = x.map(double)

In [None]:
val final_sum = double_list.foldLeft(0.0)(_+_)
println(final_sum)

In [16]:
// Creates Numeric alphabet
// val alphabet = 'a' to 'z'
val alphabet = ('a' to 'z')
val alpha = List('a','b','c')
println(alphabet)
println(alpha)

NumericRange a to z
List(a, b, c)


alphabet: scala.collection.immutable.NumericRange.Inclusive[Char] = NumericRange a to z
alpha: List[Char] = List(a, b, c)


In [53]:
// def foldLeft[B](z: B)(op: (B, A) ⇒ B): B
val donuts: Seq[String] = Seq("Plain", "Strawberry", "Glazed")
val donut_words = donuts.foldLeft("")((a, b) => a + b + " Donut ")

// val donut_words = donuts.foldLeft("")((a, b) => b + " Donut ") // Glazed Donut (last entry)
// println(s"All donuts = ${donuts.foldLeft("")((a, b) => a + b + " Donut ")}")

donuts: Seq[String] = List(Plain, Strawberry, Glazed)
donut_words: String = "Plain Donut Strawberry Donut Glazed Donut "


In [None]:
val r = alphabet.reverse.toList
val smash = r.foldRight("")(_+_)
println(smash)

In [17]:
// push collections together
val chars = ('a' to 'z') ++ ('A' to 'Z')

chars: scala.collection.immutable.IndexedSeq[Char] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z, A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z)


# SparkML + Scala

In [1]:
// Initialize
val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

// Import to Spark df
var df = spark.read.format("csv").option("header","true").load("df_pandas.csv")
df = df.drop("index")
df = df.select("advance", "los", "pax", "paid")
df.show(5)
println(df.schema)

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.4:4040
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1607981557397)
SparkSession available as 'spark'


+-------+----+---+-----+
|advance| los|pax| paid|
+-------+----+---+-----+
|    1.0| 5.0|1.0|222.2|
|    1.0| 0.0|2.0|442.2|
|    4.0| 0.0|1.0|207.1|
|    1.0| 0.0|1.0|249.1|
|   12.0|18.0|2.0|628.4|
+-------+----+---+-----+
only showing top 5 rows

StructType(StructField(advance,StringType,true), StructField(los,StringType,true), StructField(pax,StringType,true), StructField(paid,StringType,true))


spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@127d4660
df: org.apache.spark.sql.DataFrame = [advance: string, los: string ... 2 more fields]
df: org.apache.spark.sql.DataFrame = [advance: string, los: string ... 2 more fields]
df: org.apache.spark.sql.DataFrame = [advance: string, los: string ... 2 more fields]


In [2]:
// val donut_words = donuts.foldLeft("") // starting value goes in ()
                            //((
                            //a, current values of list
                            //b) newest item in list
                            //=> a + b + " Donut ") concat together current values in list with newest item in list

// Convert all cols to float
val castedDF = df.columns.foldLeft(df)((current, c) => current.withColumn(c, col(c).cast("float")))
println(castedDF.schema)

StructType(StructField(advance,FloatType,true), StructField(los,FloatType,true), StructField(pax,FloatType,true), StructField(paid,FloatType,true))


castedDF: org.apache.spark.sql.DataFrame = [advance: float, los: float ... 2 more fields]


### Vector Assemble to Dense Vector

In [3]:
import org.apache.spark.ml.feature.VectorAssembler

// convert to dense vector feature column
val assembler = new VectorAssembler()
  .setInputCols(castedDF.columns)
  .setOutputCol("features")

// transform data
val output = assembler.transform(castedDF)
println("Assembled columns to vector column 'features'")
output.show(5)

Assembled columns to vector column 'features'
+-------+----+---+-----+--------------------+
|advance| los|pax| paid|            features|
+-------+----+---+-----+--------------------+
|    1.0| 5.0|1.0|222.2|[1.0,5.0,1.0,222....|
|    1.0| 0.0|2.0|442.2|[1.0,0.0,2.0,442....|
|    4.0| 0.0|1.0|207.1|[4.0,0.0,1.0,207....|
|    1.0| 0.0|1.0|249.1|[1.0,0.0,1.0,249....|
|   12.0|18.0|2.0|628.4|[12.0,18.0,2.0,62...|
+-------+----+---+-----+--------------------+
only showing top 5 rows



import org.apache.spark.ml.feature.VectorAssembler
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_c1639385a496, handleInvalid=error, numInputCols=4
output: org.apache.spark.sql.DataFrame = [advance: float, los: float ... 3 more fields]


### Scale Data

In [4]:
import org.apache.spark.ml.feature.StandardScaler

// Initialize Scaler
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(output)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(output)
scaledData.show(5)

+-------+----+---+-----+--------------------+--------------------+
|advance| los|pax| paid|            features|      scaledFeatures|
+-------+----+---+-----+--------------------+--------------------+
|    1.0| 5.0|1.0|222.2|[1.0,5.0,1.0,222....|[0.02434390012342...|
|    1.0| 0.0|2.0|442.2|[1.0,0.0,2.0,442....|[0.02434390012342...|
|    4.0| 0.0|1.0|207.1|[4.0,0.0,1.0,207....|[0.09737560049369...|
|    1.0| 0.0|1.0|249.1|[1.0,0.0,1.0,249....|[0.02434390012342...|
|   12.0|18.0|2.0|628.4|[12.0,18.0,2.0,62...|[0.29212680148107...|
+-------+----+---+-----+--------------------+--------------------+
only showing top 5 rows



import org.apache.spark.ml.feature.StandardScaler
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_96662d5317ef
scalerModel: org.apache.spark.ml.feature.StandardScalerModel = StandardScalerModel: uid=stdScal_96662d5317ef, numFeatures=4, withMean=false, withStd=true
scaledData: org.apache.spark.sql.DataFrame = [advance: float, los: float ... 4 more fields]


In [13]:
// Only grab scaled features then rename to features
val cluster_input = scaledData.select("scaledFeatures").withColumnRenamed("scaledFeatures", "features")
cluster_input.show(3)

+--------------------+
|            features|
+--------------------+
|[0.02434390012342...|
|[0.02434390012342...|
|[0.09737560049369...|
+--------------------+
only showing top 3 rows



cluster_input: org.apache.spark.sql.DataFrame = [features: vector]


In [14]:
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Trains a k-means model.
val kmeans = new KMeans().setK(5).setSeed(1L)
val model = kmeans.fit(cluster_input)

// Make predictions
val predictions = model.transform(cluster_input)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)

Silhouette with squared euclidean distance = 0.4957853443785829
Cluster Centers: 
[0.9656586930619593,4.199334110006287,1.3732132708016558,0.8856660921141534]
[0.5252672801325673,0.4071165697085678,1.0953494606871508,0.5011525193840705]
[0.7451700564523532,0.569683853233855,2.4237409901042137,1.2378200980845149]
[1.6357300487850643,0.9443531201628226,4.39560592487628,3.850356921350474]
[2.9724173878954794,0.7162076257176955,1.6839187041990253,1.0679513058799524]


import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_f70638c5f72f
model: org.apache.spark.ml.clustering.KMeansModel = KMeansModel: uid=kmeans_f70638c5f72f, k=5, distanceMeasure=euclidean, numFeatures=4
predictions: org.apache.spark.sql.DataFrame = [features: vector, prediction: int]
evaluator: org.apache.spark.ml.evaluation.ClusteringEvaluator = ClusteringEvaluator: uid=cluEval_95260352ae37, metricName=silhouette, distanceMeasure=squaredEuclidean
silhouette: Double = 0.4957853443785829


In [15]:
predictions.show(5)

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[0.02434390012342...|         1|
|[0.02434390012342...|         2|
|[0.09737560049369...|         1|
|[0.02434390012342...|         1|
|[0.29212680148107...|         0|
+--------------------+----------+
only showing top 5 rows



# SparkML Pipeline Steps

In [23]:
// Initialize
val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

// Import to Spark df
val df = spark.read.format("csv")
                .option("header","true")
                .load("df_pandas.csv")
                .drop("index")
                .select("advance", "los", "pax", "paid")

// Convert all cols to float
val castedDF = df.columns.foldLeft(df)((current, c) => current.withColumn(c, col(c).cast("float")))
println(castedDF.schema)

StructType(StructField(advance,FloatType,true), StructField(los,FloatType,true), StructField(pax,FloatType,true), StructField(paid,FloatType,true))


spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@127d4660
df: org.apache.spark.sql.DataFrame = [advance: string, los: string ... 2 more fields]
castedDF: org.apache.spark.sql.DataFrame = [advance: float, los: float ... 2 more fields]


In [26]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler


// Configure an ML pipeline, which consists of 3 stages: VectorAssembler, StandardScaler, KMeans.
// initialize vector assembler stage
val assembler = new VectorAssembler()
  .setInputCols(castedDF.columns) // all columns in castedDF
  .setOutputCol("pre_scaled_vector")

// initialize scaler stage
val scaler = new StandardScaler()
  .setInputCol(assembler.getOutputCol) // inputCol is output from assembler stage
  .setOutputCol("features")
  .setWithStd(true)
  .setWithMean(false)

// initialize kmeans stage
val kmeans = new KMeans().setK(5).setSeed(1L)

// initialize pipeline
val pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, kmeans))
             
// Fit pipeline
val model = pipeline.fit(castedDF)
             
// Transform data
val pred = model.transform(castedDF)
pred.show(5)

+-------+----+---+-----+--------------------+--------------------+----------+
|advance| los|pax| paid|   pre_scaled_vector|            features|prediction|
+-------+----+---+-----+--------------------+--------------------+----------+
|    1.0| 5.0|1.0|222.2|[1.0,5.0,1.0,222....|[0.02434390012342...|         1|
|    1.0| 0.0|2.0|442.2|[1.0,0.0,2.0,442....|[0.02434390012342...|         2|
|    4.0| 0.0|1.0|207.1|[4.0,0.0,1.0,207....|[0.09737560049369...|         1|
|    1.0| 0.0|1.0|249.1|[1.0,0.0,1.0,249....|[0.02434390012342...|         1|
|   12.0|18.0|2.0|628.4|[12.0,18.0,2.0,62...|[0.29212680148107...|         0|
+-------+----+---+-----+--------------------+--------------------+----------+
only showing top 5 rows



import org.apache.spark.ml.{Pipeline, PipelineModel}
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_e13bf123c1b4, handleInvalid=error, numInputCols=4
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_de93d603d52b
kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_8ce0783a5751
pipeline: org.apache.spark.ml.Pipeline = pipeline_9cd507aa1559
model: org.apache.spark.ml.PipelineModel = pipeline_9cd507aa1559
pred: org.apache.spark.sql.DataFrame = [advance: float, los: float ... 5 more fields]


In [36]:
// Show all stages of the pipeline
model.stages

res29: Array[org.apache.spark.ml.Transformer] = Array(VectorAssembler: uid=vecAssembler_e13bf123c1b4, handleInvalid=error, numInputCols=4, StandardScalerModel: uid=stdScal_de93d603d52b, numFeatures=4, withMean=false, withStd=true, KMeansModel: uid=kmeans_8ce0783a5751, k=5, distanceMeasure=euclidean, numFeatures=4)


In [40]:
import org.apache.spark.ml.clustering.{KMeansModel}

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(pred)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.stages.last.asInstanceOf[KMeansModel].clusterCenters.foreach(println)

Silhouette with squared euclidean distance = 0.4957853443785829
Cluster Centers: 
[0.9656586930619593,4.199334110006287,1.3732132708016558,0.8856660921141534]
[0.5252672801325673,0.4071165697085678,1.0953494606871508,0.5011525193840705]
[0.7451700564523532,0.569683853233855,2.4237409901042137,1.2378200980845149]
[1.6357300487850643,0.9443531201628226,4.39560592487628,3.850356921350474]
[2.9724173878954794,0.7162076257176955,1.6839187041990253,1.0679513058799524]


import org.apache.spark.ml.clustering.KMeansModel
evaluator: org.apache.spark.ml.evaluation.ClusteringEvaluator = ClusteringEvaluator: uid=cluEval_09fb8bbfebe1, metricName=silhouette, distanceMeasure=squaredEuclidean
silhouette: Double = 0.4957853443785829


# Save Model to Disk/Load from Disk

In [43]:
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("./tmp/spark-scala-KMeansPipeline")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-scala-KMeansPipeline")

sameModel.stages.last.asInstanceOf[KMeansModel].clusterCenters.foreach(println)

[0.9656586930619593,4.199334110006287,1.3732132708016558,0.8856660921141534]
[0.5252672801325673,0.4071165697085678,1.0953494606871508,0.5011525193840705]
[0.7451700564523532,0.569683853233855,2.4237409901042137,1.2378200980845149]
[1.6357300487850643,0.9443531201628226,4.39560592487628,3.850356921350474]
[2.9724173878954794,0.7162076257176955,1.6839187041990253,1.0679513058799524]


sameModel: org.apache.spark.ml.PipelineModel = pipeline_9cd507aa1559
