### COMP5349 Week 5 Spark Lab Sample Code

This is a sample notebook showing basic spark RDD operations.
The program has two input data sources: *ratings.csv* and *movies.csv*
The *movies.csv* file contains movie information. Each  row represents one movie, and has the following format:
```
movieId,title,genres
```

The *ratings.csv* file contains rating information. Each row represents one rating of one movie by one user, and has the following format:

```
userId,movieId,rating,timestamp
```

Spark can get data from various data source. The example obatin data from an external cluster and write the output on your own HDFS.

Click *run cell* button on the menu to run the following two cells.

In [1]:
import findspark
from ml_utils import *
findspark.init()
from pyspark import SparkContext

In [2]:
sc = SparkContext(appName="Average Rating per Genre")

#You can change the input path pointing to your own HDFS
#If spark is able to read hadoop configuration, you can use relative path
input_path = 'hdfs://soit-hdp-pro-1.ucc.usyd.edu.au/share/movie/small/'

#Relative path is used to specify the output directory
#The relative path is always relative to your home directory in HDFS: /user/<yourUserName>
output_path = 'ratingOut'

ratings = sc.textFile(input_path + "ratings.csv")
movieData = sc.textFile(input_path + "movies.csv")

movieRatings = ratings.map(extractRating)
movieGenre = movieData.flatMap(pairMovieToGenre) # we use flatMap as there are multiple genre per movie

genreRatings = movieGenre.join(movieRatings).values()
genreRatingsAverage = genreRatings.aggregateByKey((0.0,0), mergeRating, mergeCombiners, 1).map(mapAverageRating)

genreRatingsAverage.saveAsTextFile(output_path)

#### What to expect
If successful, you will not see any output. The output is written toHDFS. use ```hdfs dfs -cat ratingOut/part-00000``` on command line to read the content of the output. Similar to MapReduce, Spark named its output as ```part-xxxxx```

If not successful, some error message will print out as output. The most likely cause would be input file does not exists or output path already exists. Spark also creates a new output directory if you need to write output. Remember to remove the existing directory or change the output_path name for multiple run.

**Remember to run the following cell to close the sparkcontext whethre spark program runs without or with any error**

In [3]:
#always run this to close the sparkcontext whether you spark program runs without or with any error.
sc.stop()