In [1]:
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer,IndexToString
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.feature import IDF
# from pyspark.ml.feature import DCT
# from pyspark.ml.feature import PolynomialExpansion
# from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline
import itertools as it
import pyspark.sql.functions as f
import boto3

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1543193125837_0001,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# TODO 1. rename column names containing '.' (GS1-279B7.1, GS1-600G8.3 CAND1.11, HY.1)
#      2. read csv sep = ',' for cpv final matrix

In [2]:
spark = SparkSession.builder.appName("PrimaryBreastApp").getOrCreate()

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1543193125837_0001,pyspark,idle,Link,Link,✔


In [4]:
SparkContext.setSystemProperty('spark.driver.memory', '15g')

In [5]:
seed = 0

In [6]:
SparkContext.setSystemProperty('spark.executor.memory', '15g')
#sc._conf.getAll()

In [7]:
# helper function to get all stored variables
def list_dataframes():
    from pyspark.sql import DataFrame
    return [k for (k, v) in globals().items() if isinstance(v, DataFrame)]

In [8]:
from botocore.exceptions import ClientError

def check(s3, bucket, key):
    try:
        s3.head_object(Bucket=bucket, Key=key)
    except ClientError as e:
        return int(e.response['Error']['Code']) != 404
    return True

In [9]:
def getTrainingMetrics(trainingSummary,printout=True):
    # for multiclass, we can inspect metrics on a per-label basis
#     print("False positive rate by label:")
#     for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
#         print("label %d: %s" % (i, rate))

#     print("True positive rate by label:")
#     for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
#         print("label %d: %s" % (i, rate))

#     print("Precision by label:")
#     for i, prec in enumerate(trainingSummary.precisionByLabel):
#         print("label %d: %s" % (i, prec))

#     print("Recall by label:")
#     for i, rec in enumerate(trainingSummary.recallByLabel):
#         print("label %d: %s" % (i, rec))

    print("F-measure by label:")
    for i, f in enumerate(trainingSummary.fMeasureByLabel()):
        print("label %d: %s" % (i, f))

    accuracy = trainingSummary.accuracy
    falsePositiveRate = trainingSummary.weightedFalsePositiveRate
    truePositiveRate = trainingSummary.weightedTruePositiveRate
    fMeasure = trainingSummary.weightedFMeasure()
    precision = trainingSummary.weightedPrecision
    recall = trainingSummary.weightedRecall
    if printout is True:
        print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
          % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
    return {"accuracy": accuracy, "fpr": falsePositiveRate, "tpr": truePositiveRate, "fmeasure": fMeasure, \
            "precision": precision, "recall": recall}

In [10]:
# read data file
mirna_path = 's3://gdc-emr0/mirna_filtered_matrix.csv'
cpv_path = 's3://gdc-emr0/cpv_filtered_matrix.csv'
# mrna_path = 's3://gdc-emr0/mrna_filtered_matrix.csv'
# read mirna
df_mirna = spark.read.option("maxColumns", 22400).csv(
    mirna_path, header=True, sep = ',',mode="DROPMALFORMED",inferSchema=True)
# read cpv
df_cpv = spark.read.option("maxColumns", 22400).csv(
    cpv_path, header=True, sep = ',',mode="DROPMALFORMED",inferSchema=True)
# # read mrna
# df_mrna = spark.read.option("maxColumns", 22400).csv(
#     mrna_path, header=True, sep = ',',mode="DROPMALFORMED",inferSchema=True)

In [11]:
df_cpv = df_cpv.toDF(*(c.replace('.', '_') for c in df_cpv.columns))
# df_mrna = df_mrna.toDF(*(c.replace('.', '_') for c in df_mrna.columns))

In [12]:
# group columns by label, identifier and feature
label_columns = ['sample_type', 'disease_type', 'primary_diagnosis']
mirna_identifier_columns = ['sample_id','case_id']
mirna_feature_columns = [x for x in df_mirna.columns if x not in (label_columns+mirna_identifier_columns)]

In [13]:
# group columns by label, identifier and feature
cpv_identifier_columns= ['_c0','sample_id','case_id']
cpv_feature_columns = [x for x in df_cpv.columns if x not in (label_columns+cpv_identifier_columns)]
df_cpv = df_cpv.withColumnRenamed("sample_type", "sample_type_cpv").withColumnRenamed("disease_type", "disease_type_cpv").withColumnRenamed("primary_diagnosis","primary_diagnosis_cpv").withColumnRenamed("case_id", "case_id_cpv")
cpv_label_columns = ['sample_type_cpv', 'disease_type_cpv', 'primary_diagnosis_cpv']
cpv_identifier_columns=['_c0','sample_id','case_id_cpv']

In [None]:
# # group columns by label, identifier and feature
# # cpv_label_columns = ['sample_type', 'disease_type', 'primary_diagnosis']
# mrna_identifier_columns= ['_c0','sample_id','case_id']
# mrna_feature_columns = [x for x in df_mrna.columns if x not in (label_columns+mrna_identifier_columns)]

In [14]:
# convert features into (sparse) vectors
# mirna
assembler = VectorAssembler(inputCols=mirna_feature_columns, outputCol='features_mirna')
df_mirna = assembler.transform(df_mirna)
df_mirna=df_mirna.drop(*mirna_feature_columns)
# eventually we should store/load from HDFS

In [15]:
# cpv
assembler = VectorAssembler(inputCols=cpv_feature_columns, outputCol='features_cpv')
df_cpv = assembler.transform(df_cpv)
df_cpv = df_cpv.drop(*cpv_feature_columns)

In [16]:
df = df_cpv.join(df_mirna, on=['sample_id'], how='left_outer')

In [17]:
df_breast = df.where(df.disease_type_cpv == 'Breast Invasive Carcinoma')

In [19]:
df_breast.count()

1075

In [21]:
# convert string labels to numerical labels
# keep track of the column names of numerical labels
label_idx_columns = [s + '_idx' for s in cpv_label_columns]

# declare indexers for 3 columns
labelIndexer = [StringIndexer(inputCol=column, outputCol=column+'_idx',handleInvalid="error",
                              stringOrderType="frequencyDesc") for column in cpv_label_columns ]
# pipeline is needed to process a list of 3 labels
pipeline = Pipeline(stages=labelIndexer)
# transform 3 label columns from string to number catagoies 
df_breast = pipeline.fit(df_breast).transform(df_breast)

# create dictionary containing 3 label lists
label_dict = {c.name: c.metadata["ml_attr"]["vals"]
for c in df.schema.fields if c.name.endswith("_idx")}

In [22]:
# full-feature
full_feature_columns=['features_mirna','features_cpv']
assembler = VectorAssembler(inputCols=full_feature_columns, outputCol='full_features')
df_breast = assembler.transform(df_breast)
# df = df.drop(*cpv_feature_columns)

In [23]:
df_breast.select('primary_diagnosis_cpv','disease_type_cpv').show()

DataFrame[sample_id: string, sample_type_cpv: string, disease_type_cpv: string, primary_diagnosis_cpv: string, case_id_cpv: string, features_cpv: vector, sample_type: string, disease_type: string, primary_diagnosis: string, case_id: string, features_mirna: vector, sample_type_cpv_idx: double, disease_type_cpv_idx: double, primary_diagnosis_cpv_idx: double, full_features: vector]

In [28]:
# create dictionary containing 3 label lists
label_dict = {c.name: c.metadata["ml_attr"]["vals"]
for c in df_breast.schema.fields if c.name.endswith("_idx")}

In [30]:
df_breast

DataFrame[sample_id: string, sample_type_cpv: string, disease_type_cpv: string, primary_diagnosis_cpv: string, case_id_cpv: string, features_cpv: vector, sample_type: string, disease_type: string, primary_diagnosis: string, case_id: string, features_mirna: vector, sample_type_cpv_idx: double, disease_type_cpv_idx: double, primary_diagnosis_cpv_idx: double, full_features: vector]

In [31]:
# Standardization 
scaler = StandardScaler(inputCol='full_features', outputCol='scaledFeatures', withStd=True, withMean=True)

# # # Convert indexed labels back to original labels
# labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
#                                labels=label_dict['disease_type_cpv_idx'])

In [32]:
# Evaluators
f1_evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='primary_diagnosis_cpv_idx', metricName='f1')
acc_evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='primary_diagnosis_cpv_idx', metricName='accuracy')
precision_evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='primary_diagnosis_cpv_idx', metricName='weightedPrecision')
recall_evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='primary_diagnosis_cpv_idx', metricName='weightedRecall')


In [35]:
# test/train split 
Xtest,Xtrain = df_breast.randomSplit([0.3, 0.7], seed)

In [36]:
s3 = boto3.client('s3')
stdmodelPath = 'breast_std_model/data/_SUCCESS'
if check(s3, 'gdc-emr0', stdmodelPath) == False:
    print("saving StandardScalar model...")
    stdmodel = scaler.fit(Xtrain)
    stdmodel.save('s3://gdc-emr0/breast_std_model')
else:
    from pyspark.ml.feature import StandardScalerModel
    print("loading StandardScalar model...")
    stdmodel = StandardScalerModel.load("s3://gdc-emr0/breast_std_model")

saving StandardScalar model...

In [37]:
Xtrain = stdmodel.transform(Xtrain)
Xtest = stdmodel.transform(Xtest)

In [None]:
# to save
# df.rdd.saveAsPickleFile(filename)
# to load
#pickleRdd = sc.pickleFile(filename).collect()
# df2 = spark.createDataFrame(pickleRdd)

# logistic

In [38]:
# Logistic Regression model 
lr = LogisticRegression(aggregationDepth= 3,maxIter=1000, regParam=0.4, elasticNetParam=0.5,
                        featuresCol='scaledFeatures',labelCol ='primary_diagnosis_cpv_idx',
                       family='multinomial',tol=1e-06)

# Hyperparameters to test
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0.0,0.5,1.0])\
            .build()

# K-fold cross validation 
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=2,seed=seed)  # use 3+ folds in practice

# Put steps in a pipeline
pipeline = Pipeline(stages=[crossval])


In [39]:
# train model
pipModel = pipeline.fit(Xtrain)

In [40]:
# predict
predictions = pipModel.transform(Xtest)

In [41]:
# get hyperparameters for best model
cvModel = pipModel.stages[-1]
bestParams = cvModel.extractParamMap()
# print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())
bestModel = cvModel.bestModel
# feature_importance = bestModel.featureImportances
# num_trees = bestModel.getNumTrees
# tree_weights = bestModel.treeWeights
# trees =  bestModel.trees
# # save model
# bestModel.save('cpv1600_rf')

In [42]:
s3 = boto3.client('s3')
modelPath = 'breast_logistic_model/data/_SUCCESS'
if check(s3, 'gdc-emr0', modelPath) == False:
    print("saving Logistic Regression model...")
    bestModel.save('s3://gdc-emr0/breast_logistic_model')
else:
    print(modelPath+" already exists...")

saving Logistic Regression model...

In [43]:
f1_score = f1_evaluator.evaluate(predictions)
acc_score = acc_evaluator.evaluate(predictions)
precision_score = precision_evaluator.evaluate(predictions)
recall_score = recall_evaluator.evaluate(predictions)

In [44]:
print(f1_score)
print(acc_score)
print(precision_score)
print(recall_score)

0.9111061450141634
0.9204892966360856
0.9192558486985007
0.9204892966360857

In [45]:
coeff = bestModel.coefficientMatrix
intercepts = bestModel.interceptVector

In [46]:
coeff_np = coeff.toArray()
intercepts_np = intercepts.values

In [47]:
full_feature_columns = mirna_feature_columns+cpv_feature_columns
coeff_df = pd.DataFrame(data=coeff_np,
                        index=label_dict['primary_diagnosis_cpv_idx'],
                        columns=full_feature_columns)  

In [48]:

from io import StringIO

csv_buffer = StringIO()
coeff_df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object('gdc-emr0', 'primary_diagonisis_coeff_matrix.csv').put(Body=csv_buffer.getvalue())

{'ETag': '"6b0e4935bf8cd797e26e544cc7c1486d"', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RetryAttempts': 0, 'HostId': 'lIrjd39KVR6B/w38DGooXqPdGqBgFEkIaYDPch2wpjhmPgD4ELXu632k0PRLf8O81qn/iCrI2RQ=', 'HTTPHeaders': {'etag': '"6b0e4935bf8cd797e26e544cc7c1486d"', 'x-amz-id-2': 'lIrjd39KVR6B/w38DGooXqPdGqBgFEkIaYDPch2wpjhmPgD4ELXu632k0PRLf8O81qn/iCrI2RQ=', 'date': 'Mon, 26 Nov 2018 19:03:40 GMT', 'content-length': '0', 'x-amz-request-id': 'B2C35B7B2D7DAFA3', 'server': 'AmazonS3'}, 'RequestId': 'B2C35B7B2D7DAFA3'}}

In [51]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=label_dict['primary_diagnosis_cpv_idx'])
predictions = labelConverter.transform(predictions)

In [57]:
prediction_results = predictions.select('primary_diagnosis_cpv_idx','prediction','primary_diagnosis_cpv','predictedLabel','rawPrediction','probability')

In [58]:
prediction_results

DataFrame[primary_diagnosis_cpv_idx: double, prediction: double, primary_diagnosis_cpv: string, predictedLabel: string, rawPrediction: vector, probability: vector]

In [60]:
prediction_results_pd = prediction_results.toPandas()

In [61]:
from io import StringIO

csv_buffer = StringIO()
prediction_results_pd.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object('gdc-emr0', 'primary_diagonisis_prediction_results.csv').put(Body=csv_buffer.getvalue())

{'ETag': '"a7fffccef33f6e44317580b3d0009a21"', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RetryAttempts': 0, 'HostId': 'grrfP711L4XByWcovi3E2aXQFsLFwd+ctsfXnWKfe60SEfWHCsbEfKgPFB1u2KP7hSSOm0+rccQ=', 'HTTPHeaders': {'etag': '"a7fffccef33f6e44317580b3d0009a21"', 'x-amz-id-2': 'grrfP711L4XByWcovi3E2aXQFsLFwd+ctsfXnWKfe60SEfWHCsbEfKgPFB1u2KP7hSSOm0+rccQ=', 'date': 'Mon, 26 Nov 2018 19:22:33 GMT', 'content-length': '0', 'x-amz-request-id': 'E80D69F34AD145CE', 'server': 'AmazonS3'}, 'RequestId': 'E80D69F34AD145CE'}}

### Random Forest 

In [31]:
from pyspark.ml.classification import RandomForestClassifier
# Random Forest Classifier
rf = RandomForestClassifier(cacheNodeIds=True, featuresCol='scaledFeatures',labelCol ='disease_type_idx',\
                           seed=seed,maxDepth=3)


# # Hyperparameters to test
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [200,500])\
            .build()

# # K-fold cross validation 
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=2,seed=seed)  # use 3+ folds in practice

# # Put steps in a pipeline
pipeline = Pipeline(stages=[crossval])

In [32]:
# train model
pipModel = pipeline.fit(Xtrain)

In [33]:
# predict
predictions = pipModel.transform(Xtest)

In [34]:
# get hyperparameters for best model
cvModel = pipModel.stages[-1]
bestParams = cvModel.extractParamMap()
# print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())
bestModel = cvModel.bestModel
feature_importance = bestModel.featureImportances
num_trees = bestModel.getNumTrees
tree_weights = bestModel.treeWeights
trees =  bestModel.trees
# # save model
# bestModel.save('cpv1600_rf')

In [36]:
s3 = boto3.client('s3')
modelPath = 'mirna_rf_model/data/_SUCCESS'
if check(s3, 'gdc-emr0', modelPath) == False:
    print("saving Random Forest model...")
    bestModel.save('s3://gdc-emr0/mirna_rf_model')
else:
    print(modelPath+" already exists...")

saving Random Forest model...

In [37]:
f1_score = f1_evaluator.evaluate(predictions)
acc_score = acc_evaluator.evaluate(predictions)
precision_score = precision_evaluator.evaluate(predictions)
recall_score = recall_evaluator.evaluate(predictions)

In [38]:
print(f1_score)
print(acc_score)
print(precision_score)
print(recall_score)

0.2977658488782247
0.3849871134020619
0.46018467656490436
0.3849871134020618

In [None]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=label_dict['disease_type_cpv_idx'])
predictions = labelConverter.transform(predictions)

In [49]:
feature_rd_df = pd.DataFrame(feature_importance, columns=['feature_importance'])
# cpv_feature_columns+mirna_feature_columns
from io import StringIO

csv_buffer = StringIO()
feature_rd_df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object('gdc-emr0', 'rf_feature_impt.csv').put(Body=csv_buffer.getvalue())

DataFrame constructor not properly called!
Traceback (most recent call last):
  File "/usr/local/lib64/python3.4/site-packages/pandas/core/frame.py", line 404, in __init__
    raise ValueError('DataFrame constructor not properly called!')
ValueError: DataFrame constructor not properly called!



In [None]:
spark.stop()