# Test Driving IBM Watson IoT Driver Behavior Service on Bluemix
Below are the high level steps involved in processing input data to get the driver behavior information.

- Scan the input raw vehicle prob data.
- Use Context Mapping service to map raw vehicle prob data to geospatial data. This is optional step. But it enhances the quality of the analyzed driver behavior results.
 - Get map-matched car probe data by using the mapMatching API.
 - Get road type data with the getLinkInformation API.
- Analyze the uploaded data by submitting a job request using Driving Behavior service's sendJobRequest API.
- Get the detailed driver behavior information from analyzed data by using Driving Behavior service's getAnalyzedTripSummaryList and getAnalyzedTripInfo APIs

## 1. Initialization code
- Using scalaj-http library for making REST calls, so add this jar
- Using play-json library for Json parsing of RESt results, add this jar
- Required imports

In [1]:
%AddJar -magic http://central.maven.org/maven2/org/scalaj/scalaj-http_2.10/2.3.0/scalaj-http_2.10-2.3.0.jar
%AddJar -magic http://central.maven.org/maven2/com/typesafe/play/play-json_2.10/2.4.6/play-json_2.10-2.4.6.jar
import scalaj.http._
import play.api.libs.json._
import scala.util.{Try,Success,Failure}

Using cached version of scalaj-http_2.10-2.3.0.jar
Using cached version of play-json_2.10-2.4.6.jar


## 2. Read Input raw trip data
- Input format is in json lines
- Read, parse and convert to TripDataPoint RDD

In [5]:
// Quick way to load test dataset created
val inputTestDataUrl = "https://raw.githubusercontent.com/smatlapudi/testdrive-bluemix-driverbehavior-service/master/testdata/udacity-ch3-north.json"
val inputTestData =  scala.io.Source.fromURL(inputTestDataUrl).getLines.to[collection.immutable.Seq]
val tripDataLines_rdd = sc.parallelize(inputTestData)

In [6]:
// TripData class
case class TripDataPoint(timestamp: String, heading: Double, speed: Double, latitude: Double, longitude: Double )
// Utility function to covert json line to Scala Object
def parseTripDataLine(jsonLine: String): Option[TripDataPoint] = {
    val jsObj  = Json.parse(jsonLine)
    implicit val modelFormat = Json.format[TripDataPoint]
    Json.fromJson[TripDataPoint](jsObj).asOpt //ignoring any possible error messages here
}
// Convert tripDataLines_rdd to TripRdd 
val tripData_rdd = tripDataLines_rdd.map(parseTripDataLine).flatMap(_.toSeq)
tripData_rdd.cache
tripData_rdd.take(5).foreach(println)

TripDataPoint(2016-11-17T22:23:03Z,357.4195341311482,15.085078615161539,37.4019584656,-122.114883423)
TripDataPoint(2016-11-17T22:23:04Z,357.12198437345154,15.001162178540001,37.4020271301,-122.115020752)
TripDataPoint(2016-11-17T22:23:05Z,355.95965463973073,14.618271412569998,37.4021072388,-122.115158081)
TripDataPoint(2016-11-17T22:23:06Z,330.98606173371456,14.17331637527667,37.4021835327,-122.115287781)
TripDataPoint(2016-11-17T22:23:07Z,351.05080395396425,13.715587838680003,37.4022636414,-122.115409851)


## 3. Map the TripData using MapInsights mapservice
- Parameter initialization
- Call mapservice matching endpoint
- Get Link Information from Mapped TripData to get Road Type

### Parameter initialization

In [7]:
val contextMapServiceProps = Map(
    "match_api_url" ->  "https://automotive.internetofthings.ibmcloud.com/mapinsights/mapservice/map/matching",
    "link_api_url" ->  "https://automotive.internetofthings.ibmcloud.com/mapinsights/mapservice/link",
    "tenant_id" -> "SOME_VALUE", // Get this from Context Mapping Service DashBoard in Bluemix
    "username" -> "SOME_VALUE",         // Get this from Context Mapping Service DashBoard in Bluemix
    "password" -> "SOME_VALUE",   // Get this from Context Mapping Service DashBoard in Bluemix
    "mo_id" -> "udacity-ch3",         // Dummy device id value for testing
    "trip_id" -> "udacity-ch3-north"  // Dummy value for testing
)
val contextMapServiceInfo = sc.broadcast(contextMapServiceProps)

### Use mapservice matching endpoint
 - Defined required case classes and utility functions
 - Make REST call to match raw coordinates to map matched coordinates
 - Refer api documentation at https://developer.ibm.com/api/view/id-263:title-IBM_Watson_IoT_Context_Mapping#Mapmatching

In [8]:
//Case classes to store input and rest results
case class RestCallResult(code: Int, body: String)
case class MappedTripAttr(   
    link_id: String,
    matched_map_id: Double,
    matched_longitude: Double,
    matched_latitude: Double,
    matched_heading: Double
)
case class MappedTripData(rawAttr: TripDataPoint, mappedAttr: Option[MappedTripAttr])

// Utility function to covert Rest Results from json to Scala Object
// Note that API result is a list of Objects
def parseMappedTripDataJson(jsonLine: String): Option[List[MappedTripAttr]] = {
    val jsObj  = Json.parse(jsonLine)
    implicit val modelFormat = Json.format[MappedTripAttr]
    Json.fromJson[List[MappedTripAttr]](jsObj).asOpt //ignoring any possible error messages here
}

In [17]:
//Make map service matching endpoint calls here
val mappedTripData_rdd = tripData_rdd.mapPartitions{iter =>
    val serviceProps = contextMapServiceInfo.value
    iter.map { tdp =>
        val response = Http(serviceProps.get("match_api_url").get).
            param("tenant_id", serviceProps.get("tenant_id").get).
            auth( serviceProps.get("username").get, serviceProps.get("password").get).
            param("mo_id",     serviceProps.get("mo_id").get).
            param("trip_id",   serviceProps.get("trip_id").get).
            param("timestamp", s"${tdp.timestamp}").
            param("heading",   s"${tdp.heading}").
            param("latitude",  s"${tdp.latitude}").
            param("longitude", s"${tdp.longitude}").
            option(HttpOptions.readTimeout(30000)).asString
              
        response.code match {
            case 200 => parseMappedTripDataJson(response.body) match {
                // from Option[List[MappedTripAttr]] ==> MappedTripData
                case Some(matchedAttr) => MappedTripData( tdp, Some(matchedAttr(0)))
                case _ => MappedTripData(tdp, None)
            }
            case _ => MappedTripData(tdp, None)
        }
    }
}

mappedTripData_rdd.cache
println("Output Sample:")
mappedTripData_rdd.take(2).foreach(println)

Output Sample:
MappedTripData(TripDataPoint(2016-11-17T22:23:34Z,1.9000000386407154,0.0,37.4032897949,-122.116546631),Some(MappedTripAttr(258517197000,5.0,-122.1166795,37.4032391,30.27786064506597)))
MappedTripData(TripDataPoint(2016-11-17T22:52:41Z,0.5999999953495317,0.0,37.4902687073,-122.239448547),Some(MappedTripAttr(95876262000,5.0,-122.2394565851877,37.49026589386588,344.2528605654734)))


### Link Information from Mapped TripData
 - Get distinct link ids
 - Call mapservice link endpoint
 - Parse output to get the the road type. Possible values are 
   ( Motorway=1, Urban Highway=2,  Urban Primary=3, Urban Road=4, Others = 5)

In [None]:
println(mappedTripData_rdd.count)

In [19]:
// from MappedTripData[TripDataPoint, Option[MappedTripAttr]] -> Some(link_id) -> link_id -> distinct Link_ids
val uniqueLinks = mappedTripData_rdd.map(_.mappedAttr.map(_.link_id)).flatMap(_.toSeq).distinct
uniqueLinks.cache
println("Sample link_ids:")
uniqueLinks.take(2).foreach(println)

Sample link_ids:
95876262000
258517197000


## Now get Link Information for above results
- Refer api documentation at https://developer.ibm.com/api/view/id-263:title-IBM_Watson_IoT_Context_Mapping#Getlinkinformation

In [20]:
// Utilityfunction to extract road_type from Rest Results from json to Scala Object
// that is json field from links[0].properties.type
def extractRoadTypeFromLinkInfoJson(jsonLine: String): Try[String] = {
    val jsonValue  = Json.parse(jsonLine)
    Try( ((jsonValue \ "links")(0) \ "properties" \ "type").as[String] )
}

case class LinkRoadType(link_id: String, road_type: String)

//Make map service link endpoint calls here
val linkInfo_rdd = uniqueLinks.repartition(10).mapPartitions{iter =>
    val serviceProps = contextMapServiceInfo.value
    iter.map { link_id =>
        val response = Http(serviceProps.get("link_api_url").get).
            param("tenant_id", serviceProps.get("tenant_id").get).
            auth(serviceProps.get("username").get, serviceProps.get("password").get).
            param("link_id", link_id).
            option(HttpOptions.readTimeout(30000)).asString
        response.code match {
            case 200 => extractRoadTypeFromLinkInfoJson(response.body) match {
                case Success(road_type) => LinkRoadType(link_id, road_type)
                case _ => LinkRoadType(link_id, "5") //default to road type 5 (Others)
            }
            case _ => LinkRoadType(link_id, "5") //default to road type 5
        }
    }
}
linkInfo_rdd.cache
linkInfo_rdd.count

2

### Prepare Data for Driver Insights service - Car Prob Data endpoint
- Refer api documentation at https://developer.ibm.com/api/view/id-261:title-IBM_Watson_IoT_Driver_Behavior#Sendcarprobedata
- join MappedTripData with LinkInfomation to add Road Type to all mapped records

In [21]:
val l_rdd = mappedTripData_rdd.map( x => (x.mappedAttr.fold("NA")(_.link_id), x) )
val r_rdd =  linkInfo_rdd.map( x => (x.link_id, x.road_type))
val mappedTripDataWithRoadInfo_rdd = l_rdd.leftOuterJoin(r_rdd)

//class for car prob data record
case class CarProbData(
    trip_id: String,
    timestamp: String,
    matched_heading: Double,
    speed: Double,
    matched_longitude: Double,
    mo_id: String,
    longitude: Double,
    matched_latitude: Double,
    matched_link_id: Option[String],
    latitude: Double,
    road_type: Option[String],
    heading: Double
)

val carProbData_rdd = mappedTripDataWithRoadInfo_rdd.map { 
    case(link_id, (mappedTripData, road_type_opt)) =>
        val tripDataAttr = mappedTripData.rawAttr
        val mappedTripAttrOpt = mappedTripData.mappedAttr
        CarProbData(
            trip_id            = contextMapServiceInfo.value.get("trip_id").get,
            timestamp          = tripDataAttr.timestamp,
            matched_heading    = mappedTripAttrOpt.fold(tripDataAttr.heading)(_.matched_heading),
            speed              = tripDataAttr.speed,
            matched_longitude  = mappedTripAttrOpt.fold(tripDataAttr.longitude)(_.matched_longitude),
            mo_id              = contextMapServiceInfo.value.get("mo_id").get,
            longitude          = tripDataAttr.longitude,
            matched_latitude   = mappedTripAttrOpt.fold(tripDataAttr.latitude)(_.matched_latitude),
            matched_link_id    = if(link_id != "NA") Some(link_id) else None,
            latitude           = tripDataAttr.latitude,
            road_type          = road_type_opt,
            heading            = tripDataAttr.heading
        )    
}
carProbData_rdd.cache
carProbData_rdd.take(2).foreach(println)

CarProbData(udacity-ch3-north,2016-11-17T22:23:34Z,30.27786064506597,0.0,-122.1166795,udacity-ch3,-122.116546631,37.4032391,Some(258517197000),37.4032897949,Some(5),1.9000000386407154)
CarProbData(udacity-ch3-north,2016-11-17T22:52:41Z,344.2528605654734,0.0,-122.23945657541587,udacity-ch3,-122.239448547,37.490265866252045,Some(95876262000),37.4902687073,Some(5),0.5999999953495317)


## Send Car Probe Data to Driver Insights service
- Initialize service related paramaters
- Send the data by calling the serive

In [22]:
val driverInsightsServiceProps = Map(
    "carprobe_datastore_url" ->  "https://automotive.internetofthings.ibmcloud.com/driverinsights/datastore/carProbe",
    "job_control_url"        -> "https://automotive.internetofthings.ibmcloud.com/driverinsights/jobcontrol/job",
    "trip_summary_list_url"  -> "https://automotive.internetofthings.ibmcloud.com/driverinsights/drbresult/tripSummaryList",
    "trip_analytics_url"     -> "https://automotive.internetofthings.ibmcloud.com/driverinsights/drbresult/trip",
    "tenant_id" -> "SOME_VALUE",       // Get this value from Driver Behavior Service Dashboard
    "username" -> "SOME_VALUE",        // Get this value from Driver Behavior Service Dashboard
    "password" -> "SOME_VALUE",        // Get this value from Driver Behavior Service Dashboard
    "mo_id" -> "udacity-ch3",          // Dummy device_id for this testing
    "trip_id" -> "udacity-ch3-north"   // Dummy trip_id for this testing 
)
val driverInsightsServiceInfo = sc.broadcast(driverInsightsServiceProps)

#### Now we have all the required attributes in right format, send the data

In [None]:
// utility function to covert CarProbData object to json string
def CarProbData2JsonString(modelObj: CarProbData): String = {
    implicit val modelFormat = Json.writes[CarProbData]
    Json.toJson[CarProbData](modelObj).toString
}

//Make driver insights card prob datastore alls here
val carProbDataResult_rdd = carProbData_rdd.mapPartitions{iter =>
    val serviceProps = driverInsightsServiceInfo.value
    iter.map { carProbData =>
        val data2post = CarProbData2JsonString(carProbData)
        val response = Http(serviceProps.get("carprobe_datastore_url").get).
            param("tenant_id", serviceProps.get("tenant_id").get).
            auth( serviceProps.get("username").get, serviceProps.get("password").get).
            option(HttpOptions.readTimeout(30000)).
            header("accept", "application/json").
            header("content-type", "application/json").
            postData(data2post).asString
        (response.code, response.body)
    }
}
carProbDataResult_rdd.cache
val results = carProbDataResult_rdd.map( x => (x._1, 1)).reduceByKey( _ + _).collect
results.foreach( x => println(s"code: ${x._1}\tcount: ${x._2}"))

## Get Driver Behavior for Above Trip
- Send a Driver Insights Job Request to Analyse the above Car probe Data
- Check  Job completion status
- Get the Analyzed Trip Information

### Send a Job Request to Analyze the Car Prob Data
- Refer API documentation at https://developer.ibm.com/api/view/id-261:title-IBM_Watson_IoT_Driver_Behavior#SendJobRequest

In [23]:
val from_date = "2016-11-17"
val to_date   = "2016-11-17"
val sendJobRequest2Post = s"""{"from":"$from_date", "to":"$to_date"}"""
println(s"postData: ${sendJobRequest2Post}")

val response = Http(driverInsightsServiceProps.get("job_control_url").get).
    param("tenant_id", driverInsightsServiceProps.get("tenant_id").get).
    auth(driverInsightsServiceProps.get("username").get, driverInsightsServiceProps.get("password").get).
    option(HttpOptions.readTimeout(30000)).
    header("accept", "application/json").
    header("content-type", "application/json").
    postData(sendJobRequest2Post).asString

println(s"response.code: ${response.code}")
println(response.body)

postData: {"from":"2016-11-17", "to":"2016-11-17"}
response.code: 200
{
  "job_id" : "247b07b4-affb-45cd-b041-fa37443f7147"
}


### Check  Job Completion Status 
- Refer API documentation at https://developer.ibm.com/api/view/id-261:title-IBM_Watson_IoT_Driver_Behavior#getjobinformation

In [26]:
val job_id = "247b07b4-affb-45cd-b041-fa37443f7147"
val response = Http(driverInsightsServiceProps.get("job_control_url").get).
    param("tenant_id", driverInsightsServiceProps.get("tenant_id").get).
    auth(driverInsightsServiceProps.get("username").get, driverInsightsServiceProps.get("password").get).
    param("job_id", job_id).
    option(HttpOptions.readTimeout(30000)).
    header("accept", "application/json").
    header("content-type", "application/json").asString

println(s"response.code: ${response.code}")
println(response.body)

response.code: 200
{
  "from" : "2016-11-17",
  "to" : "2016-11-17",
  "tenant_id" : "c97244f8-e5b1-4982-94c4-0b4bd5c5b711",
  "job_id" : "247b07b4-affb-45cd-b041-fa37443f7147",
  "job_submit_time" : 1480987489902,
  "job_status" : "SUCCEEDED"
}


### Get the Analyzed Trip Information
- Refer API documentation at https://developer.ibm.com/api/view/id-261:title-IBM_Watson_IoT_Driver_Behavior#Getanalyzedtripinformation
- First get analyzed trip summary list which includes the services generated trip_uuid
- get the Trip Information with Driver Behavior using trip_uuid

In [33]:
val job_id = "247b07b4-affb-45cd-b041-fa37443f7147"
val response = Http(driverInsightsServiceProps.get("trip_summary_list_url").get).
    param("tenant_id", driverInsightsServiceProps.get("tenant_id").get).
    param("job_id", job_id).
    auth(driverInsightsServiceProps.get("username").get, driverInsightsServiceProps.get("password").get).
    option(HttpOptions.readTimeout(30000)).
    header("accept", "application/json").
    header("content-type", "application/json").asString
                    
println(s"response.code: ${response.code}")
println(response.body)

response.code: 200
[ {
  "id" : {
    "trip_uuid" : "7fbdc6af-179b-4346-9f2d-65d382f30429",
    "tenant_id" : "c97244f8-e5b1-4982-94c4-0b4bd5c5b711"
  },
  "end_time" : 1479424149000,
  "generated_time" : 1480987518794,
  "job_id" : "247b07b4-affb-45cd-b041-fa37443f7147",
  "mo_id" : "udacity-ch3",
  "driver_id" : "",
  "start_time" : 1479421383000,
  "trip_id" : "udacity-ch3-north"
} ]


In [34]:
val trip_uuid = "7fbdc6af-179b-4346-9f2d-65d382f30429"
val response = Http(driverInsightsServiceProps.get("trip_analytics_url").get).
    param("tenant_id", driverInsightsServiceInfo.value.get("tenant_id").get).
    param("trip_uuid", trip_uuid).
    auth(driverInsightsServiceProps.get("username").get, driverInsightsServiceProps.get("password").get).
    option(HttpOptions.readTimeout(300000)).
    header("accept", "application/json").
    header("content-type", "application/json").asString
println(s"response.code: ${response.code}")
println(response.body)

response.code: 200
{
  "id" : {
    "trip_uuid" : "7fbdc6af-179b-4346-9f2d-65d382f30429",
    "tenant_id" : "c97244f8-e5b1-4982-94c4-0b4bd5c5b711"
  },
  "end_altitude" : 0.0,
  "end_latitude" : 37.5503832582108,
  "end_longitude" : -122.3120112947789,
  "end_time" : 1479424149000,
  "generated_time" : 1480987518794,
  "mo_id" : "udacity-ch3",
  "driver_id" : "",
  "start_altitude" : 0.0,
  "start_latitude" : 37.4019766,
  "start_longitude" : -122.114744,
  "start_time" : 1479421383000,
  "trip_id" : "udacity-ch3-north",
  "ctx_sub_trips" : [ {
    "id" : {
      "sub_trip_id" : "d4f784c9-21b1-46cd-8560-e218b1e3853b",
      "tenant_id" : "c97244f8-e5b1-4982-94c4-0b4bd5c5b711"
    },
    "length" : 62.84220840634381,
    "avg_speed" : 31.421104203171904,
    "end_latitude" : 37.4664285,
    "end_longitude" : -122.2078622,
    "end_time" : 1479422617000,
    "mo_id" : "udacity-ch3",
    "driver_id" : "",
    "start_latitude" : 37.46667346515379,
    "start_longitude" : -122.2085025290575