#### Names of people in the group

Please write the names of the people in your group in the next cell.

Name of person A: Vegard Vaeng Bernhardsen

Name of person B: None

In [4]:
# import dbutils
# # Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
# dbutils.fs.rm("/user", recurse=True)

In [5]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

In [6]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql import SparkSession  # Make sure to import SparkSession


#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

In [7]:
# Defining a schema for 'badges' table
badges_schema = StructType([StructField('UserId', IntegerType(), False),
                            StructField('Name', StringType(), False),
                            StructField('Date', TimestampType(), False),
                            StructField('Class', IntegerType(), False)])

# Defining a schema for 'posts' table
posts_schema = StructType([
    StructField('Id', IntegerType(), False),
    StructField('ParentId', IntegerType(), True),  # Nullable because only answers have a ParentId
    StructField('PostTypeId', IntegerType(), False),
    StructField('CreationDate', TimestampType(), False),
    StructField('Score', IntegerType(), False),
    StructField('ViewCount', IntegerType(), True),  # Nullable because it might not be relevant for answers
    StructField('Body', StringType(), False), # Note: This will be base64 encoded
    StructField('OwnerUserId', IntegerType(), True),  # Nullable because a post might not have an owner (rare cases)
    StructField('LastActivityDate', TimestampType(), False),
    StructField('Title', StringType(), True),  # Nullable because only questions have a title
    StructField('Tags', StringType(), True),  # Nullable because only questions have tags
    StructField('AnswerCount', IntegerType(), True),  # Nullable because only questions have answers
    StructField('CommentCount', IntegerType(), False),
    StructField('FavoriteCount', IntegerType(), True),  # Nullable because only questions might be favorited
    StructField('CloseDate', TimestampType(), True)  # Nullable because only questions might be closed
])

# Defining a schema for 'users' table
users_schema = StructType([
    StructField('Id', IntegerType(), False),
    StructField('Reputation', IntegerType(), False),
    StructField('CreationDate', TimestampType(), False),
    StructField('DisplayName', StringType(), False),
    StructField('LastAccessDate', TimestampType(), False),
    StructField('AboutMe', StringType(), True),  # Nullable because a user might not have filled this out
    StructField('Views', IntegerType(), False),
    StructField('UpVotes', IntegerType(), False),
    StructField('DownVotes', IntegerType(), False)
])


# Defining a schema for 'comments' table
comments_schema = StructType([
    StructField('PostId', IntegerType(), False),
    StructField('Score', IntegerType(), False),
    StructField('Text', StringType(), False),  # Note: This will be base64 encoded
    StructField('CreationDate', TimestampType(), False),
    StructField('UserId', IntegerType(), True)  # Nullable because a comment might not be linked to a user (anonymous comments)
])


#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

In [8]:
def load_csv(source_file: str, schema: StructType) -> DataFrame:
    """
    Loads a CSV file into a Spark DataFrame using the provided schema.
    
    :param source_file: Path for the CSV file to load.
    :param schema: Schema for the CSV file being loaded as a DataFrame.
    :return: Spark DataFrame containing the loaded data.
    """
    # Ensure SparkSession is imported and available
    spark = SparkSession.builder.appName("Big Data Application").getOrCreate()
    df = spark.read.csv(path=source_file, schema=schema, sep='\t', header=True)
    return df

def save_df(df: DataFrame, table_name: str) -> None:
    """
    Saves a Spark DataFrame as a Parquet file on DBFS.
    
    :param df: DataFrame to be saved.
    :param table_name: Name under which the DataFrame will be saved. This should include the path.
    """
    df.write.mode('overwrite').parquet(f"/user/hive/warehouse/{table_name}")



In [9]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [10]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    def test_load_badges(self):
        result = load_csv(source_file="/FileStore/tables/badges_csv.gz", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        result = load_csv(source_file="/FileStore/tables/posts_csv.gz", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'CloseDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        result = load_csv(source_file="/FileStore/tables/comments_csv.gz", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        result = load_csv(source_file="/FileStore/tables/users_csv.gz", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        dfs = [("/FileStore/tables/users_csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges_csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments_csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts_csv.gz", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])



Fail

EEEEE
ERROR: test_load_badges (__main__.TestTask1.test_load_badges)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "Cell Tests", line 5, in test_load_badges
  File "C:\Users\vegar\AppData\Local\Temp\ipykernel_12540\3795138152.py", line 11, in load_csv
    df = spark.read.csv(path=source_file, schema=schema, sep='\t', header=True)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\vegar\OneDrive - NTNU\Documents\Universitetet\12. Semester\Big Data Arkitektur\BigData\venv\Lib\site-packages\pyspark\sql\readwriter.py", line 740, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\vegar\OneDrive - NTNU\Documents\Universitetet\12. Semester\Big Data Arkitektur\BigData\venv\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    retu

<unittest.runner.TextTestResult run=5 errors=5 failures=0>

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.


Spark Application: I think a Spark application is a distibuted computing program, which is written in order to perform complex processing of data tasks. It has to run on a cluster managed by the Spark framework. This enables efficient processing of large datasets on multiple nodes. 
Some core features are in-memory data storage and optimized execution plans, in order to achive high performance for both real-time data and batch processing tasks. 


SparkSession: SparkSession is a unified entry point for Spark applications. It replaced earlier entry points, including SQLContext, HiveContext and SparkContext. SparkSession gives a more streamlined way to interact with Spark functionalities. This inncludes SQL queries, streaming data, Dataset APIs and DataFrame. 
By encapsulating multiple contexts into a single object, SparkSession can make it easier for a developer to write and mandage Spark code.Through this single interface, a user can perform data processing tasks, including reads and writes of data to execute SQL queries and performing analytics. 


Transformation: Transformation is one of the core consepts in Spark. It defines operations on RDDs, Resilient Distributed Datasets to create new Data. Transformations include operations like filter, map, join, groupBy, allowing the manipulation of data across the Spark cluster. 
Transformations are lazily evaluated. This means they won't be executed immideatily, but ratherbuilds up a lineage of operations to be executed. By using this approach, Spark can optimize the overall pipeline for processing data. Which imporves performance and resource utilization. As they produce new datasets, they are fundamental for Spark's ability to handle big data processing tasks quick and efficent.



Action: Actions in Spark is what trigger the execution of the transformations applied to RDDs. Opposed to transformations, which as we know are lazily evaluted, actions force an immediate computation of the results. Some examples of actions includes count, collect, save and take. 
When an action is aclled Spark evaluates the entire chain of transformation that have been applied to that point. This returns  a result to the Spark driver or writes data to storage. Actions are important to obtain results from Spark applications, whether it's for data output, triggering computations or aggregation results. 



Lazy Evalution: Lazy Evalution is a key performance optimization feature in Spark. It delays the execution of transformations until an action is called. Spark builds an exection plan, a DAG (Directed Acyclic Graph, of transformations), as opposed to exectue each transformation as soon as it is defined. By doing this, Spark can optimize th eoverall workflow for data processing. It can rearrange operations for efficency, or by minimising the amount of data the cluster collects. Therefore, Lazy evaluations can reduce unnecessary computations and improves the efficienct of Spark applications, by executing transformations in an optimized order, when required by an action. 


Your descriptions...