In [1]:
import oracledb
from google.cloud.bigquery import Client
import os, json
from google.cloud import secretmanager
import pandas as pd

In [None]:
def set_secrets_as_envs():
  secrets = secretmanager.SecretManagerServiceClient()
  resource_name = f"{os.environ['KNADA_TEAM_SECRET']}/versions/latest"
  secret = secrets.access_secret_version(name=resource_name)
  secret_str = secret.payload.data.decode('UTF-8')
  secrets = json.loads(secret_str)
  os.environ.update(secrets)

In [None]:
def oracle_secrets():
  set_secrets_as_envs()
  return dict(
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD'),
    host = os.getenv('DBT_ORCL_HOST'),
    service = os.getenv('DBT_ORCL_SERVICE'),
    project_id = os.getenv('BS_PROJECT_ID'),
    table_uri = os.getenv('BS_TABLE_URI'),  
    encoding="UTF-8",
    nencoding="UTF-8"
  )

oracle_secrets = oracle_secrets()

In [None]:
def get_data_from_table(max_opprettet_dato):
    PROJECT_ID = oracle_secrets['project_id'] 
    TABLE_URI = oracle_secrets['table_uri'] 

    client = Client(project=PROJECT_ID)
    job = None
    if max_opprettet_dato is None:
        job = client.query(f"SELECT * FROM `{TABLE_URI}`")
    else:
        max_opprettet_dato = pd.to_datetime(max_opprettet_dato, utc=True)
        job = client.query(f"SELECT * FROM `{TABLE_URI}` WHERE OPPRETTET > '{max_opprettet_dato}'")
        
    df = job.to_dataframe()
    
    # some data cleaning
    char_to_replace = {'≥' : '>=', '≤' : '<='}
    for old, new in char_to_replace.items():
        df['sats_beskrivelse'] = df['sats_beskrivelse'].str.replace(old, new)
    
    return df

In [None]:
def send_data():
    user = oracle_secrets['user'] + '[DVH_FAM_HM]'
    dsn_tns = oracledb.makedsn(oracle_secrets['host'], 1521, service_name = oracle_secrets['service'])
    
    with oracledb.connect(user=user, password = oracle_secrets['password'], dsn=dsn_tns) as conn:
        with conn.cursor() as cursor:
            sql = """select max(OPPRETTET) from brillestonad"""
            cursor.execute(sql)
            result = cursor.fetchall()
            max_opprettet_dato = result[0][0]
            
            df = get_data_from_table(max_opprettet_dato)
            
            df['opprettet'] = pd.to_datetime(df.opprettet).dt.tz_localize(None).astype(str)
            #df['brillepris'] = df['brillepris'].astype(float).fillna(0).astype(str).str.replace('.', ',')
            #df['belop'] = df['belop'].astype(float).fillna(0).astype(str).str.replace('.', ',')
            #df['sats_belop'] = df['sats_belop'].astype(float).fillna(0).astype(str).str.replace('.', ',')
            df['periode'] = pd.to_datetime(df['bestillingsdato']).dt.strftime('%Y%m').astype(str)
            
            if len(df) > 0:
                rows = [tuple(x) for x in df.values]
                cursor.executemany('''INSERT INTO brillestonad (ID,FNR_BARN,FNR_INNSENDER,ORGNR,BESTILLINGSDATO,BRILLEPRIS,BESTILLINGSREFERANSE,
                                        BEHANDLINGSRESULTAT,SATS,SATS_BELOP,SATS_BESKRIVELSE,BELOP,OPPRETTET,KILDE,PERIODE) 
                                         VALUES (:1,:2,:3,:4,:5,nvl(round(to_number(:6),2),0),:7,:8,:9,nvl(round(to_number(:10),2),0),:11,nvl(round(to_number(:12),2),0),TO_TIMESTAMP(:13, 'yyyy-mm-dd HH24:MI:SS.FF6'),:14,:15)''',rows)
                conn.commit()

            sql_fk_person1 = """
                MERGE INTO DVH_FAM_HM.brillestonad T
                USING (
                WITH brille as (
                SELECT 
                A.FNR_BARN,
                A.FNR_INNSENDER,
                A.pk_fam_brillestonad,
                A.opprettet,
                to_char(a.BESTILLINGSDATO,'YYYYMM') periode_NY,
                case when barn.skjermet_kode = 0 then BARN.FK_PERSON1
                    when barn.skjermet_kode != 0 then -1
                    else null
                    end FK_PERSON1_BARN_NY,
                case when INNSENDER.skjermet_kode = 0 then INNSENDER.FK_PERSON1
                    when INNSENDER.skjermet_kode != 0 then -1
                    else null
                    end FK_PERSON1_INNSENDER_NY,
                case when dim_person_innsender.skjermet_kode = 0 then dim_person_innsender.pk_dim_person
                    when dim_person_innsender.skjermet_kode != 0 then -1
                    else null 
                    end FK_DIM_PERSON_INNSENDER_NY,
                case when dim_person_barn.skjermet_kode = 0 then dim_person_barn.pk_dim_person
                    when dim_person_barn.skjermet_kode != 0 then -1
                    else null 
                    end FK_DIM_PERSON_barn_NY

                FROM brillestonad A

                left outer join dt_person.ident_off_id_til_fk_person1 BARN ON
                BARN.OFF_ID=FNR_BARN AND
                BARN.GYLDIG_FRA_DATO<OPPRETTET AND
                BARN.GYLDIG_TIL_DATO>=OPPRETTET
                left outer join dt_person.ident_off_id_til_fk_person1 INNSENDER ON
                INNSENDER.OFF_ID=FNR_INNSENDER AND
                INNSENDER.GYLDIG_FRA_DATO<OPPRETTET AND
                INNSENDER.GYLDIG_TIL_DATO>=OPPRETTET
                left outer join dt_person.dim_person dim_person_innsender ON
                dim_person_innsender.fk_person1 = innsender.fk_person1 AND
                dim_person_innsender.gyldig_fra_dato < OPPRETTET AND
                dim_person_innsender.gyldig_til_dato >= OPPRETTET
                left outer join dt_person.dim_person dim_person_barn ON
                dim_person_barn.fk_person1 = barn.fk_person1 AND
                dim_person_barn.gyldig_fra_dato < OPPRETTET AND
                dim_person_barn.gyldig_til_dato >= OPPRETTET
                
                WHERE OPPRETTET > TO_TIMESTAMP(:max_opprettet_dato, 'YYYY-MM-DD HH24:MI:SS.FF6')
                
                ) 
                select * from brille)  S
                 
                ON ( S.PK_FAM_BRILLESTONAD=T.PK_FAM_BRILLESTONAD)    
                WHEN MATCHED
                THEN UPDATE
                     SET 
                        T.FK_PERSON1_BARN=S.FK_PERSON1_BARN_NY,
                        T.FK_PERSON1_INNSENDER=S.FK_PERSON1_INNSENDER_NY,
                        T.FK_DIM_PERSON_INNSENDER=s.FK_DIM_PERSON_INNSENDER_NY,
                        T.FK_DIM_PERSON_BARN=s.FK_DIM_PERSON_BARN_NY,
                        T.FNR_BARN=CASE WHEN S.FK_PERSON1_BARN_NY IS NOT NULL OR S.FK_PERSON1_BARN_NY != -1 THEN NULL ELSE T.FNR_BARN END,
                        T.FNR_innsender=CASE WHEN S.FK_PERSON1_innsender_NY IS NOT NULL OR S.FK_PERSON1_innsender_NY != -1 THEN NULL ELSE T.FNR_innsender END,
                        T.oppdatert_dato=localtimestamp
            """
            
            cursor.execute(sql_fk_person1, max_opprettet_dato= str(max_opprettet_dato))
            conn.commit()

            sql_delete_k76 = """
                            merge into INTO DVH_FAM_HM.brillestonad
                            using
                            (select fk.aktor_id, max(ident.fk_person1) fk_person1
                            from test_merge_fk fk
                            left join dt_person.ident_aktor_til_fk_person1 ident
                            on fk.aktor_id = ident.aktor_id
                            group by fk.aktor_id) fk_ident
                            on (test_merge_fk.aktor_id = fk_ident.aktor_id)
                            when matched then
                            update set test_merge_fk.fk_person1 = fk_ident.fk_person1, test_merge_fk.oppdatert_dato = sysdate
                            delete where fk_ident.fk_person1 is null
                        """
            cursor.execute(sql_delete_k76)
            conn.commit()
            



In [None]:
send_data()