In [1]:
%matplotlib inline

In [2]:
import findspark
import os
import sys
from matplotlib import pyplot as plt

In [3]:
os.environ["SPARK_HOME"]="/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.9-src.zip")

In [4]:
findspark.init()

In [5]:
from pyspark import SparkConf, SparkContext, SQLContext

In [6]:
conf = SparkConf().set('spark.executor.instances',1). \
    set("spark.executor.memory", "5g").set("AppName", "testapp")

In [7]:
sc = SparkContext(master="local", conf=conf)

from pyspark.mllib.clustering import KMeans
from numpy import array

training_data = array([[1,1],[2,2],[1,3],[0.5,0],[0.3,3],[0.9,0.8],[0.9,1.2],[1.1,0.8],[1.8,1.5],[0.8,2.1],
                 [3.1,3.3],[3.2,2.9],[3,5],[2.9,4.5],[0.3,0.4],[3,3],[4,4],[3.5,3.1],[3.9,4.2],[2.5,2.9]])

dist_training_data = sc.parallelize(training_data)

clusters = KMeans.train(dist_training_data, 2, maxIterations=10, runs=10, initializationMode="random")

clusters.centers

clusters.predict([1,4])

clusters.predict([1,3])

plt.scatter(*zip(*training_data), color='r', marker='*') 
plt.scatter(*zip(*clusters.centers), color='g', marker='s')

In [8]:
sqlcontext=SQLContext(sc)

In [9]:
datafile = '/Users/Oldjun/Desktop/webapp/hospital/data/Mercy_HCAHPS_Survey_BXTN_04282016.dsv'

df=sqlcontext.read.text(datafile)

df.head(2)

In [10]:
df = sqlcontext.read.format("com.databricks.spark.csv")\
.option("header", "true").option("inferschema", "true")\
.option("mode", "DROPMALFORMED")\
.option("delimiter","|")\
.load(datafile)

In [11]:
df.dtypes

[('SRVY_DESCR', 'string'),
 ('HOSP_ADMSN_TSP', 'int'),
 ('HOSP_DSCHRG_TSP', 'int'),
 ('PTNT_ORIG_LOCTN_CD', 'string'),
 ('ADMSN_SRC_CD', 'int'),
 ('ADMSN_SRC_NM', 'string'),
 ('DSCHRG_DSPTN_CD', 'int'),
 ('DSCHRG_DSPTN_NM', 'string'),
 ('HOSP_SRVC_CD', 'int'),
 ('HOSP_SRVC_NM', 'string'),
 ('PTNT_ENCNTR_CSN_ID', 'string'),
 ('PTNT_MRN_NBR', 'string'),
 ('ST_CD', 'string'),
 ('ZIP_CD', 'int'),
 ('PTNT_AGE_QTY', 'string'),
 ('PTNT_BIRTH_DT', 'int'),
 ('PTNT_RACE_CD', 'int'),
 ('PTNT_RACE_TXT', 'string'),
 ('PTNT_GENDER_ABBR', 'string'),
 ('PTNT_RLGN_CD', 'int'),
 ('PTNT_RLGN_TXT', 'string'),
 ('PTNT_LANG_CD', 'string'),
 ('PTNT_LANG_NM', 'string'),
 ('SRVC_AREA_ID', 'int'),
 ('SRVC_AREA_NM', 'string'),
 ('LOCTN_ID', 'int'),
 ('LOCTN_NM', 'string'),
 ('DEPT_ID', 'int'),
 ('DEPT_NM', 'string'),
 ('DEPT_SPCLTY_NM', 'string'),
 ('LGTH_OF_STAY_QTY', 'int'),
 ('BNFT_PLAN_ID', 'int'),
 ('BNFT_PLAN_NM', 'string'),
 ('DRG_MPI_CD', 'int'),
 ('DRG_NM', 'string'),
 ('PRMRY_DX_CD', 'string'),
 ('PRMR

In [12]:
df1=df.filter(df["ANSWR_ID"]>0).filter(df["ST_CD"]=='MO').filter(df["QUES_ID"]<= 145)

In [13]:
df1.head()

Row(SRVY_DESCR=u'PSS-INPTNT GEN', HOSP_ADMSN_TSP=2013, HOSP_DSCHRG_TSP=2013, PTNT_ORIG_LOCTN_CD=u'ER', ADMSN_SRC_CD=1, ADMSN_SRC_NM=u'Non-Health Care Facility Point of Origin', DSCHRG_DSPTN_CD=12001, DSCHRG_DSPTN_NM=u'Home or Self Care', HOSP_SRVC_CD=12017, HOSP_SRVC_NM=u'Medical', PTNT_ENCNTR_CSN_ID=u'PHI', PTNT_MRN_NBR=u'PHI', ST_CD=u'MO', ZIP_CD=630, PTNT_AGE_QTY=u'58', PTNT_BIRTH_DT=1954, PTNT_RACE_CD=12012, PTNT_RACE_TXT=u'Caucasian', PTNT_GENDER_ABBR=u'F', PTNT_RLGN_CD=12063, PTNT_RLGN_TXT=u'NONDENOMINATIONAL', PTNT_LANG_CD=u'120140', PTNT_LANG_NM=u'English', SRVC_AREA_ID=12035, SRVC_AREA_NM=u'MERCY HEALTH EAST COMMUNITIES', LOCTN_ID=12049, LOCTN_NM=u'PARENT MERCY HOSPITAL ST LOUIS', DEPT_ID=130754, DEPT_NM=u'STLO MEDICAL SURGICAL 6', DEPT_SPCLTY_NM=u'Multi Specialty', LGTH_OF_STAY_QTY=2, BNFT_PLAN_ID=4881, BNFT_PLAN_NM=u'MEDICARE PART A AND B', DRG_MPI_CD=None, DRG_NM=u'', PRMRY_DX_CD=u'', PRMRY_DX_NM=u'', PRMRY_PROC_ID=u'', PRMRY_PROC_NM=u'', VNDR_SRVC_TYP_CD=100.3, VNDR_SRVC_T

In [14]:
df1.count()

355323

#create column age

In [15]:
from pyspark.sql.functions import lit,substring

In [16]:
df2=df1.withColumn("age",lit(0))

In [17]:
df3=df2.withColumn("age",substring(df2["PTNT_AGE_QTY"],1,2))

In [18]:
df2.head()

Row(SRVY_DESCR=u'PSS-INPTNT GEN', HOSP_ADMSN_TSP=2013, HOSP_DSCHRG_TSP=2013, PTNT_ORIG_LOCTN_CD=u'ER', ADMSN_SRC_CD=1, ADMSN_SRC_NM=u'Non-Health Care Facility Point of Origin', DSCHRG_DSPTN_CD=12001, DSCHRG_DSPTN_NM=u'Home or Self Care', HOSP_SRVC_CD=12017, HOSP_SRVC_NM=u'Medical', PTNT_ENCNTR_CSN_ID=u'PHI', PTNT_MRN_NBR=u'PHI', ST_CD=u'MO', ZIP_CD=630, PTNT_AGE_QTY=u'58', PTNT_BIRTH_DT=1954, PTNT_RACE_CD=12012, PTNT_RACE_TXT=u'Caucasian', PTNT_GENDER_ABBR=u'F', PTNT_RLGN_CD=12063, PTNT_RLGN_TXT=u'NONDENOMINATIONAL', PTNT_LANG_CD=u'120140', PTNT_LANG_NM=u'English', SRVC_AREA_ID=12035, SRVC_AREA_NM=u'MERCY HEALTH EAST COMMUNITIES', LOCTN_ID=12049, LOCTN_NM=u'PARENT MERCY HOSPITAL ST LOUIS', DEPT_ID=130754, DEPT_NM=u'STLO MEDICAL SURGICAL 6', DEPT_SPCLTY_NM=u'Multi Specialty', LGTH_OF_STAY_QTY=2, BNFT_PLAN_ID=4881, BNFT_PLAN_NM=u'MEDICARE PART A AND B', DRG_MPI_CD=None, DRG_NM=u'', PRMRY_DX_CD=u'', PRMRY_DX_NM=u'', PRMRY_PROC_ID=u'', PRMRY_PROC_NM=u'', VNDR_SRVC_TYP_CD=100.3, VNDR_SRVC_T

In [19]:
df3.head()

Row(SRVY_DESCR=u'PSS-INPTNT GEN', HOSP_ADMSN_TSP=2013, HOSP_DSCHRG_TSP=2013, PTNT_ORIG_LOCTN_CD=u'ER', ADMSN_SRC_CD=1, ADMSN_SRC_NM=u'Non-Health Care Facility Point of Origin', DSCHRG_DSPTN_CD=12001, DSCHRG_DSPTN_NM=u'Home or Self Care', HOSP_SRVC_CD=12017, HOSP_SRVC_NM=u'Medical', PTNT_ENCNTR_CSN_ID=u'PHI', PTNT_MRN_NBR=u'PHI', ST_CD=u'MO', ZIP_CD=630, PTNT_AGE_QTY=u'58', PTNT_BIRTH_DT=1954, PTNT_RACE_CD=12012, PTNT_RACE_TXT=u'Caucasian', PTNT_GENDER_ABBR=u'F', PTNT_RLGN_CD=12063, PTNT_RLGN_TXT=u'NONDENOMINATIONAL', PTNT_LANG_CD=u'120140', PTNT_LANG_NM=u'English', SRVC_AREA_ID=12035, SRVC_AREA_NM=u'MERCY HEALTH EAST COMMUNITIES', LOCTN_ID=12049, LOCTN_NM=u'PARENT MERCY HOSPITAL ST LOUIS', DEPT_ID=130754, DEPT_NM=u'STLO MEDICAL SURGICAL 6', DEPT_SPCLTY_NM=u'Multi Specialty', LGTH_OF_STAY_QTY=2, BNFT_PLAN_ID=4881, BNFT_PLAN_NM=u'MEDICARE PART A AND B', DRG_MPI_CD=None, DRG_NM=u'', PRMRY_DX_CD=u'', PRMRY_DX_NM=u'', PRMRY_PROC_ID=u'', PRMRY_PROC_NM=u'', VNDR_SRVC_TYP_CD=100.3, VNDR_SRVC_T

In [20]:
df4=df3.withColumn("age",df3.age.cast("int"))

In [21]:
df4.head()

Row(SRVY_DESCR=u'PSS-INPTNT GEN', HOSP_ADMSN_TSP=2013, HOSP_DSCHRG_TSP=2013, PTNT_ORIG_LOCTN_CD=u'ER', ADMSN_SRC_CD=1, ADMSN_SRC_NM=u'Non-Health Care Facility Point of Origin', DSCHRG_DSPTN_CD=12001, DSCHRG_DSPTN_NM=u'Home or Self Care', HOSP_SRVC_CD=12017, HOSP_SRVC_NM=u'Medical', PTNT_ENCNTR_CSN_ID=u'PHI', PTNT_MRN_NBR=u'PHI', ST_CD=u'MO', ZIP_CD=630, PTNT_AGE_QTY=u'58', PTNT_BIRTH_DT=1954, PTNT_RACE_CD=12012, PTNT_RACE_TXT=u'Caucasian', PTNT_GENDER_ABBR=u'F', PTNT_RLGN_CD=12063, PTNT_RLGN_TXT=u'NONDENOMINATIONAL', PTNT_LANG_CD=u'120140', PTNT_LANG_NM=u'English', SRVC_AREA_ID=12035, SRVC_AREA_NM=u'MERCY HEALTH EAST COMMUNITIES', LOCTN_ID=12049, LOCTN_NM=u'PARENT MERCY HOSPITAL ST LOUIS', DEPT_ID=130754, DEPT_NM=u'STLO MEDICAL SURGICAL 6', DEPT_SPCLTY_NM=u'Multi Specialty', LGTH_OF_STAY_QTY=2, BNFT_PLAN_ID=4881, BNFT_PLAN_NM=u'MEDICARE PART A AND B', DRG_MPI_CD=None, DRG_NM=u'', PRMRY_DX_CD=u'', PRMRY_DX_NM=u'', PRMRY_PROC_ID=u'', PRMRY_PROC_NM=u'', VNDR_SRVC_TYP_CD=100.3, VNDR_SRVC_T

In [22]:
sqlcontext.registerDataFrameAsTable(df4, "table4")

In [23]:
df_data2=sqlcontext.sql("select\
               age,\
               BXNT_GENDER,\
               BXTN_RACE,\
               BXTN_RELIGION,\
               BXTN_LANGUAGE,\
               BXTN_FACILITY,\
               BXTN_LGTH_OF_STAY,\
               BXTN_PAYOR_GRP,\
               BXTN_INCOME_DESC,\
               BXTN_EDUCATION_LEVEL,\
               PRMRY_DX_CD,\
               PRMRY_PROC_ID,\
               DRG_MPI_CD,\
               BNFT_PLAN_ID,\
               QUES_ID,\
               ANSWR_ID\
               from table4")

In [24]:
df_data2.head()

Row(age=58, BXNT_GENDER=u'F', BXTN_RACE=u'Caucasian', BXTN_RELIGION=u'NONDENOMINATIONAL', BXTN_LANGUAGE=u'English', BXTN_FACILITY=u'PARENT MERCY HOSPITAL ST LOUIS', BXTN_LGTH_OF_STAY=2, BXTN_PAYOR_GRP=u'SELF PAY', BXTN_INCOME_DESC=u'$75,000-$99,999', BXTN_EDUCATION_LEVEL=u'SOME COLLEGE', PRMRY_DX_CD=u'', PRMRY_PROC_ID=u'', DRG_MPI_CD=None, BNFT_PLAN_ID=4881, QUES_ID=143, ANSWR_ID=4)

In [25]:
df_data2.select(df_data2["ANSWR_ID"]).show()

+--------+
|ANSWR_ID|
+--------+
|       4|
|       4|
|       4|
|       1|
|       1|
|       1|
|       1|
|       3|
|       3|
|       4|
|       4|
|       1|
|       4|
|       4|
|       4|
|       4|
|       2|
|       1|
|       3|
|       2|
+--------+
only showing top 20 rows



In [26]:
df_data2.describe("ANSWR_ID").show()

+-------+------------------+
|summary|          ANSWR_ID|
+-------+------------------+
|  count|            355323|
|   mean|2.8512283190224106|
| stddev|1.8608650145247863|
|    min|                 1|
|    max|                88|
+-------+------------------+



In [27]:
df_data3=df_data2.groupBy('age','BXNT_GENDER',\
                'BXTN_RACE',\
               'BXTN_RELIGION',\
               'BXTN_LANGUAGE',\
               'BXTN_FACILITY',\
               'BXTN_LGTH_OF_STAY',\
               'BXTN_PAYOR_GRP',\
               'BXTN_INCOME_DESC',\
               'BXTN_EDUCATION_LEVEL',\
               'PRMRY_DX_CD',\
               'PRMRY_PROC_ID',\
               'DRG_MPI_CD',\
               'BNFT_PLAN_ID')\
               .pivot("QUES_ID").avg("ANSWR_ID")

In [28]:
df_data2.show(5)

+---+-----------+---------+-----------------+-------------+--------------------+-----------------+--------------+----------------+--------------------+-----------+-------------+----------+------------+-------+--------+
|age|BXNT_GENDER|BXTN_RACE|    BXTN_RELIGION|BXTN_LANGUAGE|       BXTN_FACILITY|BXTN_LGTH_OF_STAY|BXTN_PAYOR_GRP|BXTN_INCOME_DESC|BXTN_EDUCATION_LEVEL|PRMRY_DX_CD|PRMRY_PROC_ID|DRG_MPI_CD|BNFT_PLAN_ID|QUES_ID|ANSWR_ID|
+---+-----------+---------+-----------------+-------------+--------------------+-----------------+--------------+----------------+--------------------+-----------+-------------+----------+------------+-------+--------+
| 58|          F|Caucasian|NONDENOMINATIONAL|      English|PARENT MERCY HOSP...|                2|      SELF PAY| $75,000-$99,999|        SOME COLLEGE|           |             |      null|        4881|    143|       4|
| 58|          F|Caucasian|NONDENOMINATIONAL|      English|PARENT MERCY HOSP...|                2|      SELF PAY| $75,000-$9

df_data3.show(2)

In [29]:
df_test=df_data2.groupBy('age','BXNT_GENDER',\
                'BXTN_RACE',\
               'BXTN_RELIGION',\
               'BXTN_LANGUAGE',\
               'BXTN_FACILITY',\
               'BXTN_LGTH_OF_STAY',\
               'BXTN_PAYOR_GRP',\
               'BXTN_INCOME_DESC',\
               'BXTN_EDUCATION_LEVEL',\
               'PRMRY_DX_CD',\
               'PRMRY_PROC_ID',\
               'DRG_MPI_CD',\
               'BNFT_PLAN_ID')\
               .pivot("QUES_ID").count()

In [30]:
df_data3.columns

['age',
 'BXNT_GENDER',
 'BXTN_RACE',
 'BXTN_RELIGION',
 'BXTN_LANGUAGE',
 'BXTN_FACILITY',
 'BXTN_LGTH_OF_STAY',
 'BXTN_PAYOR_GRP',
 'BXTN_INCOME_DESC',
 'BXTN_EDUCATION_LEVEL',
 'PRMRY_DX_CD',
 'PRMRY_PROC_ID',
 'DRG_MPI_CD',
 'BNFT_PLAN_ID',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '26',
 '27',
 '28',
 '29',
 '30',
 '31',
 '32',
 '33',
 '34',
 '45',
 '46',
 '47',
 '48',
 '49',
 '50',
 '51',
 '52',
 '53',
 '54',
 '55',
 '56',
 '57',
 '58',
 '59',
 '60',
 '61',
 '62',
 '63',
 '64',
 '65',
 '67',
 '69',
 '70',
 '73',
 '143',
 '144',
 '145']

In [31]:
df_data3.dtypes

[('age', 'int'),
 ('BXNT_GENDER', 'string'),
 ('BXTN_RACE', 'string'),
 ('BXTN_RELIGION', 'string'),
 ('BXTN_LANGUAGE', 'string'),
 ('BXTN_FACILITY', 'string'),
 ('BXTN_LGTH_OF_STAY', 'int'),
 ('BXTN_PAYOR_GRP', 'string'),
 ('BXTN_INCOME_DESC', 'string'),
 ('BXTN_EDUCATION_LEVEL', 'string'),
 ('PRMRY_DX_CD', 'string'),
 ('PRMRY_PROC_ID', 'string'),
 ('DRG_MPI_CD', 'int'),
 ('BNFT_PLAN_ID', 'int'),
 ('13', 'double'),
 ('14', 'double'),
 ('15', 'double'),
 ('16', 'double'),
 ('17', 'double'),
 ('18', 'double'),
 ('19', 'double'),
 ('20', 'double'),
 ('21', 'double'),
 ('22', 'double'),
 ('23', 'double'),
 ('24', 'double'),
 ('25', 'double'),
 ('26', 'double'),
 ('27', 'double'),
 ('28', 'double'),
 ('29', 'double'),
 ('30', 'double'),
 ('31', 'double'),
 ('32', 'double'),
 ('33', 'double'),
 ('34', 'double'),
 ('45', 'double'),
 ('46', 'double'),
 ('47', 'double'),
 ('48', 'double'),
 ('49', 'double'),
 ('50', 'double'),
 ('51', 'double'),
 ('52', 'double'),
 ('53', 'double'),
 ('54', 'd

df_data3['16'].filter(df_data3['16']> 4) = NULL

In [32]:
from pyspark.sql.functions import udf

In [33]:
from pyspark.sql.types import StringType,DoubleType

In [34]:
def addNA1(value,colname,dfspark):
    new_column_udf=udf(lambda name: None if name > value else name, DoubleType())
    return dfspark.withColumn(colname,new_column_udf(dfspark[colname]))

In [35]:
add_col=[[4,'16'],[10,'33'],[6,'45'],[5,'145'],[5,'67'],[5,'70'],[5,'73']]

In [36]:
test_new_df=df_data3

In [37]:
for pair in add_col:
    value,colname=pair
    test_new_df=addNA1(value,colname,test_new_df)
    

In [38]:
df_data3.where(df_data3["16"]>4).count()

1673

In [39]:
test_new_df.where(test_new_df['16'].isNull()).count()

1769

In [40]:
def addNA2(colname,dfspark):
    new_column_udf=udf(lambda name: None if name == 5 else name, DoubleType())
    return dfspark.withColumn(colname,new_column_udf(dfspark[colname]))

In [41]:
addna_col=[17,18,19,20,21,23,25,26,28,29,34,144]
addna_col=map(lambda x:str(x),addna_col)

In [42]:
for colname in addna_col:
    test_new_df=addNA2(colname,test_new_df)
    

In [43]:
test_new_df.dtypes

[('age', 'int'),
 ('BXNT_GENDER', 'string'),
 ('BXTN_RACE', 'string'),
 ('BXTN_RELIGION', 'string'),
 ('BXTN_LANGUAGE', 'string'),
 ('BXTN_FACILITY', 'string'),
 ('BXTN_LGTH_OF_STAY', 'int'),
 ('BXTN_PAYOR_GRP', 'string'),
 ('BXTN_INCOME_DESC', 'string'),
 ('BXTN_EDUCATION_LEVEL', 'string'),
 ('PRMRY_DX_CD', 'string'),
 ('PRMRY_PROC_ID', 'string'),
 ('DRG_MPI_CD', 'int'),
 ('BNFT_PLAN_ID', 'int'),
 ('13', 'double'),
 ('14', 'double'),
 ('15', 'double'),
 ('16', 'double'),
 ('17', 'double'),
 ('18', 'double'),
 ('19', 'double'),
 ('20', 'double'),
 ('21', 'double'),
 ('22', 'double'),
 ('23', 'double'),
 ('24', 'double'),
 ('25', 'double'),
 ('26', 'double'),
 ('27', 'double'),
 ('28', 'double'),
 ('29', 'double'),
 ('30', 'double'),
 ('31', 'double'),
 ('32', 'double'),
 ('33', 'double'),
 ('34', 'double'),
 ('45', 'double'),
 ('46', 'double'),
 ('47', 'double'),
 ('48', 'double'),
 ('49', 'double'),
 ('50', 'double'),
 ('51', 'double'),
 ('52', 'double'),
 ('53', 'double'),
 ('54', 'd

In [44]:
# mark all the questions, category them into topic
# teamwork, the lower the better

In [45]:
df_teamwork=test_new_df.withColumn('teamwork',4.75 - test_new_df['45'] * 0.75 )

In [46]:
from pyspark.sql.functions import col,avg,struct

In [47]:
df_teamwork.dtypes

[('age', 'int'),
 ('BXNT_GENDER', 'string'),
 ('BXTN_RACE', 'string'),
 ('BXTN_RELIGION', 'string'),
 ('BXTN_LANGUAGE', 'string'),
 ('BXTN_FACILITY', 'string'),
 ('BXTN_LGTH_OF_STAY', 'int'),
 ('BXTN_PAYOR_GRP', 'string'),
 ('BXTN_INCOME_DESC', 'string'),
 ('BXTN_EDUCATION_LEVEL', 'string'),
 ('PRMRY_DX_CD', 'string'),
 ('PRMRY_PROC_ID', 'string'),
 ('DRG_MPI_CD', 'int'),
 ('BNFT_PLAN_ID', 'int'),
 ('13', 'double'),
 ('14', 'double'),
 ('15', 'double'),
 ('16', 'double'),
 ('17', 'double'),
 ('18', 'double'),
 ('19', 'double'),
 ('20', 'double'),
 ('21', 'double'),
 ('22', 'double'),
 ('23', 'double'),
 ('24', 'double'),
 ('25', 'double'),
 ('26', 'double'),
 ('27', 'double'),
 ('28', 'double'),
 ('29', 'double'),
 ('30', 'double'),
 ('31', 'double'),
 ('32', 'double'),
 ('33', 'double'),
 ('34', 'double'),
 ('45', 'double'),
 ('46', 'double'),
 ('47', 'double'),
 ('48', 'double'),
 ('49', 'double'),
 ('50', 'double'),
 ('51', 'double'),
 ('52', 'double'),
 ('53', 'double'),
 ('54', 'd

In [48]:
df_test_avg = sqlcontext.createDataFrame(
    [(1, None, 23.0), (None, 2, -23.0),(2,3,4.0),(None,None,None)], ("x1", "x2", "x3"))


In [49]:
df_test_avg.show()

+----+----+-----+
|  x1|  x2|   x3|
+----+----+-----+
|   1|null| 23.0|
|null|   2|-23.0|
|   2|   3|  4.0|
|null|null| null|
+----+----+-----+



In [50]:
avg_row=udf(lambda row:\
            sum([x for x in row if x!=None])/(len([x for x in row if x!=None])) if len([x for x in row if x!=None])!=0 \
                                              else None ,DoubleType())

In [51]:
new_df = df_test_avg.withColumn("avg_row",avg_row(struct([df_test_avg[x] for x in df_test_avg.columns])))

In [52]:
new_df.show()

+----+----+-----+-------+
|  x1|  x2|   x3|avg_row|
+----+----+-----+-------+
|   1|null| 23.0|   12.0|
|null|   2|-23.0|  -10.5|
|   2|   3|  4.0|    3.0|
|null|null| null|   null|
+----+----+-----+-------+



In [53]:
test_nurse=df_teamwork.withColumn("nurse",avg_row(struct([df_teamwork['13'],df_teamwork['14'],df_teamwork['15']])))

test_nurse.show(2)

In [54]:
df_doctor=test_nurse.withColumn('doctor',avg_row(struct([df_teamwork['17'],df_teamwork['18'],df_teamwork['19']])))

In [55]:
df_enviroment=df_doctor.withColumn('environment',avg_row(struct([df_teamwork['20'],df_teamwork['21']])))

In [56]:
df_pain=df_enviroment.withColumn('pain',avg_row(struct([df_teamwork['25'],df_teamwork['26']])))

In [57]:
df_medication=df_pain.withColumn('medication',avg_row(struct([df_teamwork['28'],df_teamwork['29']])))

In [58]:
df_discharge=df_medication.withColumn('discharge',7-3*avg_row(struct([df_teamwork['31'],df_teamwork['32']])))

In [59]:
df_response=df_discharge.withColumn('response',avg_row(struct([df_teamwork['23'],df_teamwork['16']])))

In [60]:
df_overall=df_response.withColumn('overall',\
                                  (df_teamwork['33']*0.4+df_teamwork['34']+ 4.75 - df_teamwork['73'] * 0.75)/3)

In [61]:
df_overall.columns

['age',
 'BXNT_GENDER',
 'BXTN_RACE',
 'BXTN_RELIGION',
 'BXTN_LANGUAGE',
 'BXTN_FACILITY',
 'BXTN_LGTH_OF_STAY',
 'BXTN_PAYOR_GRP',
 'BXTN_INCOME_DESC',
 'BXTN_EDUCATION_LEVEL',
 'PRMRY_DX_CD',
 'PRMRY_PROC_ID',
 'DRG_MPI_CD',
 'BNFT_PLAN_ID',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '26',
 '27',
 '28',
 '29',
 '30',
 '31',
 '32',
 '33',
 '34',
 '45',
 '46',
 '47',
 '48',
 '49',
 '50',
 '51',
 '52',
 '53',
 '54',
 '55',
 '56',
 '57',
 '58',
 '59',
 '60',
 '61',
 '62',
 '63',
 '64',
 '65',
 '67',
 '69',
 '70',
 '73',
 '143',
 '144',
 '145',
 'teamwork',
 'nurse',
 'doctor',
 'environment',
 'pain',
 'medication',
 'discharge',
 'response',
 'overall']

In [62]:
df_score=df_overall.withColumn('care',avg_row(struct([df_teamwork['143'],df_teamwork['145'],df_teamwork['144']])))

In [63]:
df_respect=df_score.withColumn('respect',4.75 -  df_teamwork['67']*0.75)

In [64]:
df_safety=df_respect.withColumn('safety',4.75 -  df_teamwork['70']*0.75)

In [65]:
df_age2=df_safety.withColumn('age2',df_teamwork['age']**2)

In [66]:
df_age2.columns

['age',
 'BXNT_GENDER',
 'BXTN_RACE',
 'BXTN_RELIGION',
 'BXTN_LANGUAGE',
 'BXTN_FACILITY',
 'BXTN_LGTH_OF_STAY',
 'BXTN_PAYOR_GRP',
 'BXTN_INCOME_DESC',
 'BXTN_EDUCATION_LEVEL',
 'PRMRY_DX_CD',
 'PRMRY_PROC_ID',
 'DRG_MPI_CD',
 'BNFT_PLAN_ID',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '26',
 '27',
 '28',
 '29',
 '30',
 '31',
 '32',
 '33',
 '34',
 '45',
 '46',
 '47',
 '48',
 '49',
 '50',
 '51',
 '52',
 '53',
 '54',
 '55',
 '56',
 '57',
 '58',
 '59',
 '60',
 '61',
 '62',
 '63',
 '64',
 '65',
 '67',
 '69',
 '70',
 '73',
 '143',
 '144',
 '145',
 'teamwork',
 'nurse',
 'doctor',
 'environment',
 'pain',
 'medication',
 'discharge',
 'response',
 'overall',
 'care',
 'respect',
 'safety',
 'age2']

In [67]:
len(df_age2.columns)

77

In [68]:
colIdSelected = [x for x in range(10)]+[x for x in range(64,77)]

In [69]:
colIdSelected

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76]

In [70]:
colNameSelected = [df_age2.columns[x] for x in colIdSelected]

In [71]:
colNameSelected

['age',
 'BXNT_GENDER',
 'BXTN_RACE',
 'BXTN_RELIGION',
 'BXTN_LANGUAGE',
 'BXTN_FACILITY',
 'BXTN_LGTH_OF_STAY',
 'BXTN_PAYOR_GRP',
 'BXTN_INCOME_DESC',
 'BXTN_EDUCATION_LEVEL',
 'teamwork',
 'nurse',
 'doctor',
 'environment',
 'pain',
 'medication',
 'discharge',
 'response',
 'overall',
 'care',
 'respect',
 'safety',
 'age2']

In [72]:
df_age2.where(df_age2['medication'].isNull()).count()

5177

In [73]:
df_age2.count()

11469

In [74]:
df_data4=df_age2.na.drop(subset=colNameSelected)

In [75]:
df_data4.count()

3531

In [76]:
len(df_data4.columns)

77

In [77]:
df_age2.where(df_age2['pain'].isNull()).count()

2741

#convert string to factor

In [78]:
from pyspark.ml.feature import StringIndexer

In [79]:
df = sqlcontext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])
print df.dtypes
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

[('id', 'bigint'), ('category', 'string')]
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



In [80]:
df_data4.dtypes

[('age', 'int'),
 ('BXNT_GENDER', 'string'),
 ('BXTN_RACE', 'string'),
 ('BXTN_RELIGION', 'string'),
 ('BXTN_LANGUAGE', 'string'),
 ('BXTN_FACILITY', 'string'),
 ('BXTN_LGTH_OF_STAY', 'int'),
 ('BXTN_PAYOR_GRP', 'string'),
 ('BXTN_INCOME_DESC', 'string'),
 ('BXTN_EDUCATION_LEVEL', 'string'),
 ('PRMRY_DX_CD', 'string'),
 ('PRMRY_PROC_ID', 'string'),
 ('DRG_MPI_CD', 'int'),
 ('BNFT_PLAN_ID', 'int'),
 ('13', 'double'),
 ('14', 'double'),
 ('15', 'double'),
 ('16', 'double'),
 ('17', 'double'),
 ('18', 'double'),
 ('19', 'double'),
 ('20', 'double'),
 ('21', 'double'),
 ('22', 'double'),
 ('23', 'double'),
 ('24', 'double'),
 ('25', 'double'),
 ('26', 'double'),
 ('27', 'double'),
 ('28', 'double'),
 ('29', 'double'),
 ('30', 'double'),
 ('31', 'double'),
 ('32', 'double'),
 ('33', 'double'),
 ('34', 'double'),
 ('45', 'double'),
 ('46', 'double'),
 ('47', 'double'),
 ('48', 'double'),
 ('49', 'double'),
 ('50', 'double'),
 ('51', 'double'),
 ('52', 'double'),
 ('53', 'double'),
 ('54', 'd

In [81]:
df_data4.dtypes[0][1]

'int'

In [82]:
colCat=[df_data4.dtypes[x][0] for x in range(len(df_data4.dtypes)) if df_data4.dtypes[x][1]=='string' ]

In [83]:
colCat

['BXNT_GENDER',
 'BXTN_RACE',
 'BXTN_RELIGION',
 'BXTN_LANGUAGE',
 'BXTN_FACILITY',
 'BXTN_PAYOR_GRP',
 'BXTN_INCOME_DESC',
 'BXTN_EDUCATION_LEVEL',
 'PRMRY_DX_CD',
 'PRMRY_PROC_ID']

colCat.append('BNFT_PLAN_ID')

colCat.append('DRG_MPI_CD')

colCat

In [84]:
df_data4.toPandas()['BXNT_GENDER'].unique()

array([u'F', u'M'], dtype=object)

In [85]:
indexer = StringIndexer(inputCol='BXNT_GENDER', outputCol='BXNT_GENDERIndex')
indexed2 = indexer.fit(df_data4).transform(df_data4)

In [86]:
for col in colCat:
    if col != "BXNT_GENDER":
        print col,col+"Index"
        colIndex=col+"Index"
        indexer = StringIndexer(inputCol=col, outputCol=colIndex)
        indexed2 = indexer.fit(indexed2).transform(indexed2)

    

BXTN_RACE BXTN_RACEIndex
BXTN_RELIGION BXTN_RELIGIONIndex
BXTN_LANGUAGE BXTN_LANGUAGEIndex
BXTN_FACILITY BXTN_FACILITYIndex
BXTN_PAYOR_GRP BXTN_PAYOR_GRPIndex
BXTN_INCOME_DESC BXTN_INCOME_DESCIndex
BXTN_EDUCATION_LEVEL BXTN_EDUCATION_LEVELIndex
PRMRY_DX_CD PRMRY_DX_CDIndex
PRMRY_PROC_ID PRMRY_PROC_IDIndex


In [87]:
indexed2.first() 

Row(age=68, BXNT_GENDER=u'F', BXTN_RACE=u'Caucasian', BXTN_RELIGION=u'CATHOLIC', BXTN_LANGUAGE=u'English', BXTN_FACILITY=u'PARENT MERCY HOSPITAL ST LOUIS', BXTN_LGTH_OF_STAY=4, BXTN_PAYOR_GRP=u'SELF PAY', BXTN_INCOME_DESC=u'$25,000-$34,999', BXTN_EDUCATION_LEVEL=u'HIGH SCHOOL DIPLOMA', PRMRY_DX_CD=u'174.9', PRMRY_PROC_ID=u'85.41', DRG_MPI_CD=581, BNFT_PLAN_ID=3775, 13=2.0, 14=3.0, 15=2.0, 16=2.0, 17=4.0, 18=4.0, 19=4.0, 20=2.0, 21=4.0, 22=2.0, 23=None, 24=1.0, 25=4.0, 26=4.0, 27=1.0, 28=4.0, 29=2.0, 30=1.0, 31=1.0, 32=1.0, 33=5.0, 34=2.0, 45=4.0, 46=2.0, 47=None, 48=None, 49=None, 50=None, 51=None, 52=None, 53=None, 54=None, 55=None, 56=None, 57=None, 58=None, 59=None, 60=None, 61=None, 62=None, 63=None, 64=None, 65=None, 67=3.0, 69=3.0, 70=3.0, 73=4.0, 143=3.0, 144=1.0, 145=3.0, teamwork=1.75, nurse=2.3333333333333335, doctor=4.0, environment=3.0, pain=4.0, medication=3.0, discharge=4.0, response=2.0, overall=1.9166666666666667, care=2.3333333333333335, respect=2.5, safety=2.5, age2=4

In [88]:
from pyspark.ml.regression import LinearRegression

In [130]:
lr = LinearRegression(maxIter=10)#, regParam=0.3, elasticNetParam=0.8)

In [90]:
from pyspark.mllib.regression import LabeledPoint

In [109]:
training=indexed2.select("teamwork", "nurse","discharge")\
  .map(lambda r: LabeledPoint(r[0], r[1:]))


In [110]:
training.cache()

PythonRDD[2016] at RDD at PythonRDD.scala:43

In [111]:
indexed2.cache()

DataFrame[age: int, BXNT_GENDER: string, BXTN_RACE: string, BXTN_RELIGION: string, BXTN_LANGUAGE: string, BXTN_FACILITY: string, BXTN_LGTH_OF_STAY: int, BXTN_PAYOR_GRP: string, BXTN_INCOME_DESC: string, BXTN_EDUCATION_LEVEL: string, PRMRY_DX_CD: string, PRMRY_PROC_ID: string, DRG_MPI_CD: int, BNFT_PLAN_ID: int, 13: double, 14: double, 15: double, 16: double, 17: double, 18: double, 19: double, 20: double, 21: double, 22: double, 23: double, 24: double, 25: double, 26: double, 27: double, 28: double, 29: double, 30: double, 31: double, 32: double, 33: double, 34: double, 45: double, 46: double, 47: double, 48: double, 49: double, 50: double, 51: double, 52: double, 53: double, 54: double, 55: double, 56: double, 57: double, 58: double, 59: double, 60: double, 61: double, 62: double, 63: double, 64: double, 65: double, 67: double, 69: double, 70: double, 73: double, 143: double, 144: double, 145: double, teamwork: double, nurse: double, doctor: double, environment: double, pain: double, 

In [112]:
training.first()

LabeledPoint(1.75, [2.33333333333,4.0])

In [136]:
modelA = lr.fit(training.toDF(),{lr.regParam:0.0})

In [137]:
from pyspark.mllib.regression import LinearRegressionWithSGD

In [139]:
modelB=LinearRegressionWithSGD.train(training, iterations=10, step=0.1)

In [125]:
pred=modelA.transform(training.toDF())

In [126]:
pred.first()

Row(features=DenseVector([2.3333, 4.0]), label=1.75, prediction=2.4369306310862187)

In [144]:
modelB.save(sc,"/Users/Oldjun/Desktop/webapp/hospital/sparkModel.test")

In [127]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(pred)
print("ModelA: Root Mean Squared Error = " + str(RMSE))

ModelA: Root Mean Squared Error = 0.51900147363


In [128]:
modelA.save(sc,"/Users/Oldjun/Desktop/webapp/hospital/sparkModel.test")

AttributeError: 'LinearRegressionModel' object has no attribute 'save'