In [0]:
%sh
if ! [[ "18.04 20.04 22.04 23.04 24.04" == *"$(lsb_release -rs)"* ]];
then
    echo "Ubuntu $(lsb_release -rs) is not currently supported.";
    exit;
fi

# Add the signature to trust the Microsoft repo
# For Ubuntu versions < 24.04 
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
# For Ubuntu versions >= 24.04
curl https://packages.microsoft.com/keys/microsoft.asc | sudo gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg

# Add repo to apt sources
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list

# Install the driver
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
# optional: for bcp and sqlcmd
sudo ACCEPT_EULA=Y apt-get install -y mssql-tools18
echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc
source ~/.bashrc
# optional: for unixODBC development headers
sudo apt-get install -y unixodbc-dev

In [0]:
%run /Workspace/EDP-Engineering/Setup_Environment

In [0]:
# DatabaseAdapter class
import re
import pandas as pd
import pyodbc
name_regex = re.compile('[^A-Za-z0-9_]')
class DatabaseAdapter(object):
    """
    Handles both generic and feature specific database interactions
    """

    def _split_conn_string(conn_string: str):
        retval = {}
        if conn_string != None and len(conn_string) > 0 and ";" in conn_string and "=" in conn_string:
            # parts = conn_string.split(';')
            recs = [x.split('=', 1)
                    for x in conn_string.split(';') if len(x) > 0]
            retval = dict([(key.lower().strip(), value.strip()) for key, value in recs])
        return retval

    def __init__(self, dbConnString: str) -> None:
        """
        Initialize an instance of the adapter with an ODBC connection string
        
        params: 
        dbConnString: str - A ODBC compliant connection string
        """
        if dbConnString is None:
            raise ValueError(
                "dbConnString was empty but must contain a valid database connection string")
        self.connString = dbConnString
        # self.jdbcjarPath=jdbcjarPath
        connDict = DatabaseAdapter._split_conn_string(dbConnString)
        self.driver = connDict['driver']

        if "server" in connDict.keys():
            server_parts = connDict['server'].split(',')
        else:
            server_parts = connDict['data source'].split(',')
        self.server = server_parts[0].replace('tcp:', '')
        if len(server_parts) > 1:
            port = server_parts[1]
        else:
            port = "1433"
        self.database = connDict['database']
        self.uid = connDict['uid']
        if 'port' in connDict.keys() and len(connDict['port']) > 0:
            self.port = connDict['port']
        else:
            self.port = 1433

        self.conn: pyodbc.Connection = None  # self.__openDBConn()

    def __openDBConn(self):
        try:
            if (self.conn is not None):
                return self.conn

            conn = pyodbc.connect(self.connString, autocommit=True)
            self.conn = conn
            return self.conn

        except Exception as e:
            logger.error("Couldn't connect to %s:%s with user %s", self.server, self.database, self.uid, exc_info=e)
            raise ConnectionError(
                "Database connection has failed. Please review!")

    def getConn(self):
        return self.__openDBConn()

    def closeDBConn(self):
        if self.conn != None:
            self.conn.close()
        self.conn = None

    def __del__(self):
        self.closeDBConn()

    def query_results_fetchall(self, queryStatement: str, closeConn=False):
        """
        Execute the provided query in the metadata repository
        :param queryStatement: the sql query/statement to be executed
        :param db_conn_dict: a dictionary containing values required to connect to the database. 
                             Required keys are: driver,url,password,user,databaseName
        :return: return an array of results
        """
        if queryStatement is None or len(queryStatement) == 0:
            raise ValueError("queryStatement was empty but is required")

        try:
            if (self.conn == None):
                self.__openDBConn()

            curs = self.conn.cursor()
            curs.execute(queryStatement)
            return_objects = curs.fetchall()
            curs.close()
            return return_objects
        except Exception as e:
            errmsg = 'An error occured while attempting query: {}.'.format(
                queryStatement)
            logger.error(errmsg, exc_info=e)
            if (self.conn != None):
                self.conn.close()
                self.conn = None
            raise
        finally:
            if closeConn:
                self.closeDBConn()

    def query_results_fetchone(self, queryStatement: str, closeConn=False):
        """
        Execute the provided query in the metadata repository
        :param queryStatement: the sql query/statement to be executed
        :param db_conn_dict: a dictionary containing values required to connect to the database. 
                             Required keys are: driver,url,password,user,databaseName
        :return: return an array of results
        """
        if queryStatement is None or len(queryStatement) == 0:
            raise ValueError("queryStatement was empty but is required")

        try:
            if (self.conn == None):
                self.__openDBConn()

            curs = self.conn.cursor()
            curs.execute(queryStatement)
            return_objects = curs.fetchone()
            curs.close()
            return return_objects
        except Exception as e:
            errmsg = 'An error occured while attempting query: {}.'.format(
                queryStatement)
            logger.error(errmsg, exc_info=e)
            if (self.conn != None):
                self.conn.close()
                self.conn = None
            raise
        finally:
            if closeConn:
                self.closeDBConn()

    def execute_query(self, queryStatement: str, closeConn=False):
        """
        Execute the provided query in the metadata repository
        :param queryStatement: the sql query/statement to be executed
        :param db_conn_dict: a dictionary containing values required to connect to the database. 
                             Required keys are: driver,url,password,user,databaseName
        :return: return an array of results
        """
        if queryStatement is None or len(queryStatement) == 0:
            raise ValueError("queryStatement was empty but is required")

        try:
            if (self.conn == None):
                self.__openDBConn()

            curs = self.conn.cursor()
            if len(queryStatement.strip()) > 0:  # exclude new lines
                curs.execute(queryStatement)
                curs.nextset()  # added 08212023 Bug#40624

            curs.close()
        except Exception as e:
            errmsg = 'An error occured while attempting query: {}.'.format(
                queryStatement)
            logger.error(errmsg, exc_info=e)
            if (self.conn != None):
                self.conn.close()
                self.conn = None
            raise
        finally:
            if closeConn:
                self.closeDBConn()
            
    def execute_query_batch(self, queryStatements: str, closeConn=False):
        """
        Executes the Batch of queries separated by 'GO\n'  TSQL applicable only        
        """

        if queryStatements is None or len(queryStatements.strip()) == 0:
            raise ValueError("queryStatement was empty but is required")

        if (self.conn == None):
            self.__openDBConn()
        curs = self.conn.cursor()
        sqlbatchlist = queryStatements.encode(
            "ascii", errors="ignore").decode().split('GO\n')
        try:
            for sqlstmt in sqlbatchlist:
                try:
                    if len(sqlstmt.strip()) > 0:  # exclude new lines
                        curs.execute(sqlstmt)
                except Exception as e:
                    errmsg = 'An error occured while attempting query: {}.'.format(
                        sqlstmt)
                    logger.error(errmsg, exc_info=e)
                    raise
        finally:
            curs.close()
            if closeConn:
                self.closeDBConn()

    def getDF(self, selectSQL):
        if (self.conn is None):
            self.__openDBConn()
        df = pd.read_sql(selectSQL, self.conn)  # to be removed
        return df


In [0]:
%python
# Install the required package
%pip install opencensus

In [0]:
def create_select_statement(source_name: str, table_name: str):
    sql = f"""SELECT DISTINCT COLUMN_NAME, DATA_TYPE
              FROM INFORMATION_SCHEMA.COLUMNS
              WHERE TABLE_NAME = '{table_name}'
              AND TABLE_SCHEMA = '{source_name}_base'"""
    df = synapse_adapter.getDF(sql)
    

    casts = []
    for row in df.itertuples():
        if row.DATA_TYPE.lower() == "uniqueidentifier":
            cast_query = f"CAST({row.COLUMN_NAME} AS VARCHAR(36)) AS {row.COLUMN_NAME}"
        else:
            cast_query = f"{row.COLUMN_NAME}"
        casts.append(cast_query)

    select_clause = ", ".join(casts)
    return select_clause

def create_external_synapse_table(source_name: str, table: str):
    location = f'edp/migration/{source_name.lower()}/{table.lower()}/'
    create_schema_statement = f"""if schema_id('migration_{source_name}_ext') is NULL
    	    exec sp_executesql N'create schema [migration_{source_name}_ext]'"""
    synapse_adapter.execute_query(create_schema_statement)

    drop_statement = f"""if object_id('[migration_{source_name}_ext].[{table}]') is not null
    	drop external table [migration_{source_name}_ext].[{table}]"""
    synapse_adapter.execute_query(drop_statement)

    dbutils.fs.rm(f"/mnt/{location}", recurse=True)
    
    logger.info("Creating external table for %s_base.%s", source_name, table)
    create_statement = f"""CREATE EXTERNAL TABLE [migration_{source_name}_ext].[{table}] WITH (
                LOCATION = '{location}',
                DATA_SOURCE = [AzureDataLakeStore_mainstore],
                FILE_FORMAT = [parquetfile_snappy]
        ) AS
        SELECT
            {create_select_statement(source_name, table)}
        FROM [{source_name}_base].[{table}]"""

    #execute sql on Synapse
    synapse_adapter.execute_query(create_statement)

#%restart_python
import os
import logging
from ELTFramework.Library.globals import environment
logger = logging.getLogger("migration")
logger.setLevel(logging.INFO)
print(environment)
#setup environment variables
if environment == "dev":
        edpHostname = "sen-edp-dev-sdw-01.database.windows.net"
        edpDatabase = "sen-edp-sdw-sqlpool-01"
        catalog = "edp_dev"
        adls_storage_account = "sedpdevedpadls01"
        keyVaultName = "seedpdevedpadmkv01"
elif environment == "test":
        edpHostname = "edpsdwqa.database.windows.net"
        edpDatabase = "edpsdwqa"
        catalog = "edp_test"
        keyVaultName = "seedptestedpadmkv01"
elif environment == "prod":
        edpHostname = "edpsdwprod.database.windows.net"
        edpDatabase = "edpsdwprod"
        catalog = "edp"
        keyVaultName = "seedpprodedpadmkv01"
secrets_scope = "primary-kv"

edpsdw_etl_user = dbutils.secrets.get(scope=secrets_scope, key="etl-user-name")
edpsdw_etl_password = dbutils.secrets.get(scope=secrets_scope, key="etl-user-pass")
edpsdw_conn_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server=tcp:{edpHostname},1433;Database={edpDatabase};Uid={edpsdw_etl_user};Pwd={edpsdw_etl_password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;"
synapse_adapter = DatabaseAdapter(edpsdw_conn_string)
edpprocmgmt_conn_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server=tcp:{edpHostname},1433;Database=EDPProcMgmt;Uid={edpsdw_etl_user};Pwd={edpsdw_etl_password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;"
edpprocmgmt_adapter = DatabaseAdapter(edpprocmgmt_conn_string)

In [0]:
from datetime import datetime
from pyspark.sql.functions import current_user, col, to_utc_timestamp
from pyspark.sql.types import StringType,DateType,TimestampType
import json
current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')

#get the list of tables to pull by trigger if provided, or using the array of objects
event_trigger_name = dbutils.widgets.get("event_trigger_name")
if len(event_trigger_name) > 0 and event_trigger_name != 'None':
    tables_to_migrate = edpprocmgmt_adapter.query_results_fetchall(f"select object_metadata_id, source_name, table_name from ETL.vActiveTablesToLoad where Event_Trigger_Name='{event_trigger_name}' and Sdw_Load_Strategy <> 'truncate'")
    keys = ['object_metadata_id', 'source_name', 'table_name']
    tables_to_migrate = [dict(zip(keys, item)) for item in tables_to_migrate]

else:
    tables_to_migrate = dbutils.widgets.get("objects_to_process")
    logger.info("trying to parse %s from data type %s", tables_to_migrate, type(tables_to_migrate))
    if len(tables_to_migrate) > 0 and tables_to_migrate != 'None':
        tables_to_migrate = json.loads(tables_to_migrate)
    else:
        raise ValueError("Expecting either event_trigger_name or objects_to_process to be pass to this notebook.")

#create an external table in synapse
for table in tables_to_migrate:
    table_exported = False
    object_metadata_id = table['object_metadata_id']
    source_name = table['source_name'].lower()
    table_name = table['table_name'].lower()
    try:
        create_external_synapse_table(source_name, table_name)
        table_exported = True
        # metadata_adapter.log_table_load_starting(source_name, '', table['table'])
    except Exception as e:
        logger.error("Error creating external table for %s.%s", source_name, table_name, exc_info=e)

    # Retrieve original column types from Synapse
    query = f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = '{source_name}_base' AND table_name = '{table_name}'
    """    
    columns = synapse_adapter.query_results_fetchall(query)
    # Identify UNIQUEIDENTIFIER columns
    uniqueidentifier_columns = {column_name.lower() for column_name, data_type in columns if data_type.lower() == "uniqueidentifier"} 
    # Identify DATE columns
    date_columns = {column_name.lower() for column_name, data_type in columns if data_type.lower() == "date"}      

    rows_written_parquet = 0
    #load the data to Unity Catalog
    try:
        table_loaded = False
        if table_exported:
            df = spark.read.parquet(f"/mnt/edp/migration/{source_name}/{table_name}/")
             # Build the list of columns with transformations defined based on the data type
            transformed_columns = []
            for field in df.schema.fields:
                field_name = field.name.lower()
                field_type = field.dataType.simpleString()


                if field_name in date_columns:
                    if field_type == "timestamp":
                        # Convert TIMESTAMP back to DATE
                        transformed_columns.append(col(field_name).cast(DateType()).alias(field_name))
                        
                    else: 
                        transformed_columns.append(col(field_name))

                elif field_type =="timestamp":
                    # Convert from Eastern Time to UTC
                    transformed_columns.append(
                        to_utc_timestamp(col(field_name), "America/New_York").alias(field_name)
                    )
                elif field_name in uniqueidentifier_columns:  
                    transformed_columns.append(col(field_name).cast(StringType()).alias(field_name))         
                else:
                    transformed_columns.append(col(field_name))

            # Select all transformed columns at once to optimize the DataFrame transformation
            df = df.select(*transformed_columns)
            create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {catalog}.{source_name}_base"
            spark.sql(create_schema_sql)
            df.write.format("delta").mode("overwrite").option("overwriteSchema", "True").saveAsTable(f"{catalog}.{source_name}_base.{table_name}")
            table_loaded = True
            rows_written_parquet = df.count()
            logger.info("wrote %d rows to %s.%s_base.%s", rows_written_parquet, catalog, source_name, table_name)
    except Exception as e:
        logger.error("Error creating table %s.%s_base.%s", catalog, source_name, table_name, exc_info=e)

    try:
        if table_loaded:
            edpprocmgmt_adapter.execute_query(f"insert into dataops.Databricks_Bronze_Migration_Tracker (Object_Metadata_Id,Source_Row_Count,Target_Row_Count,Schema_Match,Data_Match,Row_Count_Match,Test_Status) values({object_metadata_id}, null, {rows_written_parquet}, 0, 0, 0, 'initial extract from synapse complete')")
        else:
            edpprocmgmt_adapter.execute_query(f"insert into dataops.Databricks_Bronze_Migration_Tracker (Object_Metadata_Id,Source_Row_Count,Target_Row_Count,Schema_Match,Data_Match,Row_Count_Match,Test_Status) values({object_metadata_id}, null, {rows_written_parquet}, 0, 0, 0, 'initial extract from synapse failed')")
    except Exception as e:
        logger.error("Error writing to tracker for table %s.%s_base.%s", catalog, source_name, table_name, exc_info=e)