## Notebook Setup

In [0]:
## Package Imports

import requests
import os
import json
import base64

In [0]:
# Enter Databricks account hash here:
db_account_hash = ''

# Enter Databricks Provider Name here:
provider_name = ''

# Enter Contact Email here:
business_contact_email = ''

# Enter TOS Link here:
terms_of_service_link = ''

# Enter Privacy Policy Link here:
privacy_policy_link = ''

In [0]:
# Retrieve the API URL associated with the notebook's context, if it exists
host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)

# Retrieve the API token associated with the notebook's context, if it exists
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

# Retrieve the user associated with the notebook's context, if it exists
user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("user").getOrElse(None)

# Retrieve the workspace ID associated with the notebook's context, if it exists
workspace_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("orgId").getOrElse(None)

# Convert the notebook's context to a dictionary
ctx = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())

# Extract the API URL from the 'extraContext' key of the notebook's context dictionary
host_name = ctx['extraContext']['api_url']

# Extract the API token from the 'extraContext' key of the notebook's context dictionary
host_token = ctx['extraContext']['api_token']

# Extract the notebook path from the 'extraContext' key of the notebook's context dictionary
notebook_path = ctx['extraContext']['notebook_path']

# Set Deployment Name:
deployment_name = db_account_hash

In [0]:
## Get all Marketplace listings
get_listings_url = f"https://{db_account_hash}.cloud.databricks.com/api/2.0/marketplace-provider/listings"
response = requests.get(
          get_listings_url,
          headers={"Authorization": "Bearer " + token})

r_json = response.json()
r_json = [listing for listing in r_json['listings']]

print(f"API returned {len(response.json()['listings'])} listing(s)")

In [0]:
## For each share, find number of tables in share and add this number to this listing metadata to determine what type of Pandas Profiling to use
def get_num_tables_in_share(share_name):
  
  df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
  num_tables = df_tables.count()
  
  return num_tables

for listing in r_json:
  try:
    listing['summary']['share']['num_tables'] = get_num_tables_in_share(listing['summary']['share']['name'])
  except:
    print(f"No share name: {listing['summary']['name']}")

## Notebook Options
This notebook contains four kinds of notebooks that can be generated, depending on how many tables are in the share, how large they are, etc.
- **Pandas Profiling [Full]** - Applies full Pandas Profiling. Good for most shares.
- **Pandas Profiling [Minimal]** - Applies a smaller version of Pandas Profiling. Good for shares with many tables, or large datasets with many columns/rows.
- **Time-Series** - Gives a time-series plot for each numeric column, along with a scatter-matrix in some cases. Great for time-series datasets.
- **Simple** - Only provides sample data and summary statistics for each table. Good for shares with very many tables.

In [0]:
def pandas_profiling_notebook(listing_name, listing_subtitle, share_name, table_names=[]):
  
  def get_table_commands(table_name):
    table_model = f"""  

# COMMAND ----------
# DBTITLE 1, Table Name: {table_name}

get_table_report("{table_name}")"""
    return table_model
  table_block = ""
  
  for table_name in table_names:
    table_block += get_table_commands(table_name)
    
  model_notebook = f"""
# Databricks notebook source
# MAGIC %md
# MAGIC # {listing_name}
# MAGIC {listing_subtitle}

# COMMAND ----------

# DBTITLE 1,Tables Available in this Share
share_name = "{share_name}"
df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
print(f"This share contains {len(table_names)} table(s)")

# COMMAND ----------

# DBTITLE 1,Show all Tables available in this Share
display(df_tables.select("name"))

# COMMAND ----------
# MAGIC %md
# MAGIC # Data Exploration

# COMMAND
def get_table_report(table_name):

  from ydata_profiling import ProfileReport
  df = sqlContext.sql(f"select * from {{table_name}}").toPandas()
  displayHTML(ProfileReport(df).html)

{table_block}

# COMMAND ----------
# Ignore if notebook is not being run as a job
import json
dbutils.notebook.exit(json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())['currentRunId']['id'])"""
  return model_notebook

In [0]:
def pandas_minimal_notebook(listing_name, listing_subtitle, share_name, table_names=[]):
  
  def get_table_commands(table_name):
    table_model = f"""  

# COMMAND ----------
# DBTITLE 1, Table Name: {table_name}

get_table_report("{table_name}")"""
    return table_model
  table_block = ""
  
  for table_name in table_names:
    table_block += get_table_commands(table_name)
    
  model_notebook = f"""
# Databricks notebook source
# MAGIC %md
# MAGIC # {listing_name}
# MAGIC {listing_subtitle}

# COMMAND ----------

# DBTITLE 1,Tables Available in this Share
share_name = "{share_name}"
df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
print(f"This share contains {len(table_names)} table(s)")

# COMMAND ----------

# DBTITLE 1,Show all Tables available in this Share
display(df_tables.select("name"))

# COMMAND ----------
# MAGIC %md
# MAGIC # Data Exploration

# COMMAND
def get_table_report(table_name):

  from ydata_profiling import ProfileReport
  df = sqlContext.sql(f"select * from {{table_name}}").toPandas()
  df = df.iloc[:,:10].sample(min(len(df),10000))
  displayHTML(ProfileReport(df,minimal=True).html)

{table_block}

# COMMAND ----------
# Ignore if notebook is not being run as a job
import json
dbutils.notebook.exit(json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())['currentRunId']['id'])"""
  return model_notebook

In [0]:
def time_series_notebook(listing_name, listing_subtitle, share_name, table_names=[]):
  
  def get_table_commands(table_name):
    table_model = f"""  

# COMMAND ----------
# DBTITLE 1, Table Name: {table_name}

get_table_report("{table_name}")"""
    return table_model
  table_block = ""
  
  for table_name in table_names:
    table_block += get_table_commands(table_name)
    
  model_notebook = f"""
# Databricks notebook source
# MAGIC %md
# MAGIC # {listing_name}
# MAGIC {listing_subtitle}

# COMMAND ----------

# DBTITLE 1,Tables Available in this Share
share_name = "{share_name}"
df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
print(f"This share contains {len(table_names)} table(s)")

# COMMAND ----------

# DBTITLE 1,Show all Tables available in this Share
display(df_tables.select("name"))

# COMMAND ----------
# MAGIC %md
# MAGIC # Data Exploration

# COMMAND

import pandas as pd
import re
import plotly.express as px

def get_time_series_plots(df,date_columns,num_columns):
  if len(date_columns) >= 1:
    date_column = date_columns[0]
    df[date_column] = pd.to_datetime(df[date_column])
    for col in num_columns:
        if df[col].isna().mean() < 0.5:
          fig = px.area(df, x=date_column, y=col)
          fig.update_layout(title='Time Series Plot for ' + col, xaxis_title=date_column, yaxis_title=col, 
                            font=dict(size=12))
          fig.update_layout(yaxis_range=[0,1.5*df[col].max()])
          fig.show()
  else:
    print('No datetime column found')
      
def get_scatter_matrix(df,num_columns):
  if len(num_columns) > 1:
    sm = px.scatter_matrix(df[num_columns])
    sm.update_layout(title='Scatter Matrix Plot',
                        font=dict(size=12))
    sm.show()

def get_table_report(table_name):
  df = sqlContext.sql(f"select * from {{table_name}}").toPandas()
  try:
    print('Sample Data')
    display(df.head(5))
  except:
    print('Sample Data cannot be displayed')  
  date_columns = df.filter(regex=re.compile(r'.*(date|year|day|week|time).*', re.IGNORECASE)).columns
  num_columns = [col for col in df.columns if col not in date_columns]
  for col in num_columns:
    df[col] = pd.to_numeric(df[col],errors='coerce')   
  get_scatter_matrix(df,num_columns)
  get_time_series_plots(df,date_columns,num_columns)

{table_block}

# COMMAND ----------
# Ignore if notebook is not being run as a job
import json
dbutils.notebook.exit(json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())['currentRunId']['id'])"""
  return model_notebook

In [0]:
def simple_notebook(listing_name, listing_subtitle, share_name, table_names=[]):
  
  def get_table_commands(table_name):
    table_model = f"""  

# COMMAND ----------
# DBTITLE 1, Table Name: {table_name}

get_table_report("{table_name}")"""
    return table_model
  table_block = ""
  
  for table_name in table_names:
    table_block += get_table_commands(table_name)
    
  model_notebook = f"""
# Databricks notebook source
# MAGIC %md
# MAGIC # {listing_name}
# MAGIC {listing_subtitle}

# COMMAND ----------

# DBTITLE 1,Tables Available in this Share
share_name = "{share_name}"
df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
print(f"This share contains {len(table_names)} table(s)")

# COMMAND ----------

# DBTITLE 1,Show all Tables available in this Share
display(df_tables.select("name"))

# COMMAND ----------
# MAGIC %md
# MAGIC # Data Exploration

# COMMAND
def get_table_report(table_name):

  df = sqlContext.sql(f"select * from {{table_name}}").toPandas()
  
  try:
    print('Sample Data')
    display(df.head(5))
  except:
    print('Sample Data cannot be displayed')
  
  try:
    print('\\n\\nSummary Data')
    display(df.describe().reset_index())
  except:
    print('Summary Data cannot be displayed')

{table_block}

# COMMAND ----------
# Ignore if notebook is not being run as a job
import json
dbutils.notebook.exit(json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())['currentRunId']['id'])"""
  return model_notebook

In [0]:
def generate_and_run_notebook(listing,notebook_type):
  """
  Generate and run a new Databricks notebook based on the specified Marketplace listing and notebook type.

  Args:
  - listing: A JSON object representing a Marketplace listing.
  - notebook_type: A string indicating the type of notebook to generate. Supported values are:
      - 'Pandas Profiling [Full]'
      - 'Pandas Profiling [Minimal]'
      - 'Time-Series'
      - 'Simple'

  Returns:
  - The notebook_run_id of the generated notebook to be attached to the listing.
  """
    
  l = listing

  # GET ALL THE TABLES ASSOCIATED TO THIS PARTICULAR LISTING/SHARE
  listing_id = l['id']
  listing_name = l['summary']['name']
  listing_subtitle = l['summary']['subtitle']
  if 'share' not in l['summary']:
    print(f"Skipping - No Share: {listing_name}")
    return
  share_name = l['summary']['share']['name']
  status = l['summary']['status']
  print(f"Working on id={listing_id} name={listing_name}")

  df_tables = spark.sql(f"SHOW ALL IN SHARE {share_name}")
  t_names = [r.shared_object for r in df_tables.select("shared_object").collect()]

  notebook_name = f'{listing_id}_{share_name}'
  new_path = os.path.join(os.path.dirname(notebook_path), notebook_name)

  if notebook_type == 'Pandas Profiling [Full]':
    content = pandas_profiling_notebook(listing_name, listing_subtitle, share_name, t_names)
  if notebook_type == 'Pandas Profiling [Minimal]':
    content = pandas_minimal_notebook(listing_name, listing_subtitle, share_name, t_names)
  if notebook_type == 'Time-Series':
    content = time_series_notebook(listing_name, listing_subtitle, share_name, t_names)
  elif notebook_type == 'Simple':
    content = simple_notebook(listing_name, listing_subtitle, share_name, t_names)

  data = {
    "content": base64.b64encode(content.encode("utf-8")).decode('ascii'),
    "path": new_path,
    "language": "PYTHON",
    "overwrite": True,
    "format": "SOURCE"
  }

  response_created_notebook = requests.post(
      f'{host_name}/api/2.0/workspace/import',
      headers={'Authorization': f'Bearer {host_token}'},
      json = data
    ).json()

  print(f"{response_created_notebook} - Created for {new_path}.. Running {notebook_name}")

  ex = dbutils.notebook.run(notebook_name,timeout_seconds=600)
  return ex

In [0]:
def upload_notebook_to_marketplace(listing_id,notebook_run_id,handle_existing_notebook='replace'):
  """
  Uploads a notebook to a Databricks Marketplace listing.

  Args:
  - listing_id: the ID of the listing to upload the notebook to
  - notebook_run_id: the ID of the notebook run that generated the HTML content to upload (from generate_and_run_notebook)
  - handle_existing_notebook: what to do if the listing already has a notebook attached to it
      - 'replace': replace the existing notebook with the new one (default)
      - 'skip': skip the upload and do nothing
      - 'append': add as a second notebook

  Returns: None. Attaches given notebook to given listing.
  """
  individual_listing_url = f"https://{db_account_hash}.cloud.databricks.com/api/2.0/marketplace-provider/listings/{listing_id}"
  get_listing_r = requests.get(individual_listing_url, headers={'Authorization': f'Bearer {host_token}'})
  payload = get_listing_r.json()
  cur_files = payload["listing"]["detail"].get("embedded_notebook_file_infos", [])
  
  if len(cur_files) > 0:
    if handle_existing_notebook == 'skip':
      print(f"Skipping {listing_id} bc it has a notebook already attached to it")
      return
  
  run_id = notebook_run_id
  run_url = f"https://{db_account_hash}.cloud.databricks.com/api/2.0/jobs/runs/export?run_id={run_id}"
  run_response = requests.get(run_url, headers={'Authorization': f'Bearer {host_token}'})
  html_content = run_response.json()['views'][0]['content']
  print(f"Successfully extracted HTML content for run_id={run_id} and listing_id={listing_id}")
  
  file_url = f"https://{db_account_hash}.cloud.databricks.com/api/2.0/marketplace-provider/files"
  file_body = {
      "marketplace_file_type": "EMBEDDED_NOTEBOOK",
      "file_parent": {
          "parent_id": listing_id,
          "file_parent_type": "LISTING"
      },
      "mime_type": "text/html"
  }
  file_r  = requests.post(file_url, headers={'Authorization': f'Bearer {host_token}'}, json=file_body)
  file_r_json = file_r.json()
  file_upload_url = file_r_json['upload_url']
  file_info_id = file_r_json['file_info']['id']
  print(f"Successfully created a file for listing_id={listing_id} file_info_id={file_info_id}")
  
  # given the pre-signed URL, upload the HTML content to it
  with open(f"{listing_id}.html", 'w') as fa:
    fa.write(html_content)
    r_put_html = requests.put(file_upload_url, data=open(f"{listing_id}.html",'rb').read(), headers={"x-amz-server-side-encryption": "AES256", 'Content-Type': 'text/html'})
    print(f"Received {r_put_html} from PUT to S3 pre-signed URL")
    if int(r_put_html.status_code) != 200:
      print(r_put_html.text)
      
  # Final step: Handle the response of the PUT, so we can attach the file to the Listing
  if handle_existing_notebook == 'append':
    cur_files.append({"id": file_info_id})
  else:
    cur_files = [{"id": file_info_id}]
  payload["listing"]["detail"]["embedded_notebook_file_infos"] = cur_files
  
  payload["listing"]["summary"]["provider_info"]["name"] = provider_name
  payload["listing"]["deployment_name"] = deployment_name
  payload['listing']['summary']['provider_info']['business_contact_email'] = business_contact_email
  payload['listing']['summary']['provider_info']['term_of_service_link'] = terms_of_service_link
  payload['listing']['summary']['provider_info']['privacy_policy_link'] = privacy_policy_link
  new_l = len(payload["listing"]["detail"]["embedded_notebook_file_infos"])
  print(f"New length of embedded_notebook_file_infos -> {new_l}")
  j_payload = json.dumps(payload)
  r = requests.put(individual_listing_url, data=j_payload, headers={'Authorization': f'Bearer {host_token}'})
  print(f"Received {r} from PUT to update listing_id={listing_id}")
  print('-------------------------------------------')
  if int(r.status_code) != 200:
      print(r.text)

In [0]:
def create_notebook_for_listing(listing,notebook_type,handle_existing_notebook='replace'):
  """
  Generates, runs, and attaches a notebook to a listing.

  Args:
  - listing: A JSON object representing a Marketplace listing.
  - notebook_type: A string indicating the type of notebook to generate. Supported values are:
      - 'Pandas Profiling [Full]'
      - 'Pandas Profiling [Minimal]'
      - 'Time-Series'
      - 'Simple'
  - handle_existing_notebook: what to do if the listing already has a notebook attached to it
      - 'replace': replace the existing notebook with the new one (default)
      - 'skip': skip the upload and do nothing
      - 'append': add as an additional notebook

  Returns:
  - None. Generates, runs, and attaches notebook to DBM listing.
  """
  
  listing_id = listing['id']
  
  try:
    notebook_run_id = generate_and_run_notebook(listing, notebook_type)
  except:
    print(f"Notebook run failed for listing_id={listing_id}")
  
  try:
    
    upload_notebook_to_marketplace(listing_id,notebook_run_id)
  except:
    print(f"Notebook upload failed for listing_id={listing_id}")

## Create and Attach Notebooks
######To create, run, and attach notebooks to listings:
- run 'create_notebook_for_listing' for each listing in r_json
- Decide for each listing what method you would like to use 
  - 'Pandas Profiling [Full]' 
  - 'Pandas Profiling [Minimal]'
  - 'Time-Series'
  - 'Simple'
- Decide for each listing how you would like to handle listings with existing notebooks
  - 'replace'
  - 'skip'
  - 'append'

In [0]:
create_notebook_for_listing(listing,notebook_type,handle_existing_notebook)