# TEXT MINING DRUG REVIEWS DATASET

### Questions to Think About

1. Can we create a way for people to find the best medication for their illness? 
In other words, can we use this dataset to implement a recommendation system?
2. Is this problem better suited for classification or regression? 
In other words, what feature (or derived feature) should be used as the outcome variable?
3. Can we determine what features or words are most important for predicting review rating or usefulness count?

## STEP 1: IMPORT SPARK AND CREATE A SPARK SESSION

In [None]:
#use findspak library to automatically locate spark installation for us to import it
#install findspark
#!pip install findspark
import findspark
findspark.init()

In [None]:
#import pyspark and SparkSession
import pyspark
from pyspark.sql import SparkSession

In [None]:
#create spark session
spark = (SparkSession.builder
                    .appName('medicineModel1')
                    .getOrCreate())

## STEP 2: IMPORT DATA

In [None]:
#read training data
#header, delimiter are required options in this case
train_df = spark.read.options(header=True, inferSchema=True, delimiter='\t') \
  .csv('drugsComTrain_raw.tsv')

In [None]:
#view the first 10 lines of data
train_df.show(10)


In [None]:
#use head()
train_df.head()

In [None]:
#use take()
train_df.take(5)

In [None]:
#using limit(n).toPandas()
train_df.limit(5).toPandas()

In [None]:
#read test data
test_df = spark.read.options(header=True, inferSchema=True, delimiter='\t') \
  .csv('drugsComTest_raw.tsv')

In [None]:
#show some rows
test_df.limit(5).toPandas()

In [None]:
#print comlumn types
train_df.printSchema()

In [None]:
#humber of rows in train_df
train_df.count()

In [None]:
#number of rows in test df
test_df.count()

In [None]:
#total number of rows of combined data
tot_num = train_df.count() + test_df.count()
print(tot_num)

## STEP 3: PROCESS THE DATA

In [None]:
#combine train_df and test_df
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

#define function (takes a variable number of arguments)
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

#call the function
combined_df = unionAll(train_df,test_df)
combined_df.count()

In [None]:
#count number of missing values in each column of the dataframe

from pyspark.sql.functions import col,sum
combined_df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in combined_df.columns)).show()

In [None]:
#drop rows with missing values based on the usefulCount column
#use filter() method to return only rows for which usefulCount is not null
combined_df1 = combined_df.filter(combined_df.usefulCount.isNotNull())

In [None]:
#Again, count number of missing values in each column of the dataframe
combined_df1.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in combined_df1.columns)).show()

In [None]:
#remove rows with missing values in the condition column and check if there are still any missing values
combined_df2 = combined_df1.filter(combined_df1.condition.isNotNull())
combined_df2.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in combined_df2.columns)).show()

In [None]:
#number of rows in cleaned df
combined_df2.count()

In [None]:
#view the first 5 rows of cleaned data
combined_df2.limit(5).toPandas()

### Process Date column data

In [None]:
#function to parse date into desired format
def proc_date(date_str):
    temp = date_str
    temp1 = temp[temp.rfind(' ')+1:] #extract substring from index of " " plus 1 up to the end of the string
    temp2 = temp.replace(" ", ",") #replace the space between month and day with a comma
    temp3 = temp2[:-5] #extract month
    temp4 = temp3+temp1 #concatenate the two substrings
    return temp4
   

In [None]:
#test the proc_date function
my_date = 'November 3, 2015'
my_date1 = proc_date(my_date)
print(my_date1)

In [None]:
#Apply the proc_date function to the data
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql.functions import udf

#lambda helper function
prop_date_udf = udf(lambda date: proc_date(date), StringType())

#transform the data using the above and save the result in new prop_date column as a string
combined_df3 = combined_df2.withColumn("prop_date", prop_date_udf(combined_df2.date))
    

In [None]:
combined_df3.show(5)

In [None]:
#define date formatter function
from datetime import datetime
def date_formatter(bad_date):
    good_date = datetime.strptime(bad_date,'%B,%d,%Y').strftime('%Y-%m-%d')
    return good_date

In [None]:
#apply the date_formatter function to the data
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql.functions import udf

#define lambda function
sys_date_udf = udf(lambda p_date: date_formatter(p_date), StringType())

combined_df4 = combined_df3.withColumn("sys_date", sys_date_udf(combined_df3.prop_date))

In [None]:
combined_df4.show(2)

In [None]:
#compute number of days
#use pyspark's datediff library to compute number of days between two dates
#problem: what will be the reference date?
#Here using the bdate the dateset was donated to UCI MLL as reference date (i.e., October 4, 2018)
from pyspark.sql.functions import datediff, to_date, lit

combined_df5 = combined_df4.withColumn("num_days", 
              datediff(to_date(lit("2018-10-04")),
                       to_date("sys_date","yyyy-MM-dd")))

In [None]:
combined_df5.limit(2).toPandas()

In [None]:
#drop columns
drop_list = ['_c0','date', 'prop_date']

#new df after dropping the columns
combined_df6 = combined_df5.select([column for column in combined_df5.columns if column not in drop_list])
combined_df6.show(2)

In [None]:
#Remove quotation marks

from pyspark.sql.functions import *
combined_df6 = combined_df6.withColumn('review', regexp_replace('review', '\"', ''))

In [None]:
#new df after dropping the columns
review1 = ['review']
reviews = combined_df6.select([column for column in combined_df6.columns if column in review1])
reviews.show(2)

In [None]:
from bs4 import BeautifulSoup
html_entities_udf = udf(lambda txt: BeautifulSoup(txt).text, StringType())
combined_df6 = combined_df6.withColumn("review", html_entities_udf(combined_df6.review))

In [None]:
#drop rows containing "</span>"
combined_df7 = combined_df6.filter("condition not like '%</span>%'")

In [None]:
combined_df6.count(), combined_df7.count()

In [None]:
combined_df7.limit(5).toPandas()

In [None]:
#drop rows with no usefulCount
combined_df7a = combined_df7.filter("usefulCount > 0")

combined_df7.count(), combined_df7a.count()

In [None]:
#Construct a reverse map of indices and condition names
import pandas as pd
from pyspark.sql.functions import col
cond_name = combined_df7a.select('condition')

In [None]:
cond_name.count()

In [None]:
cond_name1 = [i.condition for i in combined_df7a.select('condition').distinct().collect()]

In [None]:
len(cond_name1)

In [None]:
for cond in cond_name1:
    print(cond)

In [None]:
cond_index = []
j =0;
for j in range(len(cond_name1)):
    cond_index.append(j)
    j +=1

In [None]:
len(cond_index)

In [None]:
for id in cond_index:
    print(id)

In [None]:
col_names = ['condition', 'conditionID']

condition_df = pd.DataFrame(zip(cond_name1,cond_index),columns=col_names)

In [None]:
condition_df.head(), condition_df.tail()

In [None]:
cond_mapping = dict(condition_df[['condition', 'conditionID']].values)
#print(cond_mapping)

In [None]:
drug_name = combined_df7a.select('drugName')
drug_name.count()

In [None]:
drug_name1 = [i.drugName for i in combined_df7a.select('drugName').distinct().collect()]
len(drug_name1)

In [None]:
drug_index = []
k =0;
for k in range(len(drug_name1)):
    drug_index.append(k)
    k +=1
len(drug_index)

In [None]:
col_names1 = ['drugName', 'drugID']

drug_df = pd.DataFrame(zip(drug_name1,drug_index),columns=col_names1)

drug_df.head()

In [None]:
drug_mapping = dict(drug_df[['drugName', 'drugID']].values)
#print(drug_mapping)

In [None]:
combined_df8 = combined_df7a.toPandas()
combined_df8['drugID'] = combined_df8.drugName.map(drug_mapping)
combined_df8['conditionID'] = combined_df8.condition.map(cond_mapping)

In [None]:
combined_df8.head()

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
combined_df9 = sqlContext.createDataFrame(combined_df8)
combined_df9.limit(5).toPandas()

In [None]:
cond_705 = combined_df9.filter("conditionID = 705").limit(5).toPandas()

In [None]:
result = combined_df9.groupBy('conditionID').count().orderBy('count', asc=True)
result.show()

In [None]:
combined_df9.filter("conditionID = 705").orderBy("rating").distinct().show()


In [None]:
combined_df9_pd = combined_df9.select("*").toPandas()

In [None]:
combined_df9_pd.head()

#Helpfulness: Extremely helpful, Very helpful, Moderately helpful, Slightly helpful, Not at all helpful
#Helpfulness_1: Extremely helpful, Very helpful, Helpful, Moderately helpful, Slightly helpful

In [None]:
cat_labels = ["Slightly helpful","Moderately helpful","Helpful","Very helpful","Extremely helpful"]
combined_df9_pd['cat_usefulCount'] = pd.qcut(combined_df9_pd['usefulCount'], q=5, precision=0,
                                             labels = cat_labels)

In [None]:
combined_df9_pd.head()

In [None]:
combined_df9_pd['usefulCount_bins'] = pd.qcut(combined_df9_pd['usefulCount'], q=5, precision=0)

combined_df9_pd.head()

In [None]:
#combined_df9_pd.drop(['usefulCount_bins'], axis=1, inplace=True)
combined_df10 = sqlContext.createDataFrame(combined_df9_pd)
combined_df10.limit(5).toPandas()

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="cat_usefulCount", outputCol="usefulCountIndex")
indexed_df = indexer.fit(combined_df10).transform(combined_df10)


In [None]:
indexed_df.limit(5).toPandas()

In [None]:
y = indexed.select('usefulCountIndex')
y.show(5)

In [None]:
nb_classes = y.distinct().count()

In [None]:
print(nb_classes)

In [None]:
#extract only numerical columns
numeric_cols = ['rating','num_days', 'usefulCount']
numeric_df = indexed_df.select([column for column in combined_df9.columns if column in numeric_cols])
numeric_df.show(5)

In [None]:
numeric_df.printSchema()

In [None]:
from pyspark.sql.functions import col
indexed_df = indexed_df.withColumn("rating",col("rating").cast("int"))

indexed_df.printSchema()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

columns_to_scale = ["rating", "usefulCount", "num_days"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline1 = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline1.fit(indexed_df)
scaledData = scalerModel.transform(indexed_df)

In [None]:
scaledData.limit(5).toPandas()

In [None]:
numeric_features = scaledData.select('rating_scaled', 'num_days_scaled')

numeric_features.show(5)

In [None]:
#extract text column
#txt_col = ['review']
#text_df = combined_df10.select([column for column in combined_df9.columns if column in txt_col])
text_df = scaledData.select('review')
text_df.limit(5).toPandas()

### Data Transformation

In [None]:
#Pineline for text processing
from pyspark.ml.feature import CountVectorizer, StopWordsRemover, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF

tokenizer = Tokenizer(inputCol="review", outputCol="words")

tokenized_df = tokenizer.transform(scaledData)


In [None]:
tokenized_df.limit(5).toPandas()

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="textFeatures")

transformed_df = remover.transform(tokenized_df)

transformed_df.limit(5).toPandas()

In [None]:
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="textFeatures", outputCol="rawFeatures", vocabSize=50000, minDF=2.0)

cv_model = cv.fit(transformed_df)

transformed_df2 = cv_model.transform(transformed_df)

transformed_df2.limit(5).toPandas()


In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features")

idfModel = idf.fit(transformed_df2)
transformed_df3 = idfModel.transform(transformed_df2)

transformed_df3.limit(5).toPandas()

In [None]:
#number of classes
nb_classes = transformed_df3.select('usefulCountIndex').distinct().count()

#number of inputs or input dimensions
input_dim = len(transformed_df3.select('features').first()[0])


In [None]:
print(nb_classes, ',', input_dim)

In [None]:
#split the data
#shuffle the data
final_df = transformed_df3.orderBy(rand())

#split the data
train_data, test_data = final_df.randomSplit((0.75, 0.25), seed=1234)

In [None]:
train_data.limit(5).toPandas()

In [None]:
#select the rows to use
train_df = train_data.select('rating_scaled','num_days_scaled', 'features', 'usefulCountIndex')
test_df = test_data.select('rating_scaled','num_days_scaled', 'features', 'usefulCountIndex')

In [None]:
print("Training Dataset Count: " + str(train_df.count()))
print("Test Dataset Count: " + str(test_df.count()))

In [None]:
train_df.limit(5).toPandas()

In [None]:
#keras deep learning
from keras.models import Sequential
from keras.layers import LSTM, Activation, Dense, Dropout, Input, Embedding
from keras.layers import Bidirectional, GlobalMaxPooling1D
from keras.optimizers import RMSprop, Adam
from keras.callbacks import EarlyStopping
from keras.losses import SparseCategoricalCrossentropy
from keras.preprocessing import sequence


In [None]:
#combined_df9['use_count_bucks'] = pd.qcut(combined_df9['usefulCount'], q=4)

In [None]:
#data samples
X_train = train_df.select("rating_scaled", "num_days_scaled", "features")
y_train = train_df.select("usefulCountIndex")

X_test = test_df.select("rating_scaled", "num_days_scaled", "features")
y_test = test_df.select("usefulCountIndex")

nlp_input = final_df.select("features")
numeric_input = final_df.select("rating_scaled", "num_days_scaled")


In [None]:
from keras.models import Model
from keras import regularizers
#which allows you to have multiple inputs and indirect connections.
embedding_size =64
seq_length = 100

nlp_input = Input(shape=(seq_length,), name='nlp_input')
numeric_input = Input(shape=(2,), name='numeric_input')
emb = Embedding(output_dim=embedding_size, input_dim=50000, input_length=seq_length)(nlp_input)
nlp_out = Bidirectional(LSTM(128, dropout=0.3, recurrent_dropout=0.3, kernel_regularizer=regularizers.l2(0.01)))(emb)
x = concatenate([nlp_out, numeric_input])
x = Dense(classifier_neurons, activation='relu')(x)
x = Dense(1, activation='sigmoid')(x)
model = Model(inputs=[nlp_input , numeric_input], outputs=[x])

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
print(model.summary())
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=3, batch_size=64)



In [None]:
model = keras.Sequential()

model.add(
    layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(4, 8))
)
model.add(layers.Bidirectional(layers.LSTM(32)))
model.add(layers.Dense(10))

model.summary()