## Internal Documentation

- First, install the EPFL VPN using the instructions from here: https://epnet.epfl.ch/AnyConnect-VPN-Clients

- How to run jobs in the cluster: https://drive.google.com/open?id=1n9tIfMkDPW6RDLFPvhhetOIcFCBNMnpfESzHc20MM4w

- To install Torch (the ML library) run the following commands on the cluster:

```curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py```

```python get-pip.py --user```

```pip install --user torch torchvision```

## Get The Code From the NoteBook To The Cluster:

We can write our python script in this notebook, but to run it we will need to scp to the server and run ```spark-submit```

```export GASPARID=<your gaspar>```

```scp script.py $GASPARID@iccluster028.iccluster.epfl.ch:/home/$GASPARID/script.py```

```ssh $GASPARID@iccluster028.iccluster.epfl.ch```

```spark-submit --master yarn --deploy-mode client --driver-memory 4G --num-executors 5 --executor-memory 4G --executor-cores 5 script.py```

## Script

The following script fails after 2 hours of execution with "not enough memory".

Error:

``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 24 in stage 110.0 failed 4 times, most recent failure: Lost task 24.3 in stage 110.0 (TID 2831, iccluster039.iccluster.epfl.ch, executor 42): ExecutorLostFailure (executor 42 exited caused by one of the running tasks) Reason: Containerkilled by YARN for exceeding memory limits. 5.3 GB of 5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.```

It works if we increase the memory to 20Gb. The ouput is:

`Root Mean Squared Error (RMSE) on train data = 21.1488 and on test data = 21.2182`

In [24]:
#----------------------------------------------------------
#Detects wether we're running inside the dev notebook
try:
    get_ipython
    notebook = True
except:
    notebook = False
#----------------------------------------------------------
    
if notebook:
    import findspark
    findspark.init()

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

from datetime import datetime  
from datetime import timedelta

#----------------------------------------------------------
# Load data
#----------------------------------------------------------

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

if notebook:
    dataFile = 'sample_us.tsv'
else:
    dataFile = 'hdfs:///datasets/amazon_multiling/tsv/amazon_reviews_us*tsv.gz'

schema = StructType([
    StructField('marketplace', StringType()),
    StructField('customer_id', IntegerType()),
    StructField('review_id', StringType()),
    StructField('product_id', StringType()),
    StructField('product_parent', IntegerType()),
    StructField('product_title', StringType()),
    StructField('product_category', StringType()),
    StructField('star_rating', IntegerType()),
    StructField('helpful_votes', IntegerType()),
    StructField('total_votes', IntegerType()),
    StructField('vine', StringType()),
    StructField('verified_purchase', StringType()),
    StructField('review_headline', StringType()),
    StructField('review_body', StringType()),
    StructField('review_date', DateType()),
])

df = spark.read.csv(dataFile, sep="\t", header=True, schema=schema)
df = df.na.drop()
df = df.selectExpr("helpful_votes as label", "*")
df.registerTempTable("df")
   
#----------------------------------------------------------
# Reduce dataset size
#----------------------------------------------------------

# We will want to take a subset of the data
# x-core means that all items have at least x reviews
x_core = 5

# This query returns the number of products with at least x reviews
query1 = '''
    SELECT product_id
    FROM df
    GROUP BY product_id
    HAVING COUNT(*) >= %s
''' % x_core

# This query returns the rows for reviews for products with at least x reviews
query2 = '''
SELECT *
FROM df
WHERE product_id IN
    (     
        SELECT product_id
        FROM df
        GROUP BY product_id
        HAVING COUNT(*) >= %s
    )
''' % x_core

df = spark.sql(query2)
# Results for all files that start with amazon_reviews_us (number of rows returned by query1):
#  Number of 1-core reviews: 21390118
#  Number of 2-core reviews: 10213901
#  Number of 3-core reviews:  6931152
#  Number of 4-core reviews:  5318037
#  Number of 5-core reviews:  4342875

#----------------------------------------------------------
# Do machine learning
#----------------------------------------------------------

# Split
(train_set, val_set, test_set) = df.randomSplit([0.90, 0.05, 0.05], seed = 0)

# Preprocess
tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# Select model and fix pipeline
lr = LinearRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, lr])

# Train and predict
pipeline_fit = pipeline.fit(train_set)
train_df = pipeline_fit.transform(train_set)
val_df = pipeline_fit.transform(val_set)
test_df = pipeline_fit.transform(test_set)

# Evaluate
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(train_df)
test_rmse = evaluator.evaluate(val_df)

print("Root Mean Squared Error (RMSE) on train data = %g and on test data = %g" % (train_rmse, test_rmse))

Root Mean Squared Error (RMSE) on train data = 0.664671 and on test data = 0.278657


In [10]:
# We will want to take a subset of the data
# x-core means that all items have at least x reviews
x_core = 5

# This query returns the number of products with at least x reviews
query1 = '''
    SELECT product_id
    FROM df
    GROUP BY product_id
    HAVING COUNT(*) >= %s
''' % x_core

# This query returns the rows for reviews for products with at least x reviews
query2 = '''
SELECT *
FROM df
WHERE product_id IN
    (     
        SELECT product_id
        FROM df
        GROUP BY product_id
        HAVING COUNT(*) >= %s
    )
''' % x_core

df2 = spark.sql(query2)
# Results for all files that start with amazon_reviews_us (number of rows returned by query1):
#  Number of 1-core reviews: 21390118
#  Number of 2-core reviews: 10213901
#  Number of 3-core reviews:  6931152
#  Number of 4-core reviews:  5318037
#  Number of 5-core reviews:  4342875

# Index categorical stringType columns to numerical
# This is to help with ML later on
# Might want to make more columns numerical later on. I just did the obvious ones
#categoricalColumns = ["marketplace", "product_category", "vine", "verified_purchase"]
#for col in categoricalColumns:
#    indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
#    df = indexer.fit(df).transform(df)




print("********** FINISHED **********")

********** FINISHED **********


In [3]:
from pyspark.ml.feature import Word2Vec

(train_set, val_set, test_set) = df.randomSplit([0.90, 0.05, 0.05], seed = 0)
train_set.show(5)

+-----+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|label|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|    0|         US|     125518|R2MW3TEBPWKENS|B00MZ6BR3Q|     145562057|Monster High Haun...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|Love it great add...| 2015-08-31|
|    0|         US|     128540|R24VKWVWUMV3M3|B004S8F7QM|     829220659|Cards Against Hum...|   

In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)

train_df[['review_id', 'words', 'tf', 'features', 'label']].show(5)

+--------------+--------------------+--------------------+--------------------+-----+
|     review_id|               words|                  tf|            features|label|
+--------------+--------------------+--------------------+--------------------+-----+
|R2MW3TEBPWKENS|[love, it, great,...|(65536,[7284,8436...|(65536,[7284,8436...|    0|
|R24VKWVWUMV3M3|     [tons, of, fun]|(65536,[8443,9639...|(65536,[8443,9639...|    0|
|R2B8VBEPB4YEZ7|[children, like, it]|(65536,[11650,206...|(65536,[11650,206...|    0|
|R1CB783I7B0U52|[showed, up, not,...|(65536,[1536,4200...|(65536,[1536,4200...|    0|
|R23JRQR6VMY4TV|[absolutely, one,...|(65536,[4427,5660...|(65536,[4427,5660...|    0|
+--------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [5]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [6]:
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
#print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % lr_model.summary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 34
RMSE: 0.664671
r2: 0.267563


In [7]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lr_model.transform(val_df)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.278657
