# **INTRODUCTIONS**

---

ECDC (European Centre for Disease Prevention and Control) is the agency that records data of COVID-19 in whole EU.
This specific workflow gathers all the recorded Covid-19 virus variants throughout the EU.

2022-04: Currently a fix is needed because the sources format has been changed.

1. **[Stored Procedures](#stored=procedures)**


# **DEPENDENCIES**
---

```json
{
    "depends-on": [
        "src/dataflows/rivm_mutations.ipynb",
        "src/statics/variants_mapping.ipynb",
        "src/utils/schemas.ipynb",
        "src/utils/conversions.ipynb"
    ]
}
```

# **TABLES**
---

### STAGINGS

In [None]:
-- Copyright (c) 2020 De Staat der Nederlanden, Ministerie van Volksgezondheid, Welzijn en Sport.
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordination for more information.

IF NOT EXISTS(SELECT * FROM sys.sequences WHERE object_id = OBJECT_ID(N'[dbo].[SEQ_VWSSTAGE_ECDC_VARIANTS]') AND type = 'SO')
CREATE SEQUENCE SEQ_VWSSTAGE_ECDC_VARIANTS
  START WITH 1
  INCREMENT BY 1;
GO

IF NOT EXISTS(SELECT * FROM SYS.TABLES WHERE [OBJECT_ID] = OBJECT_ID(N'[VWSSTAGE].[ECDC_VARIANTS]'))
BEGIN
  CREATE TABLE VWSSTAGE.ECDC_VARIANTS(
    [ID] INT PRIMARY KEY NOT NULL DEFAULT (NEXT VALUE FOR [dbo].[SEQ_VWSSTAGE_ECDC_VARIANTS]),
    [COUNTRY] VARCHAR(100) NULL,
    [COUNTRY_CODE] VARCHAR(100) NULL,
    [YEAR_WEEK] VARCHAR(100) NULL,
    [SOURCE] VARCHAR(100) NULL,
    [NEW_CASES] VARCHAR(100) NULL,
    [NUMBER_SEQUENCED] VARCHAR(100) NULL,
    [PERCENT_CASES_SEQUENCED] VARCHAR(100) NULL,
    [VALID_DENOMINATOR] VARCHAR(100) NULL,
    [VARIANT] VARCHAR(100) NULL,
    [NUMBER_DETECTIONS_VARIANT] VARCHAR(100) NULL,
    [PERCENT_VARIANT] VARCHAR(100) NULL,
    [DATE_LAST_INSERTED]  DATETIME DEFAULT GETDATE()
  );
END

IF NOT EXISTS(SELECT * FROM sys.indexes WHERE NAME='IX_STAGE_ECDC_VARIANTS')
    CREATE NONCLUSTERED INDEX IX_STAGE_ECDC_VARIANTS
    ON VWSSTAGE.ECDC_VARIANTS(DATE_LAST_INSERTED);
   
GO

### INTERMEDIATES

In [None]:
-- Copyright (c) 2020 De Staat der Nederlanden, Ministerie van Volksgezondheid, Welzijn en Sport.
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordination for more information.
IF NOT EXISTS(SELECT * FROM sys.sequences WHERE object_id = OBJECT_ID(N'[dbo].[SEQ_VWSINTER_ECDC_VARIANTS]') AND type = 'SO')
CREATE SEQUENCE SEQ_VWSINTER_ECDC_VARIANTS
  START WITH 1
  INCREMENT BY 1;
GO

IF NOT EXISTS(SELECT * FROM SYS.TABLES WHERE [OBJECT_ID] = OBJECT_ID(N'[VWSINTER].[ECDC_VARIANTS]'))
BEGIN
  CREATE TABLE VWSINTER.ECDC_VARIANTS(
    [ID] INT PRIMARY KEY NOT NULL DEFAULT (NEXT VALUE FOR [dbo].[SEQ_VWSINTER_ECDC_VARIANTS]),
    [COUNTRY] VARCHAR(100) NULL,
    [COUNTRY_CODE] VARCHAR(100) NULL,
    [YEAR_WEEK] VARCHAR(100) NULL,
    [SOURCE] VARCHAR(100) NULL,
    [NEW_CASES] int NULL,
    [NUMBER_SEQUENCED] int NULL,
    [PERCENT_CASES_SEQUENCED] decimal(8,3) NULL,
    [VALID_DENOMINATOR] bit NULL,
    [VARIANT] VARCHAR(100) NULL,
    [NUMBER_DETECTIONS_VARIANT] int NULL,
    [PERCENT_VARIANT] decimal(8,3) NULL,
    [DATE_LAST_INSERTED]  DATETIME DEFAULT GETDATE()
  );
END

IF NOT EXISTS(SELECT * FROM sys.indexes WHERE NAME='IX_INTER_ECDC_VARIANTS')
    CREATE NONCLUSTERED INDEX IX_INTER_ECDC_VARIANTS
    ON VWSINTER.ECDC_VARIANTS(DATE_LAST_INSERTED);
   
GO

### DESTINATIONS

In [None]:
-- Copyright (c) 2021 De Staat der Nederlanden, Ministerie van Volksgezondheid, Welzijn en Sport.
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordination for more information.
IF NOT EXISTS(SELECT * FROM sys.sequences WHERE object_id = OBJECT_ID(N'[dbo].[SEQ_VWSDEST_ECDC_VARIANTS]') AND type = 'SO')
CREATE SEQUENCE SEQ_VWSDEST_ECDC_VARIANTS
  START WITH 1
  INCREMENT BY 1;
GO

IF NOT EXISTS(SELECT * FROM SYS.TABLES WHERE [OBJECT_ID] = OBJECT_ID(N'[VWSDEST].[ECDC_VARIANTS]'))
BEGIN
	CREATE TABLE VWSDEST.ECDC_VARIANTS(
		[ID] INT PRIMARY KEY NOT NULL DEFAULT (NEXT VALUE FOR [dbo].[SEQ_VWSDEST_ECDC_VARIANTS]),
		[COUNTRY] [varchar](100) NULL,
		[COUNTRY_CODE] [varchar](3) NULL,
		[YEAR_WEEK] [varchar](100) NULL,
		[WEEK_START] [date] NULL,
		[WEEK_END] [date] NULL,
		[SOURCE] [varchar](100) NULL,
		[NEW_CASES] [int] NULL,
		[SAMPLE_SIZE] [int] NULL,
		[ECDC_CATEGORY] [varchar](100) NULL,
		[PERCENT_CASES_SAMPLED] [decimal](8, 1) NULL,	
		[VARIANT_CODE] [varchar](2048) NULL,
		[VARIANT] [varchar](100) NULL,
		[VARIANT_OF_CONCERN] [bit] NULL,
		[OCCURRENCE] [int] NULL,
		[PERCENT_VARIANT] [decimal](8, 1) NULL,
		[DATE_LAST_INSERTED]  DATETIME DEFAULT GETDATE()
	);
END

IF NOT EXISTS(SELECT * FROM sys.indexes WHERE NAME='IX_DEST_ECDC_VARIANTS')
    CREATE NONCLUSTERED INDEX IX_DEST_ECDC_VARIANTS
    ON VWSDEST.ECDC_VARIANTS(DATE_LAST_INSERTED);    
GO

# **VIEWS**
---

In [None]:
-- Copyright (c) 2020 De Staat der Nederlanden, Ministerie van   Volksgezondheid, Welzijn en Sport. 
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordinationfor more information.

CREATE OR ALTER  VIEW [VWSDEST].[V_ECDC_VARIANTS] AS
  WITH NL_CTE AS (
      /*
        Why GROUP BY ? : 
            For most cases, no group by is needed. The group by is added that if multiple VARIANT codes (B1XX, B2xx) map to the same WHO variant ('alpha'). 
            based on the variant mapping table the variant codes will be aggregrated with the STRING AGG function (B1XX, B2xx) and the sum of the occurance is used to calculate the percentage.
        Note for adding/modifying the select : 
            If you add /change the select, please copy the code to the group by as well (except for the aggregrated columns), such to keep the consistency of the group by aggregration.
      */
      SELECT 
          'Netherlands'                                                   AS [COUNTRY]
          ,'NLD'                                                          AS [COUNTRY_CODE]
          ,CAST([DATE_OF_STATISTICS_WEEK_START] as date)                  AS [WEEK_START]
          ,DATEADD(DAY,6,CAST([DATE_OF_STATISTICS_WEEK_START] as date))   AS [WEEK_END]
          ,STRING_AGG([RIVM_MUTATIONS].[VARIANT_CODE], ',')               AS [VARIANT_CODE]
          ,CASE
            WHEN LEN(VARIANTS_MAPPING.[WHO_VARIANT])>0 THEN VARIANT_NAME
            ELSE 'Other'
          END                                                             AS [VARIANT]
          ,[RIVM_MUTATIONS].[DATE_LAST_INSERTED]
          ,CASE -- if variant is named by WHO and classification equals VOC.
            WHEN LEN([VARIANTS_MAPPING].[WHO_VARIANT])>0 AND [VARIANTS_MAPPING].CLASSIFICATION = 'VOC' THEN CAST(1 as bit) 
            ELSE CAST(0 as bit)
          END                                                             AS [VARIANT_OF_CONCERN]
          ,MAX([SAMPLE_SIZE])                                             AS [SAMPLE_SIZE]
          ,SUM([VARIANT_CASES])                                           AS [OCCURRENCE]
          ,CAST(100.0 * SUM([VARIANT_CASES]) / MAX([SAMPLE_SIZE]) as decimal(9,1))
                                                                          AS [PERCENT_VARIANT]   
      FROM [VWSDEST].[RIVM_MUTATIONS]
      LEFT JOIN -- Add mapping to use the same variant scope as ECDC data
        VWSSTATIC.VARIANTS_MAPPING ON
        VARIANTS_MAPPING.DATE_LAST_INSERTED = (SELECT MAX(DATE_LAST_INSERTED) FROM VWSSTATIC.VARIANTS_MAPPING) AND
        VARIANTS_MAPPING.VARIANT_CODE = RIVM_MUTATIONS.VARIANT_CODE
      WHERE 
        [RIVM_MUTATIONS].[DATE_LAST_INSERTED] = (SELECT MAX([DATE_LAST_INSERTED]) FROM [VWSDEST].[RIVM_MUTATIONS]) AND
        [RIVM_MUTATIONS].[SAMPLE_SIZE] > 0
      GROUP BY
          CAST([DATE_OF_STATISTICS_WEEK_START] as date)                  
          ,DATEADD(DAY,6,CAST([DATE_OF_STATISTICS_WEEK_START] as date))
          ,CASE
            WHEN LEN(VARIANTS_MAPPING.[WHO_VARIANT])>0 THEN VARIANT_NAME
            ELSE 'Other'
          END                       
          ,[RIVM_MUTATIONS].[DATE_LAST_INSERTED]
          ,CASE -- if variant is named by WHO and classification equals VOC.
            WHEN LEN([VARIANTS_MAPPING].[WHO_VARIANT])>0 AND [VARIANTS_MAPPING].CLASSIFICATION = 'VOC' THEN CAST(1 as bit) 
            ELSE CAST(0 as bit)
          END                      
  )
  ,
  INTERNATIONAL_CTE AS (
    SELECT 
        [COUNTRY]
        ,[COUNTRY_CODE]
        ,[WEEK_START]
        ,[WEEK_END]
        ,[VARIANT_CODE]
        ,[VARIANT]
        ,[DATE_LAST_INSERTED]
        ,[VARIANT_OF_CONCERN]
        ,[SAMPLE_SIZE]
        ,[OCCURRENCE]
        ,[PERCENT_VARIANT]
    FROM 
      [VWSDEST].[ECDC_VARIANTS]
    WHERE 
      [DATE_LAST_INSERTED] = (SELECT MAX([DATE_LAST_INSERTED]) FROM [VWSDEST].[ECDC_VARIANTS])
      AND COUNTRY_CODE != 'NLD'
  ),
  --Select all rows
  COMBINED_CTE AS (
    SELECT 
      [COUNTRY]
      ,[COUNTRY_CODE]
      ,[WEEK_START]
      ,[WEEK_END]
      ,[VARIANT_CODE]
      ,[VARIANT]
      ,[DATE_LAST_INSERTED]
      ,[VARIANT_OF_CONCERN]
      ,[SAMPLE_SIZE]
      ,[OCCURRENCE]
      ,[PERCENT_VARIANT]
       FROM NL_CTE
    UNION ALL 
    SELECT 
       [COUNTRY]
      ,[COUNTRY_CODE]
      ,[WEEK_START]
      ,[WEEK_END]
      ,[VARIANT_CODE]
      ,[VARIANT]
      ,[DATE_LAST_INSERTED]
      ,[VARIANT_OF_CONCERN]
      ,[SAMPLE_SIZE]
      ,[OCCURRENCE] 
      ,[PERCENT_VARIANT]
      FROM INTERNATIONAL_CTE
  ),
  UNIQUE_WHO_VARIANTS AS (
    SELECT DISTINCT
      WHO_VARIANT AS [VARIANT],
      CASE -- if variant is named by WHO and classification equals VOC.
            WHEN LEN([VARIANTS_MAPPING].[WHO_VARIANT])>0 AND [VARIANTS_MAPPING].CLASSIFICATION = 'VOC' THEN CAST(1 as bit) 
            ELSE CAST(0 as bit)
          END                                                             AS [VARIANT_OF_CONCERN]
    FROM
      VWSSTATIC.VARIANTS_MAPPING
    WHERE
        VARIANTS_MAPPING.DATE_LAST_INSERTED = (SELECT MAX(DATE_LAST_INSERTED) FROM VWSSTATIC.VARIANTS_MAPPING) AND
        LEN(WHO_VARIANT) > 0
    UNION
    SELECT 'Other', CAST(0 as bit)
  ),
  ALL_VARIANTS_WEEKS_PER_COUNTRY AS (
    SELECT 
      [COUNTRY_CODE]
      ,[WEEK_START]
      ,[WEEK_END]
      ,UNIQUE_WHO_VARIANTS.[VARIANT]
      ,UNIQUE_WHO_VARIANTS.[VARIANT_OF_CONCERN]
      ,MAX([SAMPLE_SIZE])                         AS [SAMPLE_SIZE]
    FROM
      COMBINED_CTE
    CROSS JOIN
      UNIQUE_WHO_VARIANTS
    GROUP BY
       [COUNTRY_CODE]
      ,[WEEK_START]
      ,[WEEK_END]
      ,UNIQUE_WHO_VARIANTS.[VARIANT]
      ,UNIQUE_WHO_VARIANTS.[VARIANT_OF_CONCERN]
  )
    


SELECT 
      ALL_VARIANTS_WEEKS_PER_COUNTRY.[COUNTRY_CODE]
      ,LOWER(ALL_VARIANTS_WEEKS_PER_COUNTRY.[VARIANT])                                                  AS [NAME]
      ,dbo.CONVERT_DATETIME_TO_UNIX(ALL_VARIANTS_WEEKS_PER_COUNTRY.[WEEK_START])                        AS [date_start_unix]
      ,dbo.CONVERT_DATETIME_TO_UNIX(ALL_VARIANTS_WEEKS_PER_COUNTRY.[WEEK_END])                          AS [date_end_unix]
      
      ,dbo.CONVERT_DATETIME_TO_UNIX((SELECT MAX([DATE_LAST_INSERTED]) FROM [VWSDEST].[ECDC_VARIANTS]))  AS [date_of_insertion_unix]
      ,CASE
        WHEN ALL_VARIANTS_WEEKS_PER_COUNTRY.[VARIANT_OF_CONCERN] = 1 
          THEN 'true' 
        ELSE 'false' 
        END                                                                                             AS [is_variant_of_concern]
      ,ALL_VARIANTS_WEEKS_PER_COUNTRY.[SAMPLE_SIZE]                                                     AS [sample_size]
      
      -- reliability indicator, true if higher or equal than 500 samples, else false.
      ,CASE
            WHEN ALL_VARIANTS_WEEKS_PER_COUNTRY.SAMPLE_SIZE >=  500 -- ECDC L1
                THEN 'true' 
            WHEN ALL_VARIANTS_WEEKS_PER_COUNTRY.SAMPLE_SIZE >=  60 -- ECDC L2
                THEN 'false'
            WHEN ALL_VARIANTS_WEEKS_PER_COUNTRY.SAMPLE_SIZE >=  1 -- ECDC L3
                THEN 'false' 
            ELSE -- ECDC L4
                'false'
        END                                                                                             AS [is_reliable]

      ,[OCCURRENCE]                                                                                     AS [occurrence]
      ,[PERCENT_VARIANT]                                                                                AS [percentage]

FROM 
  ALL_VARIANTS_WEEKS_PER_COUNTRY
LEFT JOIN
  COMBINED_CTE ON
  COMBINED_CTE.[COUNTRY_CODE] = ALL_VARIANTS_WEEKS_PER_COUNTRY.[COUNTRY_CODE] AND
  COMBINED_CTE.[WEEK_START] = ALL_VARIANTS_WEEKS_PER_COUNTRY.[WEEK_START] AND
  COMBINED_CTE.[VARIANT] = ALL_VARIANTS_WEEKS_PER_COUNTRY.[VARIANT]
--Make sure the last dates of NL and international are aligned
WHERE 
  ALL_VARIANTS_WEEKS_PER_COUNTRY.WEEK_END <= (SELECT MAX([WEEK_END]) FROM NL_CTE) AND
  ALL_VARIANTS_WEEKS_PER_COUNTRY.WEEK_END <= (SELECT MAX([WEEK_END]) FROM INTERNATIONAL_CTE)
GO


# **Stored Procedures**
---

### STAGING TO INTER

In [None]:
-- Copyright (c) 2020 De Staat der Nederlanden, Ministerie van   Volksgezondheid, Welzijn en Sport. 
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordinationfor more information.

CREATE OR ALTER PROCEDURE [dbo].[SP_ECDC_VARIANTS_INTER]
AS
BEGIN
    INSERT INTO VWSINTER.ECDC_VARIANTS
    (
        [COUNTRY],
        [COUNTRY_CODE],
        [YEAR_WEEK],
        [SOURCE],
        [NEW_CASES],
        [NUMBER_SEQUENCED],
        [PERCENT_CASES_SEQUENCED],
        [VALID_DENOMINATOR],
        [VARIANT],
        [NUMBER_DETECTIONS_VARIANT],
        [PERCENT_VARIANT]
    )
     SELECT
        [COUNTRY],
        [COUNTRY_CODE],
        [YEAR_WEEK],
        [SOURCE],
        CASE 
            WHEN [NEW_CASES] = 'NA'
            THEN CAST('' AS INT)
            ELSE CAST(NULLIF([NEW_CASES],'') AS INT)
        END AS NEW_CASES,
        CAST([NUMBER_SEQUENCED] AS INT) AS NUMBER_SEQUENCED,
        CASE 
            WHEN [PERCENT_CASES_SEQUENCED] = 'NA'
            THEN CAST(0 AS decimal(8,3))
           ELSE CAST(NULLIF([PERCENT_CASES_SEQUENCED],'') AS decimal(8,3))
        END AS PERCENT_CASES_SEQUENCED,
        CASE 
            WHEN [VALID_DENOMINATOR] = 'Yes' 
            THEN CAST(1 as bit) 
            ELSE CAST(0 as bit) 
        END AS VALID_DENOMINATOR,
        [VARIANT],
        CAST(NULLIF([NUMBER_DETECTIONS_VARIANT],'') AS INT) AS NUMBER_DETECTIONS_VARIANT,
        CASE 
           WHEN [PERCENT_VARIANT] = 'NA'
            THEN CAST(0 AS decimal(8,3))
            ELSE CAST(NULLIF([PERCENT_VARIANT],'') AS decimal(8,3))
        END AS PERCENT_VARIANT
    FROM 
       VWSSTAGE.ECDC_VARIANTS
    WHERE DATE_LAST_INSERTED = (SELECT MAX(DATE_LAST_INSERTED) from VWSSTAGE.ECDC_VARIANTS)
END;
GO


### INTER TO DEST

In [None]:
-- Copyright (c) 2020 De Staat der Nederlanden, Ministerie van   Volksgezondheid, Welzijn en Sport. 
-- Licensed under the EUROPEAN UNION PUBLIC LICENCE v. 1.2 - see https://github.com/minvws/nl-contact-tracing-app-coordinationfor more information.

CREATE OR ALTER PROCEDURE DBO.SP_ECDC_VARIANTS_DEST
AS
BEGIN
    -- Apply filters and map variants
    WITH ECDC_FILTERED
    AS (
        
        SELECT
            ECDC_VARIANTS.[COUNTRY],
            CC.[COUNTRY_CODE_ISO3]                          AS [COUNTRY_CODE],
            ECDC_VARIANTS.[YEAR_WEEK],
            ECDC_VARIANTS.[SOURCE],
            ECDC_VARIANTS.[NEW_CASES],
            ECDC_VARIANTS.[NUMBER_SEQUENCED]                AS [SAMPLE_SIZE],
            ECDC_VARIANTS.[PERCENT_CASES_SEQUENCED]         AS [PERCENT_CASES_SAMPLED],
            ECDC_VARIANTS.[VALID_DENOMINATOR],
            ECDC_VARIANTS.[VARIANT]                         AS [VARIANT_CODE],
            VariantMapped.WHO_VARIANT                       AS [VARIANT],


            CASE
                WHEN LEN([WHO_VARIANT])>0 AND CLASSIFICATION = 'VOC' THEN CAST(1 as bit)
                ELSE CAST(0 as bit)
            END                                             AS [VARIANT_OF_CONCERN],


            ECDC_VARIANTS.[NUMBER_DETECTIONS_VARIANT]       AS [OCCURRENCE]
        
        FROM 
        VWSINTER.ECDC_VARIANTS ECDC_VARIANTS
        INNER JOIN -- Inner join country code table (includes all countries) to add ISO 3
            VWSSTATIC.COUNTRY_CODES CC ON
            CC.DATE_LAST_INSERTED = (SELECT MAX(DATE_LAST_INSERTED) FROM VWSSTATIC.COUNTRY_CODES) AND
            CC.COUNTRY_CODE_ISO2 = 
                CASE 
                    WHEN ECDC_VARIANTS.[COUNTRY_CODE] = 'EL' THEN 'GR' -- Greece exception, see https://publications.europa.eu/code/pdf/370000en.htm
                    ELSE ECDC_VARIANTS.[COUNTRY_CODE]
                END
        LEFT JOIN -- Add WHO naming of variant codes
            VWSSTATIC.VARIANTS_MAPPING VariantMapped ON
            VariantMapped.DATE_LAST_INSERTED = (SELECT MAX(DATE_LAST_INSERTED) FROM VWSSTATIC.VARIANTS_MAPPING) AND
            VariantMapped.VARIANT_CODE = [VARIANT]
        WHERE
            ECDC_VARIANTS.DATE_LAST_INSERTED = (
                SELECT MAX(DATE_LAST_INSERTED) FROM VWSINTER.ECDC_VARIANTS
            ) AND
            ECDC_VARIANTS.SOURCE IN ('GISAID', 'TESSy') AND -- GISAID OR TESSy
            ECDC_VARIANTS.VALID_DENOMINATOR = 1 AND -- VALID = True/Yes
            ECDC_VARIANTS.[NUMBER_SEQUENCED] > 0
    ),
    ECDC_SOURCE_SELECTION AS
    (
        -- Select source for country and week with highest number sequenced, Either GISAID OR TESSy, preference for GISAID if equal. 
        SELECT
            [COUNTRY],
            [COUNTRY_CODE],
            [YEAR_WEEK],
            CASE 
                WHEN SUM(CASE WHEN SOURCE = 'GISAID' THEN SAMPLE_SIZE ELSE 0 END) >= SUM(CASE WHEN SOURCE = 'TESSy' THEN SAMPLE_SIZE ELSE 0 END) THEN 'GISAID'
                ELSE 'TESSy'
            END                                                         AS [SELECTED_SOURCE]

        FROM
            (
                -- DISTINCT because the ECDC data contains both headers and lines headers (for each country, country code, year week, source, sample_size), lines for each variant : occurance, variant code
                SELECT DISTINCT
                    [COUNTRY],
                    [COUNTRY_CODE],
                    [YEAR_WEEK],
                    SOURCE,
                    SAMPLE_SIZE              
                FROM
                    ECDC_FILTERED
                
            ) UNIQUE_SOURCE_SAMPLES
            GROUP BY
                    [COUNTRY],
                    [COUNTRY_CODE],
                    [YEAR_WEEK]
    )

    -- Insert statement to DEST table
    /*
        Why GROUP BY ? : 
            For most cases, no group by is needed. The group by is added that if multiple VARIANT codes (B1XX, B2xx) map to the same WHO variant ('alpha'). 
            based on the variant mapping table the variant codes will be aggregrated with the STRING AGG function (B1XX, B2xx) and the sum of the occurance is used to calculate the percentage.
        
        Note for adding/modifying the select : 
            If you add /change the select, please copy the code to the group by as well (except for the aggregrated columns), such to keep the consistency of the group by aggregration.
    */

    INSERT INTO VWSDEST.ECDC_VARIANTS
    (
        COUNTRY,
        COUNTRY_CODE,
        YEAR_WEEK,
        WEEK_START,
        WEEK_END,
        SOURCE,
        NEW_CASES,
        SAMPLE_SIZE,
        ECDC_CATEGORY,
        PERCENT_CASES_SAMPLED,
        VARIANT_CODE,
        VARIANT,
        VARIANT_OF_CONCERN,
        OCCURRENCE,
        PERCENT_VARIANT
    )
    SELECT
        ECDC_VARIANTS.COUNTRY,
        ECDC_VARIANTS.COUNTRY_CODE,
        ECDC_VARIANTS.YEAR_WEEK,
        CAST(
            dbo.CONVERT_ISO_WEEK_TO_DATETIME(CAST(LEFT(ECDC_VARIANTS.YEAR_WEEK,4) as int),CAST(RIGHT(ECDC_VARIANTS.YEAR_WEEK,2) as int))
            as date)                                AS [WEEK_START],                 
        DATEADD(DAY,6,CAST(
            dbo.CONVERT_ISO_WEEK_TO_DATETIME(CAST(LEFT(ECDC_VARIANTS.YEAR_WEEK,4) as int),CAST(RIGHT(ECDC_VARIANTS.YEAR_WEEK,2) as int))
            as date)
                )                                   AS [WEEK_END],     
        ECDC_VARIANTS.SOURCE,
        ECDC_VARIANTS.NEW_CASES,
        ECDC_VARIANTS.SAMPLE_SIZE,
        
        -- ECDC levels of reliability ECDC category, for reference only.
        CASE
            WHEN ECDC_VARIANTS.SAMPLE_SIZE >=  500
                THEN 'L1'
            WHEN ECDC_VARIANTS.SAMPLE_SIZE >=  60
                THEN 'L2'
            WHEN ECDC_VARIANTS.SAMPLE_SIZE >=  1
                THEN 'L3' 
            ELSE
                'L4'
        END                                                     AS [ECDC_CATEGORY],

        ECDC_VARIANTS.PERCENT_CASES_SAMPLED,

        STRING_AGG(ECDC_VARIANTS.VARIANT_CODE, ',')             AS [VARIANT_CODE], -- STRING AGG used in case of n codes to 1 mapped variant (see variant mapping table)
        CASE 
            WHEN LEN([VARIANT]) > 0 
                THEN [VARIANT]
            ELSE 'Other'
        END                                                     AS [VARIANT],
        [VARIANT_OF_CONCERN],

        ISNULL(SUM(OCCURRENCE),0)                               AS [OCCURRENCE],
        ISNULL(CAST(100.0 * CAST(SUM(OCCURRENCE) AS [decimal](8, 3)) / ECDC_VARIANTS.SAMPLE_SIZE AS [decimal](8, 1)),0)
                                                    AS PERCENT_VARIANT
    FROM 
        ECDC_FILTERED ECDC_VARIANTS
    
    INNER JOIN -- Only select the source for country and week with highest sample size (GIAID OR TESSy)
        ECDC_SOURCE_SELECTION ON
        ECDC_SOURCE_SELECTION.SELECTED_SOURCE = ECDC_VARIANTS.SOURCE AND
        ECDC_SOURCE_SELECTION.COUNTRY_CODE = ECDC_VARIANTS.COUNTRY_CODE AND
        ECDC_SOURCE_SELECTION.YEAR_WEEK = ECDC_VARIANTS.YEAR_WEEK
    GROUP BY
        ECDC_VARIANTS.COUNTRY,
        ECDC_VARIANTS.COUNTRY_CODE,
        ECDC_VARIANTS.YEAR_WEEK,
        ECDC_VARIANTS.SOURCE,
        ECDC_VARIANTS.NEW_CASES,
        ECDC_VARIANTS.SAMPLE_SIZE,
        ECDC_VARIANTS.PERCENT_CASES_SAMPLED,
        [VARIANT_OF_CONCERN],
        CASE 
            WHEN LEN([VARIANT]) > 0 
                THEN [VARIANT]
            ELSE 'Other'
        END

END;
GO




# **DATATINO CONFIGURATIONS**
---

In [None]:
-- COPYRIGHT (C) 2020 DE STAAT DER NEDERLANDEN, MINISTERIE VAN   VOLKSGEZONDHEID, WELZIJN EN SPORT.
-- LICENSED UNDER THE EUROPEAN UNION PUBLIC LICENCE V. 1.2 - SEE HTTPS://GITHUB.COM/MINVWS/NL-CONTACT-TRACING-APP-COORDINATIONFOR MORE INFORMATION.

-- 1) UPSERT WORKFLOW(S).....
DECLARE @workflow_name NVARCHAR(50) = 'ECDC_VARIANTS',
        @workflow_id INT,        
        @workflow_description VARCHAR(256),
        @is_active INT;

IF NOT EXISTS ( SELECT 1 FROM [DATATINO_ORCHESTRATOR_1].[DATAFLOWS] WHERE NAME LIKE @workflow_name )
BEGIN
    -- 1.1) SET ENVIRONMENTAL VARIABLES.....
    SET @is_active = CASE LOWER('#{ Environment }#')
        WHEN 'production' THEN 1
        WHEN 'acceptance' THEN 1
        ELSE 1
    END;

    SELECT TOP(1)
        @workflow_id = workflows.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[WORKFLOWS] workflows
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_WORKFLOWS] v_workflows ON v_workflows.[DATAFLOW_ID] = workflows.[DATAFLOW_ID] AND v_workflows.[ID] = workflows.[ID]
    WHERE v_workflows.[NAME] = @workflow_name;

    SET @workflow_description = CONCAT('WORKFLOW: ', @workflow_name);

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_WORKFLOW]
        @id = @workflow_id, 
        @workflow_name = @workflow_name,
        @description = @workflow_description,
        @schedule = '0 13 * * THU', -- At 01:00 PM, only on Thursday
        @active = @is_active;

    -- 2) UPSERT SOURCE(S).....
    DECLARE @source NVARCHAR(256),
            @source_id INT,
            @source_name NVARCHAR(256),
            @source_description NVARCHAR(256),
            @target_name VARCHAR(256),
            @location_type VARCHAR(50),
            @security_profile VARCHAR(50);

    -- 2.1) SET ENVIRONMENTAL VARIABLES.....
    SET @source = CASE LOWER('#{ Environment }#')
        WHEN 'production' THEN 'https://opendata.ecdc.europa.eu/covid19/virusvariant/csv/data.csv'
        WHEN 'acceptance' THEN 'https://opendata.ecdc.europa.eu/covid19/virusvariant/csv/data.csv'
        ELSE 'https://opendata.ecdc.europa.eu/covid19/virusvariant/csv/data.csv'
    END;

    SET @location_type = CASE LOWER('#{ Environment }#')
        WHEN 'production' THEN 'Web'
        WHEN 'acceptance' THEN 'Web'
        ELSE 'Web'
    END;

    SET @security_profile = CASE LOWER('#{ Environment }#')
        WHEN 'production' THEN 'N/A'
        WHEN 'acceptance' THEN 'N/A'
        ELSE 'N/A'
    END;

    -- 2.2) UPSERT TABLE SOURCE(S): STAGING......
    SET @source_name = CONCAT('SOURCE_', @workflow_name);
    SET @source_description = CONCAT('LOAD: ', @workflow_name);
    SET @target_name = CONCAT('VWSSTAGE.', @workflow_name);

    SELECT TOP(1)
        @source_id = [ID]
    FROM [DATATINO_ORCHESTRATOR_1].[SOURCES]
    WHERE [NAME] = @source_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_SOURCE]
        @id = @source_id,
        @source_name = @source_name,
        @description = @source_description,
        @source = @source,
        @source_columns = 'country|country_code|year_week|source|new_cases|number_sequenced|percent_cases_sequenced|valid_denominator|variant|number_detections_variant|percent_variant',
        @target_columns = 'COUNTRY|COUNTRY_CODE|YEAR_WEEK|SOURCE|NEW_CASES|NUMBER_SEQUENCED|PERCENT_CASES_SEQUENCED|VALID_DENOMINATOR|VARIANT|NUMBER_DETECTIONS_VARIANT|PERCENT_VARIANT|DATE_LAST_INSERTED=GETDATE',
        @target_name = @target_name,
        @source_type = 'CsvFile',
        @location_type = @location_type,
        @delimiter_type = 'Colon',
        @security_profile= @security_profile;

    -- 2.3) UPSERT STORED PROCEDURE SOURCE(S): STAGE TO INTER......
    SET @source = CONCAT('dbo.SP_', @workflow_name, '_STAGE_TO_INTER');
    SET @source_name = CONCAT('SOURCE_SP_', @workflow_name, '_STAGE_TO_INTER');
    SET @source_description = CONCAT('MAP: ', @workflow_name, ' FROM STAGE TO INTER');

    SELECT TOP(1)
        @source_id = [ID]
    FROM [DATATINO_ORCHESTRATOR_1].[SOURCES]
    WHERE [NAME] = @source_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_SOURCE]
        @id = @source_id,
        @source_name = @source_name,
        @description = @source_description,
        @source = @source,
        @source_columns = null,
        @target_columns = null,
        @target_name = null,
        @source_type = 'StoredProcedure',
        @location_type = 'N/A',
        @delimiter_type = 'N/A',
        @security_profile= @security_profile;

    -- 2.4) UPSERT STORED PROCEDURE SOURCE(S): INTER TO DEST.....
    SET @source = CONCAT('dbo.SP_', @workflow_name, '_INTER_TO_DEST');
    SET @source_name = CONCAT('SOURCE_SP_', @workflow_name, '_INTER_TO_DEST');
    SET @source_description = CONCAT('MAP: ', @workflow_name, ' FROM INTER TO DEST');

    SELECT TOP(1)
        @source_id = [ID]
    FROM [DATATINO_ORCHESTRATOR_1].[SOURCES]
    WHERE [NAME] = @source_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_SOURCE]
        @id = @source_id,
        @source_name = @source_name,
        @description = @source_description,
        @source = @source,
        @source_columns = null,
        @target_columns = null,
        @target_name = null,
        @source_type = 'StoredProcedure',
        @location_type = 'N/A',
        @delimiter_type = 'N/A',
        @security_profile= @security_profile;

    -- 3) UPSERT PROCESS(ES).....
    DECLARE @process_id INT,
            @process_name NVARCHAR(256),
            @process_description NVARCHAR(256),
            @process_source_name VARCHAR(256);

    -- 3.1) UPSERT TABLE PROCESS(ES): STAGING......
    SET @process_name = CONCAT('PROCESS_', @workflow_name);
    SET @process_description = CONCAT('LOAD: ', @workflow_name);
    SET @process_source_name = CONCAT('SOURCE_', @workflow_name);

    SELECT TOP(1)
        @process_id = processes.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[PROCESSES] processes
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_WORKFLOWS] v_workflows ON v_workflows.[ID] = processes.[WORKFLOW_ID]
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_PROCESSES] v_processes ON v_processes.[PROCESS_ID] = processes.[ID]
    WHERE v_processes.[PROCESS_NAME] = @process_name 
        AND v_workflows.[NAME] = @workflow_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_PROCESS]
        @id = @process_id,
        @process_name = @process_name,
        @description = @process_description,
        @source_name = @process_source_name,
        @schedule = '* * * * *',
        @workflow_name = @workflow_name,
        @active = 1;

    -- 3.2) UPSERT STORED PROCEDURE PROCESS(S): STAGE TO INTER......
    SET @process_name = CONCAT('PROCESS_SP_', @workflow_name, '_STAGE_TO_INTER');
    SET @process_description = CONCAT('MAP: ', @workflow_name, ' FROM STAGE TO INTER');
    SET @process_source_name = CONCAT('SOURCE_SP_', @workflow_name, '_STAGE_TO_INTER');

    SELECT TOP(1)
        @process_id = processes.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[PROCESSES] processes
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_WORKFLOWS] v_workflows ON v_workflows.[ID] = processes.[WORKFLOW_ID]
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_PROCESSES] v_processes ON v_processes.[PROCESS_ID] = processes.[ID]
    WHERE v_processes.[PROCESS_NAME] = @process_name 
        AND v_workflows.[NAME] = @workflow_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_PROCESS]
        @id = @process_id,
        @process_name = @process_name,
        @description = @process_description,
        @source_name = @process_source_name,
        @schedule = '* * * * *',
        @workflow_name = @workflow_name,
        @active = 1;

    -- 3.3) UPSERT STORED PROCEDURE PROCESS(S): INTER TO DEST......
    SET @process_name = CONCAT('PROCESS_SP_', @workflow_name, '_INTER_TO_DEST');
    SET @process_description = CONCAT('MAP: ', @workflow_name, ' FROM INTER TO DEST');
    SET @process_source_name = CONCAT('SOURCE_SP_', @workflow_name, '_INTER_TO_DEST');

    SELECT TOP(1)
        @process_id = processes.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[PROCESSES] processes
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_WORKFLOWS] v_workflows ON v_workflows.[ID] = processes.[WORKFLOW_ID]
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_PROCESSES] v_processes ON v_processes.[PROCESS_ID] = processes.[ID]
    WHERE v_processes.[PROCESS_NAME] = @process_name 
        AND v_workflows.[NAME] = @workflow_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_PROCESS]
        @id = @process_id,
        @process_name = @process_name,
        @description = @process_description,
        @source_name = @process_source_name,
        @schedule = '* * * * *',
        @workflow_name = @workflow_name,
        @active = 1;

    -- 4) UPSERT DEPENDENC(Y)(IES).....
    DECLARE @dependency_id INT,
            @dependency_name NVARCHAR(256),
            @dependency_description NVARCHAR(256),
            @dependency_dataflow_name NVARCHAR(256),
            @dependency_process_name NVARCHAR(256);

    -- 4.1) UPSERT STAGING TO STORE PROCEDURE DEPENDENC(Y)(IES).....
    SET @dependency_name = CONCAT('DEPENDENCY_', @workflow_name,'_STAGE_TO_INTER');
    SET @dependency_dataflow_name = CONCAT('PROCESS_SP_', @workflow_name, '_STAGE_TO_INTER');
    SET @dependency_process_name = CONCAT('PROCESS_', @workflow_name);
    SET @dependency_description = CONCAT('TRIGGER ', @dependency_dataflow_name,' AFTER ', @dependency_process_name ,' HAS FINISHED');

    SELECT TOP(1)
        @dependency_id = dependencies.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[DEPENDENCIES] dependencies
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_DEPENDENCIES] v_dependencies ON v_dependencies.[ID] = dependencies.[ID]
    WHERE dependencies.[NAME] = @dependency_name
        AND v_dependencies.[WORKFLOW_NAME] = @workflow_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_DEPENDENCY]
        @id = @dependency_id,
        @dataflow_name = @dependency_dataflow_name,
        @dataflowtype_id = 2,
        @dependency_name = @dependency_process_name,
        @dependencytype_id = 2,
        @workflow_name = @workflow_name,
        @name = @dependency_name,
        @description = @dependency_description,
        @active = 1;

    -- 4.2) UPSERT STORED PROCEDURE TO STORED PROCEDURE DEPENDENC(Y)(IES)......
    SET @dependency_name = CONCAT('DEPENDENCY_', @workflow_name,'_INTER_TO_DEST');
    SET @dependency_dataflow_name = CONCAT('PROCESS_SP_', @workflow_name, '_INTER_TO_DEST');
    SET @dependency_process_name = CONCAT('PROCESS_SP_', @workflow_name, '_STAGE_TO_INTER');
    SET @dependency_description = CONCAT('TRIGGER ', @dependency_dataflow_name,' AFTER ', @dependency_process_name ,' HAS FINISHED');

    SELECT TOP(1)
        @dependency_id = dependencies.[ID]
    FROM [DATATINO_ORCHESTRATOR_1].[DEPENDENCIES] dependencies
    INNER JOIN [DATATINO_ORCHESTRATOR_1].[V_DEPENDENCIES] v_dependencies ON v_dependencies.[ID] = dependencies.[ID]
    WHERE dependencies.[NAME] = @dependency_name
        AND v_dependencies.[WORKFLOW_NAME] = @workflow_name;

    EXECUTE [DATATINO_ORCHESTRATOR_1].[UPSERT_DEPENDENCY]
        @id = @dependency_id,
        @dataflow_name = @dependency_dataflow_name,
        @dataflowtype_id = 2,
        @dependency_name = @dependency_process_name,
        @dependencytype_id = 2,
        @workflow_name = @workflow_name,
        @name = @dependency_name,
        @description = @dependency_description,
        @active = 1;

END