In [None]:
!pip install pytest
!pip install pytest-notebook

In [None]:
!wget https://raw.githubusercontent.com/akaihola/ipython_pytest/master/ipython_pytest.py -O ipython_pytest.py

In [None]:
%load_ext ipython_pytest

In [None]:
%%pytest

from pyspark.sql import SparkSession
from splicemachine.spark import PySpliceContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from splicemachine.mlflow_support import *
import pytest
from splicemachine.mlflow_support import get_user

schema = get_user()
spark = SparkSession.builder.getOrCreate()

splice = PySpliceContext(spark)

scoreSchema = [StructField("Score1", DoubleType(), True),
         StructField("Score2", DoubleType(), True),
         StructField("Result", IntegerType(), True)]

examSchema = StructType(scoreSchema)

df = spark.read.schema(examSchema).load("s3a://zach-splice-bucket/src_main_resources_scores.csv", format="csv").withColumnRenamed('Result','label')

df.show()

assembler = VectorAssembler(inputCols=["Score1", "Score2"], outputCol="features")

testingSchema = [StructField("Score1", DoubleType(), True),
         StructField("Score2", DoubleType(), True)]
testValues = [(40.0,40.0),(20.0,80.0),(20.0,30.0),(90.0,80.0),(10.0,20.0),
               (10.0,50.0),(50.0,30.0),(40.0,75.0),(90.0,30.0),(100.0,100.0)]
testingDf = spark.createDataFrame(testValues, StructType(testingSchema))

mlflow.register_splice_context(splice)


def test_logistic_regression():
    
    print('========== Testing Logistic Regression ==========')
    
    with mlflow.start_run(run_name='Logistic Regression'):
    
        model = LogisticRegression()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableLogisticRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableLogisticRegression", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)

        LogisticRegressionDf = model.transform(testingDf)

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableLogisticRegression (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")

        splice.execute(f"select * from {schema}.ScoresTableLogisticRegression")

        for i,row in enumerate(LogisticRegressionDf.collect()):
            dfp = [row[5],row[4][0],row[4][1]]
            table_pred = list(splice.df(f'SELECT case when prediction=\'C0\' then 0 else 1 end PREDICTION,"C0","C1" FROM SCORESTABLELOGISTICREGRESSION WHERE MOMENT_ID = {i}').collect()[0])
            assert dfp == table_pred, f'Problem. {dfp} from model, {table_pred} from table'

def test_decision_tree():
    
    print('========== Testing Decision Tree Classification ==========')
    
    with mlflow.start_run(run_name='Decision Tree Classification'):
        
        model = DecisionTreeClassifier()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableDecisionTree")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableDecisionTree", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)

        DecisionTreeDf = model.transform(testingDf)


        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableDecisionTree (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")

        splice.execute(f"select * from {schema}.ScoresTableDecisionTree")

        for i,row in enumerate(DecisionTreeDf.collect()):
            dfp = [row[5],row[4][0],row[4][1]]
            table_pred = list(splice.df(f'SELECT case when prediction=\'C0\' then 0 else 1 end PREDICTION,"C0","C1" FROM {schema}.ScoresTableDecisionTree WHERE MOMENT_ID = {i}').collect()[0])
            
            for tab, d in zip(table_pred, dfp):
                l = min(len(str(tab)), len(str(d)), 15) - 2
                assert round(d,l) == round(tab,l), f'Problem. {d} from model, {tab} from table'
        
def test_random_forest():
    
    print('========== Testing Random Forest ==========')
    
    with mlflow.start_run(run_name='Random Forest'):
    
        model = DecisionTreeClassifier()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableRandomForest")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableRandomForest", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)    
            
        RandomForestDf = model.transform(testingDf)


        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableRandomForest (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        splice.execute(f"select * from {schema}.ScoresTableRandomForest")

        for i,row in enumerate(RandomForestDf.collect()):
            dfp = [row[5],row[4][0],row[4][1]]
            table_pred = list(splice.df(f'SELECT case when prediction=\'C0\' then 0 else 1 end PREDICTION,"C0","C1" FROM {schema}.ScoresTableRandomForest WHERE MOMENT_ID = {i}').collect()[0])
            for tab, d in zip(table_pred, dfp):
                l = min(len(str(tab)), len(str(d)), 15) - 2
                assert round(d,l) == round(tab,l), f'Problem. {d} from model, {tab} from table'

def test_gbt():  
    
    print('========== Testing GBT ==========')
    
    with mlflow.start_run(run_name='GBT'):

        model = GBTClassifier()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableGBT")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableGBT", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)
        
        GBTDf = model.transform(testingDf)

        

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableGBT (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        for i,row in enumerate(GBTDf.collect()):
            dfp = [row[5],row[4][0],row[4][1]]
            table_pred = list(splice.df(f'SELECT case when prediction=\'C0\' then 0 else 1 end PREDICTION,"C0","C1" FROM {schema}.ScoresTableGBT WHERE MOMENT_ID = {i}').collect()[0])
            for tab, d in zip(table_pred, dfp):
                l = min(len(str(tab)), len(str(d)), 15) - 2
                assert round(d,l) == round(tab,l), f'Problem. {d} from model, {tab} from table'

def test_lsvc():
    
    print('========== Testing LSVC ==========')
    
    with mlflow.start_run(run_name='Linear SVC'):
        model = LinearSVC()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableLSVC")


        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableLSVC", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)
        
        LSVCDf = model.transform(testingDf)

        LSVCDf.show()

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableLSVC (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        LSVCDf.show()

        for i in range (len(testValues)):
            assert int(LSVCDf.collect()[i][-1]) == int(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLELSVC WHERE MOMENT_ID = {i}").collect()[0][0])

def test_naive_bayes():
    
    print('========== Testing Naive Bayes ==========')
    
    with mlflow.start_run(run_name='Naive Bayes'):
        model = NaiveBayes()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')
    
        splice.dropTableIfExists(f"{schema}.ScoresTableNaiveBayes")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableNaiveBayes", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'}, classes=["C0","C1"])
        mlflow.watch_job(jid)

        NaiveBayesDf = model.transform(testingDf)

        NaiveBayesDf.show()

        

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableNaiveBayes (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        NaiveBayesDf.show()

        for i in range (len(testValues)):
            assert int(NaiveBayesDf.collect()[i][5]) == int(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLENAIVEBAYES WHERE MOMENT_ID = {i}").collect()[0][0])

def test_decision_tree_regression():
    
    print('========== Testing Decision Tree Regression ==========')
    with mlflow.start_run(run_name='Decision Tree Regression'):
        model = DecisionTreeRegressor()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableDecisionTreeRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableDecisionTreeRegression", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'})
        mlflow.watch_job(jid)

        DecisionTreeRegressionDf = model.transform(testingDf)

        DecisionTreeRegressionDf.show()

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableDecisionTreeRegression (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        DecisionTreeRegressionDf.show()

        for i in range (len(testValues)):
            assert float(DecisionTreeRegressionDf.collect()[i][3]) == float(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLEDECISIONTREEREGRESSION WHERE MOMENT_ID = {i}").collect()[0][0])

def test_random_forest_regression():
    
    print('========== Testing Random Forest Regression ==========')
    with mlflow.start_run(run_name='Random Forest Regression'):
        model = RandomForestRegressor()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')
    
        splice.dropTableIfExists(f"{schema}.ScoresTableRandomForestRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableRandomForestRegression", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'})
        mlflow.watch_job(jid)

        RandomForestRegressionDf = model.transform(testingDf)

        RandomForestRegressionDf.show()

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableRandomForestRegression (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        RandomForestRegressionDf.show()

        for i in range (len(testValues)):
            assert float(RandomForestRegressionDf.collect()[i][3]) == float(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLERANDOMFORESTREGRESSION WHERE MOMENT_ID = {i}").collect()[0][0])
        
def test_gradient_boosted_tree_regression():
    
    print('========== Testing GBT Regression ==========')
    with mlflow.start_run(run_name='GBT Regression'):
        model = GBTRegressor()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(df)
        mlflow.log_model(model,'model')
    
        splice.dropTableIfExists(f"{schema}.ScoresTableGBTRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableGBTRegression", run_id=mlflow.current_run_id(), df=df.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT','Score1':'DOUBLE','Score2':'DOUBLE'})
        mlflow.watch_job(jid)

        GBTRegressionDf = model.transform(testingDf)

        GBTRegressionDf.show()

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableGBTRegression (MOMENT_ID,Score1,Score2) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + ")")
            

        GBTRegressionDf.show()

        for i in range (len(testValues)):
            assert float(GBTRegressionDf.collect()[i][3]) == float(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLEGBTREGRESSION WHERE MOMENT_ID = {i}").collect()[0][0])

In [None]:
%%pytest

from pyspark.sql import SparkSession
from splicemachine.spark import PySpliceContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from splicemachine.mlflow_support import *
import pytest
import pyspark.sql.functions as F
from splicemachine.mlflow_support import get_user

schema = get_user()

spark = SparkSession.builder.getOrCreate()

splice = PySpliceContext(spark)

testValues = [(391,314,102,2,2.0,2.5,8.24,0,0.64),
(392,318,106,3,2.0,3.0,8.65,0,0.71),
(393,326,112,4,4.0,3.5,9.12,1,0.84),
(394,317,104,2,3.0,3.0,8.76,0,0.77),
(395,329,111,4,4.5,4.0,9.23,1,0.89),
(396,324,110,3,3.5,3.5,9.04,1,0.82),
(397,325,107,3,3.0,3.5,9.11,1,0.84),
(398,330,116,4,5.0,4.5,9.45,1,0.91),
(399,312,103,3,3.5,4.0,8.78,0,0.67),
(400,333,117,4,5.0,4.0,9.66,1,0.95)]

admitSchema = [StructField("Serial Number", IntegerType(), True),
        StructField("GRE", IntegerType(), True),
         StructField("TOEFL", IntegerType(), True),
        StructField("Rating", IntegerType(), True),
         StructField("SOP", DoubleType(), True),
        StructField("LOR", DoubleType(), True),
         StructField("CGPA", DoubleType(), True),
        StructField("Research", IntegerType(), True),
         StructField("label", DoubleType(), True)]

collegeSchema = StructType(admitSchema)

assembler = VectorAssembler(inputCols=["GRE", "TOEFL", "Rating", "SOP", "LOR", "CGPA", "Research"], outputCol="features")

testingDf = spark.createDataFrame(testValues, StructType(collegeSchema))

mlflow.register_splice_context(splice)

def test_linear_regression():
    
    print('========== Testing Linear Regression ==========')
    with mlflow.start_run(run_name='Linear Regression'):
        model = LinearRegression()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(testingDf)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableLinearRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableLinearRegression", run_id=mlflow.current_run_id(), df=testingDf.drop("label").drop("Serial Number"), create_model_table=True, primary_key={'MOMENT_ID': 'INT'})
        mlflow.watch_job(jid)
        
        LinearRegressionDf = model.transform(testingDf)

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableLinearRegression (MOMENT_ID,GRE,TOEFL,Rating,SOP,LOR,CGPA,Research) VALUES (" + str(moment_id) + "," + str(val[1]) + "," + str(val[2]) + "," + str(val[3]) + "," + str(val[4]) + "," + str(val[5]) + "," + str(val[6]) + "," + str(val[7]) + ")")
            
        for i in range (len(testValues)):
            d = float(LinearRegressionDf.collect()[i][10])
            tab = float(splice.df(f"SELECT PREDICTION FROM {schema}.SCORESTABLELINEARREGRESSION WHERE MOMENT_ID = {i}").collect()[0][0])
            l = min(len(str(tab)), len(str(d)), 15) - 2
            assert round(d,l) == round(tab,l), f'Problem. {round(d,l)} from model, {round(tab,l)} from table'
        
def test_generalized_linear_regression():
   
    print('========== Testing Generalized Linear Regression ==========')
    with mlflow.start_run(run_name='Generalized Linear Regression'):
        model = GeneralizedLinearRegression()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(testingDf)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableGeneralizedLinearRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableGeneralizedLinearRegression", run_id=mlflow.current_run_id(), df=testingDf.drop("label").drop("Serial Number"), create_model_table=True, primary_key={'MOMENT_ID': 'INT'})
        mlflow.watch_job(jid)
        
        GeneralizedLinearRegressionDf = model.transform(testingDf)

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableGeneralizedLinearRegression (MOMENT_ID,GRE,TOEFL,Rating,SOP,LOR,CGPA,Research) VALUES (" + str(moment_id) + "," + str(val[1]) + "," + str(val[2]) + "," + str(val[3]) + "," + str(val[4]) + "," + str(val[5]) + "," + str(val[6]) + "," + str(val[7]) + ")")

        for i in range (len(testValues)):
            d = float(GeneralizedLinearRegressionDf.collect()[i][10])
            tab = float(splice.df(f"SELECT PREDICTION FROM {schema}.ScoresTableGeneralizedLinearRegression WHERE MOMENT_ID = {i}").collect()[0][0])
            l = min(len(str(tab)), len(str(d)), 15) - 2
            assert round(d,l) == round(tab,l), f'Problem. {round(d,l)} from model, {round(tab,l)} from table'


def test_isotonic_regression():
   
    print('========== Testing Isotonic Regression ==========')
    with mlflow.start_run(run_name='Isotonic Regression'):
        model = IsotonicRegression()
        model = Pipeline(stages=[assembler,model])
        model = model.fit(testingDf)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableIsotonicRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableIsotonicRegression", run_id=mlflow.current_run_id(), df=testingDf.drop("label").drop("Serial Number"), create_model_table=True, primary_key={'MOMENT_ID': 'INT'})
        mlflow.watch_job(jid)
        
        isotonicRegressionDf = model.transform(testingDf)

        for moment_id, val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableIsotonicRegression (MOMENT_ID,GRE,TOEFL,Rating,SOP,LOR,CGPA,Research) VALUES (" + str(moment_id) + "," + str(val[1]) + "," + str(val[2]) + "," + str(val[3]) + "," + str(val[4]) + "," + str(val[5]) + "," + str(val[6]) + "," + str(val[7]) + ")")

        for i in range (len(testValues)):
            d = float(isotonicRegressionDf.collect()[i][10])
            tab = float(splice.df(f"SELECT PREDICTION FROM {schema}.ScoresTableIsotonicRegression WHERE MOMENT_ID = {i}").collect()[0][0])
            l = min(len(str(tab)), len(str(d)), 15) - 2
            assert round(d,l) == round(tab,l), f'Problem. {round(d,l)} from model, {round(tab,l)} from table'
            
def test_survival_regression():
   
    print('========== Testing Survival Regression ==========')
    
    testValues = [
        (1.218, 1.0),
        (2.949, 0.0),
        (3.627, 0.0),
        (0.273, 1.0),
        (4.199, 0.0)
    ]
    
    testingDf = spark.createDataFrame(testValues, ["label", "censor"])
    
    with mlflow.start_run(run_name='Survival Regression'):
        
        assembler = VectorAssembler(inputCols=["censor"], outputCol="features")
        quantileProbabilities = [0.3, 0.6]
        model = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")
        model = Pipeline(stages=[assembler,model])
        model = model.fit(testingDf)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableSurvivalRegression")

        jid = mlflow.deploy_db(db_schema_name=f"{schema}", db_table_name="ScoresTableSurvivalRegression", run_id=mlflow.current_run_id(), df=testingDf.drop("label"), create_model_table=True, primary_key={'MOMENT_ID': 'INT'})
        mlflow.watch_job(jid)
        
        survivalRegressionDf = model.transform(testingDf)

        for moment_id, val in enumerate(testValues):
            x = f"INSERT INTO {schema}.ScoresTableSurvivalRegression (MOMENT_ID,censor) VALUES (" + str(moment_id) + "," + str(val[1]) + ")"
            splice.execute(x)

        for i in range (len(testValues)):
            d = float(survivalRegressionDf.collect()[i][-2])
            tab = float(splice.df(f"SELECT PREDICTION FROM {schema}.ScoresTableSurvivalRegression WHERE MOMENT_ID = {i}").collect()[0][0])
            l = min(len(str(tab)), len(str(d)), 15) - 2
            assert round(d,l) == round(tab,l), f'Problem. {round(d,l)} from model, {round(tab,l)} from table'


In [None]:
%%pytest

from pyspark.sql import SparkSession
from splicemachine.spark import PySpliceContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from splicemachine.mlflow_support import *
import pytest
from splicemachine.mlflow_support import get_user

schema = get_user()

spark = SparkSession.builder.getOrCreate()

splice = PySpliceContext(spark)

handSchema = [StructField("S1", IntegerType(), True),
         StructField("C1", IntegerType(), True),
        StructField("S2", IntegerType(), True),
         StructField("C2", IntegerType(), True),
        StructField("S3", IntegerType(), True),
         StructField("C3", IntegerType(), True),
        StructField("S4", IntegerType(), True),
         StructField("C4", IntegerType(), True),
        StructField("S5", IntegerType(), True),
         StructField("C5", IntegerType(), True),
         StructField("label", IntegerType(), True)]

cardSchema = StructType(handSchema)

assembler = VectorAssembler(inputCols=["S1", "C1", "S2", "C2", "S3", "C3", "S4", "C4", "S5", "C5"], outputCol="features")

testValues = [(1,1,1,13,2,4,2,3,1,12,0),
(3,12,3,2,3,11,4,5,2,5,1),
(1,9,4,6,1,4,3,2,3,9,1),
(1,4,3,13,2,13,2,1,3,6,1),
(3,10,2,7,1,2,2,11,4,9,0),
(1,3,4,5,3,4,1,12,4,6,0),
(2,6,4,11,2,3,4,9,1,7,0),
(3,2,4,9,3,7,4,3,4,5,0),
(4,4,3,13,1,8,3,9,3,10,0),
(1,9,3,8,4,4,1,7,3,5,0),
(4,7,3,12,1,13,1,9,2,6,0),
(2,12,1,3,2,11,2,7,4,8,0),
(4,2,2,9,2,7,1,5,3,11,0),
(1,13,2,6,1,6,2,11,3,5,1),
(3,8,2,7,1,9,3,6,2,3,0),
(2,10,1,11,1,9,3,1,1,13,0),
(4,2,4,12,2,12,2,7,3,10,1),
(4,5,2,2,4,9,1,5,4,1,1),
(2,3,3,9,2,1,2,6,4,10,0),
(1,7,2,11,4,1,2,9,3,13,0)]

testingDf = spark.createDataFrame(testValues, StructType(cardSchema))

testingDf.show()

mlflow.register_splice_context(splice)

def test_multilayer_perceptron_classifier():

    print('========== Testing Multi Layer Perceptron ==========')
    with mlflow.start_run(run_name='Multi Layer Perceptron'):
        model = MultilayerPerceptronClassifier(layers=[10,7,2])
        model = Pipeline(stages=[assembler,model])
        model = model.fit(testingDf)
        mlflow.log_model(model,'model')

        splice.dropTableIfExists(f"{schema}.ScoresTableMLPC")

        jid = mlflow.deploy_db(db_schema_name=schema, db_table_name="ScoresTableMLPC", run_id=mlflow.current_run_id(), df=testingDf.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT'}, classes=["0","1"])
        mlflow.watch_job(jid)

        MLPCDf = model.transform(testingDf)   

        for moment_id,val in enumerate(testValues):
            splice.execute(f"INSERT INTO {schema}.ScoresTableMLPC (MOMENT_ID,S1,C1,S2,C2,S3,C3,S4,C4,S5,C5) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + "," + str(val[2]) + "," + str(val[3]) + "," + str(val[4]) + "," + str(val[5]) + "," + str(val[6]) + "," + str(val[7]) + "," + str(val[8]) + "," + str(val[9]) + ")")


        for i,row in enumerate(MLPCDf.collect()):
            dfp = [row[-1]] + [idx for idx in row[-2]]
            table_pred = list(splice.df(f'SELECT cast(PREDICTION as int) PREDICTION,"0","1" FROM {schema}.ScoresTableMLPC WHERE MOMENT_ID = {i}').collect()[0])        

            for tab, d in zip(table_pred, dfp):
                l = min(len(str(tab)), len(str(d)), 15) - 2
                assert round(d,l) == round(tab,l), f'Problem. {d} from model, {tab} from table'
            
        
# def test_one_vs_rest(): MLeap doesn't support ovr at the moment (0.15.0)

#     print('========== Testing OVR ==========')
#     with mlflow.start_run(run_name='Multi Layer Perceptron'):
#         model = MultilayerPerceptronClassifier(layers=[10,7,2])
#         model = OneVsRest(classifier=model)
#         model = Pipeline(stages=[assembler,model])
#         model = model.fit(testingDf)
#         mlflow.log_model(model,'model')
    
#         splice.dropTableIfExists(f"{schema}.ScoresTableOVR")

#         jid = mlflow.deploy_db(db_schema_name=schema, db_table_name="ScoresTableOVR", run_id=mlflow.current_run_id(), df=testingDf.drop("label"), create_model_table=True, primary_key={'MOMENT_ID':'INT'})
#         mlflow.watch_job(jid)
        
#         OVRDf = model.transform(testingDf)   

#         for moment_id,val in enumerate(testValues):
#             splice.execute(f"INSERT INTO {schema}.ScoresTableOVR (MOMENT_ID,S1,C1,S2,C2,S3,C3,S4,C4,S5,C5) VALUES (" + str(moment_id) + "," + str(val[0]) + "," + str(val[1]) + "," + str(val[2]) + "," + str(val[3]) + "," + str(val[4]) + "," + str(val[5]) + "," + str(val[6]) + "," + str(val[7]) + "," + str(val[8]) + "," + str(val[9]) + ")")

#         for i in range (len(testValues)):
#             assert float(OVRDf.collect()[i][-1]) == float(splice.df(f"SELECT PREDICTION FROM {schema}.ScoresTableOVR WHERE MOMENT_ID = {i}").collect()[0][0])
            