# Create Tables

Create tables for stage, core and "lake" layer

In [None]:
CREATE TABLE #lake_countries (
    country_code VARCHAR(2),
    country_name VARCHAR(25),
    hist_updated DATETIME2
)
GO


CREATE TABLE #lake_busi_part (
    bp_number INT,
    bp_name VARCHAR(25),
    bp_country VARCHAR(2),
    hist_updated DATETIME2
)

In [None]:
CREATE TABLE #stage_countries (
    country_code VARCHAR(2),
    country_name VARCHAR(25)
);
CREATE TABLE #stage_busi_part(
    bp_number INT,
    bp_name VARCHAR(25),
    bp_country VARCHAR(2)
);

In [None]:
CREATE TABLE #core_busi_part(
    busi_part_id INT IDENTITY(1000,1),
    bp_number INT,
    bp_name VARCHAR(25),
    country_id INT,
    date_updated DATETIME2
)
CREATE TABLE #core_countries(
    country_id INT IDENTITY(1000,1),
    country_code VARCHAR(2),
    country_name VARCHAR(25),
    date_updated DATETIME2
)

# Create Stored Procedures

In [None]:
CREATE PROCEDURE uspMergeCountries @date DATETIME2 AS 
MERGE #core_countries tgt 
USING #stage_countries src on tgt.country_code = src.country_code 
WHEN MATCHED THEN 
    UPDATE 
    SET tgt.country_name = src.country_name, tgt.date_updated = @date
WHEN NOT MATCHED BY TARGET THEN 
    INSERT  (country_code, country_name, date_updated) 
    VALUES (src.country_code, src.country_name, @date);


In [None]:
CREATE PROCEDURE uspMergeBusiPart @date DATETIME2 AS 
MERGE #core_busi_part tgt 
USING (
    SELECT sbp.bp_number, sbp.bp_name, cc.country_id FROM #stage_busi_part sbp 
    LEFT JOIN #core_countries cc ON cc.country_code = sbp.bp_country /* join with core countires to get surrogate id */
    ) src 
ON tgt.bp_number = src.bp_number
WHEN MATCHED THEN UPDATE SET tgt.bp_name = src.bp_name, tgt.country_id = src.country_id, tgt.date_updated = @date
WHEN NOT MATCHED BY TARGET THEN INSERT (bp_number, bp_name, country_id, date_updated) 
VALUES (src.bp_number,src.bp_name, src.country_id, @date);


## First ingestion

In this cell the ingestion of dat on the 16.08.2023 is simulated

In [None]:
INSERT INTO #lake_countries 
    SELECT 'DE' as country_code, 'Deutschland' AS country_name, CAST('2023-08-16' AS DATETIME2) AS hist_updated
    UNION
    SELECT 'IT', 'Italien', CAST('2023-08-16' AS DATETIME2) 

GO
INSERT INTO #lake_busi_part 
    SELECT 4711 as bp_number, 'drfalk' as bp_name, 'FR' as bp_country, CAST('2023-08-16' AS DATETIME2) AS hist_updated
    UNION
    SELECT 4712,'falkdr', 'DE', CAST('2023-08-16' AS DATETIME2)    


Ingest delta from Lake to stage

In [None]:
TRUNCATE TABLE #stage_countries
GO

INSERT INTO #stage_countries
SELECT lc.country_code, lc.country_name 
FROM #lake_countries lc
WHERE lc.hist_updated >= CAST('2023-08-16' AS DATETIME2) /* records that have been inserted/updated "today". no foreign keys in core */
GO
SELECT * FROM #stage_countries

In [None]:
TRUNCATE TABLE #stage_busi_part
GO

INSERT INTO #stage_busi_part
SELECT lb.bp_number, lb.bp_name, bp_country 
FROM #lake_busi_part lb 
LEFT JOIN #core_countries cc ON lb.bp_country = cc.country_code
WHERE lb.hist_updated >= CAST('2023-08-16' AS DATETIME2)  /* records that have been inserted/updated "today" */
OR (lb.bp_number IN (SELECT bp_number FROM #core_busi_part cb WHERE cb.country_id IS NULL) /* or records that are older but have no foreign key yet */
    AND lb.hist_updated < CAST('2023-08-16' AS DATETIME2))

SELECT * FROM #stage_busi_part


# Upsert

Merge Core Entities. 

In this step, order of merge executes is relevant

In [None]:

EXEC uspMergeCountries @date ='2023-08-16';

SELECT * FROM #core_countries


In [None]:
EXEC uspMergeBusiPart @date = '2023-08-16';

SELECT * FROM #core_busi_part


# Next day

Insert into lake

In [None]:
INSERT INTO #lake_countries SELECT 'FR', 'Frankreich', CAST('2023-08-17' AS DATETIME2) 
GO
INSERT INTO #lake_busi_part SELECT 4713,'prof_falk', 'DE', CAST('2023-08-17' AS DATETIME2)    
GO

# Ingest delta into stage


In [None]:
TRUNCATE TABLE #stage_countries
GO

INSERT INTO #stage_countries
SELECT lc.country_code, lc.country_name 
FROM #lake_countries lc
WHERE lc.hist_updated >= CAST('2023-08-17' AS DATETIME2) /* records that have been inserted/updated "today". no foreign keys in core */
GO
SELECT * FROM #stage_countries

In [None]:
TRUNCATE TABLE #stage_busi_part
GO

INSERT INTO #stage_busi_part
SELECT lb.bp_number, lb.bp_name, bp_country 
FROM #lake_busi_part lb 
LEFT JOIN #core_countries cc ON lb.bp_country = cc.country_code
WHERE lb.hist_updated >= CAST('2023-08-16' AS DATETIME2)  /* records that have been inserted/updated "today" */
OR (lb.bp_number IN (SELECT bp_number FROM #core_busi_part cb WHERE cb.country_id IS NULL) /* or records that are older but have no foreign key yet */
    AND lb.hist_updated < CAST('2023-08-16' AS DATETIME2))

SELECT * FROM #stage_busi_part


# Upsert

In [None]:
EXEC uspMergeCountries @date = '2023-08-17';

SELECT * FROM #core_countries

In [None]:
EXEC uspMergeBusiPart @date = '2023-08-17';

SELECT * FROM #core_busi_part


# Cleanup


In [None]:
DROP TABLE #stage_countries
DROP TABLE #stage_busi_part
DROP TABLE #core_countries
DROP TABLE #core_busi_part
DROP TABLE #lake_countries
DROP TABLE #lake_busi_part
DROP PROCEDURE uspMergeCountries
DROP PROCEDURE uspMergeBusiPart