# ICD10PCS-2: First char of ICD should be (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, B, C, D, F, G, H, X)

QA step description: check if the first characteristic is valid. 
The valid first char of ICD code could be: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, B, C, D, F, G, H, X).

Starting Author: Amy Jin (amy@careset.com)

Date: April 16th, 2018

https://docs.google.com/spreadsheets/d/1IYg01IpssJaWHo6KxO4_dSDgXtYNFy41S5cIHFLvlGQ/edit#gid=604789549

## Connect to Parenthood Server

In [1]:
# Packages import
import os
import sys
import numpy as np
import pandas as pd
from collections import Counter
import operator
import mysql.connector
import sshtunnel
import pureyaml

# Handle path
project_dir = !pwd  # dir of current script/notebook file
config_file = open(project_dir[0] + "/db.yaml");
config = pureyaml.load(config_file.read());

# Argument dictionary for sshtunnel
ssh_config = {
    'ssh_address_or_host': ('parenthood.set.care', 22),
    'ssh_username':        config['ssh_username'],
    'ssh_password':        config['ssh_password'],
    'remote_bind_address': ('127.0.0.1', 3306),
    'local_bind_address':  ('0.0.0.0', 3333),
}

# Argument dictionary for mysql.connector
mysql_config = {
    'user':     config['mysql_user'],
    'password': config['mysql_passwd'],
    'host':     config['mysql_host'],
    'database': 'patch',
    'port':     3333,
}

# Connect to Parenthood server
with sshtunnel.SSHTunnelForwarder(**ssh_config) as tunnel:
    print('SSH tunneling successful on port: {}'.format(tunnel.local_bind_port))
    connection = mysql.connector.connect(**mysql_config)
    cur = connection.cursor()
    print('MySQL server connected successfully!')

SSH tunneling successful on port: 3333
MySQL server connected successfully!


## Test Function

In [5]:
# --------------------------------------- Inputs: ---------------------------------------
# 1) db_name:                database name in server
# 2）table_name:             table name
# 3) col_name:               column to test
# --------------------------------------- Outputs: --------------------------------------
# 1) Test result:            PASS/FAIL
# 2) If FAIL, the test will print out disinct bad ICD codes.


def icd10pcs_2(db_name, table_name, col_name):
    #table1 = str(db_name) + '.' + str(table_name)
    with sshtunnel.SSHTunnelForwarder(**ssh_config) as tunnel:
        connection = mysql.connector.connect(**mysql_config)
        cur = connection.cursor()
        print ('Test file: {}.{}'.format(db_name, table_name))
        print ('\n')        
        query = ('''
            SELECT *
            FROM {db}.{t1}
            WHERE   {col1} NOT LIKE '0%' 
                AND {col1} NOT LIKE '1%' 
                AND {col1} NOT LIKE '2%' 
                AND {col1} NOT LIKE '3%' 
                AND {col1} NOT LIKE '4%' 
                AND {col1} NOT LIKE '5%' 
                AND {col1} NOT LIKE '6%' 
                AND {col1} NOT LIKE '7%' 
                AND {col1} NOT LIKE '8%' 
                AND {col1} NOT LIKE '9%' 
                AND {col1} NOT LIKE 'b%' 
                AND {col1} NOT LIKE 'c%' 
                AND {col1} NOT LIKE 'd%' 
                AND {col1} NOT LIKE 'f%' 
                AND {col1} NOT LIKE 'g%' 
                AND {col1} NOT LIKE 'h%' 
                AND {col1} NOT LIKE 'x%' 
                AND {col1} NOT LIKE 'B%' 
                AND {col1} NOT LIKE 'C%' 
                AND {col1} NOT LIKE 'D%' 
                AND {col1} NOT LIKE 'F%' 
                AND {col1} NOT LIKE 'G%' 
                AND {col1} NOT LIKE 'H%' 
                AND {col1} NOT LIKE 'X%' 
            LIMIT 1;
        '''.format(db = db_name, t1 = table_name, col1 = col_name))

        cur.execute(query)
        rows = list(sum(cur.fetchall(), ()))

        
        if not len(rows):
            print ("Test result: PASS")
        else:
            print ("Test result: FAIL" + '\n')
            print ("The count of rows with blank {} is:".format(col_name) + '\n')
            # MySQL query to get bad ICD codes
            query = ('''
                SELECT DISTINCT {col1}
                FROM {db}.{t1}
                WHERE   {col1} NOT LIKE '0%' 
                    AND {col1} NOT LIKE '1%' 
                    AND {col1} NOT LIKE '2%' 
                    AND {col1} NOT LIKE '3%' 
                    AND {col1} NOT LIKE '4%' 
                    AND {col1} NOT LIKE '5%' 
                    AND {col1} NOT LIKE '6%' 
                    AND {col1} NOT LIKE '7%' 
                    AND {col1} NOT LIKE '8%' 
                    AND {col1} NOT LIKE '9%' 
                    AND {col1} NOT LIKE 'b%' 
                    AND {col1} NOT LIKE 'c%' 
                    AND {col1} NOT LIKE 'd%' 
                    AND {col1} NOT LIKE 'f%' 
                    AND {col1} NOT LIKE 'g%' 
                    AND {col1} NOT LIKE 'h%' 
                    AND {col1} NOT LIKE 'x%' 
                    AND {col1} NOT LIKE 'B%' 
                    AND {col1} NOT LIKE 'C%' 
                    AND {col1} NOT LIKE 'D%' 
                    AND {col1} NOT LIKE 'F%' 
                    AND {col1} NOT LIKE 'G%' 
                    AND {col1} NOT LIKE 'H%' 
                    AND {col1} NOT LIKE 'X%' ;
            '''.format(db = db_name, t1 = table_name, col1 = col_name))

            cur.execute(query)
            
            for row in cur.fetchall():
                for i in range(0,len(row)):
                    print (str(row[i]), end=", ")
                print ('\n')
            
        cur.close()
        connection.close()

## Test Example

In [6]:
icd10pcs_2('npi_inst_icdproc', 'npi_inst_icdproc_RQ17','icd_prcdr_cd')

Test file: npi_inst_icdproc.npi_inst_icdproc_RQ17


Test result: PASS


In [7]:
icd10pcs_2('_amy', 'test_data_bad1','icd_dgns_cd')

Test file: _amy.test_data_bad1


Test result: FAIL

The count of rows with blank icd_dgns_cd is:

U, 

U0, 

, 

