## Assignment 5: Recommendation System with Big Data

### Overview
I LOVE MOVIE corp has been more and more popular since their last collaboration with you. To make more money, just like all the other companies, they plan to build a movie recommendation system for their customers. However, because they are too popular, well, and have plenty of money, they decide to use Big Data engines and build their system on Google Cloud. As their technology counselor, you will fulfill this requirement in this assginment.

### GOAL
1. Setup Google cloud environment and Cloud MongoDB
2. Interactive will HDFS using shell command
3. Process batch data with Spark
4. Use NoSQL to interact with MongoDB
5. Manipulate stream data with Spark Streaming
6. Generate message queue via Kafka

### Philosophy of this system
We will build our movie recommendation system based on [Amazon.com recommend system](https://www.cs.umd.edu/~samir/498/Amazon-Recommendations.pdf). This is a good way to recommend items on the top of massive data, cause you can easily separate offline training and online processing. the following figure shows the main architecture for your product:
 <img src="document/bigdata.jpg" width = "400" height = "600" alt="what" align=center />

Raw data will be stored in HDFS, Hadoop Distributed File System. From the batch training part, Spark loads raw data from HDFS, and calculates movies' similarity to each other. All the neccessary information will be stored in MongoDB, including movie id-title pair and movies similarity. As for the streaming part, we use Apache Kafka to manage message queue, which contains user's real-time ratings. Spark streaming component will process these rating stream, generate immediate recommendation, and update user watching list. As you can see, the offline training and streaming processing are independently maintained, in code/batch_pipeline.py and code/streaming.py separately.

You can find a more detailed guide in Amazon.com [US patent paper](https://www.google.com/patents/US6266649). We will roughly follow their figure 3 for batch pipeline, and figure 2 for streaming pipeline.

We will build the whole system on Google Cloud cluster. This is because cloud offers a way to build system with high scalability. Imaging I LOVE MOVIE company has more and more customers, we can simply add more nodes by a fews clickes on Google Cloud dashboard and things will be done. Also, if the company loses a lot of users, we can decrease node number on cluster too. In this assignment, however, we will use this powerful cloud cluster to process data of several Megabytes, as you know, the only reason for us to do so is because Google will charge too much for heavy usage of computation resources.

** Development Cycle ** It is inconvenient to develop code that runs on cloud, especially when we don't setup local environment and use pyspark, which is not a normal Python program. To simplify your life, do the following steps repeatedly when developing your code:

1. use mapreduce class we introduced in lab2 to tune your mapper and reducer locally, make sure each function performs correctly.
2. follow steps in lab1 to submit your code to cloud, remember to use '> output.txt' command that can help you separate log info and program output.
3. if any bugs happen, analyze error info at output.txt, and update your local code.

### Lab1: Setup Cloud Environment


In this lab, we will follow the script to setup cluster. you will get to know how each shell command work. Finally, we will execute testcase1.py in your cluster, which contains all tools you need in batch data pipeline.

#### Create cluster 

We will use Google Dataproc to establish your cluster. Since you already setup the [gcloud SDK](https://cloud.google.com/sdk/gcloud/) in assignment 3, let's start by running shell command shell/init_cluster.sh:
~~~~
gcloud dataproc clusters create recommend --initialization-actions \
"gs://dataproc-initialization-actions/jupyter/jupyter.sh,gs://dataproc-initialization-actions/kafka/kafka.sh" \
--master-machine-type n1-standard-1 --num-masters 3 --master-boot-disk-size 50GB \
--worker-machine-type n1-standard-2 --num-workers 2 --worker-boot-disk-size 50GB
~~~~
In this command, we create a cluster named ** recommend ** in Dataproc. --initialization-actions specify how we wish to initialize our cluster, in another word, install and run what software in this cluster. Here we choose to init it with two actions, jupyter.sh and kafka.sh. [jupyter.sh](https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/tree/master/jupyter) install mini conda, a simplified version of anaconda, and lunch jupyter notebook with pyspark kernel support for you in port 8123 (however, we won't use jupyter notebook to code spark, it's really inconvenient). [kafka.sh](https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/tree/master/kafka) install kafka in our cluster. 

The following two lines specify master and worker configuration. Here, our cluster has three masters, with n1-standard-1 type of CPU and 50GB boot disk size, and 2 workers, with n1-standard-2 type of CPU and 50GB boot disk size. n1-standard-1 CPU has one core, while n1-standard-2 CPU has two cores. Therefore, there are totally 7 cores in your cluster, which just fit into Google Cloud limit of 8 cores. You must be really confused why we have three masters. This is because kafka.sh need you to install zookeeper in advance, or have a high availability cluster (three master nodes would safisfy this requirement), either way would be fine.

#### Setup proxy access 

Next, we will use shell/connect.sh to setup proxy which allows you to connect to cluster via browser. You need to install Chrome in advance.
~~~~
gcloud compute ssh --zone=us-central1-c --ssh-flag="-D 1080" --ssh-flag="-N" --ssh-flag="-n" recommend-m-0 &

/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --proxy-server="socks5://localhost:1080" \
--host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/
~~~~

There are two commands in this .sh file. The first one create proxy to master-0 node of your recommend cluster, which specify region with --zone and port with --ssh-flag. The symbol & in the end means run this command backend. The second command launches your Chrome browser and send all it's request via proxy port. You can find detailed explanation from [here](https://cloud.google.com/dataproc/docs/concepts/cluster-web-interfaces).

#### Setup MongoDB Cloud 

This step would be relatively easy, visit [mlab website](https://mlab.com/welcome/?gclid=EAIaIQobChMI4ezHnojm1QIVExuBCh2v6QbhEAAYASAAEgKbqPD_BwE) to sign up your account. 500MB free plan would be sufficient for you. Create a database named ** netflix **, all of our future work will be done in it. You will see your MongoDB URI at the upper left of mlab website, which will be used later for connecting to MongoDB in Python.

#### Run your program on Cloud
Now that all the tools we need has been setup. Let's now run our first task on the cloud! This program will create collection ** rating ** in your netflix database, which contains the user list that have rated each movie. In order to at least check grammar of our code locally, let's install the following two library in your computer:

In [1]:
!pip install pymongo
!pip install pyspark



The following cell contains code in code/batch_test.py file. It uses Spark and SparkSQL together. You may refer to this code when you create your own recommendation system later.

** note ** this code cannot run locally, it sits in this Jupyter Notebook just for introduction.

In [None]:
from pyspark.sql import SparkSession, SQLContext
import argparse

def parse_args():
    parser = argparse.ArgumentParser(description='test cloud setup')
    parser.add_argument('-mongo', help='MongoDB database URI')
    parser.add_argument('-r', help='path to input ratings.csv', default='../data/ratings.csv')
    return parser.parse_args()


def mapper1(record):
    """
    :param record: (user_id, movie_id)
    :return:       (movie_id, user_id)
    """
    return (record[1], [record[0]])

def reducer(a, b):
    return a + b

if __name__ == '__main__':
    
    # parse the arguments
    args = parse_args()
    
    # Spark initialization
    # sc is Spark context, while ssc is SparkSQL context. You may simply regard them as connector to spark.
    spark = SparkSession \
        .builder \
        .appName("data1030") \
        .config("spark.mongodb.input.uri", args.mongo) \
        .config("spark.mongodb.output.uri", args.mongo) \
        .getOrCreate()
    sc = spark.sparkContext
    ssc = SQLContext(sc)
    
    # load ratings.csv into dataframe rating_df
    rating_df = ssc.read.format('csv') \
        .option('header', 'true') \
        .option('inferschema', 'true').option('mode', 'DROPMALFORMED').load(args.r)
    # register rating_df as temporary table, only then can you use SparkSQL to query it
    ssc.registerDataFrameAsTable(rating_df, 'watch')
    
    # use SparkSQL to get user_id and movie_id, this function returns a dataframe
    tmp_df = ssc.sql('''SELECT userId AS user_id, movieId AS movie_id
                        FROM watch''')
    
    # use mapreduce to generate (movie_id, user_list) pair
    tmp_rdd = tmp_df.rdd.map(tuple) \
        .map(mapper1) \
        .reduceByKey(reducer)
        
    table_df = spark.createDataFrame(tmp_rdd, ['movie_id', 'user_list'])
    
    # print out schema and 5 rows
    table_df.printSchema()
    pprint(table_df.take(5))
    
    # load data into mongodb database
    table_df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("collection", "watch") \
        .mode("overwrite").save()
    
    # drop temp table 
    ssc.dropTempTable('watch')

Here we mixed use normal [Spark](https://spark.apache.org/docs/0.9.0/python-programming-guide.html) and [SparkSQL](https://spark.apache.org/docs/latest/sql-programming-guide.html), which can make our coding more easily. The dataframe in Spark is really similar to dataframe in pandas, and it's really powerful and convenient. Using pprint to print out structure data can make it more clear. You should pay more attention to how we write and use mapper and reducer here. 

Next, let's load it into Google Cloud and run! Use the following command to load all assignment 5 files into your cluster:
~~~~
gcloud compute copy-files [LOCAL_FILE_PATH]  [INSTANCE_NAME]:~/
~~~~
And ssh to your cluster with:
~~~~
gcloud compute ssh recommend-m-0
~~~~
Before running the code, we need to install some libraries. start a new terminal via Jupyter Notebook and run the following command in your cluster:
~~~~
pip install -r requirement.txt
~~~~
Now you can run your code by 
~~~~
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0 \
batch_test.py -mongo MongoDB_URI > output.txt
~~~~
spark-submit is the shell command how we submit a pyspark job. --packages specify what third-party JAVA packages we will be using. In this case, we use mongodb-spark-connector package. '> output' at the end of the command tells system that print all output into output.txt file. This is a good way to seperate spark log output with what your file print out (otherwise it's a nightmare to find out what your code print out in the massive log printing). After you run the code, you should see something like this in your output.txt file:
 <img src="document/printout.jpeg" width = "500" height = "700" alt="what" align=center />


The first one is the nice printing of your table_df schema, and the second one is some rows from table_df. Alright, seems you are on your way to create your recommendation system! Also, you don't need to hand in anything for this lab!

** quick reference **

Here are some addresses for monitoring what happened in your cluster that are really useful:

1. http://recommend-m-0:50070 hadoop
2. http://recommend-m-0:4040 spark UI
3. http://recommend-m-0:8123 jupyter

### Lab2: Write Your First MapReduce and NoSQL
In this lab, you will implement [MapReduce](https://en.wikipedia.org/wiki/MapReduce) and [NoSQL](https://en.wikipedia.org/wiki/NoSQL) queries. We provide you a mapreduce class which simulates how spark runs. It will run all your mapreduce job locally and on single machine, there has no parallelism. To start with, let's see a simple example:

In [2]:
from mapreduce import mapreduce
from pprint import pprint
def mapper1(record):
    words = record.split()
    return [(word, 1) for word in words]
    
def reducer(a, b):
    return a + b

with open('data/hadoop.txt', 'r') as infile:
    data = [line.strip() for line in infile]
sc = mapreduce()
word_count_result = sc.parallelize(data, 4)\
    .flatMap(mapper1)\
    .reduceByKey(reducer)\
    .sortByKey(True)\
    .collect()
sc.stop()
pprint(list(word_count_result))

['Hadoop is the Elephant King', 'A yellow and elegant thing', 'He never forgets', 'Useful data, or lets', 'An extraneous element cling', 'A wonderful king is Hadoop', 'The elephant plays well with Sqoop', 'But what helps him to thrive', 'Are Impala, and Hive', 'And HDFS in the group']
[('A', 2),
 ('An', 1),
 ('And', 1),
 ('Are', 1),
 ('But', 1),
 ('Elephant', 1),
 ('HDFS', 1),
 ('Hadoop', 2),
 ('He', 1),
 ('Hive', 1),
 ('Impala,', 1),
 ('King', 1),
 ('Sqoop', 1),
 ('The', 1),
 ('Useful', 1),
 ('and', 2),
 ('cling', 1),
 ('data,', 1),
 ('elegant', 1),
 ('element', 1),
 ('elephant', 1),
 ('extraneous', 1),
 ('forgets', 1),
 ('group', 1),
 ('helps', 1),
 ('him', 1),
 ('in', 1),
 ('is', 2),
 ('king', 1),
 ('lets', 1),
 ('never', 1),
 ('or', 1),
 ('plays', 1),
 ('the', 2),
 ('thing', 1),
 ('thrive', 1),
 ('to', 1),
 ('well', 1),
 ('what', 1),
 ('with', 1),
 ('wonderful', 1),
 ('yellow', 1)]


In this example, mapper1 returns a list of tuple, and reducer add all values together to calculate counts for each word. You may also use lambda function to run this mapreduce task.

** Task 1 ** inverted_index, which creates an inverted index of a given file. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document ids for documents in which that word appears. What you need to do is filling mapper1, reducer, mapper2.

In [3]:
import json
def mapper1(record):
    """
    The document text may have words in various cases (i.e., upper an lowercase, or mixed) or elements of punctuation.
    Do not modify the string, and treat each token as if it was a valid word. (That is, just use value.split())
    :param record: [document_id, text]
                    document_id: document identifier formatted as a string
                    text: text of the document formatted as a string
    :return:       [(word, document_id), (word, document_id), ...]
    """
    tmpresult = []
    tmptext = record[1].split()
    for item in tmptext:
        tmpresult.append((item,[record[0]]))
    return tmpresult

# a: a group of document_ids
# b: a group of document_ids
def reducer(a, b):
    """
    The input variables, a and b, are each a group of document ids. You will want to join them and return that result.
    The group may consist of one or multiple documents, make sure that your code works for both cases.
    :param a: a group of document_ids
           b: a group of document_ids
    :return: joined document_id list
    Hint: in Mapper 1, you may want to wrap the document_id in a list, making joining groups of them easier 
    (especially as it simplifies the cases above!). It is easiest to let a group of document_ids be a list of document_ids.
    """
    return a+b

# record: 
def mapper2(record):
    """
    :param record: (word, [document_id1, document_id2, ...])
                    word: a word
                    [document_id1, document_id2, ...]: unsorted list of document id's
    return (word, sorted list of unique document id's)
    """

    tmp = list(set(record[1]))
    tmp.sort()
    return (record[0], tmp)

# Do not modify
with open('data/books.json', 'r') as infile:
    data = [json.loads(line) for line in infile]

sc = mapreduce()
inverted_index_result = sc.parallelize(data, 128) \
    .flatMap(mapper1) \
    .reduceByKey(reducer) \
    .sortByKey(True) \
    .map(mapper2) \
    .collect()

sc.stop()
pprint(inverted_index_result)

[['milton-paradise.txt', "[ Paradise Lost by John Milton 1667 ] Book I Of Man ' s first disobedience , and the fruit Of that forbidden tree whose mortal taste Brought death into the World , and all our woe , With loss of Eden , till one greater Man Restore us , and regain the blissful seat , Sing , Heavenly Muse , that , on the secret top Of Oreb , or of Sinai , didst inspire That shepherd who first taught the chosen seed In the beginning how the heavens and earth Rose out of Chaos : or , if Sion hill Delight thee more , and Siloa ' s brook that flowed Fast by the oracle of God , I thence Invoke thy aid to my adventurous song , That with no middle flight intends to soar Above th ' Aonian mount , while it pursues Things unattempted yet in prose or rhyme ."], ['edgeworth-parents.txt', "[ The Parent ' s Assistant , by Maria Edgeworth ] THE ORPHANS . Near the ruins of the castle of Rossmore , in Ireland , is a small cabin , in which there once lived a widow and her four children . As long 

** Task 2 ** matrix multiplication, which calculate multiplication of two sparse matrix. 

In [33]:
i_range = 5
j_range = 5

def mapper1(record):
    """
    :param record: (matrix, i, j, value)
                    matrix: which matrix this record belong to
                    i: row number
                    j: column number
                    value: value
    return:        [((k, f), record)]
                    k: the corresponding new matrix row indexs
                    f: the corresponding new matrix column indexs
                    record: input record
    """
    global i_range
    global j_range
    result = []
    if record[0]=='a':
        for j in range(0, j_range):
            tmp = ((record[1], j), [record])
            result.append(tmp)
    if record[0]=='b':
        for i in range(0, i_range):
            tmp = ((i, record[2]), [record])
            result.append(tmp)
    #return [result[0], result[1], result[2], result[3], result[4]]
    return result

# same as task 1
def reducer(a, b):
    return a+b

def mapper2(record):
    """
    :param record: ((k, f), [(matrix, i, j, value),...])
    :return         (k, f, value)
                    value: new matrix value at position (k, f)
    """
    lista = []
    listb = []
    value = 0
    for item in record[1]:
        lista.append(item) if item[0]=='a' else listb.append(item)
    for itema in lista:
        for itemb in listb:
            if (itema[2]==itemb[1]):
                value += itema[3]*itemb[3]
    return (record[0][0], record[0][1], value)


with open('data/matrix.json', 'r') as infile:
    data = [json.loads(line) for line in infile]

sc = mapreduce()
multiply_result = sc.parallelize(data, 128) \
    .flatMap(mapper1) \
    .reduceByKey(reducer) \
    .map(mapper2) \
    .collect()
sc.stop()
pprint(multiply_result)

[(0, 0, 11878),
 (0, 1, 14044),
 (0, 2, 16031),
 (0, 3, 5964),
 (0, 4, 15874),
 (1, 0, 4081),
 (1, 1, 6914),
 (1, 2, 8282),
 (1, 3, 7479),
 (1, 4, 9647),
 (2, 0, 6844),
 (2, 1, 9880),
 (2, 2, 10636),
 (2, 3, 6973),
 (2, 4, 8873),
 (3, 0, 10512),
 (3, 1, 12037),
 (3, 2, 10587),
 (3, 3, 2934),
 (3, 4, 5274),
 (4, 0, 11182),
 (4, 1, 14591),
 (4, 2, 10954),
 (4, 3, 1660),
 (4, 4, 9981)]


***
That's all for MapReduce, let's now move onto NoSQL! We choose to use MongoDB as our NoSQL database in this assignment. MongoDB provides a tool named Mongo Shell to interact with it. However, it requires you to install mongodb locally. Python provides a library named pymongo to perform the same function, and we will use it in this lab(again you see how greate to use Python, it's the glue for everything).

First of all, we need to connect to your mongodb. Fill in your database URI in MongoClient bracket.

** note **
try to use methods that doesn't raise deprecateWarning. Also, keep in mind that find function returns a cursor and you need to for loop it and print out query results.

First, we need to load data into memory, to do so, you need to run the following cell:

In [70]:
from pymongo import *

client = MongoClient("mongodb://jinyan:12345678@ds151973.mlab.com:51973/netflix")
db = client.netflix
cl = db.watch

As you can see, we can directly use .database_name and .collection_name to access corresponding records. All of NoSQL operator can be found on [pymongo website](https://api.mongodb.com/python/current/), you can use any query operators as you like to finish the task. remember to use pprint to print your result out for each NoSQL.

NoSQL 1: find document whose movie_id 200.

In [71]:
for item in cl.find({'movie_id': 4993}):
    pprint(item)

{'_id': ObjectId('59a2c83ff787c87416ca577d'),
 'movie_id': 4993,
 'user_list': [8,
               13,
               15,
               17,
               20,
               22,
               23,
               30,
               31,
               38,
               40,
               42,
               46,
               48,
               56,
               59,
               61,
               63,
               68,
               69,
               72,
               73,
               75,
               77,
               78,
               79,
               84,
               88,
               91,
               93,
               94,
               95,
               99,
               101,
               104,
               105,
               109,
               111,
               116,
               124,
               125,
               128,
               130,
               133,
               134,
               136,
               138,
               148,
         

You may notice that there is an additional field called '\_id' with ObjectId record. This is the unique key mongodb assigned to each document within collection.

NoSQL 2: find product that both user 13 and 14 have bought before

In [72]:
for item in cl.find({'user_list': {'$all':[13, 14]}}, {'user_list':0, '_id':0}):
    pprint(item)

{'movie_id': 2355}
{'movie_id': 3114}


NoSQL 3: find all movie_id that has been rated by equal or greater than 200 users, sort by movie_id.

In [73]:
for item in cl.find({'user_list.199': {'$exists': 'true'}}, {'user_list':0, '_id':0}).sort('movie_id'):
    pprint(item)

{'movie_id': 1}
{'movie_id': 47}
{'movie_id': 50}
{'movie_id': 110}
{'movie_id': 150}
{'movie_id': 260}
{'movie_id': 296}
{'movie_id': 318}
{'movie_id': 356}
{'movie_id': 364}
{'movie_id': 457}
{'movie_id': 480}
{'movie_id': 527}
{'movie_id': 588}
{'movie_id': 589}
{'movie_id': 590}
{'movie_id': 593}
{'movie_id': 608}
{'movie_id': 780}
{'movie_id': 858}
{'movie_id': 1196}
{'movie_id': 1198}
{'movie_id': 1210}
{'movie_id': 1270}
{'movie_id': 2571}
{'movie_id': 2858}
{'movie_id': 2959}
{'movie_id': 4993}


### Lab3: Get Familiar with Apache Kafka & Spark Streaming

This lab mainly helps you to know how to get Kafka and Spark Streaming work on cluster. You will first config Kafka and then run a simple Spark Streaming program to digest message in Kafka.

** Run Kafka **

[Apache Kafka](https://kafka.apache.org/quickstart) would manage our message queue for Spark Streaming. Since we install our cluster with Kafka initialization action, kafka has been installed in your cluster. ssh to recommend-m-0, All Kafka messages are organized into topics. If you wish to send a message you send it to a specific topic and if you wish to read a message you read it from a specific topic. Use the following command to create a topic named ratings, and keep in mind prefix each .sh command with directory '/usr/lib/kafka/bin/':
~~~~
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ratings
~~~~
'--create' means we want to create a topic, while '--topic' specifies topic name. Also, this command just talks to local zookeeper server for doing this creation function, and by default, the port for zookeeper server would always be 2181. Since our data is relatively small, you don't need to concern about '--replication-factor' and '--partitions',   just keep them as 1. You can view your current topics by
~~~~
kafka-topics.sh --list --zookeeper localhost:2181
~~~~
In Kafka, there are two threads --- producer and consumer. A consumer pulls messages off a Kafka topic while producers push messages into a Kafka topic. Also, since Kafka is a distributed system which runs on cluster, each node in the cluster is called a Kafka broker. By using --broker-list, you specify which machine to feed queue with message:
~~~~
kafka-console-producer.sh --broker-list recommend-w-0:9092 --topic ratings
~~~~
This command starts a console for feeding topic ratings and uses recommend-w-0 as broker to do so. open another terminal, type in the following consumer command, you will get message produced by recommend-w-0 from the queue one-by-one. --from-beginning ensures getting all the message in the history. Now you can type in any words to producer terminal, and magically, they will show up in your consumer terminal one-by-one.
~~~~
kafka-console-consumer.sh --bootstrap-server recommend-w-0:9092 --topic ratings --from-beginning
~~~~
Trust me, you don't want to type in ratings manually for debugging. With the help of shell operator '<', we could easily deal with it. Change into data/ directory, and type in the following command, you will see all rows in ratings_streaming_1.csv show in another terminal.
~~~~
kafka-console-producer.sh --broker-list recommend-w-0:9092 --topic ratings < streaming_1.csv
~~~~

** Command to run Kafka server locally **
~~~~
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic ratings
~~~~
** Spark Streaming **

For Spark Streaming, refer to [here](https://spark.apache.org/docs/latest/streaming-programming-guide.html). For connect Kafka and Spark Streaming, refer to [here](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html). There are two ways to connect them, we use direct approach (no receivers) to integrate Spark Streaming and Kafka. Not only because of all the advantages mentioned in the official website, but also because of a time exceed error for Spark Streaming to connect to zookeeper, since we initalize our cluster with 3 masters rather than preinstall zookeeper on cluster.

Now let's take a look at streaming_test.py to understand how exactly should we build streaming pipeline. This code only reports new ratings about popular movies --- which already have more than 200 ratings.

In [None]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, SQLContext
import argparse

#### util ##########################################################################################
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession \
            .builder \
            .appName("data1030") \
            .config("spark.mongodb.input.uri", "mongodb://jinyan:12345678@ds151973.mlab.com:51973/netflix") \
            .config("spark.mongodb.output.uri", "mongodb://jinyan:12345678@ds151973.mlab.com:51973/netflix") \
            .getOrCreate()

    return globals()['sparkSessionSingletonInstance']

#####################################################################################################

def parse_args():
    parser = argparse.ArgumentParser(description='test cloud setup')
    parser.add_argument('-mongo', help='MongoDB database URI')
    parser.add_argument('-b', help='broker list', default='recommend-w-0:9092')
    return parser.parse_args()

def mapper1(record):
    key = record.movie_id
    value = len(record.user_list)
    return (key, value)

def mapper2(record):
    if (record[1] >= 200):
        return [(record[0],)]
    else:
        return []

def mapper3(record):
    """
    :param record: "user_id,movie_id,rating,timestamp"
    :return: (key, value)
              key: user_id
              value: movie_id
    """
    mylist = record[1].split(",")
    return (int(mylist[0]), int(mylist[1]))


def process(rdd):
    # this empty check is neccessary
    if rdd.isEmpty():
        return rdd

    # get spark and spark streaming context, this is necessary for you to use SparkSQL
    spark = getSparkSessionInstance(rdd.context.getConf())
    ssc = SQLContext(spark.sparkContext)
    
    tmp_df = ssc.createDataFrame(rdd, ['user_id', 'movie_id'])
    ssc.registerDataFrameAsTable(tmp_df, 'ratings')

    return ssc.sql('''SELECT user_id, r.movie_id
                   FROM ratings r, popular p
                   WHERE r.movie_id == p.movie_id''').rdd

if __name__ == '__main__':

    args = parse_args()
    
    # init spark session
    spark = SparkSession \
        .builder \
        .appName("data1030") \
        .config("spark.mongodb.input.uri", args.mongo) \
        .config("spark.mongodb.output.uri", args.mongo) \
        .getOrCreate()
    sc = spark.sparkContext
    
    # init spark streaming with batch interval of 10 seconds
    stream = StreamingContext(sc, 10)
    ssc = SQLContext(sc)
    
    # load data from MongoDB
    watch_df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
        .option("collection", "watch")\
        .load()
    watch_rdd = watch_df.rdd.map(mapper1).flatMap(mapper2)
    watch_df_filtered = ssc.createDataFrame(watch_rdd, ['movie_id'])
    ssc.registerDataFrameAsTable(watch_df_filtered, 'popular')

    # streaming
    kafka_stream = KafkaUtils.createDirectStream(stream, ['ratings'], {"metadata.broker.list": args.b})

    tmp_rdd = kafka_stream.map(mapper3).transform(process)
    tmp_rdd.pprint(10)

    stream.start()
    stream.awaitTermination()

You maybe familiar with most code above, so let's focus on streaming part of it. As you may notice, we not only create SparkContext and SparkSQL context, but also have a streaming context. We use KafkaUtiles.createDirectStream() to connect to Kafka. The first parameter refers to streaming context, the second one is the topic, and the third one would be broker.list that this code may choose to listen. kafka_stream is DStream, which is a streaming version of normal RDD. Since some operation can't not be applied on it, we write most of our operation in transform method, within which we can apply all RDD operation. Keep in mind that you must print out something in order to make Spark Streaming run. At last, we use streaming.start() and awaitTermination() to make our program start to process streaming data. 

Now you can upload this code onto your cluster and use it with
~~~~
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0,org.mongodb.spark:mongo-spark-connector_2.11:2.2.0 streaming_test.py -mongo MongoDB_URI -b recommend-w-0:9092 > output
~~~~
Note: this is Java maven format --- groupId:artifactId:version. THe first package works for spark-kafka connector, while the second one works for spark-mongoDB connector. When we need to include multiple packages, using comma to sepearate them, without any breaks between them. If you use provided data streaming_1.csv, you will find output like this in your output file:
 <img src="document/streaming.jpeg" width = "400" height = "600" alt="what" align=center />
It's beautiful, isn't it?

### Part 1: Batch Data Pipeline by Spark

In this assginment, you will achieve a movie recommenation system based on Amazon.com recommendation system. Below is the workflows that we will implement. Figure 2 shows the algorithm for streaming, while figure 3 shows the algorithm for batch. The following instruction will help you understand each steps of these two workflow, and how you should implement them in MapReduce. We will use the number beside each rectangle to denote steps. Feel free to use code we provided before, they are really useful.

 <img src="document/workflow.jpeg" width = "1000" height = "800" alt="what" align=center />

Let's start from implementing figure 3! 

** before 100 **

You need to init Spark context and SparkSQL context with mongodb connecting information, refer to [MongoDB-Spark Connector](https://docs.mongodb.com/spark-connector/master/python-api/).

** 100 **

In our case, purchase history refers to user ratings for each movie. 

1. Load data. You should first use [HDFS shell](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#put) to put data/ratings.csv and data/movies.csv into Hadoop Distributed File System at cluster. We suggest putting them in to directory 'hdfs:///data/'. Note that 'hdfs://' denotes HDFS, while /data/ is the directory in it. Use spark [read function](https://docs.databricks.com/spark/latest/data-sources/read-csv.html) to load data into your Spark code.
2. Create movie collection in MongoDB. In this step, you need to use previous data to create table that stores average rating and movie title for each movie. We recommand you to use SparkSQL for this step --- first register two temporary tables based on previous data, use standard SQL to generate movie tables, and then write data into MongoDB with [write function](https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/).

** 102 **
Our algorithm doesn't need this step.

** 104 **

You can easily create a rdd that maps purchased item to users. The format of this rdd should be:

~~~~
(key, value)
 key: movie_id
 value: list of (user_id, rating) tuples
~~~~
** 106 **

We set global value POPULAR_THRESHOLD (200) to indicate threshold for popular movie, that is, if a movie have been rated by equal or greater than POPULAR_THRESHOLD users, then it can be viewed as popular movie. You should fill popular_item list with popular movie id. 

** 108 **

This one is a little bit tricky. We can use two map functions and two reduce functions. Mapper1 simply generates key value pair where user_id is the key and all the other information as value. Reduce it by key, we now get list for all movies that a user has rated. Mapper2 generates tuple of each popular movie - other movie pair, with  number of rating users for each movie as its value. Reduce it by key, now each item in value list corresponds to one common user who rates both movies in key tuple.
~~~~
mapper1(record):
    input: (key, value)
            key: movie_id
            value: list of (user_id, rating) tuples
    output: list of (key, value) pair
            key: user_id
            value: [(movie_id, rating, number of rater)]
mapper2(record):
    input: (key, value)
            key: user_id
            value: list of (movie_id, rating, number of rater)
    output: list of (key, value) tuple
            key: (movie_id1, movie_id2)
            value: ([(num_rater1, num_rater2)])
~~~~
** 110 & 114 **

We will flip 112 and 114 steps. Our algorithm uses Jaccard similarity to measure similarity between movies and only return movie pairs that share **more than** RATING_NUM (3) common users. 
~~~~
mapper3(record):
    input: (key, value)
            key: (movie_id1, movie_id2)
            value: list of (num_rater1, num_rater2) pair, where length of this list equal to number of common users
    output: [(key, value)] or []
            key: movie_id
            value: [(movie_id2, jaccard)]
            if they doesn't satisfy RATING_NUM constrain, return []
~~~~
** 112 & 116 **

Now we need to sort other item lists for each popular item and truncate this list to n (3), which is a global variable already defined at the top of our code. 
~~~~
mapper4(record):
    input: (key, value)
            key: movie_id1
            value: [(movie_id2, jaccard), ...]
    output:(movie_id1, (movie_id, jaccard), (movie_id, jaccard), ...)
            sorted tuples with n other items
~~~~

** after 116 **

Until now, you have achieved all the steps in batch workflow. we now need to upload this similar table into MongoDB. Use the same write function in step 100, and name your collection similar. Make sure each document in your collection has the following schema:
~~~~
{'_id': ObjectId, 'popular_id': int, 'item_1': {'_1': int, '_2': float}, ..., 'item_n':{...}}
~~~~
Now you can check your result with the following NoSQL. The result should be the same with problem 3 at NoSQL lab 3.


In [66]:
cl = db.similar
for item in cl.find({}, {'popular_id': 1, '_id': 0}).sort('popular_id'):
    print(item)

{'popular_id': 1}
{'popular_id': 47}
{'popular_id': 50}
{'popular_id': 110}
{'popular_id': 150}
{'popular_id': 260}
{'popular_id': 296}
{'popular_id': 318}
{'popular_id': 356}
{'popular_id': 364}
{'popular_id': 457}
{'popular_id': 480}
{'popular_id': 527}
{'popular_id': 588}
{'popular_id': 589}
{'popular_id': 590}
{'popular_id': 593}
{'popular_id': 608}
{'popular_id': 780}
{'popular_id': 858}
{'popular_id': 1196}
{'popular_id': 1198}
{'popular_id': 1210}
{'popular_id': 1270}
{'popular_id': 2571}
{'popular_id': 2858}
{'popular_id': 2959}
{'popular_id': 4993}


** Tips on Debug **
1. set POPULAR_THRESHOLD big
2. set n big
3. when submit your task, use '... > output' to separate system INFO and program output
4. use rdd.take(number) to show # of items in rdd
5. Feel free to optimize any code part, and have a writeup about your optimization in this notebook.

### Part 2: Real-time Data Pipeline by Spark Streaming

The structure of Spark Streaming has been introduced in lab3. In this part, we will implement figure 2 with them. Feel free to copy and paste lab3 code into your streaming program. 

** pre 80 **

1. Establish connection with Spark, Spark Streaming with interval to be 10 seconds, SparkSQL. Initialize pymongo client again, and notice that we declare variable client at the top of code because we may use it in process function.
2. We need to use similar and movies collections generated in batch pipeline. So, use spark.read function to load these two collections from MongoDB, and register them as tables with the same name in sparkSQL.
3. Initialize recommend collection in MongoDB with the following schema:
~~~~
['user_id': int, 'movie_list': array of integer, 'recommend_list': array of integer]
~~~~
and also register a table in sparkSQL with the same name and schema. user_id should be in range [0, 600], will movie_list and recommend_list should be empty.

*** All the following process would only be done to users corresponding to the new comming rating data***

** 80 **

In our recommendation system, we simply regard all items that one user has rated as items that known to be of interest to user. 
1. update recommend collection using pymongo. Insert new rated movie id into movie_list of corresponding user. You need to check for duplication, i.e., movie_list shouldn't have duplicate id.
2. retrive user_id and movie_list from recommend collection. Register it as a table in SparkSQL with same name.

** 82 **

Retrive similar movies for all movies known of interest for each user.
1. Create a table named input_rating with schema ['user_id': int, 'movie_id': int], where each row corresponding to a user and one movie of his interest. 
2. Join input_rating table with similar table to find out all similar items. The output should be:

~~~~
(user_id, list of similar movie_id)
~~~~

** 84 ** Our algorithm doesn't need this step

** 86 **

Use MapReduce function to merge item list. Note that you should deduplicate this list. The output of this step should be rdd with 

~~~~
(user_id, movie_id).
~~~~

** 88 **

We will sort similar items by their average rating score according to table movies. In order to simplify further operation, you need to do some additional work here:
1. register a table named user_new with previous rdd in SparkSQL
2. join user_new and movies to get dataframe of (user_id, movie_id, rating_avg, title)
3. use MapRecue function to descending sort similar movies for each user. The output should be rdd with format:

~~~~
(user_id, sorted list of (movie_id, avg_rating, movie_title))
~~~~

** 90 **

Filter and truncate recommend list. You should filter out movies that already been rated by user, i.e., movies in movie_list. Truncate recommend list to RECOMMEND_LENGTH. The output should be:

~~~~
(user_id, filtered and truncated list of (movie_id, movie_title))
~~~~

** 92 **

Simply set recommend_list in recommend collection to your current recommendation list generated in step 90, with (movie_id, movie_title) for each movie.

** 94 **

You need to return the final rdd in process function back to main function, and use stream.pprint() function to print them out. Number 10 would be sufficient for pprint.

** test your code **

We provided two incoming rating file, named streaming_1.csv and streaming_2.csv. You should test your streaming code by feeding those two files to Kafka-producer. After you feed in streaming_1.csv, your recommend collection should look like this:
 <img src="document/streaming_1.jpeg" width = "800" height = "700" alt="what" align=center />
and your recommendation system should print out following recommendation information:

~~~~
(6, [(595, 'Beauty and the Beast (1991)'), (592, 'Batman (1989)'), (597, 'Pretty Woman (1990)')])
(13, [(7153, 'Lord of the Rings: The Return of the King, The (2003)'), (5952, 'Lord of the Rings: The Two Towers, The (2002)'), (6539, 'Pirates of the Caribbean: The Curse of the Black Pearl (2003)')])
~~~~
 
After you feed in streaming_2.csv, recommend collection should look like:
 <img src="document/streaming_2.jpeg" width = "800" height = "700" alt="what" align=center />
with following output:

~~~~
(6, [(595, 'Beauty and the Beast (1991)'), (480, 'Jurassic Park (1993)'), (380, 'True Lies (1994)')])
(14, [(1196, 'Star Wars: Episode V - The Empire Strikes Back (1980)'), (1198, 'Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)'), (1210, 'Star Wars: Episode VI - Return of the Jedi (1983)')])
~~~~

** Tips for Debugging **
1. When using DStream.transform(func), you need to return rdd in func otherwise all the succeed rdd would be empty!
2. Sometimes program will print duplicate rows, that would be fine if your code is generic.
3. Removeing temporary table is a good habit.