In [6]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor
from typing import Union, List, Tuple, Any
# from tabulate import tabulate

class AuditLog:
    
    def __init__(self, columns: Union[List[str], Tuple[str]], WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        self.WS_ID = WS_ID
        self.TABLE_NAME_to_check = TABLE_NAME_to_check
        self.AUDIT_TABLE_NAME = AUDIT_TABLE_NAME
        self.LH_ID_to_check = LH_ID_to_check
        self.LH_ID_audit = LH_ID_audit if LH_ID_audit else LH_ID_to_check
        self.schema = schema
        self.fixColumns = {'STARTTIME','ENDTIME','AUDITKEY','STATUS_ACTIVITY'}
        self.columns = tuple(set(columns).union(self.fixColumns))
        
        if self.schema:    
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.schema}/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.schema}/{self.TABLE_NAME_to_check}'
        else:
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.TABLE_NAME_to_check}'
    
        self.log = {column: None for column in self.columns}
        self.log['STARTTIME'] = datetime.now() + timedelta(hours=7)
        self.log['STATUS_ACTIVITY'] = 'Not start'
        
    def initialDetail(self, initConfig: dict[str, Any]):
        assert set(initConfig.keys()).issubset(set(self.columns).difference()), f'initConfig must have the columns in {self.columns}'
        for column in initConfig:
            self.log[column] = initConfig[column]
        
    def __str__(self):
        return str(self.log)
    
class AuditLog_SPC(AuditLog):
    
    def __init__(self, WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        super().__init__(['PIPELINENAME', 'PIPELINERUNID', 'TRIGGERTYPE', 'TABLE_NAME', 'FUNCTION_NAME','COUNTROWSBEFORE', 'COUNTROWSAFTER', 'ERRORCODE', 'ERRORMESSAGE'] ,WS_ID, TABLE_NAME_to_check, AUDIT_TABLE_NAME, LH_ID_to_check, LH_ID_audit, schema)

    def initialDetail(self,  pipelineName: str, pipelineId, TriggerType, TableName, functionName, ):
        super().initialDetail({'PIPELINENAME': pipelineName, 'PIPELINERUNID': pipelineId, 'TRIGGERTYPE': TriggerType, 'TABLE_NAME': TableName, 'FUNCTION_NAME': functionName})

In [None]:
ad = AuditLog_SPC('SPC_UAT', 'factTest', 'auditTable', 'SilverLH', 'AuditLH')
ad.initialDetail('testPL', '123', 'manual', 'factTest', 'testFunction')

{'STATUS_ACTIVITY': 'Not start', 'TRIGGERTYPE': None, 'ERRORCODE': None, 'STARTTIME': datetime.datetime(2025, 1, 18, 4, 50, 58, 114075), 'ENDTIME': None, 'PIPELINERUNID': None, 'TABLE_NAME': None, 'ERRORMESSAGE': None, 'PIPELINENAME': None, 'AUDITKEY': None, 'COUNTROWSBEFORE': None, 'COUNTROWSAFTER': None, 'FUNCTION_NAME': None}


In [14]:
class FrozenKeysDict:
    def __init__(self, **kwargs):
        self._data = kwargs

    def __getitem__(self, key):
        return self._data[key]

    def __setitem__(self, key, value):
        if key not in self._data:
            raise KeyError(f"Cannot add new key: {key}")
        self._data[key] = value

    def __delitem__(self, key):
        raise KeyError(f"Cannot delete key: {key}")

    def __iter__(self):
        return iter(self._data)

    def __len__(self):
        return len(self._data)

    def __repr__(self):
        return repr(self._data)

In [18]:
log = FrozenKeysDict(**{column: None for column in ['PIPELINENAME', 'PIPELINERUNID', 'TRIGGERTYPE', 'TABLE_NAME', 'FUNCTION_NAME','COUNTROWSBEFORE', 'COUNTROWSAFTER', 'ERRORCODE', 'ERRORMESSAGE']})

In [21]:
log['PIPELINENAME'] = 'xxx'

In [22]:
log

{'PIPELINENAME': 'xxx', 'PIPELINERUNID': None, 'TRIGGERTYPE': None, 'TABLE_NAME': None, 'FUNCTION_NAME': None, 'COUNTROWSBEFORE': None, 'COUNTROWSAFTER': None, 'ERRORCODE': None, 'ERRORMESSAGE': None}

In [23]:
log['new']='yyy'

KeyError: 'Cannot add new key: new'

In [48]:
class logger:
    def __init__(self, **kwargs):
        self._data = kwargs

    def __getitem__(self, key):
        return self._data[key]

    def __setitem__(self, key, value):
        if key not in self._data:
            raise KeyError(f"Cannot add new key: {key}")
        self._data[key] = value

    def __delitem__(self, key):
        raise KeyError(f"Cannot delete key: {key}")

    def __iter__(self):
        return iter(self._data)

    def __len__(self):
        return len(self._data)

    def __repr__(self):
        return repr(self._data)

class AuditLog:
    
    def __init__(self, columns: Union[List[str], Tuple[str, ...]], WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        self.WS_ID = WS_ID
        self.TABLE_NAME_to_check = TABLE_NAME_to_check
        self.AUDIT_TABLE_NAME = AUDIT_TABLE_NAME
        self.LH_ID_to_check = LH_ID_to_check
        self.LH_ID_audit = LH_ID_audit if LH_ID_audit else LH_ID_to_check
        self.schema = schema
        self.fixColumns = {'STARTTIME','ENDTIME','AUDITKEY','STATUS_ACTIVITY'}
        self.columns = tuple(set(columns).union(self.fixColumns))
        
        if self.schema:    
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.schema}/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.schema}/{self.TABLE_NAME_to_check}'
        else:
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.TABLE_NAME_to_check}'
    
        self.log = logger(**{column: None for column in self.columns})
        self.log['STARTTIME'] = datetime.now() + timedelta(hours=7)
        self.log['STATUS_ACTIVITY'] = 'Not start'

    def setKey(self, initConfig: dict[str, Any]):
        assert set(initConfig.keys()).issubset(set(self.columns).difference()), f'initConfig must have the columns in {self.columns}'
        for column in initConfig:
            self.log[column] = initConfig[column]
        
    def initialDetail(self, initConfig: dict[str, Any]):
        self.setKey(initConfig)

    def getKey(self):
        return self.columns
    
    def getLog(self):
        return self.log
        
    def __str__(self):
        out = ''
        for key in self.columns:
            out += f'{key}: {self.log[key]}\n'
        return out
    
    def __repr__(self):
        return str(self.log)


class AuditLog_SPC(AuditLog):
    
    def __init__(self, WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        super().__init__(['PIPELINENAME', 'PIPELINERUNID', 'TRIGGERTYPE', 'TABLE_NAME', 'FUNCTION_NAME','COUNTROWSBEFORE', 'COUNTROWSAFTER', 'ERRORCODE', 'ERRORMESSAGE'] ,WS_ID, TABLE_NAME_to_check, AUDIT_TABLE_NAME, LH_ID_to_check, LH_ID_audit, schema)

    def initialDetail(self,  pipelineName: str, pipelineId, TriggerType, TableName, functionName, ):
        super().initialDetail({
            'PIPELINENAME': pipelineName, 
            'PIPELINERUNID': pipelineId, 
            'TRIGGERTYPE': TriggerType, 
            'TABLE_NAME': TableName, 
            'FUNCTION_NAME': functionName
        })

{'STATUS_ACTIVITY': 'Not start', 'TRIGGERTYPE': 'manual', 'ERRORCODE': None, 'STARTTIME': datetime.datetime(2025, 1, 18, 20, 19, 37, 815088), 'ENDTIME': None, 'PIPELINERUNID': '123', 'TABLE_NAME': 'factTest', 'ERRORMESSAGE': None, 'PIPELINENAME': 'testPL', 'AUDITKEY': None, 'COUNTROWSBEFORE': None, 'COUNTROWSAFTER': None, 'FUNCTION_NAME': 'testFunction'}

In [51]:
print(ad)

STATUS_ACTIVITY: Not start
TRIGGERTYPE: manual
ERRORCODE: None
STARTTIME: 2025-01-18 20:19:37.815088
ENDTIME: None
PIPELINERUNID: 123
TABLE_NAME: factTest
ERRORMESSAGE: None
PIPELINENAME: testPL
AUDITKEY: None
COUNTROWSBEFORE: None
COUNTROWSAFTER: None
FUNCTION_NAME: testFunction



In [79]:
import numpy as np
from pyspark.sql.functions import col, when, concat, lit, format_string,sum, upper, substring, expr, current_date, current_timestamp,to_timestamp,concat_ws, isnull, date_format, asc, trim, trunc, date_sub, year,coalesce, count, countDistinct, min, max
from pyspark.sql.types import IntegerType, DecimalType, StringType, LongType, TimestampType, StructType, StructField, DoubleType, FloatType
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor
from typing import Union, List, Tuple, Any
# from tabulate import tabulate

spark = SparkSession.builder\
        .appName("utils")\
        .getOrCreate()

class logger:
    def __init__(self, **kwargs):
        self._data = kwargs

    def __getitem__(self, key):
        return self._data[key]

    def __setitem__(self, key, value):
        if key not in self._data:
            raise KeyError(f"Cannot add new key: {key}")
        self._data[key] = value

    def __delitem__(self, key):
        raise KeyError(f"Cannot delete key: {key}")

    def __iter__(self):
        return iter(self._data)

    def __len__(self):
        return len(self._data)

    def __repr__(self):
        return repr(self._data)
    
# class Audit:
#     def __init__(self):
#         raise NotImplementedError('This is an abstract class')
    
#     def setKeys(self, initConfig: dict[str, Any]):
#         assert set(initConfig.keys()).issubset(set(self.columns).difference()), f'initConfig must have the columns in {self.columns}'
#         for column in initConfig:
#             self.log[column] = initConfig[column]

        

class AuditLog_Fusion:
    
    def __init__(self, columns: Union[List[str], Tuple[str, ...]], WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        self.WS_ID = WS_ID
        self.TABLE_NAME_to_check = TABLE_NAME_to_check
        self.AUDIT_TABLE_NAME = AUDIT_TABLE_NAME
        self.LH_ID_to_check = LH_ID_to_check
        self.LH_ID_audit = LH_ID_audit if LH_ID_audit else LH_ID_to_check
        self.schema = schema
        self.fixColumns = {'STARTTIME','ENDTIME','AUDITKEY','STATUS_ACTIVITY'}
        self.columns = tuple(set(columns).union(self.fixColumns))
        
        if self.schema:    
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.schema}/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.schema}/{self.TABLE_NAME_to_check}'
        else:
            self.PATH_TO_AUDIT_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_audit}/Tables/{self.AUDIT_TABLE_NAME}'
            self.PATH_TO_CHECKED_TABLE = f'abfss://{self.WS_ID}@onelake.dfs.fabric.microsoft.com/{self.LH_ID_to_check}/Tables/{self.TABLE_NAME_to_check}'
    
        self.log = logger(**{column: None for column in self.columns})
        self.log['STARTTIME'] = datetime.now()
        self.log['STATUS_ACTIVITY'] = 'Not start'

    def setKeys(self, initConfig: dict[str, Any]):
        assert set(initConfig.keys()).issubset(set(self.columns).difference()), f'initConfig must have the columns in {self.columns}'
        for column in initConfig:
            self.log[column] = initConfig[column]

    def setKey(self, key: str, value: Any):
        assert key in self.columns, f'key must be in {self.columns}'
        self.log[key] = value
        
    def initialDetail(self, initConfig: dict[str, Any]):
        self.setKeys(initConfig)

    def getKey(self):
        return self.columns
    
    def getLog(self):
        return self.log
        
    def __str__(self):
        out = ''
        for key in self.columns:
            out += f'{key}: {self.log[key]}\n'
        return out
    
    def __repr__(self):
        return str(self.log)
    
    def endSuccess(self):
        self.log['STATUS_ACTIVITY'] = 'Fail'
        self._endAuditLog()
        print(self)
        
    def endFail(self, errorCode: str, errorMessage: str):
        self.log['STATUS_ACTIVITY'] = 'Fail'
        self.log['ERRORCODE'] = errorCode
        self.log['ERRORMESSAGE'] = errorMessage
        self._endAuditLog()
        print(self)

    def _endAuditLog(self):
        # write to audit table
        self.log['ENDTIME'] = datetime.now()
        df = spark.createDataFrame([self.log])
        df.write.mode('append').save(self.PATH_TO_AUDIT_TABLE)

    def getAuditLogTable(self):
        return spark.read.load(self.PATH_TO_AUDIT_TABLE)
    
    def countBefore(self, df):
        self.log['COUNTROWSBEFORE'] = df.count()

    def countAfter(self, df):
        self.log['COUNTROWSAFTER'] = df.count()

    def getAllPath(self):
        return {'PATH_TO_AUDIT_TABLE':self.PATH_TO_AUDIT_TABLE, 'PATH_TO_CHECKED_TABLE':self.PATH_TO_CHECKED_TABLE}


class AuditLog_SPC(AuditLog_Fusion):
    
    def __init__(self, WS_ID: str, TABLE_NAME_to_check:str, AUDIT_TABLE_NAME:str, LH_ID_to_check: str, LH_ID_audit: str = None, schema: str = None):
        '''
        - if `LH_ID_audit` is not given, it is  LH_ID_to_check automatically, i.e. audit table is in the same lakehouse as that of
        - if using lakehouse with Schema, please provide `schema` parameter
        '''
        super().__init__(['PIPELINENAME', 'PIPELINERUNID', 'TRIGGERTYPE', 'TABLE_NAME', 'FUNCTION_NAME','COUNTROWSBEFORE', 'COUNTROWSAFTER', 'ERRORCODE', 'ERRORMESSAGE'] ,WS_ID, TABLE_NAME_to_check, AUDIT_TABLE_NAME, LH_ID_to_check, LH_ID_audit, schema)

    def initialDetail(self,  pipelineName: str, pipelineId: str, TriggerType: str, TableName: str, functionName: str):
        super().initialDetail({
            'PIPELINENAME': pipelineName, 
            'PIPELINERUNID': pipelineId, 
            'TRIGGERTYPE': TriggerType, 
            'TABLE_NAME': TableName, 
            'FUNCTION_NAME': functionName
        })


In [80]:
ad = AuditLog_SPC(WS_ID = 'SPC_UAT', TABLE_NAME_to_check = 'factTest', AUDIT_TABLE_NAME='auditTable', LH_ID_to_check='SilverLH', LH_ID_audit='AuditLH')
ad.initialDetail(pipelineName = 'testPL', pipelineId = '123', TriggerType = 'manual', TableName = 'factTest', functionName = 'testFunction')
print(ad)

raise FileExistsError('Create you audit table first')

STATUS_ACTIVITY: Not start
TRIGGERTYPE: manual
ERRORCODE: None
STARTTIME: 2025-01-18 13:40:43.660532
ENDTIME: None
PIPELINERUNID: 123
TABLE_NAME: factTest
ERRORMESSAGE: None
PIPELINENAME: testPL
AUDITKEY: None
COUNTROWSBEFORE: None
COUNTROWSAFTER: None
FUNCTION_NAME: testFunction



In [82]:
demoDf = spark.createDataFrame([{'a':1, 'b':2}, {'a':3, 'b':4}])
demoDf.count()

DataFrame[a: bigint, b: bigint]

In [77]:
notebookutils.fs.exists(ad.PATH_TO_AUDIT_TABLE)

STATUS_ACTIVITY: Not start
TRIGGERTYPE: manual
ERRORCODE: None
STARTTIME: 2025-01-18 13:39:16.665795
ENDTIME: None
PIPELINERUNID: 123
TABLE_NAME: factTest
ERRORMESSAGE: None
PIPELINENAME: testPL
AUDITKEY: None
COUNTROWSBEFORE: None
COUNTROWSAFTER: None
FUNCTION_NAME: testFunction



In [71]:
ad.

SyntaxError: invalid syntax (2940658051.py, line 1)