# Inventory Collector
Collects data on database objects (tables and views) as well as grants on those objects.
Saves all data to a delta table

# Initialization
You will have to run both of these code cells in this section each time you reconnect to the cluster.

Note: The usage of "from ... import" works expects a single .py file, as included from github.
If you are not using github repos, create a notebook with the DbInventoryCollector.py file's contents in it, and change this line to read:

```%run ./DB-Inventory-Collector```

In [None]:
from DbInventoryCollector import InventoryCollector

In [None]:
from pyspark.sql.functions import *

#Create Widgets
InventoryCollector.CreateWidgets(dbutils, spark, reset=False)

#Instantiate and initialize collector class
collector = InventoryCollector(spark, dbutils.widgets.get("Inventory_Catalog"), dbutils.widgets.get("Inventory_Database"))
collector.initialize()

#This pulls out the widget values to a python variable.
#Paste these lines into a cell to enable automatic execution on widget change
destCatalog = dbutils.widgets.get("Migration_Catalog")
whichCatalog = dbutils.widgets.get("Scan_Catalog")
sourceDatabase = dbutils.widgets.get("Scan_Database")

# Scanning Databases
Generally you will first run these scan functions to record what objects exist.

## Scan of a single database
Note that there are two types of scans : objects and grants.
Each returns a pair of data: the execution id and the dataframe holding the scanned results.
All past scans are saved to an append only table. The execution_id can help you retrieve the scan as of a certain time.

With the widget code, this cell will automatically be re-run when you change the dropdown at the top.

In [None]:
(exec_id_objects, objectDF) = collector.scan_database_objects(whichCatalog, sourceDatabase)
print(f"Finished scanning objects for {whichCatalog}.{sourceDatabase}. ObjectExId: {exec_id_objects} ")
display(objectDF)

In [None]:
(exec_id_grants, grantDF) = collector.scan_database_grants(whichCatalog, sourceDatabase)
print(f"Finished scanning grants for {whichCatalog}.{sourceDatabase}. GrantExId: {exec_id_grants}")
display(grantDF)

## Scan All Catalog Functions
Note: this is a bit of a WIP

In [None]:
collector.scan_catalog_functions(whichCatalog)

## Scan All Databases in Catalog
Automatically list and scan all databases.

**Parameters:**
*rescan* -- If true, will re-scan a database even if inventory data already exists for it. If false, duplicate databases will be skipped. Default: False

In [None]:
# collector.scan_all_databases(whichCatalog, rescan = False)

# Results Inspection

## Summary of past executions

In [None]:
display(collector.get_execution_history())

## Summary of all databases

In [None]:
dbSummary = collector.get_database_inventory_summary(whichCatalog)
display(dbSummary)

## Inspect Single Database Results
There are two types of results stored. "grants" and "objects".

In [None]:
#the "grants" result type lists out each non-inherited grant on the database and its tables and views.
db_grants = collector.get_last_results('grants', whichCatalog, sourceDatabase)
display(db_grants)

In [None]:
#the "objects" lists out each table and view, along with its type (managed, external, or view). If there was an error retrieving details, the error is stored. For a view the DDL is saved too.
db_objects = collector.get_last_results('objects', whichCatalog, sourceDatabase)
display(db_objects)

In [None]:
#You can futher aggregate the results as well
display(db_objects.groupBy('objectType').count())

## Look at most recent collected grants using SQL

In [None]:
%sql
WITH ranked_grants AS (
  SELECT *,
    RANK() OVER (PARTITION BY source_database ORDER BY execution_time DESC) as rank
  FROM hive_metastore.databricks_inventory.grant_statements
)
SELECT ObjectType, ActionType, ObjectKey, Principal, grant_statement
FROM ranked_grants
WHERE rank = 1
order by source_database, ObjectType, ObjectKey

## Compare DEV and HIVE catalogs for missing tables

In [None]:
# Run summary of both catalogs and save to variables
dbSummary_hive = collector.get_database_inventory_summary("hive_metastore")
dbSummary_dev = collector.get_database_inventory_summary("dev")

In [None]:
dbSummary_hive_prefixed = dbSummary_hive.select([col(colName).alias(f'hive_{colName}') for colName in dbSummary_hive.columns])
dbSummary_dev_prefixed = dbSummary_dev.select([col(colName).alias(f'dev_{colName}') for colName in dbSummary_dev.columns])
summary_compare = dbSummary_hive_prefixed.join(dbSummary_dev_prefixed, dbSummary_hive_prefixed['hive_database'] == dbSummary_dev_prefixed['dev_database'], how = "full_outer")

#display(summary_compare)

In [None]:
compare_short = summary_compare.select(

'hive_database',
"hive_object_last_execution_id", "dev_object_last_execution_id",
'HIVE_ERROR', 'DEV_ERROR', (col('HIVE_ERROR') - col('DEV_ERROR')).alias("DIFF_ERROR"),
'HIVE_EXTERNAL', 'DEV_EXTERNAL', (col('HIVE_EXTERNAL') - col('DEV_EXTERNAL')).alias("DIFF_EXTERNAL"),
'HIVE_MANAGED', 'DEV_MANAGED', (col('HIVE_MANAGED') - col('DEV_MANAGED')).alias("DIFF_MANAGED"),
'HIVE_VIEW', 'DEV_VIEW', (col('HIVE_VIEW') - col('DEV_VIEW')).alias("DIFF_VIEW")
)


display(compare_short.filter("DIFF_ERROR != 0 OR DIFF_EXTERNAL != 0 OR DIFF_MANAGED != 0 OR DIFF_VIEW != 0"))

## Pull list of Source Tables with Scan Errors
These tables won't be attempted to migrate
There are two common errors here seen so far:
1. External location does not exist
2. 

In [None]:
allErrors = None
for exec_id in [r.hive_object_last_execution_id for r in compare_short.filter("HIVE_ERROR > 0").collect()]:
    newErrors = collector.get_results_by_execution_id(exec_id).select(["source_catalog", "source_database", "table", "errMsg"]).filter('objectType == "ERROR"')
    if allErrors is None:
        allErrors = newErrors
    else:
        allErrors = allErrors.union(newErrors)
display(allErrors)

In [None]:
allHiveObjects = None
for exec_id in [r.hive_object_last_execution_id for r in compare_short.filter("DIFF_EXTERNAL > 0 OR DIFF_MANAGED > 0 OR DIFF_VIEW > 0").collect()]:
    newMissing = collector.get_results_by_execution_id(exec_id).select(["source_catalog", "source_database", "objectType", "table"]).filter('objectType != "ERROR"')
    if allHiveObjects is None:
        allHiveObjects = newMissing
    else:
        allHiveObjects = allHiveObjects.union(newMissing)
display(allHiveObjects)


allDestObjects = None
for exec_id in [r.hive_object_last_execution_id for r in compare_short.filter("DIFF_EXTERNAL > 0 OR DIFF_MANAGED > 0 OR DIFF_VIEW > 0").collect()]:
    newMissing = collector.get_results_by_execution_id(exec_id).select(["source_catalog", "source_database", "objectType", "table"]).filter('objectType != "ERROR"')
    if allDestObjects is None:
        allDestObjects = newMissing
    else:
        allDestObjects = allDestObjects.union(newMissing)
display(allDestObjects)

In [None]:
collector.setStorageCatalog()

allGrants_hive = spark.sql(f"""
WITH ranked_grants AS (
  SELECT *,
    RANK() OVER (PARTITION BY source_catalog, source_database ORDER BY execution_time DESC) as rank
  FROM {collector.inventory_database}.grant_statements)
SELECT source_database, ObjectType, ActionType, ObjectKey, Principal, grant_statement
FROM ranked_grants
WHERE source_catalog = "hive_metastore" AND rank = 1
order by source_database, ObjectType, ObjectKey
""")

allGrants_dest = spark.sql(f"""
WITH ranked_grants AS (
  SELECT *,
    RANK() OVER (PARTITION BY source_catalog, source_database ORDER BY execution_time DESC) as rank
  FROM {collector.inventory_database}.grant_statements)
SELECT source_database, ObjectType, ActionType, ObjectKey, Principal, grant_statement
FROM ranked_grants
WHERE source_catalog = "{destCatalog}" AND rank = 1
order by source_database, ObjectType, ObjectKey
""")

joinColumns = ["source_database", "Principal", "ActionType", "ObjectType", "ObjectKey"]

allGrants_hive = allGrants_hive.select("source_database", "Principal", "ObjectType", "ObjectKey",
when(col("ActionType") == "USAGE", "USE SCHEMA")
.otherwise(col("ActionType")).alias("ActionType")
).filter('ActionType != "READ_METADATA" AND ActionType != "OWN"')
#.when(col("ActionType") == "OWN", "ownership") #Note: to add Just chain these after other .when

grantCompare_both = allGrants_hive.join(allGrants_dest, joinColumns, how = "inner")
grantCompare_hiveOnly = allGrants_hive.join(allGrants_dest, joinColumns, how = "left_anti")
grantCompare_destOnly = allGrants_dest.join(allGrants_hive, joinColumns, how = "left_anti")


In [None]:
display(grantCompare_both)

In [None]:
display(grantCompare_hiveOnly)

In [None]:
display(grantCompare_hiveOnly.groupBy("ActionType").count())
#usage -- rename
#select -- investigate
#own --check table properties.
#read metadata -- ignore

In [None]:
display(grantCompare_devOnly)

In [None]:

allGrants_hive = spark.sql(f"""
WITH ranked_grants AS (
  SELECT *,
    RANK() OVER (PARTITION BY source_database ORDER BY execution_time DESC) as rank
  FROM {inventory_catdb}.grant_statements
SELECT ObjectType, ActionType, ObjectKey, Principal, grant_statement
FROM ranked_grants
WHERE source_catalog = "hive_metastore" AND rank = 1
order by source_database, ObjectType, ObjectKey
""")
                           
allGrants_dest = spark.sql(f"""
WITH ranked_grants AS (
  SELECT *,
    RANK() OVER (PARTITION BY source_database ORDER BY execution_time DESC) as rank
  FROM {inventory_catdb}.grant_statements
SELECT ObjectType, ActionType, ObjectKey, Principal, grant_statement
FROM ranked_grants
WHERE source_catalog = "{destCatalog}" AND rank = 1
order by source_database, ObjectType, ObjectKey
""")