# COMP7095 - Big Data Management

## Spark Lab 1: Introduction to Spark

### Introduction
Spark is a general framework for distributed computing that offers high performance for both batch and interactive processing. It supports higher-level tools for SQL and structured data processing, machine learning, graph processing, and stream processing. It exposes APIs for Java, Python, and Scala. In our labs, we mainly use it with Python.

## Hand on
We are going to use PySpark to load the data from a data file in TSV format and adopt PySpark to do some simply analysis. 

The file named as "moview_reviews.tsv" can be downloaded from the course moodle, too. 

Using the following code segments to understand how PySpark works.

The size of the data file is around 65MB. and the following is the partial layout of the data file:
```
review\tsentiment
One of the other reviewer...\tpositive
A wonderful little produc...\tpositive
I thought this was a wond...\tpositive
Basically there's a famil...\tnegative
...
```

Note that values are seperated by a tab ('\t').

Import the required packages and get the instance of the Spark context:

In [1]:
# Install pyspark
!pip install pyspark

from pyspark import *
from operator import add

sc = SparkContext.getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m540.8 kB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=28e97763a829d1d4ec59a7419feabaef1e632a04f204201e9c84c87af93dd8f2
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Define a function to split the values (review and sentiment) of each line.

In [2]:
def preprocess(line):
    review, sentiment = line.split('\t')
    return sentiment, review

Upload your local file to Colab.

This step requires you to first import the files module from the google.colab library:

In [3]:
from google.colab import files

Uploading files from local file system using Python code.

You use the upload method of the files object:

In [5]:
uploaded = files.upload()

Once the upload is complete, you can either read it as a file in Colab.

#### Task 1: Load the data file and create a resilient distributed data (RDD) object. Please complete your code as follows.

In [6]:
rdd = sc.textFile('movie_reviews.tsv')

Use the `filter` function to ignore the header row and flip the data to the `preprocess` function. Then, a new RDD object will be created.
The `count` function returns the number of rows stored in the RDD object.

In [7]:
reviews = rdd.filter(lambda x: x != 'review\tsentiment').map(preprocess)
reviews.count()

50000

We can also check what are stored in the RDD object by using the take function. Here we use the `take` function with 1 to get the first item. The parameter represents how many items you want to get from the RDD object.

Next, we use `filter` function to retrieve all rows with the positive sentiment and create a new RDD object. And, it stores the reviews without the sentiments.
Let's also check how many positive reviews!

In [8]:
posReviews = reviews.filter(lambda x: x[0] == 'positive').map(lambda x: x[1])
posReviews.count()

25000

#### Task 2: Please take the first row of positive reviews. Please complete your code as follows.

In [9]:
reviews.take(1)

[('positive',
  "One of the other reviewers has mentioned that after watching just 1 Oz episode you'll be hooked. They are right, as this is exactly what happened with me. The first thing that struck me about Oz was its brutality and unflinching scenes of violence, which set in right from the word GO. Trust me, this is not a show for the faint hearted or timid. This show pulls no punches with regards to drugs, sex or violence. Its is hardcore, in the classic use of the word. It is called OZ as that is the nickname given to the Oswald Maximum Security State Penitentary. It focuses mainly on Emerald City, an experimental section of the prison where all the cells have glass fronts and face inwards, so privacy is not high on the agenda. Em City is home to many..Aryans, Muslims, gangstas, Latinos, Christians, Italians, Irish and more....so scuffles, death stares, dodgy dealings and shady agreements are never far away. I would say the main appeal of the show is due to the fact that it goes w

#### Task 3: Please a new RDD object for negative reviews. Please complete your code as follows.

In [11]:
negReviews = reviews.filter(lambda x: x[0] == 'negative').map(lambda x: x[1])
negReviews.count()

25000

Besides, we can find the word frequency using the following simple steps:
1. Define a function for splitting each line. It returns a list of words.

In [None]:
def splitWords(line):
    values = line.replace(',', ' ').replace('.', ' ').replace('"', '').split(' ')
    data = []
    for v in values:
        if len(v) > 0:
            data.append(v)
    return data

2. Create a new RDD object from the original RDD object by using the `filter` function and the `splitWords` function.

In [None]:
wordcounts = rdd.filter(lambda x: x != 'review\tsentiment').flatMap(splitWords).map(lambda w: (w, 1)).reduceByKey(add)

3. We sort data by the frequency (x[1], the column with index 1) in descending order and retrieve 10 items.

In [None]:
wordcounts.takeOrdered(10, key=lambda x: -x[1])

## After using Spark
In the end, we should stop the Spark by using the `stop` function.

In [None]:
sc.stop()