In [2]:
#Importing pyspark session
import pyspark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#Importing pyspark package
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col,trim,round, length
from pyspark import SparkContext

import pyspark.sql.functions as F
from pyspark.sql.types import *
spark=SparkSession.builder.appName('drug_dataset').getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Clean and Prepare the Data

In [4]:
#Importing train data from S3
df_train = spark.read.csv('s3://capstone-drug-dataset/captsone-drug-dataset/train_raw.csv',inferSchema=True, header=True,quote='"',escape= "\"",multiLine=True)
columnmap = {}
for column in df_train.columns:
  if column.endswith("\r"):
    columnmap[column] = column.rstrip()
for c in columnmap.keys():
  df_train = df_train.withColumn(columnmap[c], F.col(c))
  df_train = df_train.drop(c)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
#Importing test data from S3
df_test = spark.read.csv('s3://capstone-drug-dataset/captsone-drug-dataset/test_raw.csv',inferSchema=True, header=True,quote='"',escape= "\"",multiLine=True)
for column in df_test.columns:
  if column.endswith("\r"):
    columnmap[column] = column.rstrip()
for c in columnmap.keys():
  df_test = df_test.withColumn(columnmap[c], F.col(c))
  df_test = df_test.drop(c)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#Test data samples
df_test.show(10)
df_train.printSchema
df_train = df_train.withColumn("usefulCount",round(df_train["usefulCount"]).cast('integer'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------------+--------------------+--------------------+------+---------+-----------+
|uniqueID|       drugName|           condition|              review|rating|     date|usefulCount|
+--------+---------------+--------------------+--------------------+------+---------+-----------+
|  163740|    Mirtazapine|          Depression|"I&#039;ve tried ...|    10|28-Feb-12|       22.0|
|  206473|     Mesalamine|Crohn's Disease, ...|"My son has Crohn...|     8|17-May-09|       17.0|
|  159672|        Bactrim|Urinary Tract Inf...|"Quick reduction ...|     9|29-Sep-17|        3.0|
|   39293|       Contrave|         Weight Loss|"Contrave combine...|     9| 5-Mar-17|       35.0|
|   97768|Cyclafem 1 / 35|       Birth Control|"I have been on t...|     9|22-Oct-15|        4.0|
|  208087|        Zyclara|           Keratosis|"4 days in on fir...|     4| 3-Jul-14|       13.0|
|  215892|         Copper|       Birth Control|"I&#039;ve had th...|     6| 6-Jun-16|        1.0|
|  169852|  Amitript

In [6]:
#Joining train and test data set
df = df_train.join(df_test, on=['uniqueID', 'drugName', 'condition','review','rating','date','usefulCount'], how='left_outer')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#Computing setniment column based on rating
sentiment = when(col("rating")<=5, 0).otherwise(1)

df = df.withColumn("sentiment",sentiment)
df = df.withColumn('length',length(df['review']))



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Feature Transformation

In [8]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="review", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
pos_neg = StringIndexer(inputCol='sentiment',outputCol='label')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import OneVsRest


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Building pipeline and fit model

In [12]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[pos_neg,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(df)
cleaner

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PipelineModel_439b06e22cca

In [13]:
clean_data = cleaner.transform(df)
clean_data = clean_data.select(['label','features'])
clean_data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(169991,[31,100,1...|
|  0.0|(169991,[3,9,26,2...|
|  0.0|(169991,[0,1,2,6,...|
|  0.0|(169991,[0,2,3,6,...|
|  0.0|(169991,[0,2,3,14...|
|  0.0|(169991,[1,2,6,9,...|
|  0.0|(169991,[1,3,16,1...|
|  0.0|(169991,[1,2,4,7,...|
|  1.0|(169991,[0,13,14,...|
|  0.0|(169991,[0,2,3,7,...|
|  0.0|(169991,[3,6,17,2...|
|  0.0|(169991,[2,10,27,...|
|  0.0|(169991,[1,3,15,3...|
|  0.0|(169991,[6,13,25,...|
|  0.0|(169991,[1,7,10,1...|
|  1.0|(169991,[24,28,32...|
|  1.0|(169991,[1,4,7,8,...|
|  1.0|(169991,[9,17,70,...|
|  0.0|(169991,[4,8,12,2...|
|  0.0|(169991,[1,2,3,28...|
+-----+--------------------+
only showing top 20 rows

## Naive Bayes classification

### Split data into training and test sets

## Build cross-validation model
### Estimator

In [14]:
(training,testing) = clean_data.randomSplit([0.7,0.3])
from pyspark.ml.classification import NaiveBayes
naivebayes = NaiveBayes(featuresCol="features", labelCol="label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Parameter grid

In [15]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(naivebayes.smoothing, [0, 1, 2, 4, 8]).\
    build()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Evaluator

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Build cross-validation model

In [20]:
from pyspark.ml.tuning import CrossValidator
crossvalidator = CrossValidator(estimator=naivebayes, estimatorParamMaps=param_grid, evaluator=evaluator)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Fit cross-validation model

In [21]:
crossvalidation_mode = crossvalidator.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Prediction on training and test sets

In [22]:
pred_train = crossvalidation_mode.transform(training)
pred_train.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(169991,[0,1,2,3,...|[-3702.9514106716...|[1.0,2.9769783098...|       0.0|
|  0.0|(169991,[0,1,2,3,...|[-3094.4982360209...|[1.0,7.6998320511...|       0.0|
|  0.0|(169991,[0,1,2,3,...|[-1682.7879118113...|[1.0,3.1860324449...|       0.0|
|  0.0|(169991,[0,1,2,3,...|[-2462.0445886490...|[1.0,2.6826684634...|       0.0|
|  0.0|(169991,[0,1,2,3,...|[-1915.0911247414...|[1.0,2.2375957346...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

In [24]:
pred_test = crossvalidation_mode.transform(testing)
pred_test.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(169991,[0,1,2,3,...|[-2226.8796828961...|[1.0,9.6953329886...|       0.0|
|  0.0|(169991,[0,1,2,4,...|[-2697.5553014949...|[0.99999999983640...|       0.0|
|  0.0|(169991,[0,1,2,6,...|[-3325.7979559638...|[0.99999999999982...|       0.0|
|  0.0|(169991,[0,1,2,6,...|[-2330.4906943526...|[3.17980632939569...|       1.0|
|  0.0|(169991,[0,1,2,15...|[-1787.3689802939...|[0.99999999999998...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

## Best model from cross validation

In [25]:
print("The parameter smoothing has best value:",
      crossvalidation_mode.bestModel._java_obj.getSmoothing())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The parameter smoothing has best value: 4.0

### Prediction accuracy on train data

In [26]:
print('training data (f1):', evaluator.setMetricName('f1').evaluate(pred_train), "\n",
     'training data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_train),"\n",
     'training data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_train),"\n",
     'training data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_train))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

training data (f1): 0.9118135373576837 
 training data (weightedPrecision):  0.9138090639589193 
 training data (weightedRecall):  0.9108978509406015 
 training data (accuracy):  0.9108978509406015

### Prediction accuracy on test data

In [27]:
print('test data (f1):', evaluator.setMetricName('f1').evaluate(pred_test), "\n",
     'test data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_test),"\n",
     'test data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_test),"\n",
     'test data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

test data (f1): 0.8137093066854626 
 test data (weightedPrecision):  0.8253850746342872 
 test data (weightedRecall):  0.8089598352214212 
 test data (accuracy):  0.8089598352214212

## Confusion matrix

### Confusion matrix on training data

In [28]:
train_conf_mat = pred_train.select('label', 'prediction')
train_conf_mat.rdd.zipWithIndex().countByKey()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defaultdict(<class 'int'>, {Row(label=0.0, prediction=0.0): 72790, Row(label=0.0, prediction=1.0): 6326, Row(label=1.0, prediction=1.0): 29911, Row(label=1.0, prediction=0.0): 3720})

### Confusion matrix on testing data

In [29]:
test_conf_mat = pred_test.select('label', 'prediction')
test_conf_mat.rdd.zipWithIndex().countByKey()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defaultdict(<class 'int'>, {Row(label=0.0, prediction=0.0): 27884, Row(label=0.0, prediction=1.0): 6209, Row(label=1.0, prediction=1.0): 11391, Row(label=1.0, prediction=0.0): 3066})