1. Upload files to s3 bucket folder and create a volume in databricks mounting on the same location
    - To create a external volume
    `CREATE EXTERNAL VOLUME if not exists bronze.datavault.ccm LOCATION 'S3 path'`
2. Create a managed volume and upload the files using databricks UI

In [0]:
# variables
dbutils.widgets.text("bronze_catalog", "bronze_sb")
dbutils.widgets.text("bronze_schema", "test3")
catalog_name = dbutils.widgets.get("bronze_catalog")
schema_name = dbutils.widgets.get("bronze_schema")
raw_volume_path = "/Volumes/bronze_sb/test3/raw_data/"

In [0]:
# volume has been created to store the raw files
spark.sql(f"create volume if not exists {catalog_name}.{schema_name}.raw_data")

DataFrame[]

In [0]:
# getting the metadata of the files
dataset_list = dbutils.fs.ls(raw_volume_path)
print(dataset_list)

[FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/fact.averagecosts.dlm', name='fact.averagecosts.dlm', size=29142959, modificationTime=1741510042000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/fact.transactions.dlm', name='fact.transactions.dlm', size=380631122, modificationTime=1741510141000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/hier.clnd.dlm', name='hier.clnd.dlm', size=276119, modificationTime=1741510035000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/hier.hldy.dlm', name='hier.hldy.dlm', size=545, modificationTime=1741510035000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/hier.invloc.dlm', name='hier.invloc.dlm', size=2702, modificationTime=1741510035000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/hier.invstatus.dlm', name='hier.invstatus.dlm', size=17004, modificationTime=1741510035000), FileInfo(path='dbfs:/Volumes/bronze_sb/test3/raw_data/hier.possite.dlm', name='hier.possite.dlm', size=5079, modificationTime=

In [0]:
# extracting the filepaths from the metadata collected
file_paths = [x.path[5:] for x in dataset_list]
print(file_paths)

['/Volumes/bronze_sb/test3/raw_data/fact.averagecosts.dlm', '/Volumes/bronze_sb/test3/raw_data/fact.transactions.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.clnd.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.hldy.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.invloc.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.invstatus.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.possite.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.pricestate.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.prod.dlm', '/Volumes/bronze_sb/test3/raw_data/hier.rtlloc.dlm']


In [0]:
'''
Data files have been uploaded using UI due to available resources but the following piece of code can be used to directly read from S3 cloud and proceed further.
1. Install the Hadoop-aws library on the compute
2. Set the AWS credentials in the Spark configuration
3. Read the files by providing full s3 paths
'''

# Set AWS credentials in Spark configuration
# spark.conf.set("spark.hadoop.fs.s3a.access.key", "YOUR_ACCESS_KEY")
# spark.conf.set("spark.hadoop.fs.s3a.secret.key", "YOUR_SECRET_KEY")
# spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")

# Now we can use df_from_file_path to load the data but s3 full path needs to be passed in this case like "s3a://raw_data_bucket/folders/file.dlm"

In [0]:
# function to load the raw data into lakehouse schema(bronze_sb) with the given dataframe
def df_load(target_catalog,schema_name,table_name,df,mode="overwrite"):
    # to avoid the table name containing any special characters
    table_name = table_name.replace("-","_").replace('.',"_")
    table = f"{target_catalog}.{schema_name}.{table_name}"
    # to create the managed table in the lakehouse with overwrite mode and blocksize of 64mb
    df.write.format("delta").mode(mode).option("overwriteSchema", "true").option("maxRecordsPerFile", 1000000).option("parquet.block.size", 64 * 1024 * 1024).saveAsTable(table)
    row_count = df.count()
    print(f"Table: {table} has been loaded with {row_count} rows")

# function to form the dataframe from the filepaths
def df_from_filepath(file_path, file_name):
    # to read the delimited file with inferring schema and header as true
    df = spark.read.option("header", "true").option("delimiter", "|").option("inferSchema", "true").csv(file_path)
    # calling the function to create the table in the staging schema
    df_load(catalog_name, schema_name, file_name, df)


In [0]:
# Loading data with b_ prefix indicating bronze representing raw data
for file in file_paths:
    file_name = file.split("/")[-1]
    print(f"ingesting {file_name}")
    df_from_filepath(file, file_name)

ingesting fact.averagecosts.dlm
Table: bronze_sb.test3.fact_averagecosts_dlm has been loaded with 740805 rows
ingesting fact.transactions.dlm
Table: bronze_sb.test3.fact_transactions_dlm has been loaded with 4503108 rows
ingesting hier.clnd.dlm
Table: bronze_sb.test3.hier_clnd_dlm has been loaded with 1820 rows
ingesting hier.hldy.dlm
Table: bronze_sb.test3.hier_hldy_dlm has been loaded with 19 rows
ingesting hier.invloc.dlm
Table: bronze_sb.test3.hier_invloc_dlm has been loaded with 85 rows
ingesting hier.invstatus.dlm
Table: bronze_sb.test3.hier_invstatus_dlm has been loaded with 245 rows
ingesting hier.possite.dlm
Table: bronze_sb.test3.hier_possite_dlm has been loaded with 90 rows
ingesting hier.pricestate.dlm
Table: bronze_sb.test3.hier_pricestate_dlm has been loaded with 4 rows
ingesting hier.prod.dlm
Table: bronze_sb.test3.hier_prod_dlm has been loaded with 1235 rows
ingesting hier.rtlloc.dlm
Table: bronze_sb.test3.hier_rtlloc_dlm has been loaded with 84 rows


In [0]:
def primary_key_check(table_name, primary_keys):
    non_null = False
    uniqueness = False
    nulls = 0
    for pk in primary_keys:
        nulls += spark.sql(f"select count(*) from {table_name} where {pk} is null").collect()[0][0]
    if nulls == 0:
        non_null = True
    keys = ','.join(primary_keys)
    uniqueness = spark.sql(f"select count(*) from {table_name} group by {keys} having count(*) > 1").collect()
    if len(uniqueness) == 0:
        uniqueness = True
    return non_null and uniqueness

def foreign_key_check(table_name1, table_name2, foreign_key):
    is_foreign = False
    fails = spark.sql(f"select count(*) from {table_name1} where {foreign_key} not in (select {foreign_key} from {table_name2})").collect()[0][0]
    if fails == 0:
        is_foreign = True
        print(f"Foreign keys {foreign_key} for {table_name1} and {table_name2} are satisfied")
    else:
        print(f"Foreign keys {foreign_key} for {table_name1} and {table_name2} are not satisfied")
    return is_foreign

In [0]:
# Primary keys are formed based on ERD in step 1
primary_keys_dict = {"hier_clnd_dlm" : "fscldt_id" , "hier_hldy_dlm" : "hldy_id" , "hier_invloc_dlm" : "loc" , "hier_invstatus_dlm" : "code_id" , "hier_possite_dlm" : "site_id" , "hier_pricestate_dlm" : "state_id, substate_id" , "hier_prod_dlm" : "sku_id" , "hier_rtlloc_dlm" : "str", "fact_averagecosts_dlm" : "fscldt_id, sku_id"}
for table_name, primary_keys in primary_keys_dict.items():
    print(f"Checking primary keys for {table_name}")
    is_primary = primary_key_check(f"{catalog_name}.{schema_name}.{table_name}", primary_keys.split(","))
    if not is_primary:
        print("**************")
        print(f"Primary keys {primary_keys} are not satisfied for {table_name}")
        print("**************")
    else:
        print(f"Primary keys {primary_keys} are satisfied for {table_name}")

Checking primary keys for hier_clnd_dlm
Primary keys fscldt_id are satisfied for hier_clnd_dlm
Checking primary keys for hier_hldy_dlm
Primary keys hldy_id are satisfied for hier_hldy_dlm
Checking primary keys for hier_invloc_dlm
Primary keys loc are satisfied for hier_invloc_dlm
Checking primary keys for hier_invstatus_dlm
Primary keys code_id are satisfied for hier_invstatus_dlm
Checking primary keys for hier_possite_dlm
Primary keys site_id are satisfied for hier_possite_dlm
Checking primary keys for hier_pricestate_dlm
Primary keys state_id, substate_id are satisfied for hier_pricestate_dlm
Checking primary keys for hier_prod_dlm
Primary keys sku_id are satisfied for hier_prod_dlm
Checking primary keys for hier_rtlloc_dlm
Primary keys str are satisfied for hier_rtlloc_dlm
Checking primary keys for fact_averagecosts_dlm
Primary keys fscldt_id, sku_id are satisfied for fact_averagecosts_dlm


In [0]:
# # Foreign key check for b_fact_averagecosts_dlm against b_hier_clnd_dlm and b_hier_prod_dlm
# print(foreign_key_check(f"{catalog_name}.{schema_name}.fact_averagecosts_dlm", f"{catalog_name}.{schema_name}.hier_clnd_dlm", "fscldt_id"))
# print(foreign_key_check(f"{catalog_name}.{schema_name}.fact_averagecosts_dlm", f"{catalog_name}.{schema_name}.hier_prod_dlm", "sku_id"))
# # Hence sku_id is the foreign key and fscldt_id is not but it is part of primary key

Foreign keys fscldt_id for bronze_sb.test3.fact_averagecosts_dlm and bronze_sb.test3.hier_clnd_dlm are not satisfied
False
Foreign keys sku_id for bronze_sb.test3.fact_averagecosts_dlm and bronze_sb.test3.hier_prod_dlm are satisfied
True


In [0]:
# %sql
# select * from silver_sb.information_schema.tables where table_name like 'b%'