# Framework

In [2]:
debug = True

## Grabbing the "framework" branch from GitHub and use the "hermes" folder as a library

Step 1: Install necessary libraries.

In [1]:
import importlib
import pip

def _install(package):
    pip.main(['install', package])

def _import(package):
    importlib.import_module(package)
    
def install_and_import(package):
    try:
        _import(package)
    except ImportError:
        _install(package)

In [2]:
install_and_import("GitPython")
install_and_import("click")

Collecting GitPython
  Downloading GitPython-1.0.1.tar.gz (355kB)
Collecting gitdb>=0.6.4 (from GitPython)
  Downloading gitdb-0.6.4.tar.gz (400kB)
Collecting smmap>=0.8.5 (from gitdb>=0.6.4->GitPython)
  Downloading smmap-0.9.0.tar.gz
Building wheels for collected packages: GitPython, gitdb, smmap
  Running setup.py bdist_wheel for GitPython
  Stored in directory: /Users/tiffanyj/Library/Caches/pip/wheels/23/f4/31/1d0570ae6ecccca26eafb087788483f614cd740281fd842660
  Running setup.py bdist_wheel for gitdb
  Stored in directory: /Users/tiffanyj/Library/Caches/pip/wheels/63/1b/54/87cf226ccefad0e5fdc78e3c8c65180ac77ed2a04d1dec3a56
  Running setup.py bdist_wheel for smmap
  Stored in directory: /Users/tiffanyj/Library/Caches/pip/wheels/47/75/63/333cdcb6d3e6e8eb1ec6869564b84f7f1e6a875d87541a0ae9
Successfully built GitPython gitdb smmap
Installing collected packages: smmap, gitdb, GitPython
Successfully installed GitPython-1.0.1 gitdb-0.6.4 smmap-0.9.0


You are using pip version 7.1.2, however version 8.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.


Collecting click
  Downloading click-6.2-py2.py3-none-any.whl (70kB)
Installing collected packages: click
Successfully installed click-6.2


You are using pip version 7.1.2, however version 8.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.


Step 2: Create a temporary directory.

Step 3: Git clone the "framework" branch from GitHub to the temporary directory.

Step 4: Zip the hermes source files.

Step 5: Add zip to SparkContext.

Step 6: Remove temporary directory once it is no longer needed.

In [3]:
remote_url = "https://github.com/tiffanyj41/hermes.git"
remote_branch = "framework"
source_dir = "hermes"

In [7]:
# helper functions
import os
import functools

def _list_all_in_dir(dir_path):
    for path, subdirs, files in os.walk(dir_path):
        for filename in files:
            print os.path.join(path, filename)
            
def _zip_dir(srcdir_path, zipfile_handler):
    try:
        zipfile_handler.writepy(srcdir_path)
    finally:
        zipfile_handler.close()
            
def trackcalls(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        wrapper.has_been_called = True
        return func(*args, **kwargs)
    wrapper.has_been_called = False
    return wrapper

@trackcalls
def _add_zipfile_to_sc(zipfile_path):
    sc.addPyFile(zipfile_path) 

In [8]:
import git
import os
import tempfile
import shutil
import zipfile    

# create a temporary directory
tmpdir_path = tempfile.mkdtemp()
if debug: print "temporary directory: %s\n" % tmpdir_path

# ensure file is read/write by creator only
saved_umask = os.umask(0077)

# create a zipfile handler to zip the necessary files
ziptmpdir_path = tempfile.mkdtemp()
if debug: print "temporary directory for zip file: %s\n" % ziptmpdir_path
zipfile_path = ziptmpdir_path + "/hermes_src.zip"
if debug: print "zip file's path: %s\n" % zipfile_path
zipfile_handler = zipfile.PyZipFile(zipfile_path, "w")

# make zipfile handler verbose for debugging
zipfile_handler.debug = 3

try:
    # clone "framework" branch from GitHub into temporary directory
    local_branch = git.Repo.clone_from(remote_url, tmpdir_path, branch=remote_branch)
    if debug: print "current branch: %s\n" % local_branch.head.ref
    if debug: print "list all in %s:" % tmpdir_path; _list_all_in_dir(tmpdir_path); print "\n"
        
    # zip "hermes" directory
    if debug: print "zipping: %s\n" % os.path.join(tmpdir_path, source_dir)
    _zip_dir(os.path.join(tmpdir_path, source_dir), zipfile_handler)
    
    # check zip file
    if debug: print "Is zip file %s valid? %s\n" % (zipfile_path, zipfile.is_zipfile(zipfile_path))
    
    # add zip to SparkContext 
    # note: you can only add zip to SparkContext one time
    if not _add_zipfile_to_sc.has_been_called:
        if debug: print "add zip file %s into spark context\n" % zipfile_path
        _add_zipfile_to_sc(zipfile_path)
    else:
        if debug: print "zip file %s is already added into spark context; will not re-add\n" % zipfile_path
    
except IOError as e:
    raise e
else:
    os.remove(zipfile_path)
finally:
    os.umask(saved_umask)
    shutil.rmtree(tmpdir_path)
    shutil.rmtree(ziptmpdir_path)

NameError: name 'sc' is not defined

## Example 1
* Run movielens_10m_ratings with **ratings** vector transformation
* Implement **ALS** recommender system algorithms
* Implement **RMSE, MAE** metrics

Framework is based on a state machine. Since you are using a notebook, it is unlikely that you will use a state machine to automate the process, but you can use parts of the state machine to do what you need to do.

### Step 1: __start()
**For those who use [MovieLens 1M CF test src code](http://l41-srv-mcdh32.b.internal:8880/notebooks/Hermes/MovieLens%201M%20CF%20test%20src%20code.ipynb#) as guidance, this is executing the pre-requisites when the HDFS directory and the input data are not defined yet.**

Function: 
* __start() creates the HDFS directory and uploads the input data. 
* __start() implements the start_state of the state machine.

```bash

def __start(cargo):
    """start_state without the state machine."""

    if Globals.verbose: Globals.logger.debug("In start_state:")

    if Globals.verbose: Globals.logger.debug("Creating the hdfs directory " + cargo.hdfs_dir)
    os.system("hdfs dfs -mkdir " + cargo.hdfs_dir)

    def load_json_files(datas):
        for i in range(0, len(datas)):
            json_path = datas[i].datapath
            if Globals.verbose: Globals.logger.debug("Loading JSON file " + json_path + " into hdfs directory " + cargo.hdfs_dir)
            os.system("hdfs dfs -put " + json_path + " " + cargo.hdfs_dir + "/" + os.path.basename(json_path))

    load_json_files(cargo.datas)
```

In [7]:
import os
hdfs_dir = "/datasets/movielens/1m"
movies_json_path = "/home/tiffanyj/datasets/movielens/movielens_1m_movies.json.gz"
movies_json_path_in_hdfs = hdfs_dir + "/" + os.path.basename(movies_json_path)
ratings_json_path = "/home/tiffanyj/datasets/movielens/movielens_1m_ratings.json.gz"
ratings_json_path_in_hdfs = hdfs_dir + "/" + os.path.basename(ratings_json_path)

print movies_json_path
print movies_json_path_in_hdfs
print ratings_json_path 
print ratings_json_path_in_hdfs

/home/tiffanyj/datasets/movielens/movielens_1m_movies.json.gz
/datasets/movielens/1m/movielens_1m_movies.json.gz
/home/tiffanyj/datasets/movielens/movielens_1m_ratings.json.gz
/datasets/movielens/1m/movielens_1m_ratings.json.gz


#### Option 1: You implement what is already in __start() manually yourself

In [8]:
import os
# create hdfs_dir 
os.system("hdfs dfs -mkdir " + hdfs_dir)
# put json located at json_path into hdfs_dir
os.system("hdfs dfs -put " + ratings_json_path + " " + ratings_json_path_in_hdfs)

256

In [9]:
from hermes import *
import modules.data

# define Data (ie. UserVectorData) which is a class wrapper of the json 
# and will be used to create a Vector (ie. UserVector)
datapath = ratings_json_path
vector_transformation = "ratings"
schemapath = None
dataname = "movielens"

uservectordata = modules.data.UserVectorData(datapath, vector_transformation, schemapath, dataname)

ImportError: No module named hermes

#### Option 2: You execute using the __start() function

In [34]:
# import hermes where __start() function is defined
from hermes import *
# import cargo where Cargo class is defined
import modules.cargo
# import data where configuration is defined
import modules.data

# create cargo
cargo = modules.cargo.Cargo()

# add items to cargo
cargo.hdfs_dir = hdfs_dir

# define Data and put it in cargo
dataname = "movielens"
datapath = ratings_json_path
vector_transformation = "ratings"
schemapath = None
uservectordata = modules.data.UserVectorData(datapath, vector_transformation, schemapath, dataname)
cargo.datas.append(uservectordata)

# call the start function
hermes.__start(cargo)

uservectordata = cargo.datas[0]
uservectordata.cache()

### Step 2: __json_to_rdd()
**For those who use [MovieLens 1M CF test src code](http://l41-srv-mcdh32.b.internal:8880/notebooks/Hermes/MovieLens%201M%20CF%20test%20src%20code.ipynb#) as guidance, this is accomplishing cell # 5, 6, 7.**

Function: 
* __json_to_rdd() parses JSON to RDD. 
* __json_to_rdd() implements the json_to_rdd state of the state machine.

```bash
    """json_to_rdd_state without the state macine."""

    if Globals.verbose: Globals.logger.debug("In json_to_rdd_state:")

    # create RDD for each JSON file and store it in Cargo's vectors list
    for i in range(0, len(cargo.datas)):
        data = cargo.datas[i]
        if Globals.verbose: Globals.logger.debug("Working with json file %s" % data.datapath)

        if Globals.verbose: Globals.logger.debug("Creating dataframe based on the content of the json file")
        datapath_in_hdfs = "hdfs://" + cargo.fs_default_ip_addr + "/" + cargo.hdfs_dir + "/" + os.path.basename(data.datapath)
        data.set_dataframe(Globals.scsingleton.sc, Globals.scsingleton.sqlCtx, datapath_in_hdfs)

        if Globals.verbose: Globals.logger.debug("Creating RDD based on the computed dataframe and configuration provided by the user")
        cargo.vectors.append( vg.VectorFactory().create_obj_vector(data, cargo.support_files) ) 

```

#### Option 1: You implement what is already in __json_to_rdd() manually yourself


In [13]:
from hermes import *
import modules.data
import modules.vectorgenerator

# convert JSON to Dataframe
uservectordata.set_dataframe(sc, sqlCtx, ratings_json_path_in_hdfs) 
ratings = uservectordata.dataframe # extracting dataframe variable from UserVectorData class

# this is the same thing as 
# ratings = sqlCtx.read.json("hdfs://" + ratings_json_path_in_hdfs)
# ratings.repartition(sc.defaultParallelism * 3)

In [14]:
from hermes import *
import modules.vectorgenerator
import modules.vg

# support_files is a dictionary that you can pass in during vector creation 
support_files = {}

# convert DataFrame to RDD
mv = modules.vectorgenerator.VectorFactory().create_obj_vector(uservectordata, None, True) 
all_user_ratings = mv.vector

# this is the same thing as 
# mv = movieLens_vectorize.movieLens_vectorize(ratings, None, "ratings", "none")
# all_user_ratings = mv.get_user_vector()

In [None]:
print type(all_user_ratings)
all_user_ratings.take(5)

#### Option 2: You execute using the __json_to_rdd() function

In [None]:
from hermes import *

cargo.fs_default_ip_addr = ""
cargo.hdfs_dir = hdfs_dir[1:]
cargo.support_files = {}

# call json_to_rdd function
hermes.__json_to_rdd(cargo)

In [None]:
mv = cargo.vectors[0]
all_user_ratings = mv.vector
print type(all_user_ratings)
all_user_ratings.take(5)

### Step 3: __split_data()
**For those who use [MovieLens 1M CF test src code](http://l41-srv-mcdh32.b.internal:8880/notebooks/Hermes/MovieLens%201M%20CF%20test%20src%20code.ipynb#) as guidance, this is accomplishing cell # 8, 9.**

Function: 
* __split_data() splits data to train, test, and (optional) validate. 
* __split_data() implements the split_data_state of the state machine.

```bash
def __split_data(cargo):
    """split_data_state without the state machine."""

    if Globals.verbose: Globals.logger.debug("In split_data_state:")

    for i in range(0, len(cargo.vectors)):
        vector = cargo.vectors[i]
        weights, seed = hermesui._ask_user_for_split_percentage(vector.data.datapath)
        vector.split_data(weights, seed)

```

In [15]:
trainingPercentage = 60/100.
testPercentage = 40/100.
validationPercentage = 0/100.
seed = 11

#### Option 1: You implement what is already in __split_data() manually yourself

In [16]:
uservector = mv

uservector.split_data([trainingPercentage, testPercentage, validationPercentage], seed)
train_ratings = uservector.training_vector
test_ratings = uservector.test_vector
validation_ratings = uservector.validation_vector

# this is the same thing as
# train_ratings, test_ratings = uservector.vector.randomSplit([0.6, 0.4], 11)

In [None]:
train_ratings.cache()
test_ratings.cache()
validation_ratings.cache()

print train_ratings.count(), test_ratings.count()

#### Option 2: you execute using the __split_data() function

In [22]:
from hermes import *

# call split_data function
hermes.__split_data(cargo)

'\n# TODO: will implement later\n'

In [None]:
mv = cargo.vectors[0]
train_ratings = mv.training_vector
test_ratings = mv.test_vector
validation_ratings = mv.validation_vector
print train_ratings.count(), test_ratings.count()

### Step 4: __make_prediction()
**For those who use [MovieLens 1M CF test src code](http://l41-srv-mcdh32.b.internal:8880/notebooks/Hermes/MovieLens%201M%20CF%20test%20src%20code.ipynb#) as guidance, this is accomplishing cell # 10.**

Function: 
* __make_prediction() develop model based on the train data and make prediction based on this model. 
* __make_prediction() implements the make_prediction_state of the state machine.

```bash
def __make_prediction(cargo):
    """make_prediction_state without the state machine."""

    if Globals.verbose: Globals.logger.debug("In make_prediction_state:")   

    for i in range(0, len(cargo.vectors)):
        thisvector = cargo.vectors[i]

        # select which recommenders based on the vector type
        recommenders = None
        thisvector_uservector = None
        thisvector_contentvector = None
        if helper.is_direct_subclass(thisvector, vg.UserVector):
            if Globals.verbose: Globals.logger.debug("Iterating through recommenders for user vector on data %s", thisvector.data.datapath)
            thisvector_uservector = thisvector
            recommenders = cargo.user_recommenders
        elif helper.is_direct_subclass(thisvector, vg.ContentVector):
            if Globals.verbose: Globals.logger.debug("Iterating through recommenders for content vector on data %s", thisvector.data.datapath)
            thisvector_contentvector = thisvector
            thisvector_uservector = thisvector.uservector
            recommenders = cargo.content_recommenders

        # run all recommenders on the vector
        for r in recommenders:
            if Globals.verbose: Globals.logger.debug("Making recommendation %s on data %s", r, thisvector.data.datapath)
            # TODO: implement other use case, ie. WithTfidf(), etc.
            recommender = rg.RecommenderFactory().create_obj_recommender(r, thisvector_uservector, thisvector_contentvector)
            # default use case
            # recommender = RecommenderFactory().create_obj_recommender(r, vector, Default())
            # with tf-idf use case 
            # recommender = RecommenderFactory().create_obj_recommender(r, vector, WithTfidf())
            # without tf-idf use case
            # recommender = RecommenderFactory().create_obj_recommender(r, vector, WithoutTfidf())
            # etc.
            with Timer() as t:
                prediction_vector = recommender.make_prediction()
            if Globals.verbose: Globals.logger.debug("Making prediction takes %s seconds" % t.secs)

```

#### Option 1: You implement what is already in __make_prediciton() manually yourself

In [17]:
from hermes import *
import modules.recommendergenerator

# create recommender object with the default use case
recommender_str = "ALS"
recommender = modules.recommendergenerator.RecommenderFactory().create_obj_recommender(recommender_str, uservector)
# or
# modules.recommendergenerator.RecommenderFactory().create_obj_recommender(recommender, uservector, Default())

# get the prediction vector
prediction_vector = recommender.make_prediction()
# or
# prediction_vector = uservector.prediction
predicted1 = prediction_vector

In [None]:
prediction_vector.cache()
predicted1.cache()

print type(predicted1)
predicted1.take(5)

<class 'pyspark.rdd.RDD'>


[Rating(user=36455, product=12, rating=3.1620100630939234),
 Rating(user=13019, product=12, rating=3.009068937170033),
 Rating(user=1199, product=12, rating=1.889880680902047),
 Rating(user=56039, product=12, rating=1.8340114917394583),
 Rating(user=68279, product=12, rating=2.575869762437719)]

In [18]:
from hermes import *
import algorithms.cf

# instead of doing the step above, you can also call the function directly
prediction_vector = algorithms.cf.calc_cf_mllib(uservector.training_vector)
predicted2 = prediction_vector

In [19]:
print type(predicted2)
predicted2.take(5)

<class 'pyspark.rdd.RDD'>


[Rating(user=22502, product=12, rating=2.145246574980865),
 Rating(user=22514, product=12, rating=1.8239622809024438),
 Rating(user=22526, product=12, rating=1.6218700820020784),
 Rating(user=22538, product=12, rating=3.22630662094852),
 Rating(user=22550, product=12, rating=2.568704193724831)]

In [None]:
# both ways are the same thing as
# predicted = algorithms.cf.calc_cf_mllib(uservector.training_vector)

#### Option 2: you execute using the __make_prediction() function

In [24]:
from hermes import *

cargo.user_recommenders = ["ALS"]
cargo.content_recommenders = []

# call make_prediction function
hermes.__make_prediction(cargo)

'\n# TODO: will implement later\n'

In [None]:
mv = cargo.vectors[0]
prediction_vector = mv.prediction_vector
print type(prediction_vector)
prediction_vector.take(5)

### Step 5: __calculate_metrics()
**For those who use [MovieLens 1M CF test src code](http://l41-srv-mcdh32.b.internal:8880/notebooks/Hermes/MovieLens%201M%20CF%20test%20src%20code.ipynb#) as guidance, this is accomplishing cell # 11.**

Function: 
* __calculate_metrics() tests the metrics specified by the user. 
* __calculate_metrics() implements the calculate_metrics_state of the state machine.

```bash
def __calculate_metrics(cargo):
    """calculate_metrics_state without the state machine."""

    if Globals.verbose: Globals.logger.debug("In calculate_metrics_state:")

    # create a metric executor
    executor = mg.MetricExecutor(mg.Metric())

    for i in range(0, len(cargo.vectors)):
        Globals.logger.info("-" * 80)
        Globals.logger.info("Data: %s" % cargo.vectors[i].data.datapath)
        for m in cargo.metrics:
            # check if metric exists
            metric = mg.MetricFactory().create_obj_metric(m)
            # set metric in executor
            executor.change_metric(metric)
            # execute the metric
            with Timer() as t:
                Globals.logger.info("Metric: %s = %f" % (m, executor.execute(cargo.vectors[i])))
            if Globals.verbose: Globals.logger.debug("Calculating metric takes %s seconds" % t.secs)
        Globals.logger.info("-" * 80)

```

#### Option 1: You implement what is already in __calculate_metrics() manually yourself

In [None]:
from hermes import *
import modules.metricgenerator 

# create metric executor
executor = modules.metricgenerator.MetricExecutor(modules.metricgenerator.Metric())

# create metric object
metric_str = "RMSE"
rmse_metric = modules.metricgenerator.MetricFactory().create_obj_metric(metric_str)

# set metric in executor 
executor.change_metric(rmse_metric)

# calculate metric
rmse = executor.execute(uservector)

print "RMSE: ", rmse

# switch metric object
metric_str = "MAE"
mae_metric = modules.metricgenerator.MetricFactory().create_obj_metric(metric_str)
executor.change_metric(mae_metric)

# calculate metric
mae = executor.execute(uservector)

print "MAE: ", mae


In [None]:
from hermes import *
import algorithms.performance_metrics

# instead of doing the step above, you can also call the function directly
rmse = algorithms.performance_metrics.calculate_rmse(uservector.test_vector, uservector.prediction_vector)
print "RMSE: ", rmse

mae = algorithms.performance_metrics.calculate_mae(uservector.test_vector, uservector.prediction_vector)
print "MAE: ", mae

In [None]:
# both ways are the same thing as
# rmse = algorithms.performance_metrics.calculate_rmse(uservector.test_vector, uservector.prediction_vector)
# mae = algorithms.performance_metrics.calculate_mae(uservector.test_vector, uservector.prediction_vector)

#### Option 2: you execute using the __calculate_metrics() function

In [None]:
from hermes import *

cargo.metrics = ["ALS"]

# call calculate_metrics function
hermes.__calculate_metrics(cargo)