Copyright Scott Jensen, San Jose State University

<a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-sa/4.0/88x31.png" /></a><br /><span xmlns:dct="http://purl.org/dc/terms/" property="dct:title">This notebook</span> by <span xmlns:cc="http://creativecommons.org/ns#" property="cc:attributionName">Scott Jensen,Ph.D.</span> is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/">Creative Commons Attribution-ShareAlike 4.0 International License</a>.

# Working With Files Part 1: Loading The Yelp Data

**<span style="color:red">This version of the notebook includes only Steps 2, 3, and 5 of Part 1 which loads the Yelp data.</span>** This version of Step 5 should be run if you have already completed Parts 2 and 3 in the original version of the notebook.

When running this notebook, first complete steps 2 & 3 to create the widget for the manifest URL and add that URL to the widget before running step 5.

After running this notebook, Return to your original notebook and complete the three code cells in steps 6 and 7 of Part 1 before submitting your notebook for grading.

### Step 2: Using a Databricks widget to enter the path to the data manifest

Since some of the data files are too large to manually upload even when compressed, you will be importing the data from where we temporarily staged it on AWS, but **you *MUST* complete the dataset agreement assignment in order to earn credit for *ANY* of the exercises or the team assignments which use the Yelp data**.  To bring the data from where we staged it on AWS to your Databricks account hosted on AWS (for free by Databricks! Yay!) the code below needs to know where it can downlaod the data from.

When you run the next cell, a "widget" will appear **at the top of the notebook** that prompts you for your the path to a data manifest.  
On the right-hand side of that widget bar you will see a pushpin (a.k.a. thumbtack) icon that will allow you to pin that to the top of the code window even when you scroll down.  Once you have entered the manifest's path, you can "unpin" the widget bar to make more screen real estate available.

We will be importing three files to your Databricks account and eventually we will be putting the compressed files they contain in a directory named `/yelp` on the Databricks File System (DBFS) on your Databricks account. The manifest uses a JSON format and contains a JSON array with an object for each file.  Each JSON object provides three properties: the name of the file, an MD5 sum for the file, and a flag as to whether the file should be unzipped. The MD5 sum is a one-way hash that allows us to make sure the file download did not encounter any errors and is exactly the same as the file we originally staged on AWS.  The flag for whether to unzip is because we compress the data flies using the bzip2 format, but the review and user data files have been split by year (the year of the review or the year a user joined Yelp), so we zipped up the directories containing those files.  We need to unzip those directories before moving the files to the `/yelp` irectory on DBFS. You may be wondering why the manifest file is used, but it allows us to easily move the files or update them, and we only need to provide you with the URL to where you can find the current manifest.

To see the widget (if you just imported the notebook), click on the arrow in the upper 
right-hand corner of the next code cell and select `Run Cell`.

#### The following cell is Step 2

In [0]:
dbutils.widgets.text("manifest_url","xxxxxxxxxxxxxxx","Enter the manifest URL:")

#### Step 3: Entering the file manifest URL

In the input box for the widget with the prompt "Enter the manifest URL", enter the URL we provide in class.

Once you have entered the URL in the widget, we can get started.

In [0]:
dbutils.fs.help()

### Step 5: Importing the data

The code in the following cell will import the data files to your yelp directory.  While that cell is running (**it will probably take around 5 minutes**), let's talk about what is happening in that cell.
<ol>
  <li style="padding-bottom:5px;">First, we are importing the library named `requests` which allows us to retrieve data over the web.  This is not built into the core of Python, so it's a separate library.  However, since it's one that's commonly used, Databricks has it in your cluster already.  If this was a less commonly used library, you would need to first load the library on your cluster.</li>
  
  <li style="padding-bottom:5px;">We define a few variables, such as the part of the URL that's the same for all files located on AWS, where we will be downloading the data from.</li>

  <li style="padding-bottom:5px;">To make the process easier to understand, we break it into steps that can be defined as separate functions.  Each function is defined starting with `def` and the indenting in Python tells it when each function has ended.  The main code near the end of the cell is less than 10 lines long and calls the functions defined above.</li>

  <li style="padding-bottom:5px;">One of the first functions called is `get_manifest`, which is passed the URL you entered for the widget at the top of the notebook, it tries to download the manifest from that URL, and the manifest says what files are being downloaded.  The manifest is a tiny JSON file that we store out at a public URL and that file is a JSON object containing a name:value pair with the id for the bucket where the data is stored, and two name:value pairs with arrays as the values:
    <ul>
      <li style="padding-bottom:3px;">An array of the files or directories that will be created in the directory where we download the data.</li>
      <li style="padding-bottom:3px;">The second array contains a JSON object for each download file with:
        <ol>
          <li>The name of the file being downloaded</li>
          <li>The MD5 sum of the file. The MD5 sum is a hash of the file contents that returns a string - this is used to make sure the file downloaded is complete.</li>
          <li>A flag indicating if the downladed file should be unzipped.  The review and user downloads are zipped files containing a directory of files.</li>
        </ol>
      </li>
    </ul>
  </li>
  <li style="padding-bottom:5px;">The code cycles through downloading and unzipping the files (as needed), and in our case the user and review files contain a directory of files compressed using the bzip2 format which Spark can load.  As each file is downloaded, the MD5 sum of the file is calculated and compared to the MD5 sum in the manifest to make sure the download is complete and data is not missing.</li>
</ol>  

When downloading the data, we cannot work directly in DBFS, so we download and unzip the data using the driver node and then movng the data to DBFS.  Keep in mind that where we download the files
on the file system local to the driver node of the cluster disappears when the cluster terminates, but the files we move to DBFS are permanent and will be there again when you start a new cluster.

In [0]:
import pyspark.sql.functions as f
import requests
import re
import tarfile
import zipfile
import json
import hashlib

FORCE_DOWNLOAD = False

URL_HOST = ".s3-us-west-2.amazonaws.com/"
TEMP_DIR = "/yelptemp/"
ZIPPED_DIR = "/yelptemp/zipped/"
UNZIPPED_DIR = "/yelptemp/unzipped/"
DATA_DIR = "/yelp"

def clean_all():
  ' Removes the data directory if it exists on DBFS'
  try:
    dbutils.fs.rm(DATA_DIR,recurse=True)
  except Exception:
    pass
  

def get_manifest():
  ''' Returns the dictionary that's the download manifest based on the URL
      entered in the URL widget.
      If it's not a valid URL or returns a status code other than 200, an exception is raised.
      If the manifest is not valid JSON, or does not contain name:value pairs named
      id, data_list, and download_list, an exception is raised.  
  ''' 
  manifest_url = dbutils.widgets.get("manifest_url")
  response = requests.get(manifest_url)
  if response.status_code != 200:
    raise Exception(f"The manifest URL {manifest_url} returned a status of " + str(response.status))
  manifest = response.text
  try:
    manifest_dict = json.loads(manifest)
    # The manifest should have a data_list element and a down_Load list element
    if "data_list" in manifest_dict == False:
      raise Exception("The manifest does not contain a data_list.")
    if "download_list" in manifest_dict == False:
      raise Exception("The manifest does not contain a download_list.")
    return(manifest_dict)
  except json.JSONDecodeError as err:
    raise Exception("The manifest is not a valid JSON document.", err)
    

def check_data(manifest):
  ''' Function used to check if the data directory contains valid
      copies of all of the files in the download. The manifest dictionary
      is passed as a parameter and is expected to comtain a data_list containing
      the names of each file expected in the data directory.
      If a file is missing, this method returns False.
      If all of the files exist, it returns True.
  '''
  try:
    data_dir_list = dbutils.fs.ls(DATA_DIR)
    if len(db_dir_list) == 0:
      return(False)
      file_list = manifest["file_list"]
      existing_list = dbutils.fs.ls(DATA_DIR)
      for file_name in file_list:
        found == False
        for info in existing_list:
          if info.name == file_name:
            found == True
            break
        if found == False:
          return(False)
      # looped through all of the required files and they are there
      return(True)
  except Exception:
    # The directory does not exist, does not match the manifest, or the hashes don't match
    return(False)
  
def get_bucket_id(manifest):
  ''' The manifest is expected to contain a name:value pair named id
      where the value is the bucket name on S3 where the files are
      staged.  If the id is missing or is a blanks string, then an
      exception is raised, otherwise the bucket id is returned.
  '''
  try:
    bucket = manifest['id'].strip()
    if len(bucket) == 0:
      raise Exception("The id provided in the manifest was an empty string, but should be the name of the bucket being downloaded from.")
    else:
      return(bucket)
  except Exception as e:
    raise Exception("An error occurred in retrieving the bucket id from the manifest", e)
      
  
def download_file(manifest_item, bucket_id):
  ''' Given a dictionary from the download list, download the file to the
      temporary directory for downloading the file and check the
      MD5 sum to make sure it matches.
      If the MD5 sum does not match, an excetion is raised, otherwise it prints
      that the file was successfully downloaded.
  '''
  file_name = manifest_item["name"]
  item_md5sum = manifest_item["md5"]
  request_url = "https://" + bucket_id + URL_HOST + file_name
  local_name = ZIPPED_DIR + file_name 
  print("requesting file from:", request_url)
  r = requests.get(request_url, stream=True)
  status_code = r.status_code
  # If the status code is 200, then we successfully retrieved the file
  if status_code != 200:
    raise Exception(f"The {file_name} download failed. A status code of {str(status_code)} was returned from the URL:{request_url}.")
  else: # write the file 
    with open(local_name, 'wb') as file:
      for chunk in r.iter_content(chunk_size=4096):
        file.write(chunk)
        file.flush()
    file.close()
  #check if the hash of the file downloaded matches the md5 sum in the manifest
  with open(local_name, 'rb') as data_file:
    md5sum = hashlib.md5( data_file.read() ).hexdigest()
    if md5sum.lower() != item_md5sum.lower():
      raise Exception(f"The file {file_name} downloaded from Google Drive generated a MD5 sum of {md5sum} instead of the MD5 sum in the manifest ({item_md5sum}) so it may be corrupted and the processing was terminated.")
    else:
      print ("successfully downloaded:", file_name)

      
def process_file(manifest_item):
    ''' The file is now downloaded.  If the file is zipped,
        it first needs to be unziiped, and either way, moved
        to the DBFS data directory.
    '''
    local_name = ZIPPED_DIR + manifest_item["name"]
    local_path = "file:" + local_name
    is_zipped = manifest_item["zipped"] == "true" # This is either Ture or False
    if is_zipped:
      with zipfile.ZipFile(local_name,"r") as zip_ref:
        zip_ref.extractall(UNZIPPED_DIR)
      untar_info = dbutils.fs.ls("file:" + UNZIPPED_DIR)
      # The zip file could contain a directory, a file, or more than 1 file,
      # so we loop through the file list, moving all of them to DBFS
      for info in untar_info:
        destination = DATA_DIR + "/" + info.name
        dbutils.fs.mv(info.path, destination, recurse=True)  
      dbutils.fs.rm(local_path)
    else: # file was not zipped (or should remain zipped), so just move it
        destination = DATA_DIR + "/" + manifest_item["name"]
        dbutils.fs.mv(local_path, destination)  
    print ("processed:", local_name)
    
                      
def load_data(manifest_list, bucket_id):
  ''' Loops through the files in the download list from the manifest and 
      downloads the file, verifies the MD5 sum is correct, unzips it if needed,  
      and moves the file or folder that was in it to the data directory.'''
  # Create the empty temporary directories
  try:
    dbutils.fs.rm("file:" + TEMP_DIR,recurse=True)
  except Exception:
    pass
  # Create the temporary local directory and sub-directories
  dbutils.fs.mkdirs("file:" + TEMP_DIR)
  dbutils.fs.mkdirs("file:" + ZIPPED_DIR)
  dbutils.fs.mkdirs("file:" + UNZIPPED_DIR)
  # Loop through the files to download
  for item in manifest_list:
    download_file(item, bucket_id)
    process_file(item)
  # Remove the temp directory used to unzip the files
  dbutils.fs.rm("file:" + TEMP_DIR, recurse=True)
  
  
# *******************************************  
# Run the Actual Routine to Load the Data
# This code uses the above defined functions
# *******************************************
if FORCE_DOWNLOAD == True:
  clean_all()
manifest_dict = get_manifest()
if check_data(manifest_dict) == False:
  bucket_id = get_bucket_id(manifest_dict)
  download_list = manifest_dict["download_list"]
  load_data(download_list, bucket_id)
else:
  print("All of the required files exist in the data directory already, so the download was not processed.")
print("Done")
  