# Importing Libraries

In [1]:
SparkContext.setSystemProperty('spark.executor.pyspark.memory', '8g')
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark import SparkFiles
from pyspark.sql.functions import regexp_replace
import seaborn as sns
from sqlalchemy import create_engine, inspect
sns.set_style('whitegrid')
import plotly.express as px
from pyspark import SparkFiles, SparkContext
import os
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
from pyspark.mllib.util import MLUtils
from config import rds_user, rds_password

# Reading data from AWS S3


In [3]:
%%time
url1= "https://icdrive1.s3.amazonaws.com/drugsComTrain_raw.csv"
spark.sparkContext.addFile(url1)
df_train = spark.read.option('header', 'true').csv(SparkFiles.get("drugsComTrain_raw.csv"), inferSchema=True, quote='\"', escape='\"',multiLine=True, timestampFormat="mm/dd/yyyy HH:mm:ss")
url2 = "https://icdrive1.s3.amazonaws.com/drugsComTest_raw.csv"
spark.sparkContext.addFile(url2)
df_test = spark.read.option('header', 'true').csv(SparkFiles.get("drugsComTest_raw.csv"), inferSchema=True, quote='\"', escape='\"',multiLine=True, timestampFormat="mm/dd/yyyy HH:MM:SS")

Wall time: 1min 52s


In [37]:
%%time
# Concatenating the train and test data for cleaning
df = df_train.union(df_test)

Wall time: 1.99 ms


## Printing the schema

In [38]:
df.printSchema()

root
 |-- uniqueID: integer (nullable = true)
 |-- drugName: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- review: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- usefulCount: integer (nullable = true)



## Viewing the data

In [39]:
df.show()

+--------+--------------------+--------------------+--------------------+------+---------+-----------+
|uniqueID|            drugName|           condition|              review|rating|     date|usefulCount|
+--------+--------------------+--------------------+--------------------+------+---------+-----------+
|  206461|           Valsartan|Left Ventricular ...|"It has no side e...|     9|20-May-12|         27|
|   95260|          Guanfacine|                ADHD|"My son is halfwa...|     8|27-Apr-10|        192|
|   92703|              Lybrel|       Birth Control|"I used to take a...|     5|14-Dec-09|         17|
|  138000|          Ortho Evra|       Birth Control|"This is my first...|     8| 3-Nov-15|         10|
|   35696|Buprenorphine / n...|   Opiate Dependence|"Suboxone has com...|     9|27-Nov-16|         37|
|  155963|              Cialis|Benign Prostatic ...|"2nd day on 5mg s...|     2|28-Nov-15|         43|
|  165907|      Levonorgestrel|Emergency Contrac...|"He pulled out, b...|

## Data Cleaning

In [40]:
# Removing &#039; from review column and replacing with apostrophe(')
df = df.withColumn("review", regexp_replace("review", "&#039;", "'"))
# Verifying the column
df.select("review").show()

+--------------------+
|              review|
+--------------------+
|"It has no side e...|
|"My son is halfwa...|
|"I used to take a...|
|"This is my first...|
|"Suboxone has com...|
|"2nd day on 5mg s...|
|"He pulled out, b...|
|"Abilify changed ...|
|" I Ve had  nothi...|
|"I had been on th...|
|"I have been on t...|
|"I have taken ant...|
|"I had Crohn's wi...|
|"Have a little bi...|
|"Started Nexplano...|
|"I have been taki...|
|"This drug worked...|
|"I've been taking...|
|"I've been on eve...|
|"I have been on T...|
+--------------------+
only showing top 20 rows



In [41]:
# replacing quotes 
df = df.withColumn("review", regexp_replace("review", '\"', ""))
# Verifying the change
df.select("review").show(1, truncate=False)


+-----------------------------------------------------------------------------+
|review                                                                       |
+-----------------------------------------------------------------------------+
|It has no side effect, I take it in combination of Bystolic 5 Mg and Fish Oil|
+-----------------------------------------------------------------------------+
only showing top 1 row



In [42]:
# Cleaning condition field and replacing <span> strings with 9999
df = df.withColumn('condition', regexp_replace('condition', '\d+</span> users found this comment helpful.', '9999'))

In [43]:
# Converting date field from string type to date type
from pyspark.sql.functions import unix_timestamp, from_unixtime
df = df.select('uniqueID', 'drugName','condition','review','rating','usefulCount', from_unixtime(unix_timestamp('date', 'dd-MMM-yy')).alias('review_date'))
df.show()

+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+
|uniqueID|            drugName|           condition|              review|rating|usefulCount|        review_date|
+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+
|  206461|           Valsartan|Left Ventricular ...|It has no side ef...|     9|         27|2012-05-20 00:00:00|
|   95260|          Guanfacine|                ADHD|My son is halfway...|     8|        192|2010-04-27 00:00:00|
|   92703|              Lybrel|       Birth Control|I used to take an...|     5|         17|2009-12-14 00:00:00|
|  138000|          Ortho Evra|       Birth Control|This is my first ...|     8|         10|2015-11-03 00:00:00|
|   35696|Buprenorphine / n...|   Opiate Dependence|Suboxone has comp...|     9|         37|2016-11-27 00:00:00|
|  155963|              Cialis|Benign Prostatic ...|2nd day on 5mg st...|     2|         43|2015

In [49]:
# Verifying if condition starting "<span>" are removed
df.filter(df["condition"].rlike("<span>")).count()

0

## Checking for null values

In [50]:
df.describe().show()

+-------+------------------+--------------------+-----------+--------------------+------------------+-----------------+-------------------+
|summary|          uniqueID|            drugName|  condition|              review|            rating|      usefulCount|        review_date|
+-------+------------------+--------------------+-----------+--------------------+------------------+-----------------+-------------------+
|  count|            215063|              215063|     213869|              215063|            215063|           215063|             215063|
|   mean|116039.36481403124|                null|     9999.0|                null| 6.990007579174474|28.00100435686287|               null|
| stddev| 67007.91336634514|                null|        0.0|                null|3.2755544975903432| 36.3460688835081|               null|
|    min|                 0|A + D Cracked Ski...|       9999|

 please tell th...|                 1|                0|2008-02-24 00:00:00|
|    max|           

In [51]:
# Finding null valu count for condition column
from pyspark.sql.functions import isnan

In [52]:
df.filter(df["condition"].isNull()).count()


1194

In [53]:
# Dropping all NA rows
#df_after_na = df.dropna()
from pyspark.sql.functions import when, lit
df_after_na = df.withColumn("condition", \
              when(df["condition"].isNull(), lit('9999')).otherwise(df["condition"]))
df_after_na.show()

+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+
|uniqueID|            drugName|           condition|              review|rating|usefulCount|        review_date|
+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+
|  206461|           Valsartan|Left Ventricular ...|It has no side ef...|     9|         27|2012-05-20 00:00:00|
|   95260|          Guanfacine|                ADHD|My son is halfway...|     8|        192|2010-04-27 00:00:00|
|   92703|              Lybrel|       Birth Control|I used to take an...|     5|         17|2009-12-14 00:00:00|
|  138000|          Ortho Evra|       Birth Control|This is my first ...|     8|         10|2015-11-03 00:00:00|
|   35696|Buprenorphine / n...|   Opiate Dependence|Suboxone has comp...|     9|         37|2016-11-27 00:00:00|
|  155963|              Cialis|Benign Prostatic ...|2nd day on 5mg st...|     2|         43|2015

In [54]:
# Verify
#print(f'Null records {df_after_na.filter(df["condition"]='9999').count()}')
df_after_na.describe().show()

+-------+------------------+--------------------+-----------+--------------------+------------------+-----------------+-------------------+
|summary|          uniqueID|            drugName|  condition|              review|            rating|      usefulCount|        review_date|
+-------+------------------+--------------------+-----------+--------------------+------------------+-----------------+-------------------+
|  count|            215063|              215063|     215063|              215063|            215063|           215063|             215063|
|   mean|116039.36481403124|                null|     9999.0|                null| 6.990007579174474|28.00100435686287|               null|
| stddev| 67007.91336634514|                null|        0.0|                null|3.2755544975903432| 36.3460688835081|               null|
|    min|                 0|A + D Cracked Ski...|       9999|

 please tell th...|                 1|                0|2008-02-24 00:00:00|
|    max|           

## Feature Engineering

In [55]:
## Adding a new column for prediction purpose
## If rating is > 5, we will mark as 'Postive' else 'Negative'
from pyspark.sql import functions as f
df_cleaned = df_after_na.withColumn('review_outcome', f.when(f.col('rating') > 5, "Positive").otherwise("Negative"))
df_cleaned = df_cleaned.withColumnRenamed('date','review_date')
df_cleaned.show()

+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+--------------+
|uniqueID|            drugName|           condition|              review|rating|usefulCount|        review_date|review_outcome|
+--------+--------------------+--------------------+--------------------+------+-----------+-------------------+--------------+
|  206461|           Valsartan|Left Ventricular ...|It has no side ef...|     9|         27|2012-05-20 00:00:00|      Positive|
|   95260|          Guanfacine|                ADHD|My son is halfway...|     8|        192|2010-04-27 00:00:00|      Positive|
|   92703|              Lybrel|       Birth Control|I used to take an...|     5|         17|2009-12-14 00:00:00|      Negative|
|  138000|          Ortho Evra|       Birth Control|This is my first ...|     8|         10|2015-11-03 00:00:00|      Positive|
|   35696|Buprenorphine / n...|   Opiate Dependence|Suboxone has comp...|     9|         37|2016-11-27 0

## EDA

## Top 10 used drugs

In [56]:
from pyspark.sql.functions import desc, avg,sum
df_bydrugName = df_cleaned.groupBy(["drugName","condition"]).agg({'drugName': 'count', "rating": "avg" })
df_bydrugName.show()

+--------------------+--------------------+---------------+-----------------+
|            drugName|           condition|count(drugName)|      avg(rating)|
+--------------------+--------------------+---------------+-----------------+
|      Levonorgestrel|       Birth Control|           2884|7.038834951456311|
|    Ortho Tri-Cyclen|       Birth Control|            116|             5.75|
|           Methadone|        Chronic Pain|            108|8.898148148148149|
|          Paroxetine|  Anxiety and Stress|             94|6.787234042553192|
|             Desoxyn|                ADHD|             19|9.631578947368421|
|              Zofran|Nausea/Vomiting, ...|              7|7.285714285714286|
|      Levocetirizine|           Allergies|             97|5.278350515463917|
|      Insulin lispro|    Diabetes, Type 2|              8|              6.0|
|             Emetrol|     Nausea/Vomiting|             21|9.095238095238095|
|          Diflunisal|                Pain|              2|     

In [57]:
top10_drugs = df_bydrugName.orderBy(desc("count(drugName)")).select("*").toPandas().drop_duplicates(subset='condition',keep='first')



In [58]:
top10_drugs.head(10)

Unnamed: 0,drugName,condition,count(drugName),avg(rating)
0,Etonogestrel,Birth Control,4394,5.829768
6,Levonorgestrel,Emergency Contraception,1651,8.472441
7,Phentermine,Weight Loss,1650,8.769697
9,Miconazole,Vaginal Yeast Infection,1338,2.988042
11,Varenicline,Smoking Cessation,1079,8.803522
15,Bupropion / naltrexone,Obesity,888,6.970721
20,Magnesium sulfate / potassium sulfate / sodium...,Bowel Preparation,821,7.431181
24,Bupropion,Depression,747,7.386881
26,Isotretinoin,Acne,682,8.353372
27,Escitalopram,Anxiety,669,7.825112


In [1]:
#sns.barplot(x="drugName", y="count", data=top10_drugs.head(10))
#fig = px.bar(top10_drugs.head(20), x='drugName', y='count(drugName)',
#            hover_data=['avg(rating)','condition'], color='avg(rating)')
#fig.show()

## Top 10 conditions

In [61]:
from pyspark.sql.functions import desc
df_byCondition = df_cleaned.groupBy("condition").count()
print((df_byCondition.count(), len(df_byCondition.columns)))

(837, 2)


In [62]:
top10_Conditions = df_byCondition.orderBy(desc("count")).select("*").toPandas()
top10_Conditions.head()


Unnamed: 0,condition,count
0,Birth Control,38436
1,Depression,12164
2,Pain,8245
3,Anxiety,7812
4,Acne,7435


In [2]:
#fig = px.bar(top10_Conditions.head(20), x='condition', y='count')
#fig.show()

## Distribution of rating

In [64]:
df_rating = df_cleaned.select("rating").toPandas()
fig = px.histogram(df_rating, x="rating", histnorm='probability density')
fig.show()

## Writing the dataframes to PostgreSQL in AWS RDS

In [65]:
#import os
os.environ["SPARK_CLASSPATH"] = 'C:\Spark\spark-2.3.3-bin-hadoop2.7\jars\postgresql-42.2.8.jar'

In [66]:
# Configure settings for RDS
#mode = "append"
# Setting mode to overite as currently the data is static
mode = "overwrite"
jdbc_url = "jdbc:postgresql://mypostgresdb2.cb2yuf2tmjzo.us-east-1.rds.amazonaws.com:5432/drugDB"
config = {"user": rds_user,
         "password": rds_password,
         "driver": "org.postgresql.Driver"}

In [67]:
df_cleaned.printSchema()

root
 |-- uniqueID: integer (nullable = true)
 |-- drugName: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- review: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- usefulCount: integer (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_outcome: string (nullable = false)



## Writing the combined data to RDS

In [68]:
# Append drug cleaned data to RDS instance
df_cleaned.write.jdbc(url=jdbc_url, table='drug_review_data', mode=mode, properties=config)

## Creating CSV locally for Tableau

In [71]:
# Writing the Spark dataframe locally as csv file
df_cleaned.toPandas().to_csv(os.path.join('../static/db/','drug_review_data_combined.csv'))


## Reading the Drug usage across the states of US - CSV from Amazon S3

In [73]:
%%time
url1= "https://icdrive1.s3.amazonaws.com/State_Drug_Utilization_Data_2019.csv"
spark.sparkContext.addFile(url1)
df_drug_usage = spark.read.option('header', 'true').csv(SparkFiles.get("State_Drug_Utilization_Data_2019.csv"), inferSchema=True, sep= ",", timestampFormat="mm/dd/yyyy HH:mm:ss")

Wall time: 2min 22s


In [74]:
df_drug_usage_cleaned = df_drug_usage.select("State","Product Name","Number of Prescriptions","Quarter Begin Date","Latitude","Longitude").dropna()
df_drug_usage_cleaned.show(5)

+-----+------------+-----------------------+------------------+--------+---------+
|State|Product Name|Number of Prescriptions|Quarter Begin Date|Latitude|Longitude|
+-----+------------+-----------------------+------------------+--------+---------+
|   AK|  CARVEDILOL|                     55|        01/01/2019|  61.385|-152.2683|
|   AR|   GLIPIZIDE|                     25|        01/01/2019| 34.9513| -92.3809|
|   AR|  CLOTRIMAZO|                    108|        01/01/2019| 34.9513| -92.3809|
|   AZ|  OXYCODONE-|                     40|        01/01/2019| 14.2417|-170.7197|
|   CA|  OXYBUTYNIN|                     20|        01/01/2019| 33.7712|-111.3877|
|   CA|  HYDROXIZIN|                     16|        01/01/2019| 33.7712|-111.3877|
|   CA|  GLIMEPIRID|                    102|        01/01/2019| 33.7712|-111.3877|
|   CA|  DULOXETINE|                     20|        01/01/2019| 33.7712|-111.3877|
|   CA|  HYDROXYCHL|                     19|        01/01/2019| 33.7712|-111.3877|
|   

## Writing the cleaned drug usage data to RDS

In [75]:
df_drug_usage_cleaned .write.jdbc(url=jdbc_url, table='drug_usage_2019', mode=mode, properties=config)

## Writing the cleaned drug usage data locally for Tableau

In [76]:
df_drug_usage_cleaned.toPandas().to_csv(os.path.join("../static/db","drug_usage_2019.csv"))

## Reading the cleaned data from S3

In [None]:
%%time
## Testing the combined CSV from S3 bucket.
url= "https://icdrive1.s3.amazonaws.com/drug_review_data_combined.csv"
spark.sparkContext.addFile(url)
df_combined = spark.read.option('header', 'true').csv(SparkFiles.get("drug_review_data_combined.csv"), inferSchema=True, sep=",", multiLine=True)

**Build Pandas Dataframe for performing Sentimental Analysis**

In [None]:
train_df = df_combined.toPandas()
train_df["pos"] = ""
train_df["neg"] = ""

In [None]:
train_df['label'] = train_df['rating'].apply(lambda x: 1 if x>5 else 0)

In [None]:
train_df.head()

## Perform Sentiment Analysis on Training Data

In [None]:
analyzer = SentimentIntensityAnalyzer()

**Polarity scores calculated**

In [None]:
res = analyzer.polarity_scores(train_df["review"][0])
res

In [None]:
train_df_df["review"][0]

In [None]:
%%time
#review_df = review_df.withColumnRenamed("review_outcome", "label")
train_df["pos"] = train_df["review"].apply(lambda x: analyzer.polarity_scores(x)['pos'])

In [None]:
%%time
train_df["neg"] = train_df["review"].apply(lambda x: analyzer.polarity_scores(x)['neg'])

In [None]:
train_df.head()

In [None]:
#train_df.to_csv('https://icdrive1.s3.amazonaws.com/drug_review_data_cleaned.csv', index=False)
train_df.to_csv(os.path.join("../static/db/",'drug_review_data_cleaned.csv'))

Also this data is uploaded to S3 bucket.