# Pyspark notebook (python3)

Example commands for running on Google Cloud Dataproc - https://cloud.google.com/dataproc/

```
gcloud dataproc clusters create cchio-persistent-1 \
    --metadata "JUPYTER_PORT=8123,JUPYTER_CONDA_PACKAGES=numpy:pandas:scipy:scikit-learn" \
    --initialization-actions \
        gs://dataproc-initialization-actions/jupyter/jupyter.sh \
    --zone us-central1-c \
    --num-workers 4 \
    --properties spark:spark.executorEnv.PYTHONHASHSEED=0,spark:spark.yarn.am.memory=1024m \
    --worker-machine-type=n1-standard-8 \
    --master-machine-type=n1-standard-8
    
gcloud compute ssh --zone=us-central1-c \
  --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" --ssh-flag="-n" "cchio-persistent-1-m" &
  
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome \
    "http://cchio-persistent-1-m:8123" \
    --proxy-server="socks5://localhost:10000" \
    --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
    --user-data-dir=/tmp/
```

Defaults to python3 - if you want to use python2, add `--metadata MINICONDA_VARIANT=2`

In [None]:
import os
import sys
sys.path.append('../chapter1')
import email_read_util

from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Download 2007 TREC Public Spam Corpus
1. Read the "Agreement for use"
   https://plg.uwaterloo.ca/~gvcormac/treccorpus07/

2. Download 255 MB Corpus (trec07p.tgz) and untar into the 'chapter1/datasets' directory

3. Check that the below paths for 'data_dir' and 'labels_path' exist

In [None]:
DATA_DIR = '../chapter1/datasets/trec07p/data/'
LABELS_FILE = '../chapter1/datasets/trec07p/full/index'
TRAINING_SET_RATIO = 0.7

In [None]:
labels = {}
# Read the labels.
with open(LABELS_FILE) as f:
    for line in f:
        line = line.strip()
        label, key = line.split()
        labels[key.split('/')[-1]] = 1 if label.lower() == 'ham' else 0

In [None]:
def read_email_files():
    X = []
    y = [] 
    for i in range(len(labels)):
        filename = 'inmail.' + str(i+1)
        email_str = email_read_util.extract_email_text(
            os.path.join(DATA_DIR, filename))
        X.append(email_str)
        y.append(labels[filename])
    return X, y

In [None]:
X, y = read_email_files()

schema = StructType([
            StructField('id', IntegerType(), nullable=False),
            StructField('email', StringType(), nullable=False),
            StructField('label', DoubleType(), nullable=False)])

df = spark.createDataFrame(zip(range(len(y)), X, y), schema)

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- email: string (nullable = false)
 |-- label: double (nullable = false)

In [None]:
train, test = df.randomSplit([TRAINING_SET_RATIO, 1-TRAINING_SET_RATIO], seed=123)

In [None]:
tokenizer = Tokenizer()
vectorizer = CountVectorizer()
rfc = RandomForestClassifier()

pipeline = Pipeline(stages=[tokenizer, vectorizer, rfc])

In [None]:
paramMap = {
    tokenizer.inputCol: 'email',
    tokenizer.outputCol: 'tokens',

    vectorizer.inputCol: 'tokens',
    vectorizer.outputCol: 'vectors',

    rfc.featuresCol: 'vectors',
    rfc.labelCol: 'label',
    rfc.numTrees: 500
}

In [None]:
model = pipeline.fit(train, params=paramMap)

In [None]:
prediction = model.transform(test)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
pr_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderPR'})
roc_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})

print("Area under ROC curve score: {:.3f}".format(roc_score))
print("Area under precision/recall curve score: {:.3f}".format(pr_score))

Area under ROC curve score: 0.971
Area under precision/recall curve score: 0.958
