# Introduction to PySpark
__-Shubham Agarwal__

This notebook is created as an experiment with PySpark with the Jupyter notebook.
It is assumed that Jupyter (iPython) and Spark have already been installed.
To use PySpark with Jupyter notebook, you might have to update bash_profile or bashrc as

```
export SPARK_HOME="/Users/username/spark-version"
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-spark_version-src.zip:$PYTHONPATH
```

Exporting variable PYSPARK_SUBMIT_ARGS can create issues with Jupyter. I would rather not recommend this. 

Let us start by defining Spark Context and proceed with a basic example of word count

In [4]:
# Set up spark environment
from pyspark import SparkContext
sc = SparkContext("")
# sc = SparkContext("local[2*]")

In [5]:
# Basic example
words = sc.parallelize(["scala","java","hadoop","spark","python"])
print "Number of words: " + str(words.count())

Number of words: 5


We can access the interface by simply opening http://localhost:4040 in a web browser.

In this notebook, we will be experimenting with the [MovieLens](https://movielens.org) dataset. Assuming data in the `/data/movielens/medium ` directory. 

We will use two files from this dataset: `ratings.dat` and `movies.dat`. All ratings are contained in the file `ratings.dat` and are in the following format:
```
UserID::MovieID::Rating::Timestamp
```
The movie information is in the file `movies.dat` and is in the following format:

```
MovieID::Title::Genres
```

Let's start with the data. Loading the dataset:
- [MovieLens 1M Dataset](http://grouplens.org/datasets/movielens/1m/ml-1m.zip) - 1 million ratings from 6000 users on 4000 movies.


In [1]:
# path to MovieLens dataset
movieLensHomeDir="../data/movielens/medium/"
# from subprocess import check_output
# print(check_output(["ls", "../"]).decode("utf8"))
moviesPath = movieLensHomeDir + "movies.dat"
ratingsPath = movieLensHomeDir + "ratings.dat"
usersPath = movieLensHomeDir + "users.dat"

Defining two functions `parseRating` and `parseMovie` that parse a rating and a movie record.

In [2]:
def parseRating(line):
    """ Parse a rating record in MovieLens format UserID::MovieID::Rating::Timestamp
    Args:
        line (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    fields = line.split('::')
    return int(fields[0]), int(fields[1]), float(fields[2])
def parseMovie(line):
    """ Parse a movie record in MovieLens format MovieID::Title::Genres
    Args:
        entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    Returns:
        tuple: (MovieID, Title, Genres)
    """
    fields = line.split("::")
    return int(fields[0]), fields[1], fields[2]

In [6]:
# movies is an RDD of (movieID, title, genre)
moviesRDD = sc.textFile(movieLensHomeDir + "movies.dat").map(parseMovie).setName("movies")
# ratings is an RDD of (userID, movieID, rating)
ratingsRDD = sc.textFile(movieLensHomeDir + "ratings.dat").map(parseRating).setName("ratings") 

__Note__ - In these lines of code, we are creating the `moviesRDD` and `ratingsRDD` variables (technically RDDs) and we are pointing to files (on your local PC). Spark’s lazy nature means that it doesn’t automatically compile your code. Instead, it waits for some sort of action occurs that requires some calculation.

In [7]:
numRatings = ratingsRDD.count()
numMovies  = moviesRDD.count()
#numUsers   = ratingsRDD.
print(numRatings)
print(numMovies)
print (ratingsRDD.first())
print(moviesRDD.take(5))

1000209
3883
(1, 1193, 5.0)
[(1, u'Toy Story (1995)', u"Animation|Children's|Comedy"), (2, u'Jumanji (1995)', u"Adventure|Children's|Fantasy"), (3, u'Grumpier Old Men (1995)', u'Comedy|Romance'), (4, u'Waiting to Exhale (1995)', u'Comedy|Drama'), (5, u'Father of the Bride Part II (1995)', u'Comedy')]


>Define two new RDDs containing only the movies for genre _Comedy_ and all movies that have _Comedy_ among other genres.<br/>
>Use the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.filter">`filter`</a> function which return a new RDD containing only the elements that satisfy a predicate.<br/>
>Use the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.subtract">`subtract`</a> function to retreive the movies that have  _Comedy_ in their genres but not only (That is the elements of the second RDD minus the ones in the first). Count them and exhibit a few of them.



In [14]:
def hasComedy(movie):
    genres = movie[2].split("|")
    return "Comedy" in genres
def onlyComedy(movie):
    genres = movie[2].split("|")
    if len(genres)==1 and "Comedy" in genres:
        return True
    else:
        return False

onlyComedyRDD = moviesRDD.filter(lambda movie : onlyComedy(movie))
ComedyRDD = moviesRDD.filter(lambda movie : hasComedy(movie))
print(onlyComedyRDD.count())
print(ComedyRDD.count())
notOnlyComedyRDD = ComedyRDD.subtract(onlyComedyRDD)
print(notOnlyComedyRDD.count())
print(ComedyRDD.take(5))




521
1200
679
[(1, u'Toy Story (1995)', u"Animation|Children's|Comedy"), (3, u'Grumpier Old Men (1995)', u'Comedy|Romance'), (4, u'Waiting to Exhale (1995)', u'Comedy|Drama'), (5, u'Father of the Bride Part II (1995)', u'Comedy'), (7, u'Sabrina (1995)', u'Comedy|Romance')]


>Investigate the different movies genres. Warning: Multiples genres should not be seen as new genres! For this:
* separate the genres by delimiter '|' using  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMap">`flatMap`</a>
* use the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct">`distinct`</a> function which return a new RDD containing the distinct elements in this RDD.

>Count the number of different genres and print them.



In [10]:
genresRDD = moviesRDD.flatMap(lambda movie : movie[2].split('|')).distinct()
print(genresRDD.count())

18


Get the average of all of the ratings (use the mean built-in function)

In [20]:
avg=ratingsRDD.map(lambda r : (r[2])).mean()
print(avg)

3.58156445303


>Get the rating distribution

In [15]:
from operator import add

frequencyRDD = ratingsRDD.map(lambda r : (r[2], 1)).reduceByKey(add)
print(frequencyRDD.take(5))


[(2.0, 107557), (4.0, 348971), (1.0, 56174), (3.0, 261197), (5.0, 226310)]


<small><i>Examples taken from the course Convex and Distributed Optimization (2016).</i></small>