In [None]:
// Set log level to ERROR (less verbose)
sc.setLogLevel("ERROR")
// Import libs
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import scala.util.hashing.MurmurHash3

In [None]:
import org.apache.spark.sql.SparkSession //creates Spark Session

val spark = SparkSession
  .builder()
  .appName("Spark Session")
  .getOrCreate()


val df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("wrangle_final_for_yury.txt")


df.show()

In [None]:
 val datarset = df.select("RainTomorrow", "Location", "MinTemp","MaxTemp","Rainfall","Evaporation","Sunshine","Humidity9am","Temp9am"
                          )

datarset.show()


In [None]:
import org.apache.spark.sql.functions._
val dataset2 = datarset.withColumn("RainTomorrow", when(col("RainTomorrow") === "Yes", "1")
                                   //.otherwise(col("RainTomorrow"))
                                   .otherwise("0")
                           );

dataset2.show()

In [None]:
//Tokenizer for location
//update hashing and get rid of tokenizer


val tokenizer = new Tokenizer()
  .setInputCol("Location")
  .setOutputCol("locfeatres")

val hashingTF = new HashingTF()
  .setNumFeatures(20)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("locationfeatures")

// Let's see the TF outputs
val hash01 = hashingTF.transform(tokenizer.transform(dataset2))
 // hash01.show()

 val featuresdf = hash01.select("RainTomorrow", "locationfeatures", "MinTemp","MaxTemp","Rainfall","Evaporation","Sunshine","Humidity9am","Temp9am")
featuresdf.show()

In [6]:
//

In [None]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}


//  .setInputCols(Array("locationfeatures", "MinTemp","MaxTemp","Rainfall","Evaporation","Sunshine","Humidity9am","Temp9am"))

featuresdf.withColumn("MaxTemp", $"MaxTemp".cast("float"))
featuresdf.withColumn("Rainfall", $"Rainfall".cast("float"))
featuresdf.withColumn("Evaporation", $"Evaporation".cast("float"))
featuresdf.withColumn("Sunshine", $"Sunshine".cast("float"))
featuresdf.withColumn("Humidity9am", $"Humidity9am".cast("float"))
featuresdf.withColumn("Temp9am", $"Temp9am".cast("float"))


//val output3 = output2.withColumn("label", $"label".cast(sql.types.IntegerType))


val assembler = new VectorAssembler()
  .setInputCols(Array("MinTemp","MaxTemp","Rainfall","Evaporation","Sunshine","Humidity9am","Temp9am"))
  .setOutputCol("features")
  .setHandleInvalid("skip")

val output = assembler.transform(featuresdf)
//println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "RainTomorrow").show(false)

val output2 = output.withColumnRenamed("RainTomorrow","label")

output2.select("features", "label").show(false)

val output3 = output2.withColumn("label", $"label".cast("float"))

In [None]:
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
lr

In [None]:
//split to test and training

val splits: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = output3.randomSplit(Array(0.75, 0.25), 98765L)//create two vals to hold TrainingData and TestingData respectivelyval trainDataFrame = splitDataSet(0)val testDataFrame = splitDataSet(1)

//val splits = output.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(1), splits(0))



In [None]:
val pipeline = new Pipeline()
  .setStages(Array(lr))


In [None]:
// Fit the pipeline to training documents.
val model = pipeline.fit(trainingData)

In [None]:
// Now we can optionally save the fitted pipeline to disk
model.write
  .overwrite()
  .save("/tmp/spark-logistic-regression-model")

In [None]:
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

In [None]:
// Compare the original model with its copy
model.transform(trainingData)
  .select( "probability", "features","prediction", "label")
  .toDF.show(50)

/*
sameModel.transform(training)
  .select("id", "text", "probability", "prediction", "label")
  .toDF.show() */

In [None]:
// Make predictions on test documents.
model.transform(testData)
  .select("probability", "features","prediction", "label")
  .collect()
  .foreach { case Row(label: Float, features: Array, prob: Vector, pred: Double) =>
    println(s"($label, $features)\t --> prob=$prob\t prediction=$pred")
  } 

In [None]:
//Create LibSVM
import org.apache.spark.mllib.util.MLUtils

output3.coalesce(1)
  .write.format("com.databricks.spark.csv")
  .option("header", "true")
   .save("featurevect.csv")
  
val data = MLUtils.loadLibSVMFile(sc, "featurevect.csv")

In [None]:

import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel


// Load and parse the data file.
//Weather-aus-cleanup-file 
//val data = MLUtils.loadLibSVMFile(sc, "weatherAUS.csv")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32

val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification forest model:\n" + model.toDebugString)

// Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
val sameModel = RandomForestModel.load(sc, "myRandomForestClassificationModel")