# Task 3

In [0]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
# Loading modules that we need
from pyspark.sql.dataframe import DataFrame
from collections import Counter
from pyspark.sql.functions import desc
from pyspark.sql.functions import stddev
from pyspark.sql.functions import split
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
from itertools import combinations
from pyspark.sql.types import * 
import math

In [0]:
# A helper function to load a table (stored in Parquet format) from DBFS as a Spark DataFrame 
def load_df(table_name: "name of the table to load") -> DataFrame:
    return spark.read.format("delta").load(table_name)

users_df = load_df("/user/hive/warehouse/users")
posts_df = load_df("/user/hive/warehouse/posts")

#### Subtask 1: implementing two functions
Implement these two functions:
1. 'compute_pearsons_r' that receives a DataFrame and two column names and returns the [Pearson correlation coefficient](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient) between values of two columns;
2. 'make_tag_graph' that in the input receives the DataFrame containing the records related to 'questions' and returns a DataFrame with two columns 'u' and 'v'; the record for row i from the resulting DataFrame is a tuple (u_i, v_i). u_i and v_j are distinct tags and have appeared together for a question.

Please note that you should implement the 'compute_pearsons_r' yourself, so you should not use the 'DataFrame.stat.corr' method. Nevertheless, you can use 'DataFrame.stat.corr' to verify the correctness of your implementation.

In [0]:
def make_combinations(l):
  return [c for c in combinations(l, 2)]


def compute_pearsons_r(df: "a DataFrame", col1: "name of column A", col2: "name of column B") -> float:
    cov = df.stat.cov(col1, col2)
    stv1 = df.select(stddev(col1))
    stv2 = df.select(stddev(col2))
    result = (cov /(stv1.collect()[0][0] * stv2.collect()[0][0]))
    return result
  
def make_tag_graph(df: "DataFrame containing question data") -> DataFrame:
    
    
    combination_udf = udf(make_combinations, ArrayType(ArrayType(StringType())))
    
    split_df = df.select('*', split("Tags", ">").alias("split_tags"))
    exploded_df = split_df.select("*", explode("split_tags").alias("tag"))
    grouped = exploded_df.groupBy("Id").agg(collect_list("tag").alias("tags"))
    
    edges = grouped.select(combination_udf("tags").alias("tagcombinations"))
    
    edges = edges.select(explode("tagcombinations").alias("tagscombined"))
    edges = edges.selectExpr("tagscombined[0] as u", "tagscombined[1] as v")
    edges = edges.withColumn('u', regexp_replace(col('u'), "<", ""))
    edges = edges.withColumn('v', regexp_replace(col('v'), "<", ""))
    edges = edges.filter(edges["u"].isNotNull() & (edges["v"] != ""))
    
    
    reversed_edges = edges.selectExpr("v as u", "u as v")
    final_edges = edges.union(reversed_edges)
    
    final_edges.show()
    
    return final_edges
  

In [0]:
# Imprting GraphFrames graph library; make sure you have GraphFrames installed on the cluster
from graphframes import *

#### Subtask 2: implementing three functions
Impelment these three functions:
1. 'get_nodes' that, given the result from execution of 'make_tag_graph', returns a DataFrame with one column named 'id' that includes the tags that have appeared in the tag graph;
2. 'get_edges' that, given the result from execution of 'make_tag_graph', returns a DataFrame with two columns 'src' and 'dst' where 'src' is the source node and 'dst' is the destination node.
3. 'compute_pagerank' that receives a GraphFrames graph object in the input and computes the PageRank for nodes in the graph and returns the result as a DataFrame with two columns named 'id' and 'pagerank'; the rows in the in the resulting DataFrame should be sorted by the values of 'pagerank' column.

Note that the term 'tag graph' in this context refers to the DataFrame reuturned by executing 'make_tag_graph'. Furthermore, 'src' and 'dst' are distinct, so 'src' != 'dst'.

In [0]:
def get_nodes(df: "DataFrame of the tag graph") -> DataFrame:
  unique_tags_df = df.select(explode(array(col("u"), col("v")))).distinct()
  unique_tags_df = unique_tags_df.select("col").toDF("id")
  unique_tags_df = unique_tags_df.select("id")
  df = unique_tags_df.filter((~(col("id") == "")))
  return df

def get_edges(df: "DataFrame of the tag graph") -> DataFrame:
    edges = df.select(col("u").alias("src"), col("v").alias("dst"))
    return edges

def compute_pagerank(graph: "a Graphframes graph") -> DataFrame:
  results = graph.pageRank(resetProbability=0.15, tol=0.01)
  display(results.vertices)
  result = results.vertices.select("id", "pagerank").sort("pagerank", ascending=False)
  return result


In [0]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully.

In [0]:
%%unittest_main
class TestTask3(unittest.TestCase):
  
  error_threshold = 0.03
  
  def test_corr1(self):
    # Pearson correlation coefficient between 'user reputation' and 'upvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "UpVotes")
    self.assertLessEqual(math.fabs(result-0.5218138310114108), self.error_threshold)
    print(result)
  
  def test_corr2(self):
    # Pearson correlation coefficient between 'user reputation' and 'downvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "DownVotes")
    self.assertLessEqual(math.fabs(result-0.1473558141546844), self.error_threshold)
    print(result)

  def test_corr3(self):
    # Pearson correlation coefficient between 'question score' and the 'number of answers' it received
    result = compute_pearsons_r(posts_df[posts_df["PostTypeId"] == 1], "Score", "AnswerCount")
    self.assertLessEqual(math.fabs(result-0.47855272641249674), self.error_threshold)
    print(result)
    
  def test_make_tag_graph(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    self.assertIsInstance(result, DataFrame)
    
    coulmn_names = Counter(map(str.lower, ['u', 'v']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)), "Missing column(s) or column name mismatch")
    
    display(result)
    
    self.assertEqual(result.count(), 225292)
    
  def test_get_nodes(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    n = get_nodes(result)
    self.assertEqual(n.count(), 638)
    n.show()

  def test_get_edges(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    e = get_edges(result)
    
    coulmn_names = Counter(map(str.lower, ['src', 'dst']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, e.columns)), "Missing column(s) or column name mismatch")
    
    self.assertEqual(e.count(), 225292)
    e.show()
    
  def test_compute_pagerank(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    n = get_nodes(result)
    e = get_edges(result)
    g = GraphFrame(n, e)
    ranks = compute_pagerank(g)
    self.assertEqual(ranks.first()[0], 'machine-learning')
    ranks.show()



+----------------+----------------+
|               u|               v|
+----------------+----------------+
|       education|     open-source|
|     data-mining|     definitions|
|machine-learning|         bigdata|
|machine-learning|          libsvm|
|         bigdata|          libsvm|
|         bigdata|     scalability|
|         bigdata|      efficiency|
|         bigdata|     performance|
|     scalability|      efficiency|
|     scalability|     performance|
|      efficiency|     performance|
|           nosql| relational-dbms|
|     data-mining|      clustering|
|     data-mining|          octave|
|     data-mining|         k-means|
|     data-mining|categorical-data|
|      clustering|          octave|
|      clustering|         k-means|
|      clustering|categorical-data|
|          octave|         k-means|
+----------------+----------------+
only showing top 20 rows



id,pagerank
similar-documents,0.5529163717743941
feature-extraction,2.338228658076921
graphs,1.3052156699365418
ensemble-modeling,1.0228283124584572
theano,0.4300044029218328
evaluation,1.6616731760406982
multi-output,0.3667330876819561
mlops,0.2951482149991089
finite-precision,0.2128372053146443
policy-gradients,0.5776891046022012


+-------------------+------------------+
|                 id|          pagerank|
+-------------------+------------------+
|   machine-learning|  53.1479857835248|
|             python| 32.06835033450795|
|      deep-learning|24.042373096165225|
|     neural-network|21.255768081750155|
|     classification|15.318045144552991|
|              keras|14.072024884265128|
|                nlp|11.430372243657702|
|       scikit-learn|10.970562767137379|
|         tensorflow|10.714361569387375|
|        time-series| 7.725044537959054|
|         regression| 7.217694808632236|
|            dataset| 7.011398546911266|
|                cnn| 6.921492245972301|
|                  r| 6.857501599786331|
|        data-mining| 6.562685631055276|
|         clustering| 6.341043099185564|
|predictive-modeling| 5.973874233475867|
|               lstm| 5.790255335632053|
|         statistics| 5.664164310072055|
|             pandas| 5.601571613162543|
+-------------------+------------------+
only showing top

u,v
education,open-source
data-mining,definitions
machine-learning,bigdata
machine-learning,libsvm
bigdata,libsvm
bigdata,scalability
bigdata,efficiency
bigdata,performance
scalability,efficiency
scalability,performance


Fail

......F
FAIL: test_make_tag_graph (__main__.TestTask3)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "Cell Tests", line 32, in test_make_tag_graph
AssertionError: 225292 != 228830

----------------------------------------------------------------------
Ran 7 tests in 2250.198s

FAILED (failures=1)
Out[11]: <unittest.runner.TextTestResult run=7 errors=0 failures=1>

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'User-Defined Functions (UDFs)', 'Data Locality', 'Bucketing', 'Distributed Filesystem' mean in the context of Spark?

Write your descriptions in the next cell.


User defined functions allow you to create custom functions in spark SQL, these functions can be used to extend the functionnality of spark SQL beyond what is available with build-in functions. For example we can define: val add = udf((x: Int)) => x + 1. It is not available for pySpark only java and Scala.

In spark environment, the data is executed as close to the data as possible. This means that the spark executors are executed based on their proximity to the requested data.
In spark the locality has an influence over when the tasks are executed. Data locality is achieved if every HDFS block is loaded into the RAM of the same node where the HDFS block lives. 

Bucketing is an optimizing technique used for spark to split the dataset in smaller chuncks. For each column in a dataset record, the key in each column is hashed and the hash determines in which bucket the data resides. The hash function calculates the bucket number based on the bucket column. Buckets are not the same as partitions because the bucket column is stored in the data file instead of partitions that are stored as part of file system paths.

Distributed Filesystem
In spark we use the HDFS which is the hadoop distributed filesystem. A distributed filesystem is capable of storing data on multiple server nodes and no longer on only one machine. In HDFS data is stored as files which are divided into smaller blocks. Each block is replicated across several nodes in the cluster. When working with this data, HDFS retrieves the blocks from the nodes and combines them. 

