#### Names of people in the group

Thomas Bjerke

Trym Grande

In [None]:
# Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
dbutils.fs.rm(f"dbfs:/FileStore/dataframes", recurse=True)

Out[98]: True

In [None]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import pandas as pd

#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

In [None]:
# Defining a schema for 'badges' table
badges_schema = StructType([StructField('UserId', IntegerType(), False),
                            StructField('Name', StringType(), False),
                            StructField('Date', TimestampType(), False),
                            StructField('Class', IntegerType(), False)])

# Defining a schema for 'posts' table
posts_schema = StructType([StructField('Id', IntegerType(), False),
                           StructField('ParentId', IntegerType(), True),
                           StructField('PostTypeId', IntegerType(), False),
                           StructField('CreationDate', TimestampType(), False),
                           StructField('Score', IntegerType(), False),
                           StructField('ViewCount', IntegerType(), False),
                           StructField('Body', StringType(), False),
                           StructField('OwnerUserId', IntegerType(), False),
                           StructField('LastActivityDate', TimestampType(), False),
                           StructField('Title', StringType(), True),
                           StructField('Tags', StringType(), True),
                           StructField('AnswerCount', IntegerType(), True),
                           StructField('CommentCount', IntegerType(), False),
                           StructField('FavoriteCount', IntegerType(), True),
                           StructField('CloseDate', TimestampType(), True)])

# Defining a schema for 'users' table
users_schema = StructType([StructField('Id', IntegerType(), False),
                           StructField('Reputation', IntegerType(), False),
                           StructField('CreationDate', TimestampType(), False),
                           StructField('DisplayName', StringType(), False),
                           StructField('LastAccessDate', TimestampType(), False),
                           StructField('AboutMe', StringType(), True),
                           StructField('Views', IntegerType(), False),
                           StructField('UpVotes', IntegerType(), False),
                           StructField('DownVotes', IntegerType(), False)])

# Defining a schema for 'comments' table
comments_schema = StructType([StructField('PostId', IntegerType(), False),
                            StructField('Score', IntegerType(), False),
                            StructField('Text', StringType(), False),
                            StructField('CreationDate', TimestampType(), False),
                            StructField('UserId', IntegerType(), False)])


#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

In [None]:
def load_csv(source_file: "path for the CSV file to load", schema: "schema for the CSV file being loaded as a DataFrame") -> DataFrame:
    df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .option("delimiter","\t") \
      .schema(schema) \
      .load(source_file)
    print(df_with_schema)
    return df_with_schema

def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except:
    return False
  
def save_df(df: DataFrame, table_name) -> bool:
    if file_exists(f"dbfs:/FileStore/dataframes/{table_name}"):
      return false
    df.write.parquet(f"dbfs:/FileStore/dataframes/{table_name}")
    return True

In [None]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

The ipython_unittest extension is already loaded. To reload it, use:
  %reload_ext ipython_unittest


#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [None]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    def test_load_badges(self):
        result = load_csv(source_file="/FileStore/tables/badges_csv.gz", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        result = load_csv(source_file="/FileStore/tables/posts_csv.gz", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'CloseDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        result = load_csv(source_file="/FileStore/tables/comments_csv.gz", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        result = load_csv(source_file="/FileStore/tables/users_csv.gz", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        dfs = [("/FileStore/tables/users_csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges_csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments_csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts_csv.gz", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])

DataFrame[UserId: int, Name: string, Date: timestamp, Class: int]
DataFrame[PostId: int, Score: int, Text: string, CreationDate: timestamp, UserId: int]
DataFrame[Id: int, ParentId: int, PostTypeId: int, CreationDate: timestamp, Score: int, ViewCount: int, Body: string, OwnerUserId: int, LastActivityDate: timestamp, Title: string, Tags: string, AnswerCount: int, CommentCount: int, FavoriteCount: int, CloseDate: timestamp]
DataFrame[Id: int, Reputation: int, CreationDate: timestamp, DisplayName: string, LastAccessDate: timestamp, AboutMe: string, Views: int, UpVotes: int, DownVotes: int]
DataFrame[Id: int, Reputation: int, CreationDate: timestamp, DisplayName: string, LastAccessDate: timestamp, AboutMe: string, Views: int, UpVotes: int, DownVotes: int]
DataFrame[UserId: int, Name: string, Date: timestamp, Class: int]
DataFrame[PostId: int, Score: int, Text: string, CreationDate: timestamp, UserId: int]
DataFrame[Id: int, ParentId: int, PostTypeId: int, CreationDate: timestamp, Score: in

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.

# Descriptions

## Spark Application
A Spark application runs on a set of nodes that are clustered together. On one of these nodes sits the driver process, and the rest of the nodes consist of executor processes. The driver process is a kind of administrative process that keeps track of information about the application, as well as distributing work to the executor nodes. When the work is done, it is also the driver process' responsibility to respond to the user program. The executor processes do the actual computations, and report back to the driver process before the driver can report back to the user program. 

## SparkSession
A SparkSession is a way of unifying all of the contexts needed as entry points to utilize Spark's functionality. Without SparkSessions, developers would need to pay attention to all of the different contexts such as Hive contexts, SQL contexts etc. to run a spark application. What a SparkSession does is to gather all these contexts in to one, and thus make it way easier for developers to work with. The SparkSession object resides in the driver process and helps coordinate the executor nodes by keeping information about all of the contexts in the application. 

## Transformations
To understand this concept, one first has to know about RDDs. An RDD (Resilient Distributed Dataset) is Sparks fundemental datastructure, and are collections of any types of objects. They are records of data that is partitioned and replicated across multiple nodes (making them resilient/fault tolerant). A transformation is one of two operations that can be done on RDDs. A transformation means to take a RDD as input, do something with it, and then store the results in a new RDD. RDDs are immutable, which means that every time a transformation occurs, a new RDD is created so that the input-RDD is still the same as before the transform-operation. 

There are two types of transformations, the first one is narrow transformations. In narrow transformations, all data that is needed to compute the records in a partition resides within that same partition. The second type is wide transformations, which means that the data needed to compute the records in a single partition might reside across several different partitions of the parent-RDD. Examples of narrow transformations are map() and filter(), and examples of wide transformations are groupbyKey() and reducebyKey(). 

## Action
Actions are methods within an RDD (Resilient Distributed Dataset). These operations will be able to start a job that will  execute on a cluster. These can be implemented directly into the code, as long as the result set being worked on is small enough to fit into memory. Otherwise, it is also possible to write the data to the DBFS storage between execution. Whenever an action is called, the related transformations will also be executed. Common examples of these are 'reduce', 'collect', 'takeSample', 'take', 'first', 'saveAsTextfile', 'saveAsSequenceFile', 'countByKey', and 'Foreach'.

## Lazy Evaluation
Lazy evaluation is a consept within Apache Spark. Here, the RDD can be considered the data. When a transformation is being called, it will only be appended to a transformation log called the DAG. This will allow many transformations to be called without them actually executing, because they are lazy. The transformations will only apply all at once whenever an action is executed.