In [0]:
## Initializations

In [0]:
DEBUG = False

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,IntegerType,BooleanType,DateType

In [0]:
def retrieve_workspace_instance_name():
  import os
  from urllib.parse import urlparse
  url = os.environ['DATABRICKS_HOST']
  host_dict = urlparse(url)
  return host_dict.hostname
def retrieve_spark_remote():
  import os
  return os.environ['SPARK_REMOTE']
def retrieve_token():
  import os
  return os.environ['DATABRICKS_TOKEN']
def retrieve_cluster_id():
  import os
  return os.environ['DATABRICKS_CLUSTER_ID']

In [0]:
if DEBUG:
  # By setting fields in builder.remote:
  from databricks.connect import DatabricksSession

  spark = DatabricksSession.builder.remote(
    f"sc://{retrieve_workspace_instance_name()}:443/;token={retrieve_token()};x-databricks-cluster-id={retrieve_cluster_id()}"
  ).getOrCreate()

In [0]:
dbutils.widgets.text("catalog", "mazda_bi20_nonprd_catalog", "Unity catalog name")
dbutils.widgets.text("schema", "LZ_MUM", "Oracle schema")
dbutils.widgets.text("table", "TUSER", "Oracle table")

In [0]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table = dbutils.widgets.get("table")

In [0]:
username = dbutils.secrets.get(scope = "ACC", key = "DWH_BI1__JDBC_USERNAME")
password = dbutils.secrets.get(scope = "ACC", key = "DWH_BI1__JDBC_PASSWORD")

assert username, "secret username not retrieved"
assert password, "secret password not retrieved"

hostName = "accdw-scan.mle.mazdaeur.com"
# hostName="10.230.2.32"
port = "1521"
databaseName = "ACC_DWH"

jdbcUrl = f"jdbc:oracle:thin:@//{hostName}:{port}/{databaseName}"
print(jdbcUrl)  

In [0]:
sql = f"""
SELECT OWNER, TABLE_NAME, NUM_ROWS, AVG_ROW_LEN
FROM all_tables 
WHERE owner LIKE 'LZ_%'
AND NUM_ROWS > 0
ORDER BY NUM_ROWS DESC
"""
df_tables = (spark.read
  .format("jdbc")
  .option("driver", 'oracle.jdbc.driver.OracleDriver')
  .option("url", jdbcUrl)
  .option("query", sql)
  .option("user", username)
  .option("password", password)
  .load()
  .withColumn("NUM_ROWS",col("NUM_ROWS").cast(IntegerType()))
)

display(df_tables)

In [0]:
sql = f"""
SELECT 
  tc.TABLE_NAME, tc.COLUMN_NAME, tc.DATA_TYPE, tc.NULLABLE, tc.NUM_NULLS, tc.NUM_DISTINCT, tc.DATA_DEFAULT, tc.AVG_COL_LEN, tc.CHAR_LENGTH,
  con.cons
FROM DBA_TAB_COLUMNS tc
left join
  ( select  listagg( cc.constraint_name, ',') within group (order by cc.constraint_name)  cons, 
         table_name, owner , column_name 
         from  DBA_CONS_COLUMNS cc 
          group by  table_name, owner , column_name ) con
  on con.table_name = tc.table_name and 
     con.owner = tc.owner and
     con.column_name = tc.column_name
where  tc.owner = 'LZ_MUM'
order by 1 ,2 
"""
df_schema = (spark.read
  .format("jdbc")
  .option("driver", 'oracle.jdbc.driver.OracleDriver')
  .option("url", jdbcUrl)
  .option("query", sql)
  .option("user", username)
  .option("password", password)
  .load()
  .withColumn("NUM_NULLS",col("NUM_NULLS").cast(IntegerType()))
  .withColumn("NUM_DISTINCT",col("NUM_DISTINCT").cast(IntegerType()))
  .withColumn("AVG_COL_LEN",col("AVG_COL_LEN").cast(IntegerType()))
  .withColumn("CHAR_LENGTH",col("CHAR_LENGTH").cast(IntegerType()))
)
display(df_schema)

In [0]:
df = (spark.read
  .format("jdbc")
  .option("driver", 'oracle.jdbc.driver.OracleDriver')
  .option("url", jdbcUrl)
  .option("dbtable", f"{schema}.{table}")
  .option("user", username)
  .option("password", password)
  .load()
)

In [0]:
display(df)

In [0]:
df.printSchema()

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
df.write.format("delta").mode('overwrite').saveAsTable(f"{catalog}.{schema}.{table}")