<a href="https://colab.research.google.com/github/jtwang1027/pyspark_aws/blob/master/colab_pyspark_bace.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#Install pyspark
#check that the spark version is avail at the wget URL, may be updated

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

references to setup pyspark:
- [(link1)](https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427)
- [(link2)](https://www.youtube.com/watch?v=QUiAc3rWtMA)
~41min



In [0]:
#python packages
'''
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error 
'''
#pyspark packages
import pyspark.sql.functions as F
import pyspark.sql.types as T
#from pyspark import SparkFiles

from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, expr



from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("spark_instance") \
    .getOrCreate()

In [0]:
pd_df= pd.read_csv('https://github.com/jtwang1027/pyspark_aws/raw/master/bace.csv')
pd_df.drop('canvasUID',axis=1,inplace=True)
pd_mol2vec=pd.read_csv('https://github.com/jtwang1027/pyspark_aws/raw/master/mol2vec.csv',header=None)

#concatenating two df (adding columsn) is a pain in Spark, we'll just do it pandas
pd_comb=pd.concat([pd_df,pd_mol2vec],axis=1)
print(pd_comb.shape) #594 cols from w2v + 300 col from w2v

(1513, 894)


In [0]:
#going from URL directly to spark df example
#url2='https://github.com/jtwang1027/pyspark_aws/raw/master/mol2vec.csv'
#spark.sparkContext.addFile(url2)
#temp = spark.read.csv("file://"+SparkFiles.get("mol2vec.csv"), header=False, inferSchema= True)



In [0]:
#checking distribution of pIC50 across train/test validation groups
sns.violinplot(pd_df.Model, pd_df['pIC50'])

In [0]:
df=spark.createDataFrame(pd_df) 
df_m2v=spark.createDataFrame(pd_comb) #df + mol2vec

In [0]:
import re

#rename columns to replace periods . and parentheses, which would trigger syntax error in Pyspark
df=df.toDF(*(re.sub(r'[\.\s]+', '', c) for c in df.columns))
df=df.toDF(*(re.sub(r'\([^)]*\)', '', c) for c in df.columns))

df_m2v=df_m2v.toDF(*(re.sub(r'[\.\s]+', '_', c) for c in df_m2v.columns))
df_m2v=df_m2v.toDF(*(re.sub(r'\([^)]*\)', '_', c) for c in df_m2v.columns))

In [0]:
#defining Pipeline for: df
assembler=VectorAssembler(inputCols= df.columns[5:], outputCol= 'pre-scaled')
scaling= StandardScaler(inputCol='pre-scaled', outputCol='features', withStd=True, withMean=False)
pipeline1= Pipeline( stages=[assembler, scaling])

#second assembler for m2v with additional columns (has more columns that has to be assembled)
assembler2=VectorAssembler(inputCols= df_m2v.columns[5:], outputCol= 'pre-scaled')
pipeline2= Pipeline( stages=[assembler2, scaling])

#fitting pipeline on both datasets
df_scaled=pipeline1.fit(df).transform(df)
df_m2v_scaled=pipeline2.fit(df_m2v).transform(df_m2v)



In [0]:
#splitting into train/test/validation, predefined for this dataset based on Model column
train= df_scaled.filter(col('Model')=='Test') #"Test" has more rows, use for training
test= df_scaled.filter(col('Model')=='Train') #203 rows
validation= df_scaled.filter(col('Model')=='Valid') #45 rows

#train/test/split the second dataframe 
train_m2v= df_m2v_scaled.filter(col('Model')=='Test') #"Test" has more rows, use for training
test_m2v= df_m2v_scaled.filter(col('Model')=='Train') #203 rows
validation_m2v= df_m2v_scaled.filter(col('Model')=='Valid') #45 rows

In [0]:
train_m2v.select('Model').show(3)

+-----+
|Model|
+-----+
| Test|
| Test|
| Test|
+-----+
only showing top 3 rows



## Linear Models

In [0]:
lr=LinearRegression(labelCol='pIC50').fit(train)
lr_model=lr.transform(test)

In [0]:
lr_model.select('prediction','pIC50').show(5)

+-----------------+---------+
|       prediction|    pIC50|
+-----------------+---------+
|7.413925713777616|9.1549015|
|8.003503019102709|8.8538723|
|8.213082908073588|8.6989698|
|7.835343812156019|8.6989698|
|8.206335882401547|8.6989698|
+-----------------+---------+
only showing top 5 rows



In [0]:
#regparam= lambda, weight of regularization
#elasticNetParam= alpha, weighting of L1 vs L2 norm
enet=LinearRegression(featuresCol='features', labelCol='pIC50')
paramGrid=(ParamGridBuilder()
             .addGrid(enet.regParam, [0.01, 0.01, 0.5, 0.9])
             .addGrid(enet.elasticNetParam, [0.0,0.25, 0.5,0.75, 1.0])
             .addGrid(enet.maxIter, [10])
             .build())

evaluator=RegressionEvaluator( predictionCol='prediction',labelCol='pIC50',metricName='rmse')

cv = CrossValidator(estimator=enet, estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

cvMod=cv.fit(train)



In [0]:
enet_best=cvMod.bestModel
enet_pred=enet_best.transform(test)
#cvMod.bestModel.getParam('regParam')
#cvMod.bestModel.getParam('elasticNetParam')

In [0]:
#2 ways to evaluate linear regression output
#1) using the regressionevaluator
evaluator.evaluate(enet_pred)

#evaluate method
enet_best=cvMod.bestModel
enet_summ=enet_best.evaluate(test)

27.587484403720904

In [0]:
enet_pred.select('prediction','pIC50','mol').show()

## Gradient Boosted Trees

In [0]:
from pyspark.ml.regression import GBTRegressor

from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Specify a GBT model
gbt_model = GBTRegressor(featuresCol="features", maxIter=10,labelCol='pIC50',  predictionCol='prediction')
# GBT training
gbt = gbt_model.fit(train)

gbt_m2v = gbt_model.fit(train_m2v)



In [0]:
gbt_pred_m2v.select('prediction').show(5)

In [0]:

# Make predictions.
#gbt_pred = gbt.transform(validation)
gbt_pred_m2v = gbt_m2v.transform(validation_m2v)

# Select example rows to display.
#gbt_pred.select("prediction", "pIC50").show(5)

evaluator=RegressionEvaluator( predictionCol='prediction', labelCol='pIC50',metricName='rmse')

# Select (prediction, true label) and compute test error
#rmse=evaluator.evaluate(gbt_pred)
rmse_m2v=evaluator.evaluate(gbt_pred_m2v)



#gbtModel = model.stages[1]
#print(gbtModel)  # summary only

In [0]:
#gbt_m2v.select('features','pIC50').show(5)
#gbt_pred_m2v.select('prediction','pIC50').show(5)
gbt_pred_m2v.count()

45

In [0]:
print("Root Mean Squared Error (RMSE) on est data = %g" % rmse)

In [0]:
rmse_m2v

NameError: ignored

pretty good pandas_udf tutorial
[(link)](https://towardsdatascience.com/pyspark-forecasting-with-pandas-udf-and-fb-prophet-e9d70f86d802)

In [0]:
df.schema.fieldNames()

In [0]:
from rdkit import Chem

In [0]:
#install word2vec 
!pip install git+https://github.com/samoturk/mol2vec

#install rdkit dependencies, and deepchem if needed
!wget -c https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
!chmod +x Miniconda3-latest-Linux-x86_64.sh
!bash ./Miniconda3-latest-Linux-x86_64.sh -b -f -p /usr/local
!conda install -q -y -c conda-forge rdkit 

In [0]:
#install rdkit dependencies, and deepchem if needed
!wget -c https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
!chmod +x Miniconda3-latest-Linux-x86_64.sh
!bash ./Miniconda3-latest-Linux-x86_64.sh -b -f -p /usr/local
!conda install -q -y -c conda-forge rdkit 

Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done


In [0]:
import rdkit