#### Names of people in the group

Please write the names of the people in your group in the next cell.

Nick Askari

Simen Peder Stang

In [None]:
# Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
dbutils.fs.rm("/user", recurse=True)

Out[16]: True

In [None]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-4ebc8707-28c6-4b40-9385-2e7015623720/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame
from pyspark.sql.types import *

#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

In [None]:
# Defining a schema for 'badges' table
badges_schema = StructType([StructField('UserId', IntegerType(), False),
                            StructField('Name', StringType(), False),
                            StructField('Date', TimestampType(), False),
                            StructField('Class', IntegerType(), False)])

# Defining a schema for 'posts' table
posts_schema = StructType([StructField('Id', IntegerType(), False),
                           StructField('ParentId', IntegerType(), True), 
                           StructField('PostTypeId', IntegerType(), False),
                           StructField('CreationDate', TimestampType(), False), 
                           StructField('Score', IntegerType(), False),
                           StructField('ViewCount', IntegerType(), False),
                           StructField('Body', StringType(), False), 
                           StructField('OwnerUserId', IntegerType(), False),
                           StructField('LastActivityDate', TimestampType(), False),
                           StructField('Title', StringType(), True),
                           StructField('Tags', StringType(), True), 
                           StructField('AnswerCount', IntegerType(), True), 
                           StructField('CommentCount', IntegerType(), False),
                           StructField('FavoriteCount', IntegerType(), True),
                           StructField('CloseDate', TimestampType(), True)])
## To-do!

# Defining a schema for 'users' table
users_schema = StructType([
    StructField('Id', IntegerType(), False),  
    StructField('Reputation', IntegerType(), False),
    StructField('CreationDate', TimestampType(), False),
    StructField('DisplayName', StringType(), False),
    StructField('LastAccessDate', TimestampType(), False),
    StructField('AboutMe', StringType(), True),
    StructField('Views', IntegerType(), False),
    StructField('UpVotes', IntegerType(), False),
    StructField('DownVotes', IntegerType(), False)
])
## To-do!

# Defining a schema for 'comments' table
comments_schema = StructType([
    StructField('PostId', IntegerType(), False),
    StructField('Score', IntegerType(), False),
    StructField('Text', StringType(), False),  
    StructField('CreationDate', TimestampType(), False),
    StructField('UserId', IntegerType(), False)
])
## To-do!

#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame

def load_csv(source_file: "path for the CSV file to load", schema: "schema for the CSV file being loaded as a DataFrame") -> DataFrame:
    ## To-do!
    spark = SparkSession.builder.getOrCreate()
    return spark.read.csv(path=source_file, sep='\t', schema=schema, header=True)

def save_df(df: "DataFrame to be saved", table_name: "name under which the DataFrame will be saved") -> None:
    ## To-do!
    df.write.mode('overwrite').parquet(f"dbfs:/user/hive/warehouse/{table_name}")

In [None]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

The ipython_unittest extension is already loaded. To reload it, use:
  %reload_ext ipython_unittest


#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [None]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    def test_load_badges(self):
        result = load_csv(source_file="/FileStore/tables/badges_csv.gz", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        result = load_csv(source_file="/FileStore/tables/posts_csv.gz", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'CloseDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        result = load_csv(source_file="/FileStore/tables/comments_csv.gz", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        result = load_csv(source_file="/FileStore/tables/users_csv.gz", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        dfs = [("/FileStore/tables/users_csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges_csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments_csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts_csv.gz", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])



Success

.....
----------------------------------------------------------------------
Ran 5 tests in 25.960s

OK
Out[22]: <unittest.runner.TextTestResult run=5 errors=0 failures=0>

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.

Your descriptions...

### Spark Application

A Spark Application is a program which details how the Spark system should process data. To explain what exactly a spark application is, it is convinent to briefly mention what Apache Spark is. One can use Spark to facilitate processing and analyse huge amounts of data. The data is divided into different machines, or clusters. This means that Spark can distribute computation accross these different machines. In other words it is capable of parallell processing where it can process data in different nodes within a cluster. Hence, it is suitable when dealing with big data. 

Now the Spark Application is simply the user built program which details the data processsing. In such an application you could for example start by creating a Sparkcontext, which details where to access some given cluster. Then one can write code to perform transformations or action (more on this later). These applications can be written in Java, Scala, Python or R. 

### SparkSession

SparkSession is the entry point to programming Spark with the Dataset and DataFrame API, as mentioned in the documentation. It is essentiallly an access point to all Spark functionalities. A SparkSession can be used to create DataFrames (like in this task), register DataFrames as tables, execute SQL queries, cache tables, and read data from various sources. 

### Transformations

Transformations creates a new RDD (Resilient Distributed Dataset) from a previous one. A transformation could be an operation like 'filter', 'map' or 'join' to mention a few. One important fact about transformations is that they are computed 'lazily' which we will come back to.

Another key fact is that once you perform transformations and create new RDD's, spark keeps track of the dependencies in a *lineage graph*. This way it knows how to compute each of the RDD's. In addition, if some data is lost, then one can recover data with this graph.

### Actions

With actions, as opposed to transformation, are operations that actually triggers some computations. Actions are such that they return a value to the driver program, or writing to external storage. In other words, output needs to be produced. Typical actions could be something like count(). Also bear in mind that an action might be performed on some transformation of a RDD, hence when the action is run, then the transformation also must be run (or evaluated).

### Lazy Evaluation

As mentioned, transformations are evaluated lazily. Hence, when you just write a line of code where you execute some evaluation, then it does not execute until it has to for some action. This way you kind of delay computation. Making the system more efficient, avoiding heavy computation until it really has to. Also one minimizes data movement, as it can prove to be costly. Spark accomplishes this be constructing a DAG (Directed Acyclic Graph). 