In [1]:
import scala.sys.process._
// Spark ML libraries

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString, VectorAssembler}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.sql.SparkSession

"wget https://apsportal.ibm.com/exchange-api/v1/entries/ba9a3008817bbf458eea4980294e618b/data?accessKey=203a8cb3d4a96e2033cc686dd9d36158".!

val filename = "data?accessKey=203a8cb3d4a96e2033cc686dd9d36158"

// @hidden_cell
// This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
// You might want to remove those credentials before you share your notebook.
def setHadoopConfigd733d4ba5cf1416391b329fb037363a1(name: String) = {
    // This function sets the Hadoop configuration so it is possible to
    // access data from Bluemix Object Storage using Spark

    val prefix = "fs.swift.service." + name
    sc.hadoopConfiguration.set(prefix + ".auth.url", "https://identity.open.softlayer.com" + "/v3/auth/tokens")
    sc.hadoopConfiguration.set(prefix + ".auth.endpoint.prefix","endpoints")
    sc.hadoopConfiguration.set(prefix + ".tenant", "b096337885ed4543a5fa954c5935a4a2")
    sc.hadoopConfiguration.set(prefix + ".username", "38d21a8a7330459f81d015c09aa1f3ec")
    sc.hadoopConfiguration.set(prefix + ".password", "WbMsy.OkTa{5_3FE")
    sc.hadoopConfiguration.setInt(prefix + ".http.port", 8080)
    sc.hadoopConfiguration.set(prefix + ".region", "dallas")
    sc.hadoopConfiguration.setBoolean(prefix + ".public", false)
}

// you can choose any name
val name = "keystone"
setHadoopConfigd733d4ba5cf1416391b329fb037363a1(name)

//read data source
val train_data = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load("swift://dsxsample." + name + "/banking.csv")
    //load(filename)


// to load data as rdd
/*val rddData3 = sc.textFile("swift://dsxsample." + name + "/banking.csv")
rddData3.take(5)

train_data.take(5)
train_data.printSchema()
train_data.show()

train_data.count() */
// prepare data for ml model
val splits = train_data.randomSplit(Array(0.8, 0.18, 0.02), seed = 24L)
val training_data = splits(0).cache()
val test_data = splits(1)
val prediction_data = splits(2)

println("Number of training records: ", training_data.count())
println("Number of testing records : ", test_data.count())
println("Number of prediction records : ", prediction_data.count())

val stringIndexer_label = new StringIndexer().setInputCol("y").setOutputCol("label").fit(train_data)
val stringIndexer_job = new StringIndexer().setInputCol("job").setOutputCol("job_ix")
val stringIndexer_marital = new StringIndexer().setInputCol("marital").setOutputCol("marital_ix")
val stringIndexer_education = new StringIndexer().setInputCol("education").setOutputCol("education_ix")
val stringIndexer_default = new StringIndexer().setInputCol("default").setOutputCol("default_ix")
val stringIndexer_housing = new StringIndexer().setInputCol("housing").setOutputCol("housing_ix")
val stringIndexer_loan = new StringIndexer().setInputCol("loan").setOutputCol("loan_ix")
val stringIndexer_contact = new StringIndexer().setInputCol("contact").setOutputCol("contact_ix")
val stringIndexer_month = new StringIndexer().setInputCol("month").setOutputCol("month_ix")
val stringIndexer_day_of_week = new StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_ix")
val stringIndexer_poutcome = new StringIndexer().setInputCol("poutcome").setOutputCol("poutcome_ix")

//create a feature vector by combining all features together.
val vectorAssembler_features = new VectorAssembler().setInputCols(Array("age", "job_ix", "marital_ix", "education_ix", "default_ix", "housing_ix", "loan_ix", "contact_ix", "month_ix", "day_of_week_ix", "duration", "campaign", "pdays", "previous", "poutcome_ix", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed")).setOutputCol("features")

//define estimatore you want to use for classification.

val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(10)
// index lables back to original labels
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(stringIndexer_label.labels)

val pipeline_rf = new Pipeline().setStages(Array(stringIndexer_label, stringIndexer_job, stringIndexer_marital, stringIndexer_education, stringIndexer_default, stringIndexer_housing, stringIndexer_loan, stringIndexer_contact, stringIndexer_month, stringIndexer_day_of_week, stringIndexer_poutcome, vectorAssembler_features, rf, labelConverter))

training_data.printSchema()

//training the model

val model_rf = pipeline_rf.fit(training_data)

//model accuracy
val predictions = model_rf.transform(test_data)
val evaluatorRF = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluatorRF.evaluate(predictions)
println("Accuracy = " + accuracy)
println("Test Error = " + (1.0 - accuracy))

// Persist model- store model in watson machine learning repo by usinf scala client libraries

import com.ibm.analytics.ngp.repository._
import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

val service_path = "https://ibm-watson-ml.mybluemix.net"
val user = "3c01de8f-30e9-4630-a4ce-c0f7f27fa71f"
val password ="357ab4b5-bf25-45fb-8c92-21b45a224f12"

//authorize with wml
val client = MLRepositoryClient(service_path)
client.authorize(user, password)
//create model artifact
val model_artifact = MLRepositoryArtifact(model_rf, training_data, "Banking data Prediction Model using random forest")
//get saved model metadata from wml
val saved_model = client.models.save(model_artifact).get
saved_model.meta.availableProps

//get model details
println("modelType: " + saved_model.meta.prop("modelType"))
println("trainingDataSchema: " + saved_model.meta.prop("trainingDataSchema"))
println("creationTime: " + saved_model.meta.prop("creationTime"))
println("modelVersionHref: " + saved_model.meta.prop("modelVersionHref"))
println("label: " + saved_model.meta.prop("label"))

//load model and make predictions
val model_version_href = saved_model.meta.prop("modelVersionHref").get
val loaded_model_artifact = client.models.version(model_version_href).get
//print model name loaded
loaded_model_artifact.name.mkString
//make predictions
loaded_model_artifact match {
        case SparkPipelineModelLoader(Success(model)) => {
          val predictions = model.transform(prediction_data)
        }
        case SparkPipelineModelLoader(Failure(e)) => "Loading failed."
        case _ => println(s"Unexpected artifact class: ${loaded_model_artifact.getClass}")
    }
predictions.select("age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed","predictedLabel").show()

predictions.select("predictedLabel").groupBy("predictedLabel").count().show()



--2017-05-24 17:17:55--  https://apsportal.ibm.com/exchange-api/v1/entries/ba9a3008817bbf458eea4980294e618b/data?accessKey=203a8cb3d4a96e2033cc686dd9d36158
Resolving apsportal.ibm.com (apsportal.ibm.com)... 169.54.245.70
Connecting to apsportal.ibm.com (apsportal.ibm.com)|169.54.245.70|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/octet-stream]
Saving to: ‘data?accessKey=203a8cb3d4a96e2033cc686dd9d36158.48’

     0K .......... .......... .......... .......... .......... 4.38M
    50K .......... .......... .......... .......... .......... 16.3M
   100K .......... .......... .......... .......... .......... 9.58M
   150K .......... .......... .......... .......... .......... 7.30M
   200K .......... .......... .......... .......... .......... 15.1M
   250K .......... .......... .......... .......... .......... 5.76M
   300K .......... .......... .......... .......... .......... 72.4M
   350K .......... .......... .......... .......... 

In [2]:
//get WML instance token

val wml_auth_header = "Basic " + Base64.getEncoder.encodeToString((user + ":" + password).getBytes(StandardCharsets.UTF_8))
val wml_url = service_path + "/v2/identity/token"
val wml_response = Http(wml_url).header("Authorization", wml_auth_header).asString
val wmltoken_json: JsValue = Json.parse(wml_response.body)

val wmltoken = (wmltoken_json \ "token").asOpt[String] match {
    case Some(x) => x
    case None => ""
}

print(wmltoken)

eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRJZCI6Ijg4M2UyYjBmLTI5YmEtNDhhZi05MTc3LTcxOWE3NmNiOTNmZiIsImluc3RhbmNlSWQiOiI4ODNlMmIwZi0yOWJhLTQ4YWYtOTE3Ny03MTlhNzZjYjkzZmYiLCJwbGFuSWQiOiIzZjZhY2Y0My1lZGU4LTQxM2EtYWM2OS1mOGFmM2JiMGNiZmUiLCJyZWdpb24iOiJ1cy1zb3V0aCIsInVzZXJJZCI6IjNjMDFkZThmLTMwZTktNDYzMC1hNGNlLWMwZjdmMjdmYTcxZiIsImlzcyI6Imh0dHA6Ly8xMjkuNDEuMjI5LjE4ODo4MDgwL3YyL2lkZW50aXR5IiwiaWF0IjoxNDk1NjY0NDA1LCJleHAiOjE0OTU2OTMyMDV9.YYcDDc4HOcf0uB2T0-IsIfu5prgnNd5WIOzqpetGYVepSEozdt_CLkny-JQHZHWS3R-PdL3QQxJzWkauQTUErT7atf39JP9kXj9mWSCaOJYxMrwbdIBUoWpE22-eJpHYA3IsO5yPSWfAPdi5c6iW3N1XqzhmPA73cNIY3jQSKRPyl__5zW0yBM2ckJIGqhOHiEqXrC6nTrL0XBqLNjAZh-oBd5C3hOYKpYhm6ULfBtKI53tBXnZpUTdHAREAwES1l73m7jBej0fDNU70R7pX3zQWQvznDxpp5eY6VSVRnHpwx6i1jbYbGXlCN_QgHW1z5VpW7I4KalbWMeSC6q2Y-g

In [3]:
//create online deployment and scoring edndpoint..you can do this same from WML console.
val endpoint_online = "https://ibm-watson-ml.mybluemix.net/v2/online/deployments/"
val payload_artifactVersionHref = loaded_model_artifact.meta.prop("modelVersionHref").get
val payload_name = "ML model from DSX"
val payload_data_online = Json.stringify(Json.toJson(Map("artifactVersionHref" -> payload_artifactVersionHref, "name" -> payload_name)))
val response_online = Http(endpoint_online).postData(payload_data_online).header("Content-Type", "application/json").header("Authorization", wmltoken).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
val scoring_href_json: JsValue = Json.parse(response_online.body)
val scoring_href = (scoring_href_json \ "entity" \ "scoringHref").asOpt[String] match {
    case Some(x) => x
    case None => ""
}
print(scoring_href)

https://ibm-watson-ml.mybluemix.net/32768/v2/scoring/3418

In [5]:
//prepare payload data for test data
val payload_scoring = Json.stringify(Json.toJson(Map("record" -> Json.toJson(List(Json.toJson(18), Json.toJson("student"), Json.toJson("single"), Json.toJson("high.school"), Json.toJson("no"), Json.toJson("no"), Json.toJson("no"), Json.toJson("cellular"), Json.toJson("nov"), Json.toJson("fri"),Json.toJson(256), Json.toJson(2), Json.toJson(7), Json.toJson(2), Json.toJson("failure"), Json.toJson(-3.4), Json.toJson(92.649), Json.toJson(-30.1), Json.toJson(0.714), Json.toJson(5017.5))))))
//get prediction results
val response_scoring = Http(scoring_href).postData(payload_scoring).header("Content-Type", "application/json").header("Authorization", "Bearer " + wmltoken).option(HttpOptions.method("PUT")).option(HttpOptions.connTimeout(10000)).option(HttpOptions.readTimeout(50000)).asString
print(response_scoring)

HttpResponse({"result":{"duration":256,"housing_ix":1.0,"education_ix":1.0,"housing":"no","day_of_week":"fri","previous":2,"features":{"values":[18.0,10.0,1.0,1.0,0.0,1.0,0.0,0.0,4.0,4.0,256.0,2.0,7.0,2.0,1.0,-3.4,92.649,-30.1,0.714,5017.5]},"campaign":2,"cons_conf_idx":-30.1,"job":"student","predictedLabel":"1","poutcome_ix":1.0,"month_ix":4.0,"prediction":1.0,"nr_employed":5017.5,"loan_ix":0.0,"contact":"cellular","job_ix":10.0,"age":18,"loan":"no","euribor3m":0.714,"education":"high.school","default_ix":0.0,"marital":"single","marital_ix":1.0,"default":"no","pdays":7,"contact_ix":0.0,"cons_price_idx":92.649,"rawPrediction":{"values":[3.3179473559161448,6.682052644083854]},"day_of_week_ix":4.0,"emp_var_rate":-3.4,"poutcome":"failure","month":"nov","probability":{"values":[0.3317947355916145,0.6682052644083855]}}},200,Map(Access-Control-Allow-Methods -> Vector(POST, GET, OPTIONS, PUT, DELETE), Access-Control-Allow-Origin -> Vector(*), Connection -> Vector(Keep-Alive), Content-Type -> 