# COMP7095 - Big Data Management

## Spark Lab 3: Spark SQL

### Introduction
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames, which can also act as a distributed SQL query engine. In this lab, we learn how to manipulate the data using the functions provided by the dataframe and SQL queries.

### Preparation
It is assumed that you have installed Python 3.9.x and created a virtual environment on your computer. Next, we need to perform the following steps for this lab:

1. Download the `ipnb version` of this lab and `movie_reviews.tsv` and save them.
2. Launch Terminal/Command prompt.
3. Start your Spark with Jupyter Notebook:

   


### Creating DataFrame from RDD

Everything now is ready, we can go ahead to work on this lab. 

First, we import the required packages.

In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *

We define a function named "preprocess" that will be used to split the values (review and sentiment) of each line.

In [2]:
def preprocess(line):
    values = line.split('\t')
    return values[1], values[0]

Then, we get the instance of the Spark context and load the data file to create a resilient distributed data (RDD) object. 

We use the `filter` function to ignore the header row and pass the data to the `preprocess` function. Then, a new RDD object will be created.

In [None]:
sc = pyspark.SparkContext.getOrCreate()

rdd = sc.textFile('data/movie_reviews.tsv')
reviews = rdd.filter(lambda x: x != 'review\tsentiment').map(preprocess)

We can check the content of the RDD object using the `take` function.

In [None]:
reviews.take(1)

With SQLContext, we can create a dataframe from a RDD object. \
<mark>DataFrame = RDD + Schema</mark>

To define a schema, we need the `StructField` function to describe each column. The syntax of the `StructField` function is: \

`StructField(col_name, col_type, nullable)`

In [None]:
schema = StructType([
    StructField('review', StringType()),
    StructField('sentiment', StringType())
])

# sqlContext = SQLContext(sc)
# df = sqlContext.createDataFrame(reviews, schema)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(reviews, schema)

You review the schema attribute of the dataframe.

In [None]:
df.schema

To view the content of the dataframe, we use the `show` function.

In [None]:
df.toPandas()

### Creating DataFrame from Data File

Import the required packages and create a SQL context.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

Create a schema and use the SQL context to load the data from the file - `movie_reviews.tsv`.

In [None]:
schema = StructType([
    StructField('review', StringType()),
    StructField('sentiment', StringType())
])

df2 = spark.read.csv('data/movie_reviews.tsv', header=True, schema=schema, sep='\t')
df2.toPandas()

User Define Function (UDF) allows us to create new columns based on the existing columns. 
For example, we want new columns to: \
- present the length of the review
- use Boolean (True/False) to present the sentiment of the review
- present how many "funny" included in the review
- present how many "terrible" include in the review

In [None]:
from pyspark.sql.functions import udf

length = udf(lambda x: len(x))
pos = udf(lambda x: x == 'positive')
funny = udf(lambda r, s: r.count('funny') if s == 'positive' else 0)
terrible = udf(lambda r, s: r.count('terrible') if s == 'negative' else 0)

Use the UDFs to create new columns.

In [None]:
df2 = df2.withColumn('length', length('review'))
df2 = df2.withColumn('positive', pos('sentiment'))
df2 = df2.withColumn('funny', funny('review', 'sentiment'))
df2 = df2.withColumn('terrible', terrible('review', 'sentiment'))

We can also delete the unwanted column by using the `drop` function.

In [None]:
df2 = df2.drop('sentiment')

Let's see what is the result!

In [None]:
df2.toPandas()

### Caching
Spark provides an important feature to cache intermediate data and provide significant performance improvement while running multiple queries on the same data.

By default, the dataframe is not cached. We can check its status through the `is_cached` attribute.

In [None]:
df2.is_cached

To cache the data, we simply call the cache function of the dataframe.

In [None]:
df2.cache()

We can verify it by checking the `is_cached` attribute again.

In [None]:
df2.is_cached

We can remove the cache by using the `unpersist` function.

`df2.unpersist()`

Of course, we want to keep using the caching for the following parts.

### Data Exploring

DataFrame provides different functions for retrieving data.

#### Ordering
We change the display order using the `orderBy` function. For example, sort by the "positive" column in ascending order.

Note that `ascending=True` means sort in ascending order; and `ascending=False` means sort in decending order.

In [None]:
df2.orderBy('positive', ascending=True).toPandas()

#### Your task 1: Please sort df2 according to 'negative' in descending order.

#### Ordering by Multiple Columns
We can sort the data by multiple columns too.

In [None]:
df2.orderBy(['positive','terrible'], ascending=[True, False]).toPandas()

#### Selecting Columns
The `select` function allows us to select which columns we want to display. For example, we want to have "review", "positive", and "terrible" columns only.

In [None]:
df2.select(['review', 'positive', 'terrible']).toPandas()

#### Your task 2: Please list df2 by 'review', 'length' and 'funny'.

#### Adding Conditions
With the `where` function, we can specify the condition for data retrieval. For example, we want the negative reviews with more than 3000 characters.

In [None]:
df2.select('review', 'positive', 'length').where('positive = false and length > 3000').toPandas()

#### Aggregate
With the `agg` (aggregate) functions, we can find the `min`, `max`, `avg`, `stddev`, and `count` from the dataframe.

For example, we want to find the maximum number of "funny" words in a single review.

In [None]:
df2.agg({'funny':'max'}).show()

#### Your task 3: Please find the maximum number of "terrible" words in a single review.

#### Grouping
Combining with `groupBy` function, we can find the aggregates of different groups. 

For example, we want to find the average lengths of positive reviews and negative reviews respectively.

In [None]:
df2.groupBy('positive').agg({'length':'avg'}).toPandas()

#### Summary
We can also show the simple statistic by using the `summary` function.

In [None]:
df2.summary().toPandas()

### Using SQL Query
We can create a temp view from the dataframe and the view can be used with SQL queries to retrieve the data.

In [None]:
df2.createOrReplaceTempView('movie_reviews')

Now, let's have a try to get something with a simple SQL query. For example, we want to show the columns including `review` and `length`. So, we use:

In [None]:
spark.sql('select review, length from movie_reviews').toPandas()

### Your task 4
Please show the reviews that contains at least 6 "funny" words or at least 6 "terrible" words. Also, we need them reviews are sorted by the number of the "funny" words in decending order and then the number of the "terrible" words in decending order.

We can also get the average lengths of the positive and negative reviews respectively.

In [None]:
spark.sql('select positive, avg(length) as avg_len from movie_reviews group by positive').toPandas()

### After using Spark
In the end, we should stop the Spark by using the `stop` function.

In [None]:
sc.stop()
spark.stop()