# Load JSON

We've collected a bunch of event data from the GitHub Archive that we need to load into Spark.
Currently we're just operating on data from the month of January, 2015.

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
from pyspark.sql import SQLContext
sql = SQLContext(sc)

In [3]:
janData = sql.read.json("../data/january.json")

In [4]:
janData.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _links: struct (nullable = true)
 |    |    |    |-- html: struct (nullable = true)
 |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- pull_request: struct (nullable = true)
 |    |    |    |    |-- href: strin

# Filter Data
We have a lot of extra information in our data set right now.

Let's get rid of everything except event information, the user generating that event, and the repository involved.

In [5]:
jan = janData.select(janData['actor'],janData['repo'],janData['type'])

In [6]:
jan = jan.filter(jan['type']=='WatchEvent')

In [7]:
jan.first()

Row(actor=Row(avatar_url='https://avatars.githubusercontent.com/u/642929?', gravatar_id='', id=642929, login='jchristi', url='https://api.github.com/users/jchristi'), repo=Row(id=18297319, name='LinuxStandardBase/lsb', url='https://api.github.com/repos/LinuxStandardBase/lsb'), type='WatchEvent')

# Organize Data

Our data is a DataFrame of "Rows" right now, let's create an RDD of key-value pairs instead. This will remove extraneous data, as well as help us to understand the data we're working with.

**Key:** User(name, id)  
**Value:** Repo(name, id)

In [8]:
from collections import namedtuple

In [9]:
User = namedtuple('User', 'name id')
Repo = namedtuple('Repo', 'name id')

In [10]:
def create_user(row):
    actor = row['actor']
    return User(actor.login,actor.id)

def create_repo(row):
    repo = row['repo']
    return Repo(repo.name,repo.id)

In [11]:
jan = jan.map(lambda row: (create_user(row),create_repo(row)))

In [12]:
jan.first()

(User(name='jchristi', id=642929),
 Repo(name='LinuxStandardBase/lsb', id=18297319))

# Create Ratings

We've organized our data into an RDD, now let's go ahead and create an RDD of type `Rating` from the [Spark MLlib Recommendation module](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html).

**Rating(user, repo, rating)**

In [13]:
from pyspark.mllib.recommendation import Rating 

def create_rating(user_repo):
    user = user_repo[0]
    repo = user_repo[1]
    return Rating(user.id, repo.id, 1)

In [14]:
ratings = jan.map(lambda user_repo: create_rating(user_repo))

In [15]:
ratings.first()

Rating(user=642929, product=18297319, rating=1.0)

# Train/Load a Model

We've built an RDD of type Rating. We'll use this collection to create a model. If one already exists, we'll load it instead.

This model will be built using a technique called [collaborative filtering](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html). The specific algorithm that Spark uses is known as [alternating least squares](https://bugra.github.io/work/notes/2014-04-19/alternating-least-squares-method-for-collaborative-filtering/).

Our data does not contain explicit feedback by the user (ratings).
Our data does contain implicit feedback by the user (Stars).

In [88]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
import os.path

In [89]:
rank = 5

In [97]:
model = None
model_path = '../models/january-implicit.mdl'
if os.path.exists(model_path):
    print('Loading model...')
    model = MatrixFactorizationModel.load(sc, model_path)
    print('Model loaded successfully!')
else:
    print('Training model...')
    model = ALS.trainImplicit(ratings, rank, seed=0)
    print('Model trained successfully!')

# Evaluate the Model

We need to evaluate the reliability of our model. We can use cross-validation to determine how accurately our model would predict ratings. This is done by dividing our data set into distinct groups, generally a [training set, validation set, and test set](https://stackoverflow.com/questions/2976452/whats-is-the-difference-between-train-validation-and-test-set-in-neural-networ).  

We want to avoid things like overfitting. 

In [22]:
# TODO:: Evaluate the model.

# Save the Model
Training a model can take a *long* time. Let's save what we've done so we don't lose it later.

In [98]:
if not os.path.exists(model_path):
    model.save(sc, model_path)

# Generate Results

Let's generate some results with our recommendation engine!

In [99]:
# Some helper RDDs/functions to find users and repos.
repos = jan.map(lambda x: x[1]).distinct().cache()
users = jan.map(lambda x: x[0]).distinct().cache()

def find_repo(name=None, id=None):
    if name is not None:
        return repos.filter(lambda repo: repo.name==name).first()
    if id is not None:
        return repos.filter(lambda repo: repo.id==id).first()

def find_user(name=None, id=None):
    if name is not None:
        return users.filter(lambda user: user.name==name).first()
    if id is not None:
        return users.filter(lambda user: user.id==id).first()

In [132]:
# Find the user 'nathanph'.
user = find_user('nathanph')

In [133]:
# Generate some recommendations for our user.
recommendations = model.recommendProducts(user.id, 5)
recommendations = list(map(lambda recommendation: (recommendation, find_repo(id=recommendation.product)), recommendations))

In [134]:
recommendations

[(Rating(user=6075916, product=14194174, rating=1.3045169899444473),
  Repo(name='alex/what-happens-when', id=14194174)),
 (Rating(user=6075916, product=29247444, rating=0.7145092051705337),
  Repo(name='yaronn/blessed-contrib', id=29247444)),
 (Rating(user=6075916, product=29370487, rating=0.22522412526986302),
  Repo(name='thoughtbot/til', id=29370487)),
 (Rating(user=6075916, product=18708860, rating=0.1596436791884901),
  Repo(name='tiimgreen/github-cheat-sheet', id=18708860)),
 (Rating(user=6075916, product=28751632, rating=0.13833889059264798),
  Repo(name='0xAX/linux-insides', id=28751632))]

In [135]:
# Find what repos the user has starred.
stars = jan.filter(lambda star: star[0].name==user.name)
stars = stars.map(lambda star: star[1]).collect()

In [136]:
stars

[Repo(name='ianks/octodown', id=28029170),
 Repo(name='MaciejCzyzewski/retter', id=16251363),
 Repo(name='jbenet/ipfs', id=16722900),
 Repo(name='jonjonsonjr/dotfiles', id=21444162),
 Repo(name='cleebp/asubus-ios', id=18974837),
 Repo(name='alex/what-happens-when', id=14194174),
 Repo(name='alex/what-happens-when', id=14194174),
 Repo(name='tiimgreen/github-cheat-sheet', id=18708860),
 Repo(name='namecoin/namecoin', id=2459724),
 Repo(name='mbcrawfo/undergrad-thesis', id=29548556),
 Repo(name='jmoon018/PacVim', id=26992831)]

In [137]:
# Only display the recommended repos that the user has not already starred.
relevant_recommendations = list(filter(lambda recommendation: recommendation[1] not in stars, recommendations))
relevant_recommendations = list(map(lambda recommendation: recommendation[1], relevant_recommendations))
relevant_recommendations

[Repo(name='yaronn/blessed-contrib', id=29247444),
 Repo(name='thoughtbot/til', id=29370487),
 Repo(name='0xAX/linux-insides', id=28751632)]

In [138]:
# Print some hyperlinks.
_ = [print(url) for url in list(map(lambda repo: 'http://github.com/'+repo.name, relevant_recommendations))]

http://github.com/yaronn/blessed-contrib
http://github.com/thoughtbot/til
http://github.com/0xAX/linux-insides
