In [2]:
from pyspark import SparkContext
from pyspark import SparkConf

import pandas as pd
import numpy as np

sc = SparkContext.getOrCreate()
sc

In [4]:
# load data
inputRDD = sc.textFile("data/case_transpose.csv")
inputRDD2 = sc.textFile("data/ctrl_transpose.csv")

adRDD = inputRDD.map(lambda line: line.split(","))
addCtrl = inputRDD2.map(lambda line: line.split(","))

genes = adRDD.first()
genes_test = addCtrl.first()

# combine the two csv files into 1 rdd

adRDD = adRDD.union(addCtrl)


# each row corresponds to 1 person & all their genes
# first we want to separate gene names from rdd
adRDD = adRDD.filter(lambda line: line != genes).cache()
# each column represents 1 gene
# making sure they're in the same order
#print(genes_test==genes)

#adRDD = adRDD.map(lambda line: array())

In [5]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# put into familiar pandas dataframe
df = adRDD.toDF(genes).toPandas()

# binary classification, we want to have 1 vs 0
df.Classes = df.Classes.replace('1', '0')

df.Classes = df.Classes.replace('2', '1')

df = df.replace('?', np.nan)

# need to change the dtypes from object to float
df = df.apply(pd.to_numeric, errors='coerce')

df.head(10)

# our dependent var is Classes; 1:case, 0:ctrl


Unnamed: 0,Classes,GI_10047091-S,GI_10047093-S,GI_10047103-S,GI_10047133-A,GI_10092596-S,GI_10092600-S,GI_10092616-S,GI_10092618-S,GI_10092672-S,...,GI_9257221-S,GI_9257224-S,GI_9257226-S,GI_9257237-S,GI_9257239-A,GI_9257241-S,GI_9257243-S,GI_9257244-A,GI_9257245-I,GI_9257247-S
0,1,0.4591279,0.03779276,-0.005020818,0.39648322,0.294007184,0.264441563,0.435411052,-0.535313981,0.160455637,...,-0.704773321,-0.375528027,-0.512365812,0.240620669,-0.030797817,0.222405276,-0.003962327,0.357551153,0.229803689,-0.022660789
1,1,-1.98483532,?,0.076412305,?,-0.165881642,0.422382366,-0.248214585,1.018654669,-0.348808229,...,1.432850886,1.489628701,0.039887188,-0.444672848,-0.648193713,?,0.097789709,0.434896894,0.520601613,-0.003773875
2,1,1.004901865,0.425738476,-0.099454249,-0.170768956,-0.731452529,-0.253671409,-0.32073863,-0.520905463,0.544893005,...,-0.665362661,-0.709329618,0.202727616,0.147759253,0.366218698,-0.256029268,-0.09144285,0.111413713,-0.2340353,-0.631228873
3,1,0.362305918,0.170206404,-0.120559105,-0.103302417,0.411346037,-0.304337136,-0.326256636,-0.914468647,0.18123125,...,-0.722824874,-0.984979541,-0.160732003,-0.066951252,0.096362738,0.278873015,0.026118062,-0.002167299,-0.123767053,-0.103868843
4,1,-1.503367735,-0.474718088,0.193405902,-0.469004315,-0.345926056,0.610510871,-0.066067217,0.44140508,-0.357782042,...,0.824124899,1.240138925,1.094078751,-0.156186396,-0.416933013,-1.045042931,-0.023954806,0.419680012,-0.109642522,0.438731386


In [7]:
# Deal with NaN values:

from sklearn.preprocessing import Imputer

imputer = Imputer(missing_values="NaN", strategy="mean", axis=1)

imputer = imputer.fit(df)

imputed_df = imputer.transform(df.values)

imputed_df = pd.DataFrame(imputed_df)

imputed_df.columns = genes
imputed_df.head(10)


In [9]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

# back to spark df
imputed_df = sqlContext.createDataFrame(imputed_df)

In [24]:
from pyspark.mllib.regression import LabeledPoint

# put into LabeledPoints for mllib classification models
temp = imputed_df.rdd.map(lambda line: LabeledPoint(line[0], line[1:]))

# divide into training and testing sets
train, test = temp.randomSplit([0.7, 0.3], seed=666)

train.cache()
test.cache()


#test.take(1)



[LabeledPoint(1.0, [0.4591279,0.03779276,-0.005020818,0.39648322,0.294007184,0.264441563,0.435411052,-0.535313981,0.160455637,-0.192908817,0.415754573,0.000660222,0.252676602,0.098847857,-0.233628792,0.0231720118334,-0.537014762,-0.220556177,-0.239079073,0.356816538,0.316680614,0.404142304,-0.240919808,0.085540082,0.378998373,0.568005602,1.119834484,0.0231720118334,-0.210537934,0.357650506,-0.129227145,0.579174397,-0.220001247,-0.281574347,-0.172654165,0.252403981,0.313719871,-0.07930719,-0.056770609,0.133663413,-0.654972028,0.106016457,0.100912458,-0.015925726,-0.086447307,0.0231720118334,-0.491874924,-0.292020065,0.0231720118334,-0.72252348,0.061387728,-0.292313234,0.055905274,1.192639253,0.760487903,0.043691714,0.123676794,-0.029003942,-0.588053153,0.071157799,0.100638508,0.070832014,-0.206530953,-0.310559717,0.257341194,0.106168643,0.276507059,-0.215949792,0.431566804,0.220750263,0.197015981,-0.187315185,0.577897681,-0.030945051,0.434461705,0.381954223,0.103255191,0.184093336,-0.21

In [38]:
############# DECISION TREE - Classification #################


from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

model = DecisionTree.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', maxDepth=30, maxBins=32)

predictions = model.predict(test.map(lambda x: x.features))

labels_preds = test.map(lambda lp: lp.label).zip(predictions)

test_err = labels_preds.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())
print("Decision Tree Classification Model Accuracy: %0.3f" % (1 - test_err))



Decision Tree Classification Model Accuracy: 0.640


In [37]:
############# DECISION TREE - Regression #################

model2 = DecisionTree.trainRegressor(train, categoricalFeaturesInfo={}, impurity='variance', maxDepth=30, maxBins=32)

preds2 = model2.predict(test.map(lambda x: x.features))

labels_preds2 = test.map(lambda lp: lp.label).zip(preds2)

mse = labels_preds2.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() / float(test.count())

print("Decision Tree Regression Model Accuracy: %0.3f" % (1 - mse))


Decision Tree Regression Model Accuracy: 0.640


In [42]:
from pyspark.mllib.tree import RandomForest, RandomForestModel


model3 = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={}, numTrees=15, featureSubsetStrategy="auto", impurity="gini", maxDepth=5, maxBins=32)

preds3 = model3.predict(test.map(lambda x: x.features))

labels_preds3 = test.map(lambda lp: lp.label).zip(preds3)

mse2 = labels_preds3.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() / float(test.count())

print("Random Forest Classification Model Accuracy: %0.3f" % (1 - mse2))

Random Forest Classification Model Accuracy: 0.658


In [44]:
##################### PCA ############################
# put the data (excluding labels) into vectors
features = imputed_df.rdd.map(lambda row: (Vectors.dense(row[1:]),))


# features.count()
#features.take(3)

vec_df = features.toDF(["features"])

vec_df.take(1)

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(vec_df)

# java.lang.OutOfMemoryError: Java heap space


Py4JJavaError: An error occurred while calling o1568.fit.
: java.lang.OutOfMemoryError: Java heap space
	at breeze.linalg.svd$.breeze$linalg$svd$$doSVD_Double(svd.scala:93)
	at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:40)
	at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:39)
	at breeze.generic.UFunc$class.apply(UFunc.scala:48)
	at breeze.linalg.svd$.apply(svd.scala:23)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:389)
	at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:48)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:99)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:70)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [16]:
from numpy import array
from numpy import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

parsed = imputed_df.rdd.map(lambda line: array(line[1:]))

clusters = KMeans.train(parsed, 2, maxIterations=10, initializationMode="random")

def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsed.map(lambda point: error(point)).reduce(lambda x, y: x + y)

print(WSSSE)


13308.8773496
