In [None]:
# MSM VM config prep
import findspark
findspark.init('/home/mitch/spark-3.3.0-bin-hadoop2')
import pyspark
 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BApredsV2').getOrCreate()

# --- suppress future spark warnings/error/etc output ---
spark.sparkContext.setLogLevel("OFF")

In [None]:
import pandas as pd
def load_data_and_merge():
    labels_and_calcs = spark.read.csv("bioavailability_data_final.csv",inferSchema=True,sep=',',header=True)
    df1 = labels_and_calcs.toPandas()

    df2 = pd.read_pickle('bioavailabilityData_w_Frags_simpler.pkl')
    df2 = df2.drop(columns=['drug_smiles','ba_pct'])

    data = pd.merge(df1,df2,how='left',left_on='_c0',right_on=df2.index)
    data = spark.createDataFrame(data)

    return data

data = load_data_and_merge()

In [None]:
''' 
# INTIAL LABELS:
# --- Data has 1 continuous label column, and 4 categorical label columns (discretized variants of continuous label).
# ------ categorical labels applied by dividing the continuous label values into 3-5 categories 
# ------ the value range associated with each group were selected based on histogram dist./mean/stdev
# --- We'll add one more discretization variant,  using Spark's built-in QuantileDiscretizer
'''
# -- Add QuantileDiscretizer labels
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
qd5 = QuantileDiscretizer(numBuckets=5,inputCol='BA_pct',outputCol='label_QD5')

data_wLabels = qd5.fit(data).transform(data)

# -- INDEX / ENCODE LABELS
from pyspark.ml.feature import (StringIndexer,OneHotEncoder)

label_quant0 = 'BA_pct'
label_cat0_vector = OneHotEncoder(inputCol='label_QD5',outputCol='label_cat0_vector')

label_cat1_index = StringIndexer(inputCol='label1',outputCol='label_cat1_index')
label_cat1_vector = OneHotEncoder(inputCol='label_cat1_index',outputCol='label_cat1_vector')

label_cat2_index = StringIndexer(inputCol='label2',outputCol='label_cat2_index')
label_cat2_vector = OneHotEncoder(inputCol='label_cat2_index',outputCol='label_cat2_vector')

label_cat3_index = StringIndexer(inputCol='label3a',outputCol='label_cat3_index')
label_cat3_vector = OneHotEncoder(inputCol='label_cat3_index',outputCol='label_cat3_vector')

label_cat4_index = StringIndexer(inputCol='label3b',outputCol='label_cat4_index')
label_cat4_vector = OneHotEncoder(inputCol='label_cat4_index',outputCol='label_cat4_vector')

from pyspark.ml import Pipeline
label_pipeline = Pipeline(stages=[label_cat0_vector,
                                 label_cat1_index,label_cat1_vector,
                                 label_cat2_index,label_cat2_vector,
                                 label_cat3_index,label_cat3_vector,
                                 label_cat4_index,label_cat4_vector])
data_wLabels = label_pipeline.fit(data_wLabels).transform(data_wLabels)

## Test predictions by fragment data alone

In [None]:
test_subset = data_wLabels.select(['Name','BA_pct','MolWt','MolLogP','TPSA',
                                   'label_QD5','label1','label2','label3a','label3b',
                                   'label_cat0_vector','label_cat1_vector','label_cat2_vector','label_cat3_vector','label_cat4_vector',
                                  'frags_all','frags_better','frags_best','frags_efgs','frags_brics'])

In [None]:
from pyspark.ml.feature import NGram,Word2Vec,CountVectorizer,HashingTF,IDF

In [None]:
''' # Fragments NLP processing  - NEW -
'''

from pyspark.ml.feature import NGram,Word2Vec,CountVectorizer,HashingTF,IDF

fragment_types = ['frags_all','frags_better','frags_best','frags_efgs','frags_brics']
test_subset_nlp = test_subset
for frag_type in fragment_types:
    
    
    #output_name_level1 = f"nlp_{frag_type}"
    frag_type_short = frag_type.replace('frags_','')
    
    cv = CountVectorizer(inputCol=frag_type, outputCol=f"{frag_type_short}_cv", minDF=2.0)
    cv_idf = IDF(inputCol=f"{frag_type_short}_cv", outputCol=f"{frag_type_short}_cv_idf")
    
    w2v = Word2Vec(inputCol=frag_type, outputCol=f"{frag_type_short}_w2v")
    
    n2gram = NGram(n=2, inputCol=frag_type, outputCol=f"{frag_type_short}_n2g")
    n2gram_cv = CountVectorizer(inputCol=f"{frag_type_short}_n2g", outputCol=f"{frag_type_short}_n2g_cv", minDF=2.0)
    n2gram_cv_idf = IDF(inputCol=f"{frag_type_short}_n2g_cv", outputCol=f"{frag_type_short}_n2g_cv_idf")
    
    
    nlp_pipeline = Pipeline(stages=[cv,cv_idf,w2v,n2gram,n2gram_cv,n2gram_cv_idf])
    test_subset_nlp = nlp_pipeline.fit(test_subset_nlp).transform(test_subset_nlp)
    

In [None]:
test_subset_nlp = test_subset_nlp.select(['Name','BA_pct','MolWt','MolLogP','TPSA',#'label_QD5','label1','label2','label3a','label3b','label_cat0_vector','label_cat1_vector','label_cat2_vector','label_cat3_vector','label_cat4_vector',
                                      'all_cv', 'all_cv_idf', 'all_w2v', 'all_n2g_cv', 'all_n2g_cv_idf', 
                                      'better_cv', 'better_cv_idf', 'better_w2v', 'better_n2g_cv', 'better_n2g_cv_idf', 
                                      'best_cv', 'best_cv_idf', 'best_w2v', 'best_n2g_cv', 'best_n2g_cv_idf', 
                                      'efgs_cv', 'efgs_cv_idf', 'efgs_w2v', 'efgs_n2g_cv', 'efgs_n2g_cv_idf', 
                                      'brics_cv', 'brics_cv_idf', 'brics_w2v', 'brics_n2g_cv', 'brics_n2g_cv_idf'])

from pyspark.ml.linalg import Vector
from pyspark.ml.feature import (VectorAssembler,VectorIndexer)

vector_assemblers = []

alternative_features = ['all_cv', 'all_cv_idf', 'all_w2v', 'all_n2g_cv', 'all_n2g_cv_idf', 
                      'better_cv', 'better_cv_idf', 'better_w2v', 'better_n2g_cv', 'better_n2g_cv_idf', 
                      'best_cv', 'best_cv_idf', 'best_w2v', 'best_n2g_cv', 'best_n2g_cv_idf', 
                      'efgs_cv', 'efgs_cv_idf', 'efgs_w2v', 'efgs_n2g_cv', 'efgs_n2g_cv_idf', 
                      'brics_cv', 'brics_cv_idf', 'brics_w2v', 'brics_n2g_cv', 'brics_n2g_cv_idf']
output_features = ""
for feats in alternative_features:
    #feats_input = ['MolWt','MolLogP','TPSA',feats]
    feats_input = [feats]
    feats_output = f"FEAT_{feats}"
    
    vec_assembler = VectorAssembler(inputCols=feats_input, outputCol=feats_output)
    
    vector_assemblers.append(vec_assembler)
    
    output_features += "'"+feats_output+"'"+", "
output_features = output_features[0:len(output_features)-2]
print(output_features)

from pyspark.ml import Pipeline
feature_pipeline = Pipeline(stages=[x for x in vector_assemblers])

test_subset_nlpFeatures = feature_pipeline.fit(test_subset_nlp).transform(test_subset_nlp)

In [None]:
test_subset_final = test_subset_nlpFeatures.select(['Name','BA_pct',
                        'all_cv', 'all_cv_idf', 'all_w2v', 'all_n2g_cv', 'all_n2g_cv_idf', 
                      'better_cv', 'better_cv_idf', 'better_w2v', 'better_n2g_cv', 'better_n2g_cv_idf', 
                      'best_cv', 'best_cv_idf', 'best_w2v', 'best_n2g_cv', 'best_n2g_cv_idf', 
                      'efgs_cv', 'efgs_cv_idf', 'efgs_w2v', 'efgs_n2g_cv', 'efgs_n2g_cv_idf', 
                      'brics_cv', 'brics_cv_idf', 'brics_w2v', 'brics_n2g_cv', 'brics_n2g_cv_idf'])

(training,testing) = test_subset_final.randomSplit([0.7,0.3])

In [None]:
allFeatures =  ['all_cv', 'all_cv_idf', 'all_w2v', 'all_n2g_cv', 'all_n2g_cv_idf', 
                      'better_cv', 'better_cv_idf', 'better_w2v', 'better_n2g_cv', 'better_n2g_cv_idf', 
                      'best_cv', 'best_cv_idf', 'best_w2v', 'best_n2g_cv', 'best_n2g_cv_idf', 
                      'efgs_cv', 'efgs_cv_idf', 'efgs_w2v', 'efgs_n2g_cv', 'efgs_n2g_cv_idf', 
                      'brics_cv', 'brics_cv_idf', 'brics_w2v', 'brics_n2g_cv', 'brics_n2g_cv_idf']
featuresName = '' # temp value, redefined below

eval_df = pd.DataFrame() # already exists from prior run
labelName = 'BA_pct'  # SPECIFY

modelname_short = 'rfr'
iteration = 4

''' 
# Load Evaluation History records file
'''
with open('evaluation_history.pickle', 'rb') as handle:
    evaluation_history = pickle.load(handle)

evaluation_history[modelname_short] = {}
evaluation_history[modelname_short][iteration] = {}
evaluation_history[modelname_short][iteration]['source'] = 'frags_all'
evaluation_history[modelname_short][iteration]['label'] = labelName


for index,features in enumerate(allFeatures):
    featuresName = features
    
    '''# SPECIFY MODEL 
    '''
    from pyspark.ml.regression import RandomForestRegressor
    rfr = RandomForestRegressor(featuresCol=features,labelCol='BA_pct')
    modeltype = rfr  # SPECIFY (lr,dtr,rfr,gbtr,glr,ir)
    
    
    modelname = f"rfr{iteration}_{allFeatures[index]}"
    modelname = modelname.replace('features_','')
    
    evaluation_history[modelname_short][iteration][features] = {}
    
    # FIT/TRAIN MODEL & TRANSFORM DATA
    mymodel = modeltype.fit(training)
    myresults = mymodel.transform(testing)
    
    # CALCULATE KEY EVALS
    from pyspark.ml.evaluation import RegressionEvaluator
    regEvaluator = RegressionEvaluator(labelCol=labelName,predictionCol='prediction')

    evaluator = regEvaluator
    evalMetrics = {regEvaluator:['rmse','mse','mae','r2','var']}
    
    evaluation = []
    
    for each_metric in evalMetrics[evaluator]:        
        metric = each_metric

        result = evaluator.evaluate(myresults, {evaluator.metricName: metric})

        evaluation.append((metric,result))
        
        evaluation_history[modelname_short][iteration][features][metric] = result
        
    #r2_adj = mymodel.summary.r2adj
    #evaluation.append(('r2_adj(Training)',r2_adj))
    column0 = [x for x,y in evaluation]
    column1 = [y for x,y in evaluation]
    eval_df['metric'] = column0
    eval_df[modelname] = column1
    
''' # BACKUP EVALUATION HISTORY
'''
import pickle
with open('evaluation_history.pickle', 'wb') as handle:
    pickle.dump(evaluation_history, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# iteration 4
pd.set_option('display.max_columns',None)
eval_df

### Results:
**Findings:**
1. the most effective fragment representation is `frags_all`
2. the most effective nlp representation is **CountVectorizer** or CV-IDF > bigram CV > word2vec