## Objective

Deploy a DataRobot Codegen Model to Postgres via PL/JAVA

## Why?

* Postgres is a popular database (and personal favorite).  It is the second most beloved database (after Redis) and is the 3rd most commonly used database based on a 2018 [survey](https://insights.stackoverflow.com/survey/2018/#technology-most-loved-dreaded-and-wanted-databases). 

* [Greemplum](https://greenplum.org/) is an open-source massively parrallel data platform for analytics, machine learning and AI based on Postgres.  Greenplum’s massively parallel processing architecture provides automatic parallelization of all data and queries in a scale-out, shared nothing architecture.  Pivotal Greenplum is part of a larger offering from Pivotal.io.  
* Pivotal have a lot of [logos](https://pivotal.io/customers) that overlap with DataRobot.  

## PL/JAVA

[PL/Java](https://github.com/tada/pljava) is a free open-source extension for PostgreSQL that allows stored procedures, triggers, and functions to be written in the Java language and executed in the backend. 

## Considerations

### How to interact with model?  

* Batch scoring
* Online scoring via database calls
* Ad Hoc Analysis 

### My requirements 

My main requirement was the make it feel very sql-ish, so I wrote a function that would be parameterized by the features of the model, with the end goal of running something like 

```
select score(feature1, feature2, ...)
from data_table;
```

Secondary requirement was a function to do batch scoring.  This was accomplish by creating a funciton that took as argument a string, which references a table containing the a dataset to score.  Results are returned accordingly.  For instances of more than one available model, parameterize with model id (corresponding to scoring jar) and deployment detail for mlops agents.  

Example of call
```
create table for_scoring as 
select * from data_table limit 100;

select * from batchScore('for_scoring');
```

MLOps agents can easily be introduced into both scoring methods as is, but as one moves over to a distributed system like greenplum, mlops listeners will probably need to be running in each node of cluster.  Furthermore, I'm not sure that batch scoring routine will be effective on a distributed system (more research is required).

## Performance

Not rigorously tested, but seems to be on par with straight scoring via a java app.

## Getting Started

Downloaded codegen model from DataRobot and published to local maven repository and write java app - comeback to this later. 

## Switching over to Postgres

To be able to use our functions in Postgres, we should package it up and expose it to Postgres.  

This was all accomplished by rewritting the code above in Java, setting up the pom.xml and packaging the app with maven.

Everything that follows is based on the Lending Club dataset.



## Adding Jars to PLJAVA Classpath

via `SET` in postgres, I add all necessary jars.  This is not exactly how it is intended to be used, but I ran into issues with the PLJAVA classloader and found the solution below works as desired.  Alternatively, classpath can be defined in the postgres config.  

## Creating SQL Functions

For every function we create in Java, we annotate said function with `@Function` provided by PLJava.  This annotation tells the PLJAVA that the functions are meant to be used in postgres, PLJAVA will create a PLJAVA.drr file that contains the syntax needed to deploy functions in postgres.  

## `modelId` function

The `modelId` function is just a means to return the model id.  Other functions would be introduced based on requirements.

## `score` function

Given the requirement of a sql-ish call to the model, we'll work towards a function that would execute as follows (along with our `modelId()` function)., 

```
select uid, score(feature1, feature2, ...), modelId()
from dataset
```

The syntax below was created by PLJAVA when I packaged up the scoring app.  A simply copy and paste of the syntax and we are ready to go.  

## `batchScore` function

Nothing much to this in postgres function, but the implementation in java makes it feel very rigid.  The `batchScore` function could take as an argument a string reference a table, which may be a nightly snapshot of the scoring dataset. The function queries data and process the resulting result set and returns a set of complex types.  Complex types is use to describe what may be interpreted as a tuple of differing types.

For example my `batchScore` function returns set of `scoredRecord`s.  Each scored record has an `int` ID, a `double` score and a long `time`.

## What does it look like?


In [19]:
package com.datarobot.java;

import com.datarobot.prediction.IClassificationPredictor;
import com.datarobot.prediction.Predictors;

import org.postgresql.pljava.annotation.Function;
import org.postgresql.pljava.ResultSetProvider;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;

import java.util.Iterator;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;

public class BatchScoring implements ResultSetProvider
{

  private String jdbcUrl = "jdbc:postgresql://localhost:5432/postgres";
  private String username = "timothy.whittaker";
  private String password = "postgres";
  private String query;
  private Statement stmt = DriverManager.getConnection(jdbcUrl, username, password).createStatement();
  
  ResultSet rs;
  ResultSetMetaData md;
  int columnCount;

  // LinkedHashMap<String, Object> row

  static IClassificationPredictor predictor = Predictors.getPredictor("5d5da72a3fa59e2850f824fc");

  public BatchScoring(String table) throws SQLException
  {
    query = String.format("SELECT * from %s", table);
    rs = stmt.executeQuery(query);
    md = rs.getMetaData();
    columnCount = md.getColumnCount();
  }

 public boolean assignRowValues(ResultSet receiver, int currentRow) throws SQLException {
   if(!rs.next())
    return false;

    Map<String, Object> row = new HashMap<>();
    for (int i = 1; i <= columnCount; ++i) {
         row.put(md.getColumnName(i), rs.getObject(i));
    }
    Map<String, Double> class_probabilities = predictor.score(row);

   receiver.updateInt(1, rs.getInt(1));
   receiver.updateDouble(2, class_probabilities.get("1"));
   return true;
 }

  public void close() throws SQLException
  {
   rs.close();
  }
  
  @Function(type="scoredRecord")
  public static ResultSetProvider batchScore(String table)
  throws SQLException
  {
    return new BatchScoring(table);
  }

}



In [20]:
import com.datarobot.prediction.IClassificationPredictor;
import com.datarobot.prediction.Predictors;
import java.util.HashMap;
import org.postgresql.pljava.annotation.Function;

object Scoring { 
    
    val predictor: IClassificationPredictor = Predictors.getPredictor("5d5da72a3fa59e2850f824fc")
    
    @Function
    def modelId(): String = { 
        predictor.getModelId
    }
    
    @Function
    def score(
      application_id: Int,
      loan_amnt: Int,
      funded_amnt: Int,
      term: String,
      int_rate: String,
      int_rate_1: String,
      installment: Double,
      grade: String,
      sub_grade: String,
      emp_title: String,
      emp_length: String,
      home_ownership: String,
      annual_inc: String,
      verification_status: String,
      pymnt_plan: String,
      url: String,
      description: String,
      purpose: String,
      title: String,
      zip_code: String,
      addr_state: String,
      dti: Double,
      delinq_2yrs: String,
      earliest_cr_line: String ,
      inq_last_6mths: String ,
      mths_since_last_delinq: String ,
      mths_since_last_record: String ,
      open_acc: String ,
      pub_rec: String ,
      revol_bal: Int ,
      revol_util: String,
      total_acc: String ,
      initial_list_status: String ,
      mths_since_last_major_derog: String ,
      policy_code: Int,
      is_bad: Int
    ): Double = {
       
       val row = new HashMap[String, Any]()

       row.put("application_id",application_id)
       row.put("loan_amnt",loan_amnt)
       row.put("funded_amnt",funded_amnt)
       row.put("term",term)
       row.put("int_rate",int_rate)
       row.put("int_rate_1",int_rate_1)
       row.put("installment",installment)
       row.put("grade",grade)
       row.put("sub_grade",sub_grade)
       row.put("emp_title",emp_title)
       row.put("emp_length",emp_length)
       row.put("home_ownership",home_ownership)
       row.put("annual_inc",annual_inc)
       row.put("verification_status",verification_status)
       row.put("pymnt_plan",pymnt_plan)
       row.put("url",url)
       row.put("desc",description)
       row.put("purpose",purpose)
       row.put("title",title)
       row.put("zip_code",zip_code)
       row.put("addr_state",addr_state)
       row.put("dti",dti)
       row.put("delinq_2yrs",delinq_2yrs)
       row.put("earliest_cr_line",earliest_cr_line)
       row.put("inq_last_6mths",inq_last_6mths)
       row.put("mths_since_last_delinq",mths_since_last_delinq)
       row.put("mths_since_last_record",mths_since_last_record)
       row.put("open_acc",open_acc)
       row.put("pub_rec",pub_rec)
       row.put("revol_bal",revol_bal)
       row.put("revol_util",revol_util)
       row.put("total_acc",total_acc)
       row.put("initial_list_status",initial_list_status)
       row.put("mths_since_last_major_derog",mths_since_last_major_derog)
       row.put("policy_code", Integer.toString(policy_code))

       val class_probabilities = predictor.score(row);
    
       class_probabilities.get("1")
        
    }
    
}


In [21]:
val data = scala.io.Source.fromFile("/Users/timothy.whittaker/Desktop/sbt-projects/dr-gp-scoring/data/10K_Lending_Club_Loans.txt").getLines
val columns = data.next.split("\t")

In [22]:
for( i <- 0 to 10 ) { 
    val values = data.next.split("\t")
    val application_id:java.lang.Integer = values(0).toInt
    val loan_amnt:java.lang.Integer = values(1).toInt
    val funded_amnt:java.lang.Integer = values(2).toInt
    val term:String = values(3)
    val int_rate:String = values(4)
    val int_rate_1:String = values(5)
    val installment:java.lang.Double = values(6).toDouble
    val grade:String = values(7)
    val sub_grade:String = values(8)
    val emp_title:String = values(9)
    val emp_length:String = values(10)
    val home_ownership:String = values(11)
    val annual_inc:String = values(12)
    val verification_status:String = values(13)
    val pymnt_plan:String = values(14)
    val url:String = values(15)
    val description:String = values(16)
    val purpose:String = values(17)
    val title:String = values(18)
    val zip_code:String = values(19)
    val addr_state:String = values(20)
    val dti:java.lang.Double = values(21).toDouble
    val delinq_2yrs:String = values(22)
    val earliest_cr_line :String = values(23)
    val inq_last_6mths :String = values(24)
    val mths_since_last_delinq :String = values(25)
    val mths_since_last_record :String = values(26)
    val open_acc :String = values(27)
    val pub_rec :String = values(28)
    val revol_bal :java.lang.Integer = values(29).toInt
    val revol_util:String = values(30)
    val total_acc :String = values(31)
    val initial_list_status :String = values(32)
    val mths_since_last_major_derog :String = values(33)
    val policy_code:java.lang.Integer = values(34).toInt
    val is_bad :java.lang.Integer = values(35).toInt

    val score = Scoring.score(
        application_id,
        loan_amnt,
        funded_amnt,
        term,
        int_rate,
        int_rate_1,
        installment,
        grade,
        sub_grade,
        emp_title,
        emp_length,
        home_ownership,
        annual_inc,
        verification_status,
        pymnt_plan,
        url,
        description,
        purpose,
        title,
        zip_code,
        addr_state,
        dti,
        delinq_2yrs,
        earliest_cr_line ,
        inq_last_6mths ,
        mths_since_last_delinq ,
        mths_since_last_record ,
        open_acc ,
        pub_rec ,
        revol_bal ,
        revol_util,
        total_acc ,
        initial_list_status ,
        mths_since_last_major_derog ,
        policy_code,
        is_bad )

    println(f"application id ${application_id} has a default score of ${score}%2.2f based on model ${Scoring.modelId}")
}