In [28]:
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.regexp_replace
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions.lit
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.ml.feature.Normalizer

In [42]:
def setHadoopConfig(name: String) = {
    val prefix = "fs.swift2d.service." + name
    sc.hadoopConfiguration.set(prefix + ".auth.url", "https://identity.open.softlayer.com" + "/v3/auth/tokens")
    sc.hadoopConfiguration.set(prefix + ".auth.endpoint.prefix","endpoints")
    sc.hadoopConfiguration.set(prefix + ".tenant", "a9fb4d478e3d40a8bbd54c5a2ecf25a3")
    sc.hadoopConfiguration.set(prefix + ".username", "6a4cc8251c1940179a6cccc9098a15e0")
    sc.hadoopConfiguration.set(prefix + ".password", "kDTcKA2H(3eo5.G0")
    sc.hadoopConfiguration.setInt(prefix + ".http.port", 8080)
    sc.hadoopConfiguration.set(prefix + ".region", "dallas")
    sc.hadoopConfiguration.setBoolean(prefix + ".public", false)
    
    sc.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", true)
}

val name = "keystone"
setHadoopConfig(name)

In [30]:
/*val caseNumbers = Array(2, 3, 4, 5, 8, 9, 10, 11, 12)*/
/*don't work:
3,4,5
IllegalArgumentException: requirement failed: Decimal precision 6 exceeds max precision 5
do work:
1,2,8,9,10,11,12
*/
/* load in the first file to append (union) to */
var cases = spark.read.option("header",false).
    option("inferSchema",true).
    option("nanValue","0").
    option("nullValue","0").
    option("positiveInf","0").
    option("negativeInf","0").
    /*schema(schema).*/
    csv("swift2d://MGH." + name + "/csv/csv_case1.csv").
    withColumn("timeId", monotonically_increasing_id.cast(LongType)).
    withColumn("patId", lit(1).cast(LongType));
cases.cache()

1


[_c0: double, _c1: double ... 576 more fields]

In [6]:
val caseNumbers = Array(2,8,9,10,11,12)
/*loop through the rest of the files and append them to the first one*/
for(num <- caseNumbers){
    var caseTemp = spark.read.option("header",false).
            option("inferSchema",true).
            option("nanValue","0").
            option("nullValue","0").
            option("positiveInf","0").
            option("negativeInf","0").
            /*schema(schema).*/
            csv("swift2d://MGH." + name + "/csv/csv_case"+num.toString+".csv").
            withColumn("timeId", monotonically_increasing_id.cast(LongType)).
            withColumn("patId", lit(num).cast(LongType))
    cases = cases.union(caseTemp)
}
cases = cases.repartition(200)

In [31]:
/* files that had Inf need to have the inf changed to 0 and made into a decimal type */
for(col <- cases.schema){
    if(col.dataType == StringType){
        cases = cases.withColumn(col.name + "temp", 
                                 regexp_replace(
                                     regexp_replace(cases(col.name),"Inf","0")
                                     ,"NaN","0").cast(DoubleType))
        cases = cases.drop(col.name).withColumnRenamed(col.name + "temp",col.name)
    }
}
/* replace NaN with 0*/
cases = cases.na.fill(0)

In [32]:
println(cases.count())

43489


In [33]:
/*features are all columns exept patient id and timeid*/
val feature_cols = cases.columns.filter(x => (x != "timeId" && x != "patId"))
val assembler = new VectorAssembler().setInputCols(feature_cols).setOutputCol("features")
cases = assembler.transform(cases)

In [34]:
/*Normalize the features*/
val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures")
cases = normalizer.transform(cases)

In [35]:
/*pca with k = 100*/
val pcaModel = new PCA().setInputCol("normFeatures").setOutputCol("pcaFeatures").setK(100).fit(cases)

cases = pcaModel.transform(cases)
cases = cases.cache()

In [36]:
cases.limit(2).collect().foreach(println)

[2.2543,2.2802,3.2058,2.7831,2.2414,2.3961,3.1904,2.8007,2.2199,2.3412,3.0924,2.7539,1.973,2.2811,3.0617,2.9103,23.097,3.1503,3.0308,3.4753,2.9078,3.1089,3.6284,4.1932,3.0422,3.6411,254330.0,226880.0,341440.0,284210.0,109680.0,113230.0,125590.0,124770.0,53409.0,56575.0,60883.0,60030.0,12915.0,13836.0,13558.0,15694.0,62.59,56.045,118.16,97.662,49.497,53.792,87.745,83.527,47.886,54.141,82.052,77.989,42.201,45.07,73.033,81.49,230.36,149.73,464.07,249.64,74.399,79.464,128.04,125.21,72.525,80.443,116.56,114.27,60.959,59.875,98.354,111.16,0.8756,0.8617,0.8544,0.88318,0.85872,0.84645,0.80648,0.85755,0.85065,0.84798,0.79293,0.84303,0.8352,0.83428,0.768,0.83579,0.15363,0.17806,0.18379,0.15705,0.16583,0.18037,0.22301,0.17244,0.17223,0.18817,0.24149,0.18405,0.1838,0.19848,0.28407,0.2061,0.029691,0.031772,0.032978,0.026176,0.037034,0.038164,0.04558,0.036112,0.0382,0.039156,0.049234,0.043254,0.043432,0.046448,0.055345,0.046391,0.0059702,0.006583,0.0086975,0.0060033,0.0072352,0.0079132,0.01156,0.007

In [9]:
/*saving to parquet to be labeled with t-sne coordinates*/
/*cases.write.parquet("swift2d://MGH." + name + "/tempParq/7cases.parquet")*/

### 15 < k < 25

k: 2
Within Set Sum of Squared Errors = 2.9112884730548816E16
k: 4
Within Set Sum of Squared Errors = 1.7992007329050206E16
k: 6
Within Set Sum of Squared Errors = 1.4551705278862694E16
k: 8
Within Set Sum of Squared Errors = 1.2885932983311728E16
k: 10
Within Set Sum of Squared Errors = 1.11832826573762E16
k: 12
Within Set Sum of Squared Errors = 9.081206983606092E15
k: 14
Within Set Sum of Squared Errors = 8.522317069652645E15
k: 16
Within Set Sum of Squared Errors = 7.432428413580828E15
k: 18
Within Set Sum of Squared Errors = 7.576234664949629E15
k: 20
Within Set Sum of Squared Errors = 6.743778287620577E15
k: 22
Within Set Sum of Squared Errors = 6.43607511491788E15
k: 24
Within Set Sum of Squared Errors = 5.954476368201517E15
k: 26
Within Set Sum of Squared Errors = 5.743758395644838E15
k: 28
Within Set Sum of Squared Errors = 5.479222949405443E15
k: 30
Within Set Sum of Squared Errors = 5.348950188771225E15
k: 32
Within Set Sum of Squared Errors = 4.954711625236001E15
k: 34
Within Set Sum of Squared Errors = 4.835574388374279E15
k: 36
Within Set Sum of Squared Errors = 4.815777105637914E15
k: 38
Within Set Sum of Squared Errors = 4.724896615554858E15
k: 40
Within Set Sum of Squared Errors = 4.611871574389138E15

In [20]:
/*kmeans with wsse.  A good k could be between 15 and 25*/
/*for(a <- 10 to 30){
    println( "k: " + a )
    var kmeans = new KMeans().setFeaturesCol("pcaFeatures").setK(a)
    var model = kmeans.fit(cases)

    var WSSSE = model.computeCost(cases)
    println(s"Within Set Sum of Squared Errors = $WSSSE")
}*/

k: 10
Within Set Sum of Squared Errors = 9.541751238917884E15
k: 11
Within Set Sum of Squared Errors = 9.405817615503538E15
k: 12
Within Set Sum of Squared Errors = 8.851807434347111E15
k: 13
Within Set Sum of Squared Errors = 8.802021290934255E15
k: 14
Within Set Sum of Squared Errors = 8.004543550440997E15
k: 15
Within Set Sum of Squared Errors = 8.693538279624738E15
k: 16
Within Set Sum of Squared Errors = 7.525560733484382E15
k: 17
Within Set Sum of Squared Errors = 7.402581031959187E15
k: 18
Within Set Sum of Squared Errors = 7.085525522699249E15
k: 19
Within Set Sum of Squared Errors = 6.946290508784244E15
k: 20
Within Set Sum of Squared Errors = 6.930587732736565E15
k: 21
Within Set Sum of Squared Errors = 6.649059351596769E15
k: 22
Within Set Sum of Squared Errors = 6.354151882042983E15
k: 23
Within Set Sum of Squared Errors = 6.179099280347436E15
k: 24
Within Set Sum of Squared Errors = 5.931319774884071E15
k: 25
Within Set Sum of Squared Errors = 5.633257710434543E15
k: 26
Wi

In [37]:
/*with 18 clusters, label all points*/
var kmeans = new KMeans().setFeaturesCol("pcaFeatures").setK(18)
var kmeansModel = kmeans.fit(cases)
cases = kmeansModel.transform(cases)

In [49]:
cases.where(cases("timeId") === 500).select("prediction","timeId","patId","pcaFeatures").limit(1).collect().foreach(println)

[13,500,1,[-0.04685002433859238,-0.7935471840317845,0.19989431722432696,-0.026035551309354452,0.5664650466672222,0.06485877813358862,0.008468982141301963,0.04293673545742587,0.005450003304402173,-0.006369962485789484,0.0023203553471608214,0.011859290628000134,-0.011182237849702818,-0.005515858544289832,-0.0031218237402375155,0.0019322493686246011,0.0027406603995360343,6.492429475409141E-4,0.003243301199501925,4.398310025842497E-5,-9.810990872155996E-4,0.0013569169519016542,2.865243300826866E-4,0.005529237196960912,-0.0024721306784581502,-9.580693655523808E-4,1.1556483005478381E-4,-4.698395211770148E-4,-0.001276741858660869,3.3375167419360564E-4,-1.9644571276423482E-4,-4.296207678401921E-4,-1.0853774722169394E-4,2.0247225703949636E-4,1.6112891660482537E-4,1.8952069068533086E-5,-2.0349702270293992E-4,8.723933697613574E-4,2.2118318724955378E-5,-7.206233565904178E-5,1.899151532771148E-4,5.951371918588293E-5,1.4388268777451005E-5,4.5621541477901785E-5,1.9049242552235702E-5,2.931571500209268

In [50]:
cases.write.option("compression","none").parquet("swift2d://MGH." + name + "/tempParq_2/1cases_2.parquet")

In [48]:
cases.write.csv("swift2d://MGH." + name + "/tempParq_2/case.csv")

Name: java.lang.UnsupportedOperationException
Message: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.
StackTrace:   at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.org$apache$spark$sql$execution$datasources$csv$CSVFileFormat$$verifyType$1(CSVFileFormat.scala:194)
  at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:198)
  at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:198)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.types.StructType.foreach(StructType.scala:95)
  at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.verifySchema(CSVFileFormat.scala:198)
  at org.apache.spark.sql.exec

In [40]:
/*if we pce with k = 2, we can use coordinates to visualize*/
/*this is run seperately from the above code*/
val twoPcaModel = new PCA().setInputCol("pcaFeatures").setOutputCol("twoFeat").setK(2).fit(cases)
var kmeansVis = twoPcaModel.transform(cases).select("twoFeat","prediction","timeId","patId")

In [41]:
kmeansVis.limit(2).collect().foreach(println)

[[0.04157707571065332,0.7632631161788315],13,0,1]
[[0.007653574020720221,0.7793599009085888],13,1,1]


In [21]:
var badData = spark.read.option("header",false).
    option("inferSchema",true).
    option("nanValue","0").
    option("nullValue","0").
    option("positiveInf","0").
    option("negativeInf","0").
    /*schema(schema).*/
    csv("swift2d://MGH." + name + "/csv/csv_case3.csv").
    withColumn("timeId", monotonically_increasing_id.cast(LongType)).
    withColumn("patId", lit(1).cast(LongType));
badData.printSchema

1
root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = t

In [23]:
/*create schema*/
import scala.collection.mutable.ArrayBuffer
var fields = ArrayBuffer[StructField]()
for(num <- 0 to 575){
    fields += StructField("_c" + num.toString,DecimalType(10,0))
}
var schema = StructType(fields)
schema.fields

Array(StructField(_c0,DecimalType(10,0),true), StructField(_c1,DecimalType(10,0),true), StructField(_c2,DecimalType(10,0),true), StructField(_c3,DecimalType(10,0),true), StructField(_c4,DecimalType(10,0),true), StructField(_c5,DecimalType(10,0),true), StructField(_c6,DecimalType(10,0),true), StructField(_c7,DecimalType(10,0),true), StructField(_c8,DecimalType(10,0),true), StructField(_c9,DecimalType(10,0),true), StructField(_c10,DecimalType(10,0),true), StructField(_c11,DecimalType(10,0),true), StructField(_c12,DecimalType(10,0),true), StructField(_c13,DecimalType(10,0),true), StructField(_c14,DecimalType(10,0),true), StructField(_c15,DecimalType(10,0),true), StructField(_c16,DecimalType(10,0),true), StructField(_c17,DecimalType(10,...

In [10]:
/* all double columns need to be made to decimal*/
import java.util.Calendar
println(Calendar.getInstance().getTime())
println(now)
for(col <- cases.schema){
    println(col.name)
    if(col.dataType == DoubleType){
        cases = cases.withColumn(col.name + "temp", cases(col.name).cast(DecimalType(10,0))).drop(col.name).withColumnRenamed(col.name + "temp",col.name)
    }
}
println(Calendar.getInstance().getTime())

Wed May 31 21:05:43 CDT 2017
Wed May 31 21:04:38 CDT 2017
_c0
_c1
_c2
_c3
_c4
_c5
_c6
_c7
_c8
_c9
_c10
_c11
_c12
_c13
_c14
_c15
_c30
_c31
_c32
_c33
_c34
_c35
_c36
_c37
_c38
_c39
_c40
_c41
_c42
_c43
_c44
_c45
_c46
_c47
_c48
_c49
_c50
_c51
_c52
_c53
_c54
_c55
_c56
_c57
_c58
_c59
_c60
_c61
_c62
_c63
_c64
_c65
_c66
_c67
_c68
_c69
_c70
_c71
_c72
_c73
_c74
_c75
_c76
_c77
_c78
_c79
_c80
_c81
_c82
_c83
_c84
_c85
_c86
_c87
_c88
_c89
_c90
_c91
_c92
_c93
_c94
_c95
_c96
_c97
_c98
_c99
_c100
_c101
_c102
_c103
_c104
_c105
_c106
_c107
_c108
_c109
_c110
_c111
_c112
_c113
_c114
_c115
_c116
_c117
_c118
_c119
_c120
_c121
_c122
_c123
_c124
_c125
_c126
_c127
_c128
_c129
_c130
_c131
_c132
_c133
_c134
_c135
_c136
_c137
_c138
_c139
_c140
_c141
_c142
_c143
_c144
_c145
_c146
_c147
_c148
_c149
_c150
_c151
_c152
_c153
_c154
_c155
_c156
_c157
_c158
_c159
_c160
_c161
_c162
_c163
_c164
_c165
_c166
_c167
_c168
_c169
_c170
_c171
_c172
_c173
_c174
_c175
_c176
_c177
_c178
_c179
_c180
_c181
_c182
_c183
_c184
_c185
_c186


Adding a schema didn't work for nan/inf values