In [1]:
# -*- coding: utf8 -*-

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("hospitalDiseaseAll")
sc = SparkContext(conf=conf)

In [3]:
sqlContext = SQLContext(sc)

#### mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.DROPMALFORMED: drops lines which have fewer or more tokens than expected or tokens which do not match the schema FAILFAST: aborts with a RuntimeException if encounters any malformed line   charset: defaults to 'UTF-8' but can be set to other valid charset names

### 加载病案首页

In [4]:
filePathICH = "/data/icd10-2/ich_record_diagnosis.csv"
df = sqlContext\
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("mode", "DROPMALFORMED")\
    .load(filePathICH)
    #.load("/data/icd10/hospitaldischarge.csv")
print("datasets count=", df.count())
df.show(5)

datasets count= 50542609
+-----------------+-------------------+---+--------------+----------------+------------+-----------------+--------------+-------------------+----------+---------+
|               ID|             DCS_ID| SN|DIAGNOSIS_TYPE|    DISEASE_CODE|     DISEASE|IS_CAWM_DIAGNOSIS|DIAGNOSIS_DATE|ADMISSION_CONDITION|CREATED_AT|NODE_TYPE|
+-----------------+-------------------+---+--------------+----------------+------------+-----------------+--------------+-------------------+----------+---------+
|49300074532186399|493000745z002604-32|  3|             0|493000745I25.103|冠状动脉粥样硬化性心脏病|                2|      20171218|               null|      null|        1|
|49300074532186400|493000745z002604-32|  4|             0|         I25.500|      缺血性心肌病|                2|      20171218|               null|      null|        1|
|49300074532186401|493000745z002604-32|  5|             0|493000745I50.901|       心功能不全|                2|      20171218|               null|      null|        

In [5]:
df_ = df.dropDuplicates()
print("DropDuplicates counts=", df_.count())

DropDuplicates counts= 36037002


In [6]:
df_ich = df_.filter((df_.DISEASE!="NULL") &(df_.DISEASE.isNotNull())).dropDuplicates()
print("Remove NULL counts:",df_ich.count())

Remove NULL counts: 36036934


In [7]:
#df_ich_count = 36036934

In [8]:
df_ich_count = df_ich.count()

In [9]:
icd = sqlContext.read.format("com.databricks.spark.csv")\
        .option("header", "true")\
        .option("inferschema", "true")\
        .option("mode", "DROPMALFORMED")\
        .load("/data/icd10-2/icd.csv")
            
icd10_disease_code = icd.select(["Disease","Code"]).toPandas()
icd10_disease = icd10_disease_code["Disease"].tolist()
icd10_code = icd10_disease_code["Code"].tolist()


In [10]:
def diseaseToCode(disease):
    icd_dict = {}
    for i in range(len(icd10_disease)):
        icd_dict[icd10_disease[i]] = icd10_code[i]
    if disease in icd10_disease:
        code = icd_dict[disease]
    else:
        code = "888888888"
    return code

In [11]:
import re
def doWordFormat(word):
    word = word.strip()
    word = word.replace("（","(")
    word = word.replace("）",")")
    word = word.replace(" ","")
    fil1 = re.compile(r'\((.*?)\)' )
    wordFilter = fil1.sub('',word)#.decode("utf8")
    r1 = u'[0-9_’!"#$%&\'*+,-./:;<=>?@，。、…【】《》？“”‘’！[\\]^_`{|}~]+'
    filtered_strs = re.sub(r1, " ", wordFilter)
    return filtered_strs

In [12]:
from pyspark.sql.functions import udf
newCode = udf(lambda x: diseaseToCode(x))
diseaseFormat = udf(lambda x: doWordFormat(x))

In [13]:
df_ich_join = df_ich.withColumn("CODE_ICD", newCode(df_ich.DISEASE))
df_ich_join = df_ich_join.withColumn("DISEASE_FORMAT", diseaseFormat(df_ich_join.DISEASE))
df_diag_join = df_ich_join.withColumn("FORMAT_CODE", newCode(df_ich_join.DISEASE_FORMAT))
df_diag_join.show(3)

+------------------+------------------+----+--------------+--------------------+-------+-----------------+--------------+-------------------+--------------------+---------+---------+--------------+-----------+
|                ID|            DCS_ID|  SN|DIAGNOSIS_TYPE|        DISEASE_CODE|DISEASE|IS_CAWM_DIAGNOSIS|DIAGNOSIS_DATE|ADMISSION_CONDITION|          CREATED_AT|NODE_TYPE| CODE_ICD|DISEASE_FORMAT|FORMAT_CODE|
+------------------+------------------+----+--------------+--------------------+-------+-----------------+--------------+-------------------+--------------------+---------+---------+--------------+-----------+
|493000577005371561|493000577005371561|   1|            03|             P23.901|  新生儿肺炎|                2|      20180104|               null|                null|        1|  P23.901|         新生儿肺炎|    P23.901|
|493000577005374142|493000577005374142|   2|            03|             O69.101| 脐带绕颈2周|                2|      20180104|               null|                nul

#### 疾病名称与icd10一致

In [15]:
#def calPercent(df_ich_join):
df_ich_dis = df_ich_join.filter(df_ich_join['DISEASE_FORMAT'].isin(icd10_disease))
df_ich_dis_count = df_ich_dis.count()
print("disease same count: %d , percent=%f " %(df_ich_dis_count, df_ich_dis_count*1.0/df_ich_count))

disease same count: 18019185 , percent=0.500020 


In [16]:
tmp="493000745I50.901"
print(tmp[-7:])
codeFormat = udf(lambda x: x[-7:] )
df_ich_dis_ = df_ich_dis.filter(df_ich_dis.DISEASE_CODE.isNotNull())
#df_ich_dis__ = df_ich_dis.withColumn("Code_FORMAT", codeFormat(df_ich_dis.DISEASE_CODE) )
df_ich_dis_.show(2)
df_ich_dis_count = df_ich_dis_.count()
print("disease sanme and code isnot null:",df_ich_dis_count )

I50.901
+------------------+------------------+----+--------------+--------------------+-------+-----------------+--------------+-------------------+--------------------+---------+--------+--------------+-----------+
|                ID|            DCS_ID|  SN|DIAGNOSIS_TYPE|        DISEASE_CODE|DISEASE|IS_CAWM_DIAGNOSIS|DIAGNOSIS_DATE|ADMISSION_CONDITION|          CREATED_AT|NODE_TYPE|CODE_ICD|DISEASE_FORMAT|Code_FORMAT|
+------------------+------------------+----+--------------+--------------------+-------+-----------------+--------------+-------------------+--------------------+---------+--------+--------------+-----------+
|493000577005371561|493000577005371561|   1|            03|             P23.901|  新生儿肺炎|                2|      20180104|               null|                null|        1| P23.901|         新生儿肺炎|    P23.901|
|   493000665175676|   493000665175676|null|             1|17558.00000000000000|   手指损伤|                1|      05/08/20|               null|29-DEC-17 12.30

#### 疾病名称和code与icd10一致

In [19]:
#df_ich_dis_code = df_ich_dis__.filter((df_ich_dis.Code_FORMAT.isNotNull() )&( df_ich_dis.CODE_ICD.isNotNull()))
#df_ich_dis_code_ = df_ich_dis__.filter(df_ich_dis__['Code_FORMAT']==df_ich_dis__["CODE_ICD"])
#df_ich_code_count = df_ich_dis_code_.count()
#print("disease and code both same count: %d , percent=%f " %(df_ich_code_count, df_ich_code_count*1.0/df_ich_count))
df_ich_dis_join = df_ich_dis_.join(icd, df_ich_dis_['DISEASE_CODE']==icd['Code'])
df_ich_dis_join_count = df_ich_dis_join.count()
print("join count=%d, percent=%f" %(df_ich_dis_join_count, df_ich_dis_join_count*1.0/df_ich_count)


join count= 8326887


In [20]:
8326887*1.0/df_ich_count

0.23106535644791534

In [None]:
df_ich_dis_join5 = df_ich_dis_.join(icd, df_ich_dis_['DISEASE_CODE'].substr(1,6)==icd['Code'].substr(1,6))
print("disease and code5 both same count: %d , percent=%f " %(df_ich_code5_count, df_ich_code5_count*1.0/df_ich_count))