# Client Retention Demo

## Setup Environment
**Install the Rocket JDBC driver**

We need to install the Rocket JDBC driver in order to access data from DB2 and VSAM through MDS.
>Ideally, for scalability, this file should be hosted on a web service

In [None]:
// Install the Rocket Driver
%addjar file:///home/jovyan/work/dv-jdbc-3.1.22510.jar

**Install MongoDB and Logging Drivers**

We need to install the MongoDB and Logging Drivers.

In [None]:
// Install MongoDB and Log Jars
%addjar http://central.maven.org/maven2/com/stratio/datasource/spark-mongodb_2.10/0.11.0/spark-mongodb_2.10-0.11.0.jar
%addjar http://central.maven.org/maven2/org/mongodb/casbah-commons_2.10/2.8.0/casbah-commons_2.10-2.8.0.jar
%addjar http://central.maven.org/maven2/org/mongodb/casbah-core_2.10/2.8.0-RC0/casbah-core_2.10-2.8.0-RC0.jar
%addjar http://central.maven.org/maven2/org/mongodb/casbah-query_2.10/2.8.0/casbah-query_2.10-2.8.0.jar
%addjar http://central.maven.org/maven2/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
%addjar http://central.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.4.1/log4j-api-2.4.1.jar
%addjar http://central.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.4.1/log4j-core-2.4.1.jar

**Reload Java Classpath**

Now that the new jars have been downloaded, we need to reload the classpath.

In [None]:
@transient val systemClassLoader = ClassLoader.getSystemClassLoader().asInstanceOf[java.net.URLClassLoader]
@transient val m = classOf[java.net.URLClassLoader].getDeclaredMethod("addURL", classOf[java.net.URL])
m.setAccessible(true)

def load_jar(myUrl: java.net.URL) = {
    m.invoke(systemClassLoader, myUrl)
}

kernel.interpreter.classLoader.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(load_jar)

## Set Connection Variables
In this section, we need to specify all of the connection variables that will be used later in the Notebook.  Some of these variables may have been written as system environment variables.  If they were, those variables can be left as is.  If not, we will need to replace the 'sys.env([env_name])' with a string.

In [None]:
// Environment Specific Settings
val jdbcUser = sys.env("JDBC_USER")
val jdbcPass = sys.env("JDBC_PASS")
val jdbcHost = sys.env("JDBC_HOST")

val mongoUser = sys.env("MONGO_USER")
val mongoPass = sys.env("MONGO_PASS")
val mongoHost = sys.env("MONGO_HOST")
val mongoDB = "demo"
val mongoColl = "client_features"
print(jdbcHost)

## Import Apache Spark and MongoDB Classes
In this section we need to import all of the necessary classes for this notebook.

In [None]:
import sqlContext.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import com.mongodb.casbah.Imports._

## Access MDS Data
In the following section we will access DB2 and VSAM data stored on the z/OS host using the Rocket jdbc MDS driver.

**Create an SQL Context**

We first need to create an SQL Context to be used in accessing the data.

In [None]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

### Access Client Income Data Stored in a VSAM File
In the following section, we will load client inforamtion stored in a VSAM Data Set as a Spark DataFrame.

In [None]:
var url = "jdbc:rs:dv://" + jdbcHost + ":1200;DSN=AZKS; setLevel=-1; DBMD=GRAPHIC; DBTY=DVS;" 
url += " HOST=" + jdbcHost + "; LGID=ENC; LoginTimeOut=10; PLAN=SDBC1010; PORT=1200;" 
url += " PWD=" + jdbcPass + "; SUBSYS=NONE; UID=" + jdbcUser + "; enableCancel=True; queryTimeout=30;"

var dbtable = "(SELECT" 
dbtable += " *"
dbtable += " FROM AZKSQL.CLIENT_INFO_VSAMKSDS)as customerRows"

val clientIncome_df = sqlContext.read.format("jdbc").options(Map(
        "driver" -> "com.rs.jdbc.dv.DvDriver",
        "url" -> url,
        "dbtable" -> dbtable)).load() 

### Access Client Transaction Data Stored in a DB2 Table
In the following section, we will load client transaction data stored in a DB2 table as a Spark DataFrame.

In [None]:
var url = "jdbc:rs:dv://" + jdbcHost + ":1200;DSN=AZKS; setLevel=-1; DBMD=GRAPHIC; DBTY=DVS;" 
url += " HOST=" + jdbcHost + "; LGID=ENC; LoginTimeOut=10; PLAN=SDBC1010; PORT=1200;"
url += " PWD=" + jdbcPass + "; SUBSYS=NONE; UID=" + jdbcUser + "; enableCancel=True; queryTimeout=30;"

var dbtable = "(SELECT"
dbtable += " *" 
dbtable += " FROM AZKSQL.SPPAYTB)as customerRows"

val clientTrans_df = sqlContext.read.format("jdbc").options(Map(
        "driver" -> "com.rs.jdbc.dv.DvDriver",
        "url" -> url,
        "dbtable" -> dbtable)).load()  

## Compute Aggregate Statistics Using Apache Spark
In this section, we will compute a few aggregate statistics leveraging the Spark DataFrame API.

In [None]:
val calcTrans_df = clientTrans_df.groupBy("CONT_ID").agg(
    sum("ACAUREQ_AUREQ_TX_DT_TTLAMT").cast("float").as("total_txn_amount"),
    (count("ACAUREQ_AUREQ_TX_DT_TTLAMT")/365).cast("float").as("avg_daily_txns"),
    count("ACAUREQ_AUREQ_TX_DT_TTLAMT").cast("int").as("total_txns"),
    (sum("ACAUREQ_AUREQ_TX_DT_TTLAMT")/count("ACAUREQ_AUREQ_TX_DT_TTLAMT")).cast("float").as("avg_txn_amount")
)

## Join the Computed Aggregate Statistics with the Client Information
In this section, we will join the newly computed aggregate statistics, built off of the client transaction data, with the client information.

In [None]:
val client_df = clientIncome_df.select(
    $"CONT_ID".cast("int").as("customer_id"),
    $"GENDER".cast("int").as("gender"),
    $"AGE_YEARS".cast("float").as("age_years"),
    $"HIGHEST_EDU".cast("int").as("highest_edu"),
    $"ANNUAL_INVEST".cast("float").as("annual_investment_rev"),
    $"ANNUAL_INCOME".cast("float").as("annual_income"),
    $"ACTIVITY_LEVEL".cast("int").as("activity_level"),
    $"CHURN".cast("int").as("churn")
    ).join(calcTrans_df.select(
    $"CONT_ID", 
    $"total_txn_amount",
    $"avg_daily_txns",
    $"total_txns",
    $"avg_txn_amount"
    ), $"CONT_ID" === $"customer_id", "outer")

## Write to MongoDB
In this section we will collect the contents of our DataFrame and write it out to a MongoDB collection.


In [None]:
// connect to MongoDB and drop the mongo collection specified above
val uri = MongoClientURI("mongodb://" + mongoUser + ":" + mongoPass + "@" + mongoHost + "/?authMechanism=SCRAM-SHA-1")
val mongoClient =  MongoClient(uri)

**Setup a Function that will Load a DataFrame Contents into a MongoDB Collection**

In this section we will setup a function that will write the contents of a DataFrame into a MongoDB collection.

In [None]:
def dfToMongo ( df : DataFrame, coll : MongoCollection ) {
    // collect the DataFrame's schema in List form
    val schema = df.schema.map(f => s"${f.name}")

    // iterate through a Mapped version of the DataFrame using the value 'schema'
    // to create the map, then write the value to the MongoDB collection
    for ( row <- df.map(_.getValuesMap[Any]( schema )).collect() ) {
        val builder = MongoDBObject.newBuilder
        for ( field <- schema ){
            if ( row(field) != null ) {
                if ( field != "CONT_ID" ) {
                    builder += field -> row(field)
                }
            } else if ( field == "annual_investment_rev" ) {
                builder += field -> 0.toFloat
            }
        } 
        coll.insert(builder.result)
    }
}

**Drop MongoDB Collection to Prepare for New Data**

Because this is a demo environment, we need to ensure the MongoDB collection is empty, so we need to first drop the collection.

In [None]:
mongoClient(mongoDB)(mongoColl).drop()
mongoClient(mongoDB)(mongoColl).count()

**Load the Client DataFrame into MongoDB**

In this section we will write our DataFrame contents into a MongoDB collection.

In [None]:
dfToMongo(client_df, mongoClient(mongoDB)(mongoColl))
mongoClient(mongoDB)(mongoColl).count()