#### Names of people in the group

Please write the names of the people in your group in the next cell.

Magnus Sagmo

Elizabeth Pan

In [0]:
# Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
dbutils.fs.rm("/user", recurse=True)

In [0]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

In [0]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame
from pyspark.sql.types import *

#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

In [0]:
# Defining a schema for 'badges' table
badges_schema = StructType([StructField('UserId', IntegerType(), False),
                            StructField('Name', StringType(), False),
                            StructField('Date', TimestampType(), False),
                            StructField('Class', IntegerType(), False)])

# Defining a schema for 'posts' table
posts_schema = StructType([StructField('Id', IntegerType(), False),
                          StructField('ParentId', IntegerType(), False),
                          StructField('PostTypeId', IntegerType(), False),
                          StructField('CreationDate', TimestampType(), False),
                          StructField('Score', IntegerType(), False),
                          StructField('ViewCount', IntegerType(), False),
                          StructField('Body', StringType(), False),
                           StructField('OwnerUserId', IntegerType(), False),
                           StructField('LastActivityDate', TimestampType(), False),
                           StructField('Title', StringType(), False),
                           StructField('Tags', StringType(), False),
                           StructField('AnswerCount', IntegerType(), False),
                           StructField('CommentCount', IntegerType(), False),
                           StructField('FavoriteCount', IntegerType(), False),
                           StructField('ClosedDate', TimestampType(), False)
                          ])
## YOUR IMPLEMENTATION ##

# Defining a schema for 'users' table
users_schema = StructType([StructField('Id', IntegerType(), False),
                          StructField('Reputation', IntegerType(), False),
                          StructField('CreationDate', TimestampType(), False),
                          StructField('DisplayName', StringType(), False),
                          StructField('LastAccessDate', TimestampType(), False),
                          StructField('AboutMe', StringType(), False),
                          StructField('Views', IntegerType(), False),
                          StructField('UpVotes', IntegerType(), False),
                          StructField('DownVotes', IntegerType(), False)])
## YOUR IMPLEMENTATION ##

# Defining a schema for 'comments' table
comments_schema = StructType([StructField('PostId', IntegerType(), False),
                             StructField('Score', IntegerType(), False),
                             StructField('Text', StringType(), False),
                             StructField('CreationDate', TimestampType(), False),
                             StructField('UserId', IntegerType(), False)])
## YOUR IMPLEMENTATION ##

#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

In [0]:
def load_csv(source_file: "path for the CSV file to load", schema: "schema for the CSV file being loaded as a DataFrame") -> DataFrame:
    return spark.read.csv(source_file, sep="\t", header=True, schema=schema)

def save_df(df: "DataFrame to be saved", table_name: "name under which the DataFrame will be saved") -> None:
    df.write.format("parquet").option("parquet.enable.dictionary", "true") \
        .option("parquet.page.write-checksum.enabled", "false").mode('overwrite') \
        .saveAsTable(table_name)

In [0]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [0]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    def test_load_badges(self):
        result = load_csv(source_file="/FileStore/tables/badges_csv.gz", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        result = load_csv(source_file="/FileStore/tables/posts_csv.gz", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'ClosedDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        result = load_csv(source_file="/FileStore/tables/comments_csv.gz", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        result = load_csv(source_file="/FileStore/tables/users_csv.gz", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        dfs = [("/FileStore/tables/users_csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges_csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments_csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts_csv.gz", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.

##### Spark Application
A Spark Application consists of one driver process and several executor processes.

The driver process is responsible for 
- maintaining information about the Spark Application.
- responding to a user’s program or input.
- analyzing, distributing, and scheduling work across the executors.

The executor processes are responsible of actually executing the tasks assigned to them by the driver process.

[databricks.com](https://www.databricks.com/glossary/what-are-spark-applications)

##### SparkSession
A SparkSession is what allows you to programatically create RDDs, DataFrames and DataSets, and is essentially the entrypoint to use Spark.

[databricks.com](https://www.databricks.com/glossary/what-are-spark-applications)

##### Transformations
A Spark Tranformation is a function that takes an RDD as input, and produces one or more RDDs as output. It is essential that it always produces new RDDs, because of the immutable nature of RDDs. 

[sparkbyexamples.com](https://sparkbyexamples.com/spark/spark-rdd-transformations-2/)

##### Action
Spark Actions are the other type og Spark functions. They do not return RDDs like tranformations, but return raw values instead. One example is the ```count()``` method, which returns the number of elements in the RDD.

[sparkbyexamples.com](https://sparkbyexamples.com/spark/spark-rdd-actions/)

##### Lazy Evaluation
Lazy evaluation is the feature that the evaluation of an expression is not performed until the result of it is needed. In the Spark Context, this menas that a transformation will not be executed until an action requires the result of the transformation. 

This is useful due to the (possible) vast size of the data evaluated, and the following costly nature of transformations. 

[towardsdatascience.com](https://towardsdatascience.com/3-reasons-why-sparks-lazy-evaluation-is-useful-ed06e27360c4)