In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [2 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 https://developer.download.nvidia.com/comp

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
# Start a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("word2vec").getOrCreate()

In [4]:
# Import needed libs
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import Word2Vec, Word2VecModel
import string


In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="/content/gdrive/MyDrive/Project3_BeerQuality/Resources/reviews_beer_brewery.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("reviews_beer_brewery.csv"), header=True)

# Show DataFrame
df.show()

+-------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+----+----------+--------------------+---------------+-------------+-----------------+
|beer_id|        username|      date|                text|                look|               smell|               taste|                feel|             overall|               score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|   brewery_city|brewery_state|    brewery_types|
+-------+----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+----+----------+--------------------+-----------

In [6]:
# Check the schema for numeric conversion
df.printSchema

<bound method DataFrame.printSchema of DataFrame[beer_id: string, username: string, date: string, text: string, look: string, smell: string, taste: string, feel: string, overall: string, score: string, beer_name: string, review_state: string, style: string, availability: string, abv: string, brewery_id: string, brewery_name: string, brewery_city: string, brewery_state: string, brewery_types: string]>

In [7]:
df.count()

2127677

In [8]:
# remove alpha characters from data set on review numerics
from pyspark.sql.functions import regexp_extract, col
pattern = r'^\d+.\d+$'
df = df.filter(regexp_extract(col('look'), pattern, 0) != '')

In [9]:
# confirm look, smell, taste, overall, and score
df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|
+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+
| 271781|    bluejacket74|2017-03-17|   750 ml bottle,...| 4.0|  4.0|  4.0|4.25|    4.0| 4.03|Motorbreath Imper...|          OH|American Imperial...|Limited (brewed o...|10.8|     28094|Four String Brewi...|    Columbus|           OH| Brewery, Bar|
| 18

In [10]:
df.count()

1973277

In [11]:
# Next 6 lines casts to floats all the review ratings
df = df.withColumn('look_numeric', df['look'].cast('float'))

In [12]:
df = df.withColumn('smell_numeric', df['smell'].cast('float'))

In [13]:
df = df.withColumn('taste_numeric', df['taste'].cast('float'))

In [14]:
df = df.withColumn('feel_numeric', df['feel'].cast('float'))

In [15]:
df = df.withColumn('overall_numeric', df['overall'].cast('float'))

In [16]:
df = df.withColumn('score_numeric', df['score'].cast('float'))

In [17]:
# Confirm data types
df.printSchema

<bound method DataFrame.printSchema of DataFrame[beer_id: string, username: string, date: string, text: string, look: string, smell: string, taste: string, feel: string, overall: string, score: string, beer_name: string, review_state: string, style: string, availability: string, abv: string, brewery_id: string, brewery_name: string, brewery_city: string, brewery_state: string, brewery_types: string, look_numeric: float, smell_numeric: float, taste_numeric: float, feel_numeric: float, overall_numeric: float, score_numeric: float]>

In [18]:
# functions for manipulating review text into a format we can tokenize
def remove_punctuation(txt):
  txt = txt.strip('\xa0\xa0 ')
  return "".join(l if l not in string.punctuation else "" for l in txt)

def remove_space (text):
  if text:
    return text.replace(' ', '')
  else:
    return 'Unknown'
  

In [19]:
# define the udfs for the data
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf

remove_punctuation_udf = udf(remove_punctuation, StringType())
remove_punctuation_udf

remove_space_udf = udf(remove_space, StringType())
remove_space_udf

<function __main__.remove_space>

In [20]:
# clean the beer name by removing spaces (will be combined with text)
beer_df = df.withColumn('beer_clean', remove_space_udf(col('beer_name')))
beer_df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+------------+---------------+-------------+--------------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|look_numeric|smell_numeric|taste_numeric|feel_numeric|overall_numeric|score_numeric|          beer_clean|
+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+---

In [21]:
beer_df.count()

1973277

In [22]:
# clean the style name by removing spaces (will be combined with text)
style_df = beer_df.withColumn('style_clean', remove_space_udf(col('style')))
style_df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+------------+---------------+-------------+--------------------+--------------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|look_numeric|smell_numeric|taste_numeric|feel_numeric|overall_numeric|score_numeric|          beer_clean|         style_clean|
+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+--

In [23]:
style_df.count()

1973277

In [24]:
# clean the text by removing punctuation
text_df = style_df.withColumn("clean_text", remove_punctuation_udf(col("text")))
text_df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+------------+---------------+-------------+--------------------+--------------------+--------------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|look_numeric|smell_numeric|taste_numeric|feel_numeric|overall_numeric|score_numeric|          beer_clean|         style_clean|          clean_text|
+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+-

In [25]:
text_df.count()

1973277

In [26]:
# Combine beer_name, style, and review text into a Final_text column
from pyspark.sql.functions import concat_ws

combined_df = text_df.withColumn('Final_text', concat_ws(' ',text_df.beer_clean, text_df.style_clean, text_df.clean_text))
combined_df.select('text', 'Final_text').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
combined_df.count()

1973277

In [28]:
# Setup and execute Tokenizer and Stop words pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

# "Creating pipeline..."
tokenizer = Tokenizer(inputCol="Final_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text', outputCol='features')

pipeline = Pipeline(stages=[tokenizer, stopremove])

# "Training model..."
pipeline_stg = pipeline.fit(combined_df)
final_df = pipeline_stg.transform(combined_df)
# final_df.show()

In [29]:
final_df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|look_numeric|smell_numeric|taste_numeric|feel_numeric|overall_numeric|score_numeric|          beer_clean|         style_clean|          clean_text|          Final_text|          token_text|            features|
+-------+----------------+----------+--------------------+----+-----+-----+----+--

In [30]:
final_df.count()

1973277

In [31]:
# Create w2v model hyperparameters
#word2vec = Word2Vec(
#    vectorSize=65,
#    seed=42,
#    inputCol="features",
#    outputCol="model"
#).setMaxIter(2)

In [32]:
# Fit the model, this takes time
#w2v_review_model = word2vec.fit(final_df)

In [33]:
# show some model vectors
#w2v_review_model.getVectors().show()

In [34]:
# check the model accuracy by checking synonyms 
#w2v_review_model.findSynonymsArray("light", 5)

In [35]:
# write the model to disk
#w2v_review_model.write().overwrite().save("/content/gdrive/MyDrive/Project3_BeerQuality/review.md")

In [36]:
# Read the model from disk to confirm
w2v_loaded_model = Word2VecModel.load('/content/gdrive/MyDrive/Project3_BeerQuality/review.md')

In [37]:
# check to see if synonym is consistent with loaded model
w2v_loaded_model.findSynonymsArray("light", 5)

[('medium', 0.6914238929748535),
 ('mostly', 0.6894241571426392),
 ('ligh', 0.6799790859222412),
 ('slight', 0.6678677201271057),
 ('mild', 0.6617092490196228)]

In [38]:
# Transform the mode
w2v_model_df = w2v_loaded_model.transform(final_df)

In [39]:
w2v_model_df.show()

+-------+----------------+----------+--------------------+----+-----+-----+----+-------+-----+--------------------+------------+--------------------+--------------------+----+----------+--------------------+------------+-------------+-------------+------------+-------------+-------------+------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|beer_id|        username|      date|                text|look|smell|taste|feel|overall|score|           beer_name|review_state|               style|        availability| abv|brewery_id|        brewery_name|brewery_city|brewery_state|brewery_types|look_numeric|smell_numeric|taste_numeric|feel_numeric|overall_numeric|score_numeric|          beer_clean|         style_clean|          clean_text|          Final_text|          token_text|            features|               model|
+-------+----------------+----------+---

In [40]:
# Prepare the hyperparameters
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
lin_model = LinearRegression(
   maxIter=5,
   regParam=0.3,
   featuresCol="model",
   labelCol="taste_numeric"
)

In [41]:
# split training and testing for the model (for temporary make the training set small as processing takes time)
training, testing = w2v_model_df.randomSplit([0.01, 0.99])

In [42]:
# Fit the look review to a model (takes time)
lin_look_train_model = lin_model.fit(training)

In [43]:
# Save the model for later use
lin_look_train_model.write().overwrite().save("/content/gdrive/MyDrive/Project3_BeerQuality/lin_taste_train.md")

In [44]:
# Read the model
loaded_lin_look_train_model = LinearRegressionModel.load("/content/gdrive/MyDrive/Project3_BeerQuality/lin_taste_train.md")

In [45]:
# Transform the training data to peek at the predictions
lin_look_df = loaded_lin_look_train_model.transform(training)

In [46]:
# show a sample of the predictions
lin_look_df.select('look_numeric', 'prediction', 'Final_text').show(truncate=False)

+------------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [47]:
#Experiment
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#evaluator = MulticlassClassificationEvaluator(
#    labelCol="look_numeric", 
#    predictionCol="prediction", 
#    metricName="accuracy"
#)

#accuracy = evaluator.evaluate(lin_look_df)

#print("Accuracy of model at predicting reviews was: %f" % accuracy)

In [48]:
print(training.rdd.getNumPartitions())

14


In [49]:
partitioned_df = training.repartition(28)

In [50]:
print(partitioned_df.rdd.getNumPartitions())

28


In [None]:
print(partitioned_df.count())

19703


In [None]:
print(spark.sparkContext.defaultParallelism)

2


In [None]:
s = spark.sparkContext._jsc.sc().getExecutorMemoryStatus().keys()
print(s)
l = str(s).replace("Set(","").replace(")","").split(", ")
print(l)

d = set()
for i in l:
    d.add(i.split(":")[0])
print(len(d))

print(spark.sparkContext._jsc.sc().getExecutorMemoryStatus().size())

Set(49b8789ee58c:46509)
['49b8789ee58c:46509']
1
1


In [None]:
spark.sparkContext._conf.getAll() 

[('spark.driver.port', '33263'),
 ('spark.app.id', 'local-1615414060868'),
 ('spark.driver.host', '49b8789ee58c'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'word2vec'),
 ('spark.ui.showConsoleProgress', 'true')]

In [None]:
spark.sparkContext._conf.get('spark.driver.maxResultSize')

In [None]:
!cat /proc/cpuinfo

processor	: 0
vendor_id	: GenuineIntel
cpu family	: 6
model		: 85
model name	: Intel(R) Xeon(R) CPU @ 2.00GHz
stepping	: 3
microcode	: 0x1
cpu MHz		: 1999.999
cache size	: 39424 KB
physical id	: 0
siblings	: 2
core id		: 0
cpu cores	: 1
apicid		: 0
initial apicid	: 0
fpu		: yes
fpu_exception	: yes
cpuid level	: 13
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat md_clear arch_capabilities
bugs		: cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa
bogomips	:

In [None]:
!cat /proc/meminfo

MemTotal:       13333568 kB
MemFree:          600144 kB
MemAvailable:   10967716 kB
Buffers:           47948 kB
Cached:         10174692 kB
SwapCached:            0 kB
Active:          4653800 kB
Inactive:        7552124 kB
Active(anon):    1897480 kB
Inactive(anon):      360 kB
Active(file):    2756320 kB
Inactive(file):  7551764 kB
Unevictable:           0 kB
Mlocked:               0 kB
SwapTotal:             0 kB
SwapFree:              0 kB
Dirty:               772 kB
Writeback:             0 kB
AnonPages:       1983292 kB
Mapped:           190108 kB
Shmem:              1032 kB
Slab:             427684 kB
SReclaimable:     385244 kB
SUnreclaim:        42440 kB
KernelStack:        5728 kB
PageTables:        10388 kB
NFS_Unstable:          0 kB
Bounce:                0 kB
WritebackTmp:          0 kB
CommitLimit:     6666784 kB
Committed_AS:    5414356 kB
VmallocTotal:   34359738367 kB
VmallocUsed:           0 kB
VmallocChunk:          0 kB
Percpu:             1032 kB
AnonHugePages:   

In [None]:
!ps -ef | grep java

root        2709      60 71 Mar10 ?        01:21:24 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -cp /content/spark-2.4.7-bin-hadoop2.7/conf/:/content/spark-2.4.7-bin-hadoop2.7/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.app.name=word2vec pyspark-shell
root        4354      60  0 00:01 ?        00:00:00 /bin/bash -c ps -ef | grep java
root        4356    4354  0 00:01 ?        00:00:00 grep java
