## Load the delta table dimStore from stage.Store
####-----------------------------------------------------------------------
1. Connect to Azure Sql DB
2. Load stage.Store as temporary view
3. Create the delta table dimStore in the Curated database
4. Use SCD Type 1 to load dimStore using stage.Store
5. Review dimStore results

####-----------------------------------------------------------------------

### Import relevant libraries 

In [0]:
from pyspark.sql.functions import *

### Connect to Azure Sql DB
### Replace Sql Config values with the relevant config values. 
### 
### <sql username> 	  replace with your sql username
### <sql servername>	replace with your sql server name
### <sql dbname>	    replace with the sql database name
### <password>		    replace with the password for the sql database

In [0]:

jdbcusername = "vinoworldadm@vinoworld-dev-sq"
jdbchostname = "vinoworld-dev-sq.database.windows.net"
jdbcdatabase = "vinoworld"

jdbcport = 1433
user = "vinoworldadm@vinoworld-dev-sq"
jdbcpassword= "Prerna@321"

jdbcurl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbchostname, jdbcport, jdbcdatabase) 
connectionProperties = {
    "user": jdbcusername,
    "password": jdbcpassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


### Read the stage.Store and Create a temporary View in databricks

In [0]:
query = "(SELECT * FROM stage.Store) AS stageStore"

# Read data into DataFrame
dfSource = spark.read \
    .format("jdbc") \
    .option("url", jdbcurl) \
    .option("dbtable", query) \
    .option("user", connectionProperties["user"]) \
    .option("password", connectionProperties["password"]) \
    .option("driver", connectionProperties["driver"]) \
    .load()

# Create or replace temp view
dfSource.createOrReplaceTempView("dbr_stageStore")


### Browse the temporary view of the stage.Store table

In [0]:
%sql
SELECT * FROM dbr_stageStore


StoreName,StoreType,Description
Arancione,Online,Store sells wines with a score >= 95
Verde,Online,Store sells wines with a score between 90 and 94
Celeste,Online,Store sells wines with a score between 85 and 89


### Create the database Curated if it doesn't exist

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS curated


### Drop the dimStore Table if it exists 

In [0]:
%sql 
DROP TABLE IF EXISTS curated.dimStore

In [0]:
%sql
CREATE TABLE dimStore
USING DELTA
LOCATION '/mnt/adlsvinoworlddev/curated/dimStore';

### Create the delta table dimStore in database Curated  

In [0]:
%sql
CREATE OR REPLACE TABLE dimStore (
  StoreId BIGINT GENERATED ALWAYS AS IDENTITY, 
  StoreName STRING NOT NULL,
  StoreType STRING NOT NULL,
  Description STRING,
  InsertedDate DATE NOT NULL,
  UpdatedDate DATE NOT NULL
  ) 
USING DELTA
LOCATION  '/mnt/adlsvinoworlddev/curated/dimStore'


In [0]:
%sql
DROP TABLE IF EXISTS dimStore;

### Upsert to the dimStore delta Table from the stage.Source table in Azure Sql

In [0]:
%sql
MERGE INTO dimStore AS Target
        USING ( SELECT  StoreName, 
                        StoreType, 
                        Description 
                FROM    dbr_stageStore
              ) AS Source 
    ON  Target.StoreName = Source.StoreName --- specifies the condition
    WHEN MATCHED THEN
        UPDATE SET 
                Target.StoreType = Source.StoreType, 
                Target.Description = Source.Description,
                Target.UpdatedDate = getdate()
    WHEN NOT MATCHED THEN
        INSERT (StoreName, StoreType, Description, InsertedDate, UpdatedDate ) 
        VALUES (Source.StoreName, Source.StoreType, Source.Description, getdate(), getdate() ); --INSERT STATEMENT

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,3,0,0


### Describe the history of the dimStore Table

In [0]:
%sql
DESCRIBE HISTORY dimStore;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2024-08-06T07:52:30Z,6804765256642546,singhprerna455@gmail.com,MERGE,"Map(predicate -> [""(StoreName#814 = StoreName#2)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3060972189698367),0806-070934-1zngisr7,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2228, numTargetBytesRemoved -> 2149, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 3, executionTimeMs -> 19671, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 11084, numTargetRowsUpdated -> 3, numOutputRows -> 3, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 6411)",,Databricks-Runtime/13.3.x-scala2.12
1,2024-08-06T04:23:53Z,6804765256642546,singhprerna455@gmail.com,MERGE,"Map(predicate -> [""(StoreName#716 = StoreName#4)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3060972189698367),0806-034541-ovux8wox,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2149, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 15215, numTargetRowsInserted -> 3, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 6278, numTargetRowsUpdated -> 0, numOutputRows -> 3, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 5979)",,Databricks-Runtime/14.3.x-scala2.12
0,2024-08-06T04:22:57Z,6804765256642546,singhprerna455@gmail.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], description -> null, isManaged -> false, properties -> {}, statsOnLoad -> false)",,List(3060972189698367),0806-034541-ovux8wox,,WriteSerializable,True,Map(),,Databricks-Runtime/14.3.x-scala2.12


### Time Travel
### Review the different versions of the dimStore Table

In [0]:
%sql
SELECT * FROM dimStore VERSION AS OF 2

StoreId,StoreName,StoreType,Description,InsertedDate,UpdatedDate
1,Arancione,Online,Store sells wines with a score >= 95,2024-08-06,2024-08-06
2,Verde,Online,Store sells wines with a score between 90 and 94,2024-08-06,2024-08-06
3,Celeste,Online,Store sells wines with a score between 85 and 89,2024-08-06,2024-08-06


### Optimize a Table
### Several changes implies several files in ADLS. Use OPTIMIZE to collapse small files into larger ones to improve query speed

In [0]:
%sql
OPTIMIZE dimStore

path,metrics
dbfs:/mnt/adlsvinoworlddev/curated/dimStore,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, 0, 1, 1, true, 0, 0, 1722930782241, 1722930787179, 8, 0, null, List(0, 0), 6, 6, 0, 0, null)"


### Z-Order by Columns
### Read performance within a file can be improved using Z-Order. Specify the columns to order in the Z-Order clause

In [0]:
%sql
OPTIMIZE dimStore
ZORDER BY (StoreName)

path,metrics
dbfs:/mnt/adlsvinoworlddev/curated/dimStore,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 2228), 0, List(0, 0), 0, null), 0, 1, 1, false, 0, 0, 1722930795210, 1722930798856, 8, 0, null, List(0, 0), 6, 6, 0, 0, null)"


### Cleanup Snapshots with VACUUM
### Important to cleanup old snapshots. Can be done by running the VACUUM command

In [0]:
%sql
VACUUM dimStore

path
dbfs:/mnt/adlsvinoworlddev/curated/dimStore
