# PySpark Model of Drug review rate prediction

The drug dataset contains patient reviews on different drugs with related conditions and 10-star patient rating reflecting overall patient satisfaction. The goal is to predict the rating score using the review data. A detailed explanation of the data can be found here. We want the model to be able to predict the score for new drug that is not involved in the training data. Here is link to the source data: https://archive.ics.uci.edu/ml/datasets/Drug+Review+Dataset+(Drugs.com)

## Model Development Steps on PySpark
* [Connect to PySpark](#Connect-to-PySpark)
    * Initialize the spark environment
    
* [Importing Modules](#Importing-Modules)

* [Data Ingestion](#Data-Ingestion)
    * Connect to HDP1 to collect the training/testing data
* [Data Analysis](#Data-Analysis)

* [Feature Engineering](#Feature-Engineering)
    * Column - 'condition'
    * Column - 'date'
* [Prepare Modeling Data](#Prepare-Modeling-Data)
    * Pipeline

* [Model Training](#Model-Training)
    * Logistic Regression
    * More models (TBD)

* [Cross Validation and Model Selection](#Cross-Validation-and-Model-Selection)
    * TBD

* [Model Testing](#Model-Testing)
    * Features calculations on Test Data
    * Model Performance on Test Data

* [Saving Artifacts](#Saving-Artifacts)

### Connect to PySpark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.conf import SparkConf
spark = SparkSession.builder.config(conf=SparkConf()\
                                    .setMaster("yarn")\
                                    .setAppName("Assignment1")\
                                    .set("spark.executor.memory", '4g')\
                                    .set('spark.executor.cores', '6')).getOrCreate()

In [2]:
spark

### Importing Modules

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import datediff, to_date, lit, col, to_timestamp, regexp_extract, udf, lower, concat
from pyspark.sql.dataframe import DataFrame
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd

### Data Ingestion
#### Training Data

In [4]:
df_train = spark.sql('select * from training_path.training_drugreview')
df_train = df_train.cache()
show_rows = 4
df_train.show(show_rows)

+------+-----------+--------------------+--------------------+------+------------------+-----------+
|    id|   drugname|           condition|              review|rating|             dates|usefulcount|
+------+-----------+--------------------+--------------------+------+------------------+-----------+
|206461|  Valsartan|Left Ventricular ...|"It has no side e...|   9.0|      May 20  2012|         27|
| 95260|Guanfacine |               ADHD |My son is halfway...|   8.0|   April 27  2010 |        192|
| 92703|    Lybrel |      Birth Control |I used to take an...|   5.0|December 14  2009 |         17|
|138000|Ortho Evra |      Birth Control |This is my first ...|   8.0| November 3  2015 |         10|
+------+-----------+--------------------+--------------------+------+------------------+-----------+
only showing top 4 rows



### Data Analysis

In [5]:
df_train.printSchema()

root
 |-- id: string (nullable = true)
 |-- drugname: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- review: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- dates: string (nullable = true)
 |-- usefulcount: string (nullable = true)



In [6]:
df_train.select('drugname','condition','rating','dates','usefulcount').describe().show()

+-------+--------------------+------------+-----------------+------------------+-----------------+
|summary|            drugname|   condition|           rating|             dates|      usefulcount|
+-------+--------------------+------------+-----------------+------------------+-----------------+
|  count|              161297|      161297|           161297|            161297|           161297|
|   mean|                null|        null|6.994376832799122|              null|28.00475520313459|
| stddev|                null|        null|3.272329209020409|              null|36.40374243129974|
|    min|A + D Cracked Ski...|            |              1.0|    April 1  2008 |                0|
|    max|             femhrt |zen Shoulde |              9.0|September 9  2017 |               99|
+-------+--------------------+------------+-----------------+------------------+-----------------+



Number of samples 161297. Review data available from April 1, 2008 to September 9, 2017. usefulcount of reviews ranges from 0 to 99. Next, to work with dates and usefulcount we need to change the datatypes. we will do this in later steps.

In [7]:
# Shape of the data
print(df_train.count(), len(df_train.columns))

(161297, 7)


In [8]:
# Rating Analysis
df_train.groupBy('rating').count().orderBy('count', ascending=False).show()

+------+-----+
|rating|count|
+------+-----+
|  10.0|50989|
|   9.0|27531|
|   1.0|21619|
|   8.0|18890|
|   7.0| 9456|
|   5.0| 8013|
|   2.0| 6931|
|   3.0| 6513|
|   6.0| 6343|
|   4.0| 5012|
+------+-----+



Number of samples (rating 10) ~ 10 * Number of samples (rating 4). For multiclass classification model training, we need to address this class imbalance by introducing class weight.

In [9]:
# Cast 'rating' datatype as Numeric
df_train = df_train.withColumn('label', df_train.rating.cast('float'))

In [10]:
# number of distinct conditions
df_train.select('condition').distinct().count()

886

We can either consider this column as categorical or simple text. We will consider simple text in our analysis.

In [11]:
# Patient Condition
#df_train.select('condition').distinct().show(truncate=False)
df_train.groupBy('condition').count().orderBy('count', ascending=False).show(show_rows)

+--------------+-----+
|     condition|count|
+--------------+-----+
|Birth Control |28788|
|   Depression | 9069|
|         Pain | 6145|
|      Anxiety | 5904|
+--------------+-----+
only showing top 4 rows



Majority of patients are with condition of birth control

In [12]:
# Statistics based on condition
# Cast 'usefulcount' datatype as Numeric
df_train = df_train.withColumn('usefulcount', df_train.usefulcount.cast('float'))

group_condition = df_train.groupBy('condition')

group_condition.agg({'condition':'count','usefulcount':'sum',}).orderBy('sum(usefulcount)', ascending=False).show()

+--------------------+----------------+----------------+
|           condition|count(condition)|sum(usefulcount)|
+--------------------+----------------+----------------+
|         Depression |            9069|        458918.0|
|            Anxiety |            5904|        300272.0|
|      Birth Control |           28788|        224326.0|
|               Pain |            6145|        218605.0|
|    Bipolar Disorde |            4224|        152603.0|
|        Weight Loss |            3609|        139854.0|
|            Obesity |            3568|        135174.0|
|           Insomnia |            3673|        130801.0|
|               ADHD |            3383|        122385.0|
|High Blood Pressure |            2321|        105743.0|
| Anxiety and Stress |            1663|        105168.0|
|               Acne |            5588|         88562.0|
|        ibromyalgia |            1791|         84180.0|
|   Diabetes  Type 2 |            2554|         73108.0|
|      Panic Disorde |         

Depression, Anxiety, Birth Control, Pain are top conditions from their number of samples and usefulcount.

### Feature Engineering
#### Column - 'condition'

In [13]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="condition", outputCol="condition_words")

# bag of words count
countVectors = CountVectorizer(inputCol="condition_words", outputCol="bof_feature", vocabSize=df_train.count())

# TF-IDF # I might use this feature
hashingTF = HashingTF(inputCol="condition_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="tfidf_feature", minDocFreq=5) 

#### Column - 'date'

In [14]:
# DataFrame object with a transform method
def transform(self, f):
    return f(self)

DataFrame.transform = transform

# Convert month column to a numeric string
def month_number(x):
    if x == 'january':
        return '01'
    elif x == 'february':
        return '02'
    elif x == 'march':
        return '03'
    elif x == 'april':
        return '04'
    elif x == 'may':
        return '05'
    elif x == 'june':
        return '06'
    elif x == 'july':
        return '07'
    elif x == 'august':
        return '08'
    elif x == 'september':
        return '09'
    elif x == 'october':
        return '10'
    elif x == 'november':
        return '11'
    else:
        return '12'

def calculate_date_feature(df):
    # Extract day, month and year from dates column
    df = df.withColumn('month', regexp_extract(col('dates'), '(.*?)(\s+)(\d*)(\s+)(\d*)', 1))
    df = df.withColumn('day', regexp_extract(col('dates'), '(.*?)(\s+)(\d*?)(\s+)(\d*)', 3))
    df = df.withColumn('year', regexp_extract(col('dates'), '(.*?)(\s+)(\d*?)(\s+)(\d*)', 5))
    
    # Convert month column to a numeric string
    month_udf = udf(lambda x: month_number(x))
    df = df.withColumn('month_number', month_udf(lower(df.month)))
    
    # new dates column in standard format
    df = df.withColumn('dates_temp',concat(df.year,lit('-'),\
                                                        df.month_number,lit('-'),df.day))
    df = df.withColumn('dates_converted', df.dates_temp.cast("timestamp"))
    
    # date feature
    df = df.withColumn("days_from_review", 
              datediff(to_date(lit("2020-03-09")),
                       to_date("dates_converted","YYYY/MM/dd")).cast('float'))
    
    return df

In [15]:
df_train = df_train.transform(calculate_date_feature)

#### Class Imbalance
As we saw classes are highly imbalanced so need to introduce class weight for model training.

In [16]:
df_train.groupBy('rating').count().orderBy('count', ascending=False).collect()

[Row(rating=u'10.0', count=50989),
 Row(rating=u'9.0', count=27531),
 Row(rating=u'1.0', count=21619),
 Row(rating=u'8.0', count=18890),
 Row(rating=u'7.0', count=9456),
 Row(rating=u'5.0', count=8013),
 Row(rating=u'2.0', count=6931),
 Row(rating=u'3.0', count=6513),
 Row(rating=u'6.0', count=6343),
 Row(rating=u'4.0', count=5012)]

In [17]:
# calculating weights in pandas dataframe
rating_count = df_train.groupBy('label').count().select('label','count').toPandas()
# class weights
count_sum = rating_count['count'].sum()
rating_count['class_weight_temp'] = rating_count['count'].apply(lambda x: count_sum/x)
# normalize the weights
class_weight_temp_sum = rating_count['class_weight_temp'].sum()
rating_count['class_weight'] = rating_count['class_weight_temp'].apply(lambda x: float(x)/class_weight_temp_sum)
rating_count

Unnamed: 0,label,count,class_weight_temp,class_weight
0,9.0,27531,5,0.030488
1,5.0,8013,20,0.121951
2,7.0,9456,17,0.103659
3,2.0,6931,23,0.140244
4,3.0,6513,24,0.146341
5,10.0,50989,3,0.018293
6,1.0,21619,7,0.042683
7,6.0,6343,25,0.152439
8,8.0,18890,8,0.04878
9,4.0,5012,32,0.195122


In [18]:
def class_weights1(x):
    if x == 10:
        return 0.0188
    elif x == 9:
        return 0.034
    elif x == 1:
        return 0.044
    elif x == 8:
        return 0.05
    elif x == 7:
        return 0.1
    elif x == 5:
        return 0.119
    elif x == 2:
        return 0.138
    elif x == 3:
        return 0.147
    elif x == 6:
        return 0.151
    elif x == 4:
        return 0.191

# this did not work
def class_weights(x):
    return rating_count.loc[rating_count['label'] == x,'class_weight'].values[0]

    
classWeight_udf = udf(lambda x: class_weights1(x))
df_train = df_train.withColumn('classWeight', classWeight_udf(df_train.label).cast('float'))

In [19]:
df_train.show(show_rows)

+------+-----------+--------------------+--------------------+------+------------------+-----------+-----+--------+---+----+------------+----------+-------------------+----------------+-----------+
|    id|   drugname|           condition|              review|rating|             dates|usefulcount|label|   month|day|year|month_number|dates_temp|    dates_converted|days_from_review|classWeight|
+------+-----------+--------------------+--------------------+------+------------------+-----------+-----+--------+---+----+------------+----------+-------------------+----------------+-----------+
|206461|  Valsartan|Left Ventricular ...|"It has no side e...|   9.0|      May 20  2012|       27.0|  9.0|     May| 20|2012|          05|2012-05-20|2012-05-20 00:00:00|          2850.0|      0.034|
| 95260|Guanfacine |               ADHD |My son is halfway...|   8.0|   April 27  2010 |      192.0|  8.0|   April| 27|2010|          04|2010-04-27|2010-04-27 00:00:00|          3604.0|       0.05|
| 92703|  

### Prepare Modeling Data
#### Pipeline

In [20]:
# assemble data for modeling
#vec_assembler = VectorAssembler(inputCols=["bof_feature", "rawFeatures", "tfidf_feature", "days_from_review", "usefulcount"], outputCol="features")
vec_assembler = VectorAssembler(inputCols=["tfidf_feature", "days_from_review", "usefulcount"], outputCol="features")

# pipeline of different stages
pipeline = Pipeline(stages=[regexTokenizer, countVectors, hashingTF, idf, vec_assembler])
pipelineModel = pipeline.fit(df_train)
training_data = pipelineModel.transform(df_train)#.select('condition','condition_words','bof_feature','rawFeatures','tfidf_feature').show()
training_data.show(show_rows)

+------+-----------+--------------------+--------------------+------+------------------+-----------+-----+--------+---+----+------------+----------+-------------------+----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|   drugname|           condition|              review|rating|             dates|usefulcount|label|   month|day|year|month_number|dates_temp|    dates_converted|days_from_review|classWeight|     condition_words|         bof_feature|         rawFeatures|       tfidf_feature|            features|
+------+-----------+--------------------+--------------------+------+------------------+-----------+-----+--------+---+----+------------+----------+-------------------+----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|206461|  Valsartan|Left Ventricular ...|"It has no side e...|   9.0|      May 20  2012|   

### Model Training

In [21]:
def training_and_evaluation(model_string, model_object, training_data):
    print('{}:'.format(model_string))
    # model training
    model = model_object.fit(training_data)
    
    trainingSummary = model.summary
    print('Training data accuracy: ' + str(trainingSummary.accuracy))
    
    training_predictions = model.transform(training_data)
    
    # Another way to check multiclass prediction accuracy
    # evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol = "prediction")
    # print('Training data accuracy: ', evaluator.evaluate(training_predictions, {evaluator.metricName: "accuracy"}))
    
    pred_result = training_predictions.select('rating','prediction','probability').toPandas()

    y_actu = pd.Series(pred_result['rating'], name='Actual')
    y_pred = pd.Series(pred_result['prediction'], name='Predicted')
    df_confusion = pd.crosstab(y_actu, y_pred)

    print("Confusion matrix:")
    
    return model, df_confusion

#### Logistic Regression

In [22]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=50) # weightCol="classWeights"
# weight class was tried but did not help.
model_lr, confusion_mat_lr = training_and_evaluation('Logistic Regression', lr, training_data)
confusion_mat_lr

Logistic Regression:
Training data accuracy: 0.343019398997
Confusion matrix:


Predicted,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0
Actual,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1.0,8884,3,1,4,7,1,5,107,248,12359
10.0,4972,4,8,11,18,6,16,181,587,45186
2.0,2298,14,3,1,3,1,2,60,100,4449
3.0,1954,3,22,1,5,0,8,62,135,4323
4.0,1439,1,3,12,3,2,5,36,126,3385
5.0,2182,3,3,2,31,2,1,85,214,5490
6.0,1408,2,3,0,9,12,4,76,188,4641
7.0,1727,1,3,2,9,5,26,99,300,7284
8.0,2799,2,6,3,13,4,8,311,492,15252
9.0,3086,2,8,9,11,1,5,185,830,23394


#### Random Forest

#### Decision Tree

#### Naive Bayes

### Cross Validation and Model Selection
optimal parameters not found

### Model Testing

In [23]:
df_test = spark.sql('select * from test_path.test_drug_review')
df_test = df_test.cache()
df_test.show(show_rows)

+---+-----------+--------------------+--------------------+------------------+-----------+
| id|   drugname|           condition|              review|             dates|usefulcount|
+---+-----------+--------------------+--------------------+------------------+-----------+
|  0|Mirtazapine|          Depression|"I&#039;ve tried ...| February 28  2012|         22|
|  1| Mesalamine|Crohn's Disease  ...|My son has Crohn&...|      May 17  2009|         17|
|  2|    Bactrim|Urinary Tract Inf...|Quick reduction o...|September 29  2017|          3|
|  3|   Contrave|         Weight Loss|Contrave combines...|     March 5  2017|         35|
+---+-----------+--------------------+--------------------+------------------+-----------+
only showing top 4 rows



#### Features calculations on Test Data

In [24]:
df_test = df_test.transform(calculate_date_feature)
df_test = df_test.withColumn('usefulcount', df_test.usefulcount.cast('float'))

#### Model Performance on Test Data

In [25]:
model = model_lr
testing_data = pipelineModel.transform(df_test)

In [26]:
test_predictions = model.transform(testing_data)
test_predictions.show(show_rows)


+---+-----------+--------------------+--------------------+------------------+-----------+---------+---+----+------------+----------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| id|   drugname|           condition|              review|             dates|usefulcount|    month|day|year|month_number|dates_temp|    dates_converted|days_from_review|     condition_words|         bof_feature|         rawFeatures|       tfidf_feature|            features|       rawPrediction|         probability|prediction|
+---+-----------+--------------------+--------------------+------------------+-----------+---------+---+----+------------+----------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0|Mirtazap

### Close Spark Session

In [33]:
spark.stop()