Copyright &copy; 2022, 2024, 2025 Scott Jensen, San Jose State University

<a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-sa/4.0/88x31.png" /></a><br /><span xmlns:dct="http://purl.org/dc/terms/" property="dct:title">This notebook</span> by <span xmlns:cc="http://creativecommons.org/ns#" property="cc:attributionName">Scott Jensen,Ph.D.</span> is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/">Creative Commons Attribution-ShareAlike 4.0 International License</a>.

#Overview
This notebook is designed to accompany the notebook titled "Loading Yelp Data - Serverless" and should be run ***after*** that notebook has been successfully run. That earlier notebook will have stored the files for the Yelp dataset from Kaggle in a `kaggle` directory in your Databricks catalog. In the Yelp dataset, the review and user data are large JSON files and take a long time to read as DataFrames. This notebook contains code to create tables for the review data (Step 1) and user data (Step 2).

Step 0 defines functions used in building both the review and user tables. These functions could also be used to build tables for other datasets.

**If you are using this notebook with the Yelp dataset from Kaggle:**
* If you left the path settings at their defaults in the `Loading Yelp Data - Serverless` notebook, you can use this notebook as is.
* If you modified the paths in the Databricks Catalog where you stored the data, then you will need to update the constants in Steps 1 and 2 accordingly.
* In building the table for the user data, it makes the following changes:
  * A data error in the elite field is fixed (discussed more below)
  * The `friends` field is dropped in building the table

**If you are using the notebook with other datasets:**
* The two functions used are:
  * process_table
  * verify_count

* The `verify_count` function takes the expected record count as a parameter, so you will need to provide that if building tables for files other than the Yelp data. Alternately, skip using that after the `process_table` function.

* The `process_table` function builds the table. Most of the parameters are self-explanatory. The `json_read_function` parameter is the name of a function you define that takes one parameter - the path to the file and then returns a DataFrame for that file. If all the columns and rows in a file are to be included in the table, this could have been included in the `process_table` function itself. If you want to include all of the columns and rows, see the code in Step 1 for building the review table. For the Yelp user data, we need to fix an error in the data and drop a field. If you want to edit the data as part of building the initial table, see the code in Step 2 for an example. The `process_table` function will call the `json_read_function` that was passed as a parameter.

**A Note on syntax errors:**

You may notice that some of the Spark SQL queries that use an f-string have syntax errors highlighted. These are not actually errors, but seem to be an issue with the serverless version trying to interpret values that won't be set until the query is run. 



# Building Review and User Tables
The review and user data files are the largest Yelp files you will be using in class and in your project.  Loading a DataFrame and selecting the desired columns from larger JSON files is slow, so this notebook instead loads the data and creates Spark Delta tables named `review_table` and `user_table`.  In future sessions, the tables can then be used instead of the JSON files. 

After the tables are built, if the record count is correct, the original JSON files can be deleted.

<span style="color:red;">Be sure to run this notebook after first running the notebook that loads the data files into a volume in the Databricks Catalog as bzip2 files.</span>

The first time this notebook is run, it will take up to 10 minutes to build both tables from the JSON files (this can vary a bit).

As noted above, once the tables are built, they can be used in Spark SQL queries the same as temporary views in SQL queries.

###Step 0: Function Definitions
The following cell includes imports and function definitions that are needed when running the cells further down for building the `review_table` and `user_table`, so cells should be run in order from top-to-bottom or using the `Run all` button or menu option.

In [0]:
import pyspark.sql.functions as f

def file_exists(path):
  """Checks if the path provided is a file.

    Parameters:
     path (str): Path to the directory in the Databricks catalog

     Returns True if the path exists and contains files 
     otherwise False
  """
  try:
    file_list = dbutils.fs.ls(path)
    return len(file_list) > 0
  except Exception:
    print(f'The specified file ({path}) does not exist or the path is not valid.')
    return(False) #if path does not exist


def process_table(table_name, table_path, data_path, force_rebuild, json_read_function):
  '''Creates a table based on the JSON file (can be compressed using bzip2).
  
  Parameters:
     table_name (str): Name of the table being created
     table_path (str): Dot-separated path to the catalog and schema - generally workspace.default
     data_path (str): Volume path of the compressed JSON data file
     force_rebuild (bool): Flag as to whether the table should be rebuilt (True) from the JSON files even if the table exists
     json_read_function (function): Name of a function that takes one parameter, the path to the JSON data, and returns the DataFrame that was created
     Returns None
     Raises an exception if the path to the data file is not valid
  '''
  spark.sql(f'USE {table_path}')
  if ( spark.catalog.tableExists(table_name) and force_rebuild == False):
    print(f"{table_name} table already exists and the option to force the rebuild was false")
    return
  
  data_exists = file_exists(data_path)
  if force_rebuild:
    if not data_exists:
      raise Exception(f"The path to the compressed JSON data file: {data_path} does not exist. Any existing table was not deleted.")
    spark.sql(f'DROP TABLE IF EXISTS {table_name}')
  elif not data_exists:
    raise Exception(f"The path to the compressed JSON data file: {data_path} does not exist. Be sure you loaded the data to the correct catalog schema.")

  print(f"building {table_name} from JSON file")
  df_temp = json_read_function(data_path)
  # Does not already exist as a table, so write it out
  df_temp.write.mode("overwrite").saveAsTable(f'{table_path}.{table_name}')
  return
  

def verify_count(table_name, table_path, target_count):
  """Verifies that the table has the expected number of records.

  Parameters:
     table_name (str): Name of the table that was created
     table_path (str): Dot-separated path to the catalog and schema - generally workspace.default
     target_count (int): Number of records expected in the table
     Returns None
  """
  table = table_path + '.' + table_name
  result = spark.sql(f"""
  SELECT COUNT(*) AS record_count
  FROM {table}
  """)
  # result is a DataFrame with one row and column (for the COUNT)
  # The collect() method returns an array of Spark DataFrame Rows,
  # So in the following line we:
  # 1. Get the first row from the array returned by collect
  # 2. Convert the array to a list
  # 3. Get the first element in the list (the int with the count)
  record_count = list(result.collect()[0])[0]

  if record_count == target_count:
    print(f"Table {table_name} was created and had the expected record count of {target_count:,d}. "
           "The data file can be deleted.")
  else:
    print(f"Table {table_name} was created, "
          f"but instead of the expected record count of {target_count:,d}, "
          f"it had a record count of {record_count:,d}. "
          f"The data file should NOT be deleted "
          "and this needs to be debugged.")

### Step 1: Build the table for the review data
The following cell uses the functions defined in Step 0 above, so that must be run first.  

For the review data, we want all the columns and rows from the review data file included in the table, so the function passed as a parameter to the `process_table` function simply reads the file and returns a DataFrame.

In [0]:
#If you want to force the building of the table from the JSON files, set FORCE_REVIEW_REBUILD to True.
# Only do this if records are missing from the table, it's not able to create the table, or you want to change the fields 
# included in the table since it is time consuming.
# ************************************************************************************************
# AFTER RUNNING IT TO REBUILD FROM THE JSON DATA BE SURE THAT FORCE_REVIEW_REBUILD IS SET TO FALSE
# ************************************************************************************************
FORCE_REVIEW_REBUILD = False 
REVIEW_DATA_PATH = "/Volumes/workspace/default/kaggle/review.bz2"
REVIEW_TBL_PATH = "workspace.default"
REVIEW_TBL_NAME = "review_table"
REVIEW_TARGET = 6990280

# If any fields should be added or excluded, such transformations should be included in this function.
# The function defined must take one parameter - the path to the JSON data file.
def create_review_dataframe(data_path):
  df_reviews = spark.read.json(data_path)
  return(df_reviews)

process_table(REVIEW_TBL_NAME, REVIEW_TBL_PATH, REVIEW_DATA_PATH, FORCE_REVIEW_REBUILD, create_review_dataframe)
verify_count(REVIEW_TBL_NAME, REVIEW_TBL_PATH, REVIEW_TARGET)

spark.sql(f"SELECT * FROM {REVIEW_TBL_PATH}.{REVIEW_TBL_NAME} LIMIT 5").show()

### Step 2: Build the table for the user data
The following cell uses the functions defined in Step 0 above, so that must be run first.  

**NOTE ON TRANSFORMATIONS IN LOADING:** 

The `process_table` function takes the name of a function that will be used to load the data.  That function takes one parameter - the name of the file used to create the table. Any transformations you want to make to the raw data can be done in that function.

Due to the way Yelp created the academic dataset, we need to fix a data error (this is not uncommon in working with data). It appears the data on years a user is elite might be stored as a string of 4-digit years
such as "2017201820202021".  Since Yelp did not exist during the prior century (and it's over 70 years until the next century), someone may have decided to split that string into an array (like a list in Python) based on "20". This would work for every year this century except for 2020, which unfortunately is within our data range, so the JSON for the user with the elite string "2017201820202021" is already in the JSON user file as [2017,2018,20,20,2021], so we need to fix that.

In [0]:
#If you want to force the building of the table from the JSON files, set FORCE_USER_REBUILD to True.
# Only do this if records are missing from the table, it's not able to create the table, or you want to change the fields 
# included in the table since it is time consuming.
# **********************************************************************************************
# AFTER RUNNING IT TO REBUILD FROM THE JSON DATA BE SURE THAT FORCE_USER_REBUILD IS SET TO FALSE
# **********************************************************************************************
FORCE_USER_REBUILD = False 
USER_DATA_PATH = "/Volumes/workspace/default/kaggle/user.bz2"
USER_TBL_PATH = "workspace.default"
USER_TBL_NAME = "user_table"
USER_TARGET = 1987897

# If any fields should be added or excluded, such transformations should be included in this function.
# Here we are fixing a data error in the elite field and dropping the "friends" field. The friends field 
# contains a list of the user IDs for everyone on Yelp that has friended this user (regardless of whether that
# other user is in the dataset). For some popular users, this can run into the thousands. 
def create_user_dataframe(data_path):
  df_users = spark.read.json(data_path).\
  withColumnRenamed("elite","original_elite").\
  withColumn("elite", f.regexp_replace( f.col("original_elite"), '20,20','2020').alias("elite") ).\
  drop("original_elite").drop("friends")
  return(df_users)
  
process_table(USER_TBL_NAME, USER_TBL_PATH, USER_DATA_PATH, FORCE_USER_REBUILD, create_user_dataframe)
verify_count(USER_TBL_NAME, USER_TBL_PATH, USER_TARGET)

spark.sql(f"SELECT * FROM {USER_TBL_PATH}.{USER_TBL_NAME} LIMIT 5").show()

### Step 3: Are all of the files and tables present?

**This step is used for grading the assignment**

For most of the exercises, grading will be based on multiple cells, and while you will know the output you are looking for, you will not know which cells are graded or how they are weighted in the grading beforehand. The goal in this exercise is to make sure you have all the data  loaded so you can participate in the exercises and your team project.

The code in the following two cells lists the contents of your `/kaggle` directory in the Databricks Catalog and lists the tables you have created.

In [0]:
dbutils.fs.ls('/Volumes/workspace/default/kaggle/')

In [0]:
spark.sql(f"SELECT * FROM {REVIEW_TBL_PATH}.{REVIEW_TBL_NAME} LIMIT 5").show()
spark.sql(f"DESCRIBE TABLE {REVIEW_TBL_PATH}.{REVIEW_TBL_NAME}").show()
spark.sql(f"SELECT * FROM {USER_TBL_PATH}.{USER_TBL_NAME} LIMIT 5").show()
spark.sql(f"DESCRIBE TABLE {USER_TBL_PATH}.{USER_TBL_NAME}").show()