In [0]:
'''
notebook : retail_data_ingestion
author   : sharwari
Description : 
                  1. using mount point or databricks connector connecting to the gen2lake 
                  2. create dataframe on raw data
                  3. add etldate to dataframe and overwrite bronze table 
                  4. apply the transformation from bronze to silver. 
                         review --> append 
                         products --> merge 
                         categories --> overwrite
                  5. move the files from raw to archive folder. 

'''

In [0]:
# COMMAND ----------

stgactkey = dbutils.secrets.get(scope = 'ak_scope_11',key = 'storagegen2key')
mnt_point = "/mnt/data"

# COMMAND ----------


if not any(mount.mountPoint == mnt_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source="wasbs://akcontainer1@akstorageadls1.blob.core.windows.net",
        mount_point=mnt_point,
        extra_configs={"fs.azure.account.key.akstorageadls1.blob.core.windows.net": stgactkey}
    )


True

In [0]:
tbl_nm = dbutils.widgets.get('tablename')
#tbl_nm = 'reviews'

In [0]:

raw_path = '/mnt/data/raw/'
bv_raw_path = raw_path+'BV/'
tbl_raw_path = bv_raw_path+tbl_nm

bronze_tbl = tbl_nm+'_stg'
bronze_path = '/mnt/data/bronze'
tbl_bronze_path = bronze_path+'/'+bronze_tbl
bronze_tbl_path = 'abfss://akcontainer1@akstorageadls1.dfs.core.windows.net/bronze/'+bronze_tbl

silver_tbl = tbl_nm
silver_path = '/mnt/data/silver'
tbl_silver_path = silver_path+'/'+silver_tbl
silver_tbl_path = 'abfss://akcontainer1@akstorageadls1.dfs.core.windows.net/silver/'+silver_tbl



archive_path = '/mnt/data/archive/'
bv_archive_path = archive_path+'BV/'
tbl_archive_path = bv_archive_path+tbl_nm



catalog_nm = 'bvcatalog'
b_schema = 'bronzeschema'
s_schema = 'silverschema'


In [0]:
df = spark.read.format("json").load(tbl_raw_path+'/*.json')

In [0]:
df.show()

+-------+-------+------+---------+-------------------+----------+
|cust_id|prod_id|rating|review_id|        review_text|reviewdate|
+-------+-------+------+---------+-------------------+----------+
|     C1|PROD001|     5|  REVW001|product is not good|2024-09-10|
|     C2|PROD002|     4|  REVW002|       Nice product|2024-09-10|
|     C3|PROD003|     3|  REVW003|  worthable product|2024-09-10|
|     C4|PROD006|     2|  REVW004|   use less product|2024-09-10|
|     C1|PROD006|     1|  REVW005|product is not good|2024-09-10|
|     C2|PROD001|     5|  REVW006|       Nice product|2024-09-10|
+-------+-------+------+---------+-------------------+----------+



In [0]:
from datetime import datetime 

dt = datetime.now().strftime("%d%m%Y")

print(dt)

23092024


In [0]:
from pyspark.sql.functions import lit

In [0]:
df = df.withColumn("etldata",lit(dt))

df.show()

+-------+-------+------+---------+-------------------+----------+--------+
|cust_id|prod_id|rating|review_id|        review_text|reviewdate| etldata|
+-------+-------+------+---------+-------------------+----------+--------+
|     C1|PROD001|     5|  REVW001|product is not good|2024-09-10|23092024|
|     C2|PROD002|     4|  REVW002|       Nice product|2024-09-10|23092024|
|     C3|PROD003|     3|  REVW003|  worthable product|2024-09-10|23092024|
|     C4|PROD006|     2|  REVW004|   use less product|2024-09-10|23092024|
|     C1|PROD006|     1|  REVW005|product is not good|2024-09-10|23092024|
|     C2|PROD001|     5|  REVW006|       Nice product|2024-09-10|23092024|
+-------+-------+------+---------+-------------------+----------+--------+



<h3> raw to Bronze</h3>

In [0]:
df.write.format("Delta").mode("overwrite").option("mergeschema",True).save(tbl_bronze_path)

In [0]:
#%sql 
#sqlstatement1

#spark.sql(sqlstatement1)

In [0]:
qry = f'''
    create table if not exists {catalog_nm}.{b_schema}.{bronze_tbl}
    using delta
    location '{bronze_tbl_path}'
      '''
spark.sql(qry)

DataFrame[]

In [0]:
%sql 

select * from bvcatalog.bronzeschema.reviews_stg

cust_id,prod_id,rating,review_id,review_text,reviewdate,etldata
C1,PROD001,5,REVW001,product is not good,2024-09-10,23092024
C2,PROD002,4,REVW002,Nice product,2024-09-10,23092024
C3,PROD003,3,REVW003,worthable product,2024-09-10,23092024
C4,PROD006,2,REVW004,use less product,2024-09-10,23092024
C1,PROD006,1,REVW005,product is not good,2024-09-10,23092024
C2,PROD001,5,REVW006,Nice product,2024-09-10,23092024


<h3>bronze to silver</h3>

In [0]:
qry = f'''
    create table if not exists {catalog_nm}.{s_schema}.{silver_tbl}
    using delta
    location '{silver_tbl_path}'
      '''
spark.sql(qry)

DataFrame[]

In [0]:
if tbl_nm=='reviews':
    df.write.format("delta").mode("append").option("mergeschema",True).save(tbl_silver_path)
elif tbl_nm=='categories':
    df.write.format("delta").mode("overwrite").option("mergeschema",True).save(tbl_silver_path)
elif tbl_nm == 'products':
    df1 = spark.createDataFrame([],schema=df.schema)
    df1.write.format("Delta").mode("overwrite").option("mergeschema",True).save(tbl_silver_path)
    qry = f'''
            merge into {catalog_nm}.{s_schema}.{silver_tbl} p 
            using {catalog_nm}.{b_schema}.{bronze_tbl} ps
            on ps.productid = p.productid
            when matched then update set * 
            when not matched then insert *
            '''
    spark.sql(qry)


In [0]:
%sql 
select * from bvcatalog.silverschema.products;

cust_id,prod_id,rating,review_id,review_text,reviewdate,etldata
C1,PROD001,5,REVW001,product is not good,2024-09-10,23092024
C2,PROD002,4,REVW002,Nice product,2024-09-10,23092024
C3,PROD003,3,REVW003,worthable product,2024-09-10,23092024
C4,PROD006,2,REVW004,use less product,2024-09-10,23092024
C1,PROD006,1,REVW005,product is not good,2024-09-10,23092024
C2,PROD001,5,REVW006,Nice product,2024-09-10,23092024


In [0]:
files_lst = dbutils.fs.ls(tbl_raw_path)
dbutils.fs.mkdirs(tbl_archive_path)

for file in files_lst: 
    dbutils.fs.mv(file.path,tbl_archive_path)

In [0]:
dbutils.fs.ls('/mnt/data/archive/BV/reviews/')

[FileInfo(path='dbfs:/mnt/data/archive/BV/reviews/reviews_10092024.json', name='reviews_10092024.json', size=782, modificationTime=1727060833000)]

In [0]:
dbutils.fs.unmount('/mnt/data')

/mnt/data has been unmounted.


True