# Final Scoring - Full QP checks

#### Business Rule:
- All full QPs are not eligible to receive a score. 
- They must be in final score, with a neutral payment adjustment. 

*How can HIVVS tweak this test ?*
- In your case, you may know the NPIs that are full QPs. So, tweak the query to accomodate your needs. 


In [None]:
# initializting the necessary data
%reload_ext sparkmagic.magics
!echo '{ \
"wait_for_idle_timeout_seconds": 60, \
"livy_session_startup_timeout_seconds": 180, \
"ignore_ssl_errors":true, \
"custom_headers":{"X-Requested-By":"admin"}, \
"session_configs":{ \
 "executorCores":1,"proxyUser":"XX", \
"jars":["/final-scoring/jars/postgresql.jar"]} \
}' > ~/.sparkmagic/config.json  

### Provide your credentials:

In [None]:
# Change your username
import getpass
myuser='J5D6'
mypass=getpass.getpass('Enter your A&R Internal LDAP password:')

### Connect to Spark:

In [None]:
%spark cleanup
%spark add -s $myuser -l scala  -k -u https://ambari.impl.qppar.internal:8443/qpp-ar-impl-hadoop/default/livy/v1 -a $myuser -p $mypass

You must wait for the above step to finish. <span style="color:red">some **Do you see the yarn application id ?** text</span>

Run the house keeping steps below:

In [None]:
%%spark 
val RUN_ID = "24"
val DB_SERVERS = Map("uds" -> "uds.impl.qppar.internal", "deid" -> "uds-deidentify.impl.qppar.internal")

val credentials = sc.textFile("pgpass.txt").toDF()
  

def loadFromUDS(sql:String) = {
    val server = "uds-deidentify.impl.qppar.internal" 
    //DB_SERVERS(DB)
    val credential = credentials.filter($"value".startsWith("uds")).as[String].head.split(",")
    val userName = credential(1).toLowerCase()
    val password = credential(2)
    

    spark.read.format("jdbc").
    option("driver", "org.postgresql.Driver").
    option("url", s"jdbc:postgresql://$server:5432/uds?sslfactory=org.postgresql.ssl.NonValidatingFactory&ssl=true&sslmode=require").
    option("dbtable", s"($sql) as tab").
    option("user", userName).
    option("password", password).
    load()
} 


In [None]:
%%spark 
import java.lang.Double


case class FinalScoreVO(
    tin: String,
    npi: String,
    apmEntityId: String = null,
    apmId: String = null,
    submissionId: String = null,
    apmEntitySubmissionId: String = null,
    overallScore: Double,
    qualityScore: Double,
    qualityWeight: Double,
    qualityReason: String,
    piScore: Double,
    piWeight: Double,
    piReason: String,
    iaScore: Double,
    iaWeight: Double,
    iaReason: String,
    costScore: Double,
    costWeight: Double,
    costReason: String,
    complexPatientBonus: Double = null,
    smallPracticeBonus: Double = null,
    reasons: List[String] = null,
    isFinal: Boolean,
    isVoluntary: Boolean,
    isEligible: Boolean,
    scoreId: String = null
) 

### Load the final score data from spark

In [None]:
%%spark -o fs
val fs = spark.sql(s"SELECT * FROM final_scoring.finalscore_$RUN_ID")

In [None]:
%%spark 
print (s" The total number of final score entries in run ${RUN_ID}: ${fs.count()}")

In [None]:
%%spark fs.show()


### Load the list of full QPs in UDS

In [None]:
%%spark -o qps 
val qps = loadFromUDS("""
    SELECT DISTINCT npi, qp_status FROM active.provider
    WHERE run in (0, 3) AND year = 2018 AND ( qp_status = 'Y' OR qp_status = 'Q' )
 """)

In [None]:
%%spark
qps.show()

In [None]:
%%spark
val joined =  fs.join(qps, Seq("npi"), "inner")
joined.createOrReplaceTempView("joined")

In [None]:
%%spark
joined.printSchema()


In [None]:
%%spark
val nonZeroScores = spark.sql("select * from joined where overallScore <> 0.0")
val nonZeroCnt = nonZeroScores.count()
val zeroScores = spark.sql("select * from joined where overallScore = 0.0")
val zeroCount = zeroScores.count()
println(s"folks with QP status having zero scores: ${zeroCount} and folks with erratic (non zero) score count: ${nonZeroCnt}")

In [None]:
%%spark
nonZeroScores.select("tin", "npi", "qp_status" ,"overallScore", "isEligible", "isFinal").show()

In [None]:
%%spark 
spark.sql("select tin, npi, qp_status, overallscore, iseligible, isfinal from joined limit 5").show()

In [None]:
%%spark -o r 
val r = spark.sql("select details from final_scoring.fs_runs where runid = 24").show()

In [None]:
%%spark 
// -- A2154, 0000152161

In [None]:
%%spark -o rollup
val rollup = spark.sql(s"SELECT * FROM final_scoring.rollup_$RUN_ID")

In [None]:
%%spark 
rollup.show()

In [None]:
%%spark
val rollup = spark.sql(s"SELECT * FROM final_scoring.rollup_$RUN_ID where apmEntityId = 'A2154' ")

In [None]:
%%spark 
rollup.show()

In [None]:
%%spark
val rollupParticipants = spark.sql(s""" SELECT * FROM 
    final_scoring.rollup_participants_$RUN_ID where apmEntityId = 'A2154' """)
rollupParticipants.show()