## Builder Data Loading App

This script loads Excel or CSV files from the Advana External Data Load (EDL) site into a specified table. The data is then available for use within the Advana platform, accessible via SQL queries or a Python notebook in Databricks for further processing. Only builder data is supported. Enterprise data is not supported with this script.

The application includes robust input checks and provides feedback for errors. If an issue prevents the data load from completing, an error will be displayed, and the program will cease execution.

***

### How to Use

1.  **Upload** your CSV or Excel file to the Advana External Data Load (EDL) site under the `Builder Data` location.
    * Only `.xlsx` (Excel) or `.csv` files are supported.

2.  **Wait** until `COMPLETED` appears in the `Status` field on the Advana EDL site. This may take a while. You may need to refresh the page to see the status update.

3.  **Copy** the entire `Key` field for the file you uploaded to Advana EDL and paste it into the `Key (from Advana EDL)` field.
    * Can't see the `Key (from Advana EDL)` field? This means the widgets haven't loaded. Press the `Run all` button at the top right. You will then see widgets at the top of the script where you can enter the key.

4.  **Select** either `Overwrite` or `Append` based on your needs.
    * **Warning:** Selecting `Overwrite` will delete all previous data in the table.

5.  **Enter** the number of rows you want to skip. This is useful for skipping empty or title rows in your file.

6.  **Enter** the desired table name for your data.
    * Ensure you enter the correct table name before running the application.

7.  **Click** `Run all`.

***

### Results

1)  **Success:** Every cell of code will run and the last cell will have a ✅ green check mark.
    * The key and target schema and table name will be listed.
    * The data is now ready for further processing.
2)  **Failure:** There will be a cell that failed.
    * Failure can easily be seen by a red rectangle on the right side. Click the red rectangle to be taken to the cell and view the error message.
    * The failed cell can also be found by scrolling to the cell.
    * Follow the instructions of the error message to resolve the issue.

***

### Notes

* This program only reads the _first sheet_ of an Excel file. Make sure your data is on the first sheet.
* The program automatically removes any empty columns.
* If `Skip Rows` is not specified, the script will not skip any rows.
* Column names are automatically renamed to a Databricks-compatible format. Specifically, the program:
    * Changes the name to all uppercase.
    * Replaces spaces, slashes, and periods with underscores.
    * Removes other invalid characters as defined by [Databricks](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-names).
* Excel files are processed in-memory while CSV files have a distributed processing.
    * Under normal use cases, there should not be an issue with processing excel files in-memory.  

***

Created by Noel Mrowiec  
noel.j.mrowiec2.ctr@us.navy.mil  
10/1/202

In [0]:
pip install openpyxl

Python interpreter will be restarted.
Looking in indexes: http://mirror.vdms.uot.local/simple/, https://nexus.advana.data.mil/repository/pypi-all/simple/
Python interpreter will be restarted.


In [0]:
def install_widgets():
  dbutils.widgets.text('Key (from Advana EDL)', "")
  dbutils.widgets.text('Target Table Name','')
  dbutils.widgets.dropdown('Target Schema', 'jup_fmc_restricted_workspace', ['jup_fmc_restricted_workspace'])
  dbutils.widgets.dropdown('Overwrite or Append', "Append", ['Overwrite', 'Append'])
  dbutils.widgets.text('Skip Rows', '0')
  dbutils.widgets.dropdown('Region','us-gov-west-1', ['us-gov-west-1'])

install_widgets()

In [0]:
import os 

def is_valid_extension(filename : str) -> bool:
    name, extension = os.path.splitext(filename)
    if extension.lower() in ['.xlsx']:
        return True
    elif extension.lower() == '.csv':
        return True
    else: 
        return False
    
def is_correct_cluster(key: str) -> bool:
    if "non-sensitive" in key:
        return False
    else:
        return True

def get_s3_path_from_key(key : str) -> str:
    # Check that the key is Excel or CSV file and not on the incorrect cluster
    if not is_valid_extension(key):
        raise Exception(f"Invalid key: {key}.\nKey must have a valid extension (.csv, .xls, or .xlsx.)")
    elif not is_correct_cluster(key):
        raise Exception(f"Invalid cluster: {key}.\nData located on the incorrect cluster. Make sure you upload data to jup-fmc-r.")

    idx_path_start = key.find("builder")
    if idx_path_start < 0:
        # doens't have builder keyword, so either fix the issue, or prompt for a new key
        raise Exception(f"Invalid key: {key}.\nPlease check the key field.")
    
    s3_path = key[idx_path_start:]
    print(f's3 path: {s3_path}')

    return s3_path

In [0]:
import pandas as pd
import re

def clean_names(column_name:str, to_upper:bool=True):
    """
    Cleans a string to be used as a valid and standardized column name.
    It removes invalid characters, replaces spaces with underscores,
    truncates to a maximum length, and converts the name to uppercase.
    
    Args:
        column_name (str): The original column name as a string.
        to_upper (bool): Makes the name uppercase, if true.

    Returns:
        str: The cleaned and standardized column name.
    """
    column_name = column_name.strip()

    if len(column_name) > 255: 
        # max of 255 characters
        column_name = column_name[:255]

    cleaned_name = re.sub(r'[,;{}()\n\t=\x00-\x1F\x7F]', '', column_name)
    cleaned_name = re.sub(r'[ ./-]', '_', cleaned_name)
    
    cleaned_name = cleaned_name.upper() if to_upper else cleaned_name
    
    return cleaned_name



In [0]:
import boto3
from pyspark.sql import SparkSession
from botocore.exceptions import ClientError

def get_data_from_s3(s3_key:str, region:str, target_schema:str, rows_to_skip:int=0, file_type:str="Excel", s3_bucket='advana-restricted-data-zone'):
    """
    Reads a file (Excel or CSV) from an S3 bucket, cleans it using Pandas,
    and converts it into a Spark DataFrame for further processing.

    This function is designed for scenarios where cluster libraries cannot be
    installed, such as when dealing with small to medium-sized files that can be
    read into the driver's memory. The process involves:
    1. Reading the file directly from S3 into memory using `boto3`.
    2. Using `pandas` to load the in-memory byte stream into a DataFrame.
    3. Performing basic data cleaning, such as dropping empty columns and
       standardizing column names.
    4. Converting the cleaned Pandas DataFrame into a PySpark DataFrame to
       leverage distributed processing capabilities for subsequent steps.

    Args:
        s3_key (str): The full path to the file within the S3 bucket.
        region (str): The AWS region where the S3 bucket is located.
        rows_to_skip (int): The number of rows to skip at the beginning of the file.
            This is useful for files with headers or metadata before the actual data.
        file_type (str): The type of file to be processed. Supports "Excel" or "CSV".
        s3_bucket (str): The name of the S3 bucket. Defaults to 'advana-restricted-data-zone'.

    Returns:
        pyspark.sql.DataFrame: A PySpark DataFrame containing the cleaned data.

    Raises:
        Exception: If the specified file type is not 'Excel' or 'CSV'.
        ClientError: If there's an AWS-related error, such as 'NoSuchKey', which
            indicates the file was not found.
    """

    print(f"Starting job for S3 file: s3://{s3_bucket}/{s3_key}")
    
    try:
        # Connect to the S3 service
        s3 = boto3.client('s3', region_name=region)
        
        # Get the file object from S3
        obj = s3.get_object(Bucket=s3_bucket, Key=s3_key)
        
        # Read the entire file's byte stream into memory.
        file_content = obj['Body'].read()
        
        # Now, pass the in-memory bytes object to pandas.

        pandas_df = None
        if file_type.lower() == "excel":
            pandas_df = pd.read_excel(file_content, header=rows_to_skip, sheet_name=0, engine='openpyxl') 
        elif file_type.lower() == "csv":
            pandas_df = pd.read_csv(io.BytesIO(file_content), skiprows=rows_to_skip, engine="python")
        else:  
            raise Exception("Wrong file type. Only Excel or CSV files can be loaded")
        
        #print("Successfully read Excel file into a Pandas DataFrame.")
        
        return pandas_df

    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchKey':
            raise Exception(f'''This error can be caused by one of the following issues:\n
    Incorrect Key: The provided key is incorrect. Please verify that it exactly matches the key from the Advana EDL site.
    Incorrect Cluster: The data was uploaded to the wrong cluster. Ensure the cluster selected during the upload contains the {target_schema} schema.
    Server Overload: The Advana EDL server may be temporarily overloaded due to high traffic. Please wait a few minutes and try again. (Note: during high traffic, it can take hours for the data to be loaded.)''')

        else:
            # For any other client error, re-raise the exception
            raise Exception("An unexpected error occurred {e}.")
                
    except Exception as e:
        raise Exception(f"An unexpected error occurred {e}.")

def clean_and_prep_dataframe(pandas_df):
    try:
        # Perform cleaning in Pandas.
        # Drop null/blank columns and rename columns
        pandas_df_cleaned = pandas_df.dropna(axis='columns', how='all')
        pandas_df_cleaned = pandas_df_cleaned.rename(columns=clean_names)
        #print("Cleaned Pandas DataFrame by dropping empty columns.")
        print(f'Columns of loaded data:\n{pandas_df_cleaned.columns}')

        # Convert the cleaned Pandas DataFrame to a PySpark DataFrame.
        # This step parallelizes the data for distributed writing.
        spark_df = spark.createDataFrame(pandas_df_cleaned)
    except TypeError as e:
        raise TypeError("Data type is incorrect. Recheck your data. Check that your data has column names. Ensure you are skipping the correct number of rows.")


    return spark_df



In [0]:
def to_database(spark_df, mode: str, target_schema: str, target_table: str):
    # Overwrite or append the Delta table using Spark.
    if mode == "Append":
        (spark_df.write
            .format('delta')
            .mode("append")
            .option("delta.columnMapping.mode","name")
            .saveAsTable(f"{target_schema}.{target_table}")
        )
        print(f"Successfully appended Delta Table: {target_schema}.{target_table}")
    elif mode == "Overwrite":
        (spark_df.write
            .format('delta')
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .saveAsTable(f"{target_schema}.{target_table}")
        )
        print(f"Successfully overwrote Delta Table: {target_schema}.{target_table}")
    else:
        raise Exception("Selection error: Only overwrite or append allowed.")

    

In [0]:
key=dbutils.widgets.get('Key (from Advana EDL)').strip()
if not key: raise Exception("Error: `Key (from Advana EDL)` must not be blank")
target_tablename=dbutils.widgets.get('Target Table Name')
if not target_tablename: raise Exception("Error: `Target Table Name` must not be blank")
target_tablename=clean_names(target_tablename, to_upper=False)
target_schema=dbutils.widgets.get('Target Schema')
ua=dbutils.widgets.get('Overwrite or Append')
skiprows=dbutils.widgets.get('Skip Rows')
skiprows = 0 if not skiprows else int(skiprows)
region=dbutils.widgets.get('Region')

path = get_s3_path_from_key(key)
print(f'key={key}, target_tablename={target_tablename}, target_schema={target_schema},ua={ua}, region={region}, skiprows={skiprows}')

[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
[0;32m<command-3432481934381189>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0mkey[0m[0;34m=[0m[0mdbutils[0m[0;34m.[0m[0mwidgets[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'Key (from Advana EDL)'[0m[0;34m)[0m[0;34m.[0m[0mstrip[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0;32mif[0m [0;32mnot[0m [0mkey[0m[0;34m:[0m [0;32mraise[0m [0mException[0m[0;34m([0m[0;34m"Error: `Key (from Advana EDL)` must not be blank"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m [0mtarget_tablename[0m[0;34m=[0m[0mdbutils[0m[0;34m.[0m[0mwidgets[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m'Target Table Name'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;32mif[0m [0;32mnot[0m [0mtarget_tablename[0m[0;34m:[0

In [0]:

pandas_df = get_data_from_s3(path, region, target_schema, rows_to_skip=skiprows, file_type="Excel")
spark_df = clean_and_prep_dataframe(pandas_df)
to_database(spark_df, mode=ua, target_schema=target_schema, target_table=target_tablename)



In [0]:
dbutils.widgets.removeAll()



In [0]:
install_widgets()



In [0]:
df=sql(f"select * from {target_schema}.{target_tablename}")
display(df)



In [0]:
print(f"""✅ File was successfully loaded to table:
      Key: {key} 
      Schema and table name:  {target_schema}.{target_tablename}""")

