In [1]:
import time
start = time.time()

# Latest_SFTP_file

In [2]:
from google.cloud import storage
from datetime import datetime
import pandas as pd

class Latest_SFTP_file:
    def __init__(self):
        self.bucket_name = "miag-m360-test-bucket"

    def get_latest_file(self):
        """
        Retrieves the name of the latest file uploaded to a specified GCS bucket.

        :param bucket_name: Name of the GCS bucket
        :return: The name of the latest file or None if the bucket is empty
        """
        client = storage.Client()
        bucket = client.get_bucket(self.bucket_name)
        blobs = list(bucket.list_blobs())
        
        downloaded_files = [blob for blob in blobs if blob.name.startswith("Downloaded Files/")]

        if not downloaded_files:
            print("No files in the 'Downloaded Files' folder.")
            return None

        # Sort blobs by their updated timestamps (most recent first)
        latest_blob = max(downloaded_files, key=lambda blob: blob.updated)

        print(f"The latest file in 'Downloaded Files' is: {latest_blob.name}")
        return latest_blob.name


    def lowest_document_date(self, sftp_df):
        lowest_date = pd.to_datetime(sftp_df['Document date'], format='%d.%m.%Y').min()
        return lowest_date

    def convert_to_yyyymmdd(self, date_str):
        """
        Converts a date string from 'yyyy-mm-dd' to 'yyyymmdd' format.

        :param date_str: Date string in 'yyyy-mm-dd' format
        :return: Date string in 'yyyymmdd' format
        """
        try:
            date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")  # Parse the input date
            return date_obj.strftime("%Y%m%d")  # Format it to 'yyyymmdd'
        except ValueError:
            raise ValueError("Invalid date format, expected 'yyyy-mm-dd'")

    def convert_to_yyyy_mm_dd(self, date_str):
        """
        Converts a date string from 'yyyymmdd' to 'yyyy-mm-dd' format.

        :param date_str: Date string in 'yyyymmdd' format
        :return: Date string in 'yyyy-mm-dd' format
        """
        try:
            date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")  # Parse the input date
            return date_obj.strftime("%Y-%m-%d")  # Format it to 'yyyy-mm-dd'
        except ValueError:
            raise ValueError("Invalid date format, expected 'yyyy-mm-dd'")


# Storage_Bucket_Operations

In [3]:
import pandas as pd
from google.cloud import storage
import io


class Storage_Bucket_Operations:

    def __init__(self):
        self.bucket_name = "miag-m360-test-bucket"
        self.download_files_path = "Downloaded Files"

    def readFromBucket(self, sftp_file):
        client = storage.Client(project='cf-hada-bsc-mcctk-mia-kg')
        bucket = client.get_bucket(self.bucket_name)
        blob = bucket.blob(f"{sftp_file}")
        csv_data = blob.download_as_text()
        sftp_df = pd.read_csv(io.StringIO(csv_data), index_col=False,
                              dtype={"Store": str, "Supplier number (MIAG)": str, "Remittance advice number": str,
                                     "Supplier number (Sales Line)": str, "Document number": str,
                                     "Invoice number": str})
        return sftp_df

# DB_Instance_Operations

In [4]:
import pandas as pd
from sqlalchemy import create_engine, text
import pandasql as ps
import tempfile
import os


class DB_Instance_Operations:

    def __init__(self):
    # GCS bucket details
        bucket_name = "miag-m360-test-bucket"
        cert_files = {
            "sslrootcert": "hada-bsc-miag-m360-psql-pp-server-ca.pem",
            "sslcert": "hada-bsc-miag-m360-psql-pp-client-cert.pem",
            "sslkey": "hada-bsc-miag-m360-psql-pp-client-key.pem"
        }
    
        # Download certificate files into temporary files
        self.cert_temp_paths = self.get_certificates_from_gcs(bucket_name, cert_files)
    
        # Create the database URL using temporary file paths
        self.db_url = (
            r"postgresql+psycopg2://postgres:9rk$Y}gib9kZEucj@10.32.111.54:5432/MIAG-M360_UAT"
            f"?sslmode=require"
            f"&sslrootcert={self.cert_temp_paths['sslrootcert']}"
            f"&sslcert={self.cert_temp_paths['sslcert']}"
            f"&sslkey={self.cert_temp_paths['sslkey']}"
        )
    
        # Create the database engine
        self.engine = create_engine(self.db_url)


    def get_certificates_from_gcs(self, bucket_name, cert_files):
        """Fetch certificate files from GCS and store them in temporary files."""
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        temp_paths = {}
    
        for key, gcs_path in cert_files.items():
            # Create a temporary file
            temp_file = tempfile.NamedTemporaryFile(delete=False)
            blob = bucket.blob(gcs_path)
            blob.download_to_filename(temp_file.name)
            temp_file.close()
            temp_paths[key] = temp_file.name
            print(f"{key} downloaded and stored temporarily at {temp_file.name}")
    
        return temp_paths


    def __del__(self):
        """Clean up temporary files."""
        for path in self.cert_temp_paths.values():
            if os.path.exists(path):
                os.remove(path)
                print(f"Deleted temporary file: {path}")


    def readSDPTable(self):
        # query = "Delete from sdp_pool;"
        # with self.engine.connect() as connection:
        #     connection.execute(text(query))
        #     connection.commit()
        # print("Rows deleted")
        query = "select * from sdp_pool"
        sdp_df = pd.read_sql_query(query, self.engine)
        return sdp_df

    def updateSDP(self, sdp, sftp_df):
        sq1 = "SELECT DISTINCT `Supplier number (Sales Line)`, `Supplier number (MIAG)`, `Supplier name`, `Contract area` FROM sftp_df"
        miag2 = ps.sqldf(sq1, locals())
        if sdp.shape[0] == 0:
            sdp = miag2.copy()
        else:
            sdpq = "Select distinct * from sdp"
            sdp_dist_df = ps.sqldf(sdpq)
            new_supp_in_sftp_query = "SELECT * from sftp_df where `Supplier number (Sales Line)` not in (SELECT `Supplier_Number_Sales` FROM sdp)"
            new_supp_df = ps.sqldf(new_supp_in_sftp_query)
            push_to_sdp_query = "Select `Supplier number (Sales Line)`, `Supplier number (MIAG)`, `Supplier name`, `Contract area` from new_supp_df union Select `Supplier_Number_Sales`, `Supplier_Number_MIAG`, `Supplier_Name`, `Contract_Area` from sdp"
            sdp = ps.sqldf(push_to_sdp_query)
        return sdp

    def writeSDPTable(self, sdp_df):
        column_mapping = {
            'Supplier number (Sales Line)': 'Supplier_Number_Sales',
            'Supplier number (MIAG)': 'Supplier_Number_MIAG',
            'Supplier name': 'Supplier_Name',
            'Contract area': 'Contract_Area'
        }
        sdp_df.rename(columns=column_mapping, inplace=True)
        with self.engine.begin() as connection:
            delete_query = text("Delete from sdp_pool")
            connection.execute(delete_query)
            sdp_df.to_sql('sdp_pool', connection, if_exists='append', index=False)
        print("Written back to SDP Table of DB Instance...")

    def getSupplierNumberForMMSIC(self):
        new_supplier_list_for_mmsic = []
        supplier_list_for_mmsic = self.readSDPTable()['Supplier_Number_Sales'].to_list()
        for i in range(len(supplier_list_for_mmsic)):
            new_supplier_list_for_mmsic.append(str(0) + supplier_list_for_mmsic[i][1:])
        return new_supplier_list_for_mmsic

    def getSupplierNumberForSISIC(self):
        new_supplier_list_for_sisic = []
        supplier_list_for_sisic = self.readSDPTable()['Supplier_Number_Sales'].to_list()
        for i in range(len(supplier_list_for_sisic)):
            new_supplier_list_for_sisic.append(str(1) + supplier_list_for_sisic[i][1:])
        return new_supplier_list_for_sisic

    def getSupplierNumberForFI(self):
        new_supplier_list_for_fi = []
        supplier_list_for_fi = self.readSDPTable()['Supplier_Number_Sales'].to_list()
        for i in range(len(supplier_list_for_fi)):
            new_supplier_list_for_fi.append(supplier_list_for_fi[i][5:])
        return new_supplier_list_for_fi

    def writeICTable(self, extracted_ic_df):
        column_mapping = {
            'LIFNR': 'lifnr',
            'BELNR': 'belnr',
            'RENR': 'renr',
            'REDAT': 'redat',
            'LFSNR': 'lfsnr',
            'GEBRF': 'gebrf',
            'GSMWB': 'gsmwb',
            'GSMWF': 'gsmwf',
            'WAERS': 'waers',
            'WENUM': 'wenum',
            'RGDAT': 'rgdat',
            'ABGST': 'abgst',
            'AUFNR': 'aufnr',
            'VORGN': 'vorgn',
            'GJAHR': 'gjahr',
            'WEDAT': 'wedat',
            'DEBNOTNO': 'debnotno',
        }

        extracted_ic_df.rename(columns=column_mapping, inplace=True)
        with self.engine.connect() as connection:
            delete_query = text("Delete from intermediate_ic")
            connection.execute(delete_query)
            connection.commit()
            extracted_ic_df.to_sql('intermediate_ic', self.engine, if_exists='append', index=False)
        print("Written to Intermediate IC Table of DB Instance...")

    def writeFITable(self, extracted_fi_df):
        column_mapping = {
            'MANDT': 'mandt',
            'Document_type': 'document_type',
            'document_type_desc': 'document_type_desc',
            'GJAHR': 'gjahr',
            'BUKRS': 'bukrs',
            'GSBER': 'gsber',
            'PRCTR': 'prctr',
            'store_or_dc': 'store_or_dc',
            'KOSTL': 'kostl',
            'month_in_fin_year': 'month_in_fin_year',
            'BELNR': 'belnr',
            'XBLNR': 'xblnr',
            'AUGBL': 'augbl',
            'AUGDT': 'augdt',
            'ZFBDT': 'zfbdt',
            'ZBD1T': 'zbd1t',
            'ZBD2T': 'zbd2t',
            'NETDT': 'netdt',
            'BUZEI': 'buzei',
            'altkt': 'altkt',
            'hkont': 'hkont',
            'suppl_no': 'suppl_no',
            'BLDAT': 'bldat',
            'BUDAT': 'budat',
            'CPUDT': 'cpudt',
            'partition_date': 'partition_date',
            'dana_ingestion_date': 'dana_ingestion_date',
            'shkzg': 'shkzg',
            'Amount_in_local_currency': 'amount_in_local_currency',
            'Amount_in_document_currency': 'amount_in_document_currency',
            'Tax_in_local_currency': 'tax_in_local_currency',
            'Tax_in_document_currency': 'tax_in_document_currency',
            'WAERS': 'waers',
            'Batch_Input_session_name': 'batch_input_session_name',
            'sgtxt': 'sgtxt',
        }

        extracted_fi_df.rename(columns=column_mapping, inplace=True)
        with self.engine.connect() as connection:
            delete_query = text("Delete from intermediate_fi")
            connection.execute(delete_query)
            connection.commit()
        extracted_fi_df.to_sql('intermediate_fi', self.engine, if_exists='append', index=False)
        print("Written to Intermediate FI Table of DB Instance...")

    def readICTable(self):
        query = "select * from intermediate_ic"
        df_ic = pd.read_sql_query(query, self.engine)
        return df_ic

    def readFITable(self):
        query = "select * from intermediate_fi"
        df_fi = pd.read_sql_query(query, self.engine)
        return df_fi
    
    def writeMergedTable(self, loadfile_df_copy):
        loadfile_df_copy['COMPANY_CODE'] = loadfile_df_copy['COMPANY_CODE'].astype(str)
        loadfile_df_copy['SUPPLIER_NO'] = loadfile_df_copy['SUPPLIER_NO'].astype(str)
        loadfile_df_copy['MIAG_SUPPLIER_NO'] = loadfile_df_copy['MIAG_SUPPLIER_NO'].astype(str)
        loadfile_df_copy['ORDER_NO'] = loadfile_df_copy['ORDER_NO'].astype(str)
        loadfile_df_copy['DOC_TYPE'] = loadfile_df_copy['DOC_TYPE'].astype(str)
        loadfile_df_copy['INVOICE_NO'] = loadfile_df_copy['INVOICE_NO'].astype(str)
        loadfile_df_copy['INVOICE_DATE'] = loadfile_df_copy['INVOICE_DATE'].astype(str)
        loadfile_df_copy['DELIVERY_NOTE_NO'] = loadfile_df_copy['DELIVERY_NOTE_NO'].astype(str)
        loadfile_df_copy['TOTAL_AMT_DC'] = pd.to_numeric(loadfile_df_copy['TOTAL_AMT_DC'])
        loadfile_df_copy['TOTAL_VAT_DC'] = loadfile_df_copy['TOTAL_VAT_DC'].astype(str)
        loadfile_df_copy['CURRENCY'] = loadfile_df_copy['CURRENCY'].astype(str)
        loadfile_df_copy['PRE_FINANCE_DATE'] = pd.to_datetime(loadfile_df_copy['PRE_FINANCE_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['GOODS_RECEIPT_NO'] = loadfile_df_copy['GOODS_RECEIPT_NO'].astype(str)
        loadfile_df_copy['GOODS_RECEIPT_DATE'] = pd.to_datetime(loadfile_df_copy['GOODS_RECEIPT_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['INVOICE_ENTRY_DATE'] = pd.to_datetime(loadfile_df_copy['INVOICE_ENTRY_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['INVOICE_STATUS'] = loadfile_df_copy['INVOICE_STATUS'].astype(str)
        loadfile_df_copy['INVOICE_STATUS_INTERNAL'] = loadfile_df_copy['INVOICE_STATUS_INTERNAL'].astype(str)
        loadfile_df_copy['NET_DUE_DATE'] = loadfile_df_copy['NET_DUE_DATE'].astype(str)
        loadfile_df_copy['DEBIT_NOTE_NO'] = loadfile_df_copy['DEBIT_NOTE_NO'].astype(str)
        loadfile_df_copy['REMITTANCE_ADVICE_NO'] = loadfile_df_copy['REMITTANCE_ADVICE_NO'].astype(str)
        loadfile_df_copy['CLEARING_DATE'] = pd.to_datetime(loadfile_df_copy['CLEARING_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['DOCUMENT_NO'] = loadfile_df_copy['DOCUMENT_NO'].astype(str)
        loadfile_df_copy['STORE_NO'] = loadfile_df_copy['STORE_NO'].astype(str)
        loadfile_df_copy['MATCHING_DATE'] = pd.to_datetime(loadfile_df_copy['MATCHING_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['MATCH_STATUS'] = loadfile_df_copy['MATCH_STATUS'].astype(str)
        loadfile_df_copy['SYNC_DATE'] = pd.to_datetime(loadfile_df_copy['SYNC_DATE'], format='%d.%m.%Y')
        loadfile_df_copy['SYNC_STATUS'] = loadfile_df_copy['SYNC_STATUS'].astype(str)
        loadfile_df_copy['ARKTX'] = loadfile_df_copy['ARKTX'].astype(str)
        
        
        column_mapping = {
            'COMPANY_CODE': 'company_code',
            'SUPPLIER_NO': 'supplier_no',
            'MIAG_SUPPLIER_NO': 'miag_supplier',
            'ORDER_NO': 'order_no',
            'DOC_TYPE': 'doc_type',
            'INVOICE_NO': 'invoice_no',
            'INVOICE_DATE': 'invoice_date',
            'DELIVERY_NOTE_NO': 'delivery_note_no',
            'TOTAL_AMT_DC': 'total_amt_dc',
            'TOTAL_VAT_DC': 'total_vat_dc',
            'CURRENCY': 'currency',
            'PRE_FINANCE_DATE': 'pre_finance_date',
            'GOODS_RECEIPT_NO': 'goods_receipt_no',
            'INVOICE_ENTRY_DATE': 'invoice_entry_date',
            'INVOICE_STATUS': 'invoice_status',
            'INVOICE_STATUS_INTERNAL': 'invoice_status_internal',
            'NET_DUE_DATE': 'net_due_date',
            'DEBIT_NOTE_NO': 'debit_note_no',
            'REMITTANCE_ADVICE_NO': 'remittance_advice_no',
            'DOCUMENT_NO': 'document_no',
            'STORE_NO': 'store_no',
            'ARKTX': 'arktx',
            'CLEARING_DATE': 'clearing_date',
            'GOODS_RECEIPT_DATE': 'goods_receipt_date',
            'MATCHING_DATE': 'matching_date',
            'MATCH_STATUS': 'match_status',
            'SYNC_DATE': 'sync_date',
            'SYNC_STATUS': 'sync_status'
        }
        date_columns = ['INVOICE_DATE', 'PRE_FINANCE_DATE', 'INVOICE_ENTRY_DATE', 'NET_DUE_DATE', 'CLEARING_DATE',
                        'GOODS_RECEIPT_DATE', 'MATCHING_DATE', 'SYNC_DATE']
        for column in date_columns:
            loadfile_df_copy[column] = pd.to_datetime(loadfile_df_copy[column], format='%d.%m.%Y').dt.strftime('%m-%d-%Y')
        loadfile_df_copy.rename(columns=column_mapping, inplace=True)
        with self.engine.connect() as connection:
            delete_query = text("Delete from tbl_merged_data")
            connection.execute(delete_query)
            connection.commit()
        loadfile_df_copy.to_sql('tbl_merged_data', self.engine, if_exists='append', index=False)
        print("Written to Final 360 Table of DB Instance...")


# db_instance_ops = DB_Instance_Operations()
# sdp_df = db_instance_ops.readSDPTable()
# sdp_df = db_instance_ops.updateSDP(sdp_df, sftp_df)
# db_instance_ops.writeSDPTable(sdp_df)
# sdp_supplier_list_for_mmsic = db_instance_ops.getSupplierNumberForMMSIC()
# sdp_supplier_list_for_sisic = db_instance_ops.getSupplierNumberForSISIC()
# sdp_supplier_list_for_fi = db_instance_ops.getSupplierNumberForFI()


# BigQuery_Operations

In [5]:
from google.cloud import bigquery


class BigQuery_Operations:
    def __init__(self):
        self.client = bigquery.Client()

    def extract_MMSIC(self, sdp_supplier_list_for_mmsic):
        add_string = ""
        for i in range(len(sdp_supplier_list_for_mmsic)):
            add_string += "'"
            add_string += str(sdp_supplier_list_for_mmsic[i])
            add_string += "'"
            add_string += ", "
        add_string = add_string[:-2]
        query = f"""
                WITH LatestRecords AS (
    SELECT 
        LIFNR, 
        RENR, 
        MAX(dana_ingestion_timestamp) AS latest_timestamp
    FROM 
        `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.mmsic_to_dana_gr_invoice_header`
    GROUP BY 
        LIFNR, RENR
)
SELECT 
    T1.LIFNR, 
    T1.BELNR, 
    T1.RENR, 
    T1.REDAT, 
    T1.LFSNR, 
    T1.GEBRF, 
    T1.GSMWB, 
    T1.GSMWF,
    T1.WAERS,
    T1.WENUM,
    T1.RGDAT,
    T1.ABGST,
    T1.AUFNR,
    T1.VORGN,
    T1.GJAHR,
    T2.WEDAT,
    T1.DEBNOTNO
FROM 
    `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.mmsic_to_dana_gr_invoice_header` AS T1
LEFT JOIN (
    SELECT DISTINCT  
        VORGN, 
        WEDAT, 
        GJAHR 
    FROM
        `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.mmsic_to_dana_gr_table_header`
) AS T2
ON 
    T1.VORGN = T2.VORGN
JOIN 
    LatestRecords LR
ON 
    T1.LIFNR = LR.LIFNR 
    AND T1.RENR = LR.RENR 
    AND T1.dana_ingestion_timestamp = LR.latest_timestamp
WHERE
    T1.LIFNR IN ({add_string})
    AND T1.REDAT >= '20230501' 
ORDER BY 
    T1.dana_ingestion_timestamp;
                """
        extracted_mmsic_df = self.client.query(query).to_dataframe()
        return extracted_mmsic_df

    def extract_SISIC(self, sdp_supplier_list_for_sisic):
        add_string = ""
        for i in range(len(sdp_supplier_list_for_sisic)):
            add_string += "'"
            add_string += str(sdp_supplier_list_for_sisic[i])
            add_string += "'"
            add_string += ", "
        add_string = add_string[:-2]
        query = f"""
                WITH LatestRecords AS (
    SELECT 
        LIFNR, 
        RENR, 
        MAX(dana_ingestion_timestamp) AS latest_timestamp
    FROM 
        `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.sis_to_dana_gr_invoice_header`
    GROUP BY 
        LIFNR, RENR
)
SELECT 
    T1.LIFNR, 
    T1.BELNR, 
    T1.RENR, 
    T1.REDAT, 
    T1.LFSNR, 
    T1.GEBRF, 
    T1.GSMWB, 
    T1.GSMWF,
    T1.WAERS,
    T1.WENUM,
    T1.RGDAT,
    T1.ABGST,
    T1.AUFNR,
    T1.VORGN,
    T1.GJAHR,
    T2.WEDAT,
    T1.DEBNOTNO
FROM 
    `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.sis_to_dana_gr_invoice_header` AS T1
LEFT JOIN (
    SELECT DISTINCT  
        VORGN, 
        WEDAT, 
        GJAHR 
    FROM
        `metro-bi-dl-tur-prod.ingest_fgtf_mmsic.sis_to_dana_gr_table_header`
) AS T2
ON 
    T1.VORGN = T2.VORGN
JOIN 
    LatestRecords LR
ON 
    T1.LIFNR = LR.LIFNR 
    AND T1.RENR = LR.RENR 
    AND T1.dana_ingestion_timestamp = LR.latest_timestamp
WHERE
    T1.LIFNR IN ({add_string})
    AND T1.REDAT >= '20230501'
ORDER BY 
    T1.dana_ingestion_timestamp;
                """
        extracted_sisic_df = self.client.query(query).to_dataframe()
        return extracted_sisic_df

    def extract_FI(self, sdp_supplier_list_for_fi):
        add_string = ""
        for i in range(len(sdp_supplier_list_for_fi)):
            add_string += str(sdp_supplier_list_for_fi[i])
            add_string += ", "
        add_string = add_string[:-2]
        query = f"""
                    DECLARE country STRING DEFAULT 'tur';
        DECLARE current_fiscal_year INT64 DEFAULT 2024;
        DECLARE store_flag STRING DEFAULT 'prctr';-- or 'gsber';
        DECLARE end_month_id INT64 DEFAULT EXTRACT(YEAR FROM DATE_SUB(CURRENT_DATE(), INTERVAL 1 MONTH)) * 100 + EXTRACT(MONTH FROM DATE_SUB(CURRENT_DATE(), INTERVAL 1 MONTH));    
        DECLARE start_year INT64 DEFAULT 2024;
        DECLARE end_year INT64 DEFAULT 2025;
        CREATE OR REPLACE TABLE metro-bi-wb-mag-figov-s00.data_integrity_proj.sap_tur_360data_BELNR_testdoc
        AS(
        WITH fidoc AS (
        SELECT * ,
        MAX  (dana_ingestion_timestamp) over (PARTITION BY MANDT, BELNR, GJAHR, BUKRS, bseg.BUZEI) as max_timestamp,
        ROW_NUMBER () over (PARTITION BY MANDT, BELNR, GJAHR, BUKRS, bseg.BUZEI order by dana_ingestion_timestamp DESC) as rn
        FROM metro-bi-dl-tur-prod.ingest_fgtf_sap.fidoc fi,
        UNNEST (zbseg) AS bseg
        WHERE 1=1
        AND gjahr BETWEEN start_year AND end_year
        ),
        fidoc_unique AS (
        SELECT *
        , CASE  WHEN store_flag = 'gsber' THEN fi.gsber
            WHEN store_flag = 'prctr' THEN fi.prctr
        END AS business_area
        FROM fidoc fi
        WHERE fi.dana_ingestion_timestamp  = max_timestamp
        AND rn = 1
        )
        SELECT
        fi.MANDT
        , zbkpf.blart AS Document_type
        , doc_type.ltext as document_type_desc
        , GJAHR
        , BUKRS
        , GSBER
        , PRCTR
        , cast(prctr as int64)-cast(bukrs as int64)*10000 as store_or_dc
        , KOSTL
        , zbkpf.monat as month_in_fin_year
        , BELNR
        , zbkpf.XBLNR
        --, AUFNR
        , AUGBL
        , AUGDT
        , ZFBDT
        , ZBD1T
        , ZBD2T
        , NETDT
        , BUZEI
        , altkt
        , hkont
        , MOD(SAFE_CAST(fi.z_dana_lfa1.lifnr AS int64), 100000) as suppl_no
        , zbkpf.BLDAT
        , zbkpf.BUDAT
        , zbkpf.CPUDT
        , date(fi.PARTITIONTIME) partition_date
        , date(fi.dana_ingestion_timestamp) dana_ingestion_date
        , shkzg
        ,      CASE WHEN shkzg = 'H' THEN (-1) * fi.dmbtr
                    ELSE fi.dmbtr
                    END                 as Amount_in_local_currency
        ,      CASE WHEN shkzg = 'H' THEN (-1) * fi.wrbtr
                    ELSE fi.wrbtr
                    END                 as Amount_in_document_currency
        ,      CASE WHEN shkzg = 'H' THEN (-1) * fi.mwsts
                    ELSE fi.mwsts
                    END                 as Tax_in_local_currency
        ,      CASE WHEN shkzg = 'H' THEN (-1) * fi.wmwst
                    ELSE fi.wmwst
                    END                 as Tax_in_document_currency
        , zbkpf.WAERS
        , ZBKPF.GRPID AS Batch_Input_session_name
        , sgtxt
        -- *
        FROM fidoc_unique fi
        LEFT JOIN
        ( select * from metro-bi-dl-tur-prod.ingest_fgtf_sap.t003t AS doc_type
            WHERE 1=1
            AND doc_type.spras = 'EN'
            qualify dana_ingestion_timestamp = max(dana_ingestion_timestamp) over (partition by BLART, MANDT, SPRAS, SYSID)
            order by doc_type.blart
        ) AS doc_type
            ON zbkpf.blart = doc_type.blart
        where 1=1    
        and MOD(SAFE_CAST(fi.z_dana_lfa1.lifnr AS int64), 100000) IN ({add_string})
         and zbkpf.BLDAT >= '2023-05-01'
        )
                """
        extracted_fi_full_df = self.client.query(query).to_dataframe()
        query2 = f"select * from `metro-bi-wb-mag-figov-s00.data_integrity_proj.sap_tur_360data_BELNR_testdoc`"
        extracted_fi_df = self.client.query(query2).to_dataframe()
        # extracted_fi_df = extracted_fi_df.drop('BUZEI', axis=1)
        return extracted_fi_df

# if __name__ == "__main__":
#     bq_ops = BigQuery_Operations()

# Clean_FI

In [6]:
class Clean_FI:
    def __init__(self):
        self.doc_nos_with_no_ZFBDT = []
        self.doc_nos_with_1_ZFBDT = []
        self.doc_nos_with_more_ZFBDT = []

    def clean_fi(self, df_fi):
        df_fi['suppl_no'] = df_fi['suppl_no'].astype(str)
        df_fi['GJAHR'] = df_fi['GJAHR'].astype(str)
        df_fi['BELNR'] = df_fi['BELNR'].astype(str)
        df_fi['XBLNR'] = df_fi['XBLNR'].astype(str)
        
        df_fi = (
            df_fi
            .groupby(['suppl_no', 'BELNR', 'XBLNR', 'GJAHR'], group_keys=False)
            .apply(self.select_final_row)
            .reset_index(drop=True)
        )
        
        df_fi['BELNR'] = df_fi['BELNR'].replace('nan', np.nan)
        df_fi['XBLNR'] = df_fi['XBLNR'].replace('nan', np.nan)
        
        df_fi['suppl_no'] = df_fi['suppl_no'].astype('int64')
        df_fi['GJAHR'] = df_fi['GJAHR'].astype('int64')
        df_fi['BELNR'] = df_fi['BELNR'].astype('object')
        df_fi['XBLNR'] = df_fi['XBLNR'].astype('object')
    
        print("FI shape after cleaning : ", df_fi.shape)
        return df_fi

    def select_final_row(self, group):
        # Filter rows where ZFBDT is present
        group_with_ZFBDT = group[group['ZFBDT'].notna()]
        number_ZFBDT_present = len(group_with_ZFBDT)

        # Case: number_ZFBDT_present > 1
        if number_ZFBDT_present > 1:
            self.doc_nos_with_more_ZFBDT.append(
    (group['suppl_no'].iloc[0], group['BELNR'].iloc[0], group['XBLNR'].iloc[0], group['GJAHR'].iloc[0]))
            # Check if any rows have ZBD1T or ZBD2T present
            rows_with_ZBD = group_with_ZFBDT[group_with_ZFBDT['ZBD1T'].notna() | group_with_ZFBDT['ZBD2T'].notna()]
            if not rows_with_ZBD.empty:
                # Select the first row from rows_with_ZBD
                final_selected_row = rows_with_ZBD.iloc[0]
            else:
                # Select the row with the least ZFBDT
                min_ZFBDT_rows = group_with_ZFBDT[group_with_ZFBDT['ZFBDT'] == group_with_ZFBDT['ZFBDT'].min()]
                # If multiple rows have the least ZFBDT, choose the one with the least BUZEI
                final_selected_row = min_ZFBDT_rows.loc[min_ZFBDT_rows['BUZEI'].idxmin()]
        elif number_ZFBDT_present == 1:
            self.doc_nos_with_1_ZFBDT.append(
    (group['suppl_no'].iloc[0], group['BELNR'].iloc[0], group['XBLNR'].iloc[0], group['GJAHR'].iloc[0])
)
            # Case: number_ZFBDT_present == 1
            final_selected_row = group_with_ZFBDT.iloc[0]
        else:
            # Case: number_ZFBDT_present == 0
            # Select the row with the least BUZEI
            self.doc_nos_with_no_ZFBDT.append(
    (group['suppl_no'].iloc[0], group['BELNR'].iloc[0], group['XBLNR'].iloc[0], group['GJAHR'].iloc[0])
)
            final_selected_row = group.loc[group['BUZEI'].idxmin()]

        return final_selected_row


# Typecast_FI

In [7]:
import pandas as pd


class Typecast_FI:

    def __init__(self):
        pass

    def typecast_fi(self, df_fi):
        column_types = {
            "MANDT": "object",
            "Document_type": "object",
            "document_type_desc": "object",
            "GJAHR": "Int64",  # Nullable integer
            "BUKRS": "object",
            "GSBER": "object",
            "PRCTR": "object",
            "store_or_dc": "Int64",  # Nullable integer
            "KOSTL": "object",
            "month_in_fin_year": "Int64",  # Nullable integer
            "BELNR": "object",
            "XBLNR": "object",
            "AUGBL": "object",
            "AUGDT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "ZFBDT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "ZBD1T": "float64",
            "ZBD2T": "float64",
            "NETDT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "BUZEI": "Int64",  # Nullable integer
            "altkt": "object",
            "hkont": "object",
            "suppl_no": "Int64",  # Nullable integer
            "BLDAT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "BUDAT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "CPUDT": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "partition_date": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "dana_ingestion_date": "datetime64[ns]",  # Assuming dbdate maps to datetime
            "shkzg": "object",
            "Amount_in_local_currency": "float64",
            "Amount_in_document_currency": "float64",
            "Tax_in_local_currency": "float64",
            "Tax_in_document_currency": "float64",
            "WAERS": "object",
            "Batch_Input_session_name": "object",
            "sgtxt": "object",
        }

        # Convert columns to the appropriate types
        for col, dtype in column_types.items():
            if dtype == "datetime64[ns]":
                # Convert to datetime while handling errors
                df_fi[col] = pd.to_datetime(df_fi[col], errors="coerce")
            elif dtype == "Int64":
                # Convert to Pandas nullable integer
                df_fi[col] = df_fi[col].astype("Int64")
            else:
                # Convert to the specified type
                df_fi[col] = df_fi[col].astype(dtype)

        print("Type casted cleaned FI for storage")
        return df_fi

# Clean_IC

In [8]:
class Clean_IC:

    def __init__(self):
        pass

    def clean_ic(self, df_ic):
        
        df_ic[['LIFNR', 'BELNR', 'RENR', 'GEBRF', 'GJAHR']] = df_ic[['LIFNR', 'BELNR', 'RENR', 'GEBRF', 'GJAHR']].astype(str)
        
        df_ic = (
            df_ic
            .groupby(['LIFNR', 'BELNR', 'RENR', 'GEBRF', 'GJAHR'])
            .apply(self.get_max_populated_row)
            .reset_index(drop=True)
        )
        
        df_ic['LIFNR'] = df_ic['LIFNR'].replace('nan', np.nan)
        df_ic['BELNR'] = df_ic['BELNR'].replace('nan', np.nan)
        df_ic['RENR'] = df_ic['RENR'].replace('nan', np.nan)
        df_ic['GEBRF'] = df_ic['GEBRF'].replace('nan', np.nan)
        df_ic['GJAHR'] = df_ic['GJAHR'].replace('nan', np.nan)
        
        df_ic['LIFNR'] = df_ic['LIFNR'].astype('object')
        df_ic['BELNR'] = df_ic['BELNR'].astype('object')
        df_ic['RENR'] = df_ic['RENR'].astype('object')
        df_ic['GEBRF'] = df_ic['GEBRF'].astype('object')
        df_ic['GJAHR'] = df_ic['GJAHR'].astype('object')
        
        print("IC shape after cleaning : ", df_ic.shape)
        return df_ic

    def get_max_populated_row(self, group):
        # Count non-null values for each row
        non_null_counts = group.notnull().sum(axis=1)
        # Get the index of the row with the maximum count
        max_index = non_null_counts.idxmax()
        return group.loc[max_index]


# ReverseCast

In [9]:
class ReverseCast:

    def __init__(self):
        pass

    def reverse_cast_fi(self, df_fi):
        df_fi.columns = ['MANDT', 'Document_type', 'document_type_desc', 'GJAHR', 'BUKRS',
                         'GSBER', 'PRCTR', 'store_or_dc', 'KOSTL', 'month_in_fin_year', 'BELNR',
                         'XBLNR', 'AUGBL', 'AUGDT', 'ZFBDT', 'ZBD1T', 'ZBD2T', 'NETDT', 'BUZEI',
                         'altkt', 'hkont', 'suppl_no', 'BLDAT', 'BUDAT', 'CPUDT',
                         'partition_date', 'dana_ingestion_date', 'shkzg',
                         'Amount_in_local_currency', 'Amount_in_document_currency',
                         'Tax_in_local_currency', 'Tax_in_document_currency', 'WAERS',
                         'Batch_Input_session_name', 'sgtxt']
        return df_fi

    def reverse_cast_ic(self, df_ic):
        df_ic.columns = ['LIFNR', 'BELNR', 'RENR', 'REDAT', 'LFSNR', 'GEBRF', 'GSMWB', 'GSMWF',
                         'WAERS', 'WENUM', 'RGDAT', 'ABGST', 'AUFNR', 'VORGN', 'GJAHR', 'WEDAT',
                         'DEBNOTNO']
        return df_ic

# Merge

In [10]:
import pandas as pd
import numpy as np


class Merge:

    def __init__(self):
        pass

    def merge(self, df_fi, df_ic, sftp_df):
        merge_fi_ic = df_fi.merge(df_ic, left_on=["suppl_no", "BELNR", "XBLNR", "fin_year_FI"],
                                  right_on=["LIFNR", "BELNR", "RENR", "fin_year_IC"], how="outer")
        
        print("1st merge shape : ", merge_fi_ic.shape[0])

        merge_fi_ic['GJAHR_x'] = (
    pd.to_numeric(merge_fi_ic['GJAHR_x'], errors='coerce')  # Convert to numeric, set invalid to NaN
    .astype('Int64')                                  # Convert to nullable integers
    .astype('string')    )

        merged_df = merge_fi_ic.merge(sftp_df, left_on=["suppl_no", "BELNR", "XBLNR", "GJAHR_x"],
                                      right_on=["Supplier number (Sales Line)", "Document number", "Invoice number",
                                                "Business Year new"],
                                      how="outer")

        return merged_df


# IC_Transaction_File

In [11]:
from google.cloud import storage
import pandas as pd
import io


class IC_Transaction_File:
    def __init__(self):
        self.bucket_name = "miag-m360-test-bucket"
        self.file_path = "ic_transaction_status_R.csv"
        self.encoding = 'windows-1252'
        self.dtype = {
        'TRANSACTION STATUS (ABGST)': 'str',
        'LBL - \nTRANSACTION STATUS (ABGST)': 'str',
        'VIPA \nTRANSACTION STATUS (ABGST)': 'str',
        "360 invoice status - MVP- Display on External 360": 'str',
            
    }

    def read_csv_from_gcs(self):

        # Initialize a GCS client
        client = storage.Client()

        # Get the bucket
        bucket = client.get_bucket(self.bucket_name)

        # Fetch the blob (file) from the bucket
        blob = bucket.blob(self.file_path)

        # Download the blob content as bytes
        data = blob.download_as_bytes()

        # Read the CSV into a Pandas DataFrame
        ic_transaction_df = pd.read_csv(io.BytesIO(data), encoding=self.encoding, dtype=self.dtype)

        return ic_transaction_df




# Postprocess

In [12]:
import pandas as pd
import numpy as np
# from IC_Transaction_File import *

class Postprocess:
    def __init__(self):
        self.ic_transaction_file = IC_Transaction_File()

    def postprocess(self, merged_df):
        split_columns = merged_df['ARKTX'].str.split('#', expand=True)

        merged_df['Business Year SFTP'] = split_columns[3].apply(
            lambda x: str(x) if pd.notnull(x) and x.isdigit() else pd.NA
        ).astype('string')
        merged_df['Line Item No. SFTP'] = split_columns[4].apply(
            lambda x: str(x) if pd.notnull(x) and x.isdigit() else pd.NA
        ).astype('string')



        merged_df['Doc no. combined'] = merged_df['BELNR'].fillna(merged_df['Document number'])



        merged_df["year"] = merged_df.BLDAT.apply(
            lambda x: x.split('.')[2] if isinstance(x, str) and len(x.split('.')) > 2 else None)
        merged_df["month"] = merged_df.BLDAT.apply(
            lambda x: x.split('.')[1] if isinstance(x, str) and len(x.split('.')) > 1 else None)
        merged_df["day"] = merged_df.BLDAT.apply(
            lambda x: x.split('.')[0] if isinstance(x, str) and len(x.split('.')) > 0 else None)
        merged_df['BUZEI'] = merged_df['BUZEI'].apply(lambda x: int(x) if pd.notna(x) else x).astype('Int64')
        mask_empty_ARKTX = merged_df["ARKTX"].isna()
        merged_df.loc[mask_empty_ARKTX, "ARKTX"] = merged_df[mask_empty_ARKTX].apply(
            lambda row: (
                f"{row['Document_type']}#{row['BELNR']}#{row['year']}{row['month']}{row['day']}#{row['year']}#00{row['BUZEI']}"
                if all(
                    pd.notna([row['Document_type'], row['BELNR'], row['year'], row['month'], row['day'], row['BUZEI']]))
                else np.nan
            ),
            axis=1
        )




        merged_df['ZFBDT'] = pd.to_datetime(merged_df['ZFBDT'], format='%d.%m.%Y')
        merged_df['ZBD1T'] = pd.to_numeric(merged_df['ZBD1T'], errors='coerce')
        merged_df['ZBD2T'] = pd.to_numeric(merged_df['ZBD2T'], errors='coerce')
        merged_df['NET_DUE_DATE'] = merged_df.apply(
            lambda row: row['ZFBDT'] + pd.Timedelta(
                days=row['ZBD1T'] if pd.notna(row['ZBD1T']) else (row['ZBD2T'] if pd.notna(row['ZBD2T']) else 0)),
            axis=1
        )
        merged_df['NET_DUE_DATE'] = merged_df['NET_DUE_DATE'].dt.strftime('%d.%m.%Y')



        ic_tran_status_df = self.ic_transaction_file.read_csv_from_gcs()
        ic_tran_status_df = ic_tran_status_df[
            ["TRANSACTION STATUS (ABGST)", "LBL - \nTRANSACTION STATUS (ABGST)", "VIPA \nTRANSACTION STATUS (ABGST)",
             "360 invoice status - MVP- Display on External 360"]]
        ic_tran_status_df["ABGST"] = ic_tran_status_df["TRANSACTION STATUS (ABGST)"].fillna(
            ic_tran_status_df['LBL - \nTRANSACTION STATUS (ABGST)']).fillna(
            ic_tran_status_df["VIPA \nTRANSACTION STATUS (ABGST)"])
        ic_tran_status_df = ic_tran_status_df[["ABGST", "360 invoice status - MVP- Display on External 360"]]
        ic_tran_status_df['ABGST'] = ic_tran_status_df['ABGST'].str.zfill(4)
        ic_tran_status_df['ABGST'] = ic_tran_status_df['ABGST'].astype('string')
        abgst_status_dict = ic_tran_status_df.set_index('ABGST')['360 invoice status - MVP- Display on External 360'].to_dict()



        merged_df['ABGST'] = merged_df['ABGST'].astype('string')




        # Step 1: Populate 'Inv Stat' with "cleared-MIAG" for rows where 'RAN' is present
        merged_df.loc[merged_df['Remittance advice number'].notna(), 'INVOICE_STATUS'] = "cleared-MIAG"
        # Step 2: Populate 'Inv Stat' with "cleared-FI" for rows where 'Inv Stat' is null and 'AUGBL' is present
        merged_df.loc[merged_df['INVOICE_STATUS'].isna() & merged_df['AUGBL'].notna(), 'INVOICE_STATUS'] = "cleared-FI"
        # Step 3: Populate 'Inv Stat' with "Invoice Approval completed" for rows where 'Inv Stat' is null and 'BLDAT' is present
        merged_df.loc[merged_df['INVOICE_STATUS'].isna() & merged_df[
            'BLDAT'].notna(), 'INVOICE_STATUS'] = "Invoice approval completed"
        # Step 4: Populate 'Inv Stat' based on mapping from abgst_status_dict for rows where 'Inv Stat' is null and 'ABGST' is present
        merged_df.loc[merged_df['INVOICE_STATUS'].isna() & merged_df['ABGST'].notna(), 'INVOICE_STATUS'] = merged_df[
            'ABGST'].map(abgst_status_dict)
        # Step 5: Populate remaining 'Inv Stat' as "In progress" where 'Inv Stat' is still null
        merged_df['INVOICE_STATUS'].fillna("Direct Entry to FI / New Status", inplace=True)




        merged_df['Line Item No. SFTP'] = (
            merged_df['Line Item No. SFTP']
            .apply(lambda x: str(x).lstrip('0') if pd.notna(x) else x)  # Remove leading zeros if not NA
            .astype('Int64')  # Convert to Int64 type
        )
        merged_df['GJAHR_x'] = (
            pd.to_numeric(merged_df['GJAHR_x'], errors='coerce')  # Convert to numeric, set invalid to NaN
            .astype('Int64')  # Convert to nullable integers
            .astype('string')  # Convert integers to strings, keep <NA>
        )

        print("Datatypes before GR Invoice Implementation : ", merged_df[['Document_type', 'Document type', 'GJAHR_x', 'Business Year SFTP', 'BUZEI', 'Line Item No. SFTP', 'Amount_in_local_currency', 'Gross amount']].dtypes)
        return merged_df

# GR_Invoice

In [13]:
import time

import pandas as pd
import numpy as np

class GR_Invoice:

    def __init__(self):
        self.df_all_zeros = pd.DataFrame()
        self.df_non_zero = pd.DataFrame()

        self.df_group_len_1 = pd.DataFrame()
        self.df_group_len_2 = pd.DataFrame()
        self.df_group_len_3 = pd.DataFrame()

        self.both_cleared_or_one_progress_groups = pd.DataFrame()
        self.gr_invoice_records_groups = pd.DataFrame()
        self.not_cleared_miag_records_groups = pd.DataFrame()

        self.cleared_df = pd.DataFrame()
        self.other_df = pd.DataFrame()
        self.failed_doc_nos = []

        self.len_3_cleared_df = pd.DataFrame()
        self.len_3_other_df = pd.DataFrame()


    def gr_invoice(self, merged_df):
        print("Merged_df shape : ", merged_df.shape)
        self.df_all_zeros = merged_df[merged_df['Doc no. combined'] == '0000000000']
        self.df_non_zero = merged_df[merged_df['Doc no. combined'] != '0000000000']
        grouped = self.df_non_zero.groupby('Doc no. combined')
        self.df_group_len_2 = grouped.filter(lambda x: len(x) == 2)
        self.df_group_len_1 = grouped.filter(lambda x: len(x) == 1)
        self.df_group_len_3 = grouped.filter(lambda x: len(x) > 2)
        print("\n")
        print("All zeros : ", len(self.df_all_zeros))
        print("Length 1 : ", len(self.df_group_len_1))
        print("Length 2 : ", len(self.df_group_len_2))
        print("Length 3 : ", len(self.df_group_len_3))
        print("\n")

        gr_len2_start = time.time()
        gr_len2_concat = self.gr_length_2()
        gr_len2_end = time.time()
        print("GR Inv Len2 Time : ",gr_len2_end-gr_len2_start)
        gr_len3_concat = self.gr_length_3()
        gr_len3_end = time.time()
        print("GR Inv Len3 Time : ", gr_len3_end - gr_len2_end)

        merged_df = pd.concat([gr_len2_concat, gr_len3_concat, self.df_group_len_1, self.df_all_zeros], ignore_index=True)
        merged_df.reset_index(drop=True, inplace=True)
        print(merged_df.shape)
        return merged_df


    def gr_length_2(self):
        grouped = self.df_group_len_2.groupby('Doc no. combined')

        # Initialize lists for storing groups
        both_cleared_or_one_progress_groups = []
        gr_invoice_records_groups = []
        not_cleared_miag_records_groups = []

        count = 0
        # Iterate over groups with conditions
        for doc_no, group in grouped:
            count += 1
            # print(count)
            statuses = group['INVOICE_STATUS'].tolist()

            # Check conditions for categorizing groups
            if statuses == ['cleared-MIAG', 'cleared-MIAG'] or \
                    ('cleared-MIAG' in statuses and 'In progress' in statuses):
                both_cleared_or_one_progress_groups.append(group)
            elif 'cleared-MIAG' in statuses and \
                    any(status in ['cleared-FI', 'Invoice approval completed'] for status in statuses):
                gr_invoice_records_groups.append(group)
            elif all(status != 'cleared-MIAG' for status in statuses):
                not_cleared_miag_records_groups.append(group)

        # Concatenate the groups into DataFrames
        self.both_cleared_or_one_progress = pd.concat(both_cleared_or_one_progress_groups, ignore_index=True)
        self.gr_invoice_records = pd.concat(gr_invoice_records_groups, ignore_index=True)
        self.not_cleared_miag_records = pd.concat(not_cleared_miag_records_groups, ignore_index=True)

        print("\n")
        print("Length 2 : ", len(self.df_group_len_2))
        print("Both cleared or one progress : ", len(self.both_cleared_or_one_progress))
        print("GR Invoice Records : ", len(self.gr_invoice_records))
        print("not_cleared_miag_records ", len(self.not_cleared_miag_records))
        print("\n")

        if (len(self.gr_invoice_records) == 0):
            self.gr_invoice_records = pd.DataFrame(columns=self.df_group_len_2.columns)

        grouped = self.gr_invoice_records.groupby('Doc no. combined')

        # Initialize lists to store the rows based on INVOICE_STATUS
        cleared_records = []
        other_records = []

        # Iterate through each group
        for name, group in grouped:
            # Separate rows based on 'INVOICE_STATUS' value
            cleared_record = group[group['INVOICE_STATUS'] == 'cleared-MIAG']
            other_record = group[group['INVOICE_STATUS'] != 'cleared-MIAG']

            # Append to the lists if the record exists
            if not cleared_record.empty:
                cleared_records.append(
                    cleared_record.iloc[0])  # Assuming there's only one 'cleared-MIAG' record per group
            if not other_record.empty:
                other_records.append(
                    other_record.iloc[0])  # Assuming there's only one non-'cleared-MIAG' record per group

        # Convert lists to DataFrames if needed
        self.cleared_df = pd.DataFrame(cleared_records)
        self.other_df = pd.DataFrame(other_records)

        print("\n")
        print("GR Invoice Records : ", len(self.gr_invoice_records))
        print("Cleared df : ", len(self.cleared_df))
        print("Non Cleared df : ", len(self.other_df))
        print("\n")

        if len(self.cleared_df) == 0:
            self.cleared_df = pd.DataFrame(columns=self.gr_invoice_records.columns)

        if len(self.other_df) == 0:
            self.other_df = pd.DataFrame(columns=self.gr_invoice_records.columns)

        # Loop through each row in cleared_df and perform validations
        for index, row1 in self.cleared_df.iterrows():
            # Get the corresponding row in other_df based on 'Doc no. combined'
            row2 = self.other_df[self.other_df['Doc no. combined'] == row1['Doc no. combined']]

            # Proceed only if there is exactly one matching row in other_df
            if len(row2) == 1:
                row2 = row2.iloc[0]

                # Perform validation checks
                if (
                        pd.notnull(row1['Line Item No. SFTP']) and pd.notnull(row2['BUZEI']) and row1[
                    'Line Item No. SFTP'] == row2['BUZEI'] and
                        pd.notnull(row1['Document type']) and pd.notnull(row2['Document_type']) and row1[
                    'Document type'] == row2['Document_type'] and
                        pd.notnull(row1['Business Year SFTP']) and pd.notnull(row2['GJAHR_x']) and row1[
                    'Business Year SFTP'] == row2['GJAHR_x'] and
                        pd.notnull(row1['Gross amount']) and pd.notnull(row2['Amount_in_local_currency']) and row1[
                    'Gross amount'] == row2['Amount_in_local_currency']
                ):
                    self.cleared_df.at[index, 'NET_DUE_DATE'] = row2['NET_DUE_DATE']

                    # Check if 'Store' column in row1 is not empty
                    if pd.notnull(row1['Store']) and row1['Store'] != '':
                        # Directly remove row2 from other_df
                        self.other_df = self.other_df.drop(row2.name)
                    else:
                        # Copy 'store_or_dc' value from row2 to 'Store' column in row1
                        self.cleared_df.at[index, 'Store'] = row2['store_or_dc']
                        # Remove row2 from other_df
                        self.other_df = self.other_df.drop(row2.name)
                else:
                    # If validation fails, add the 'Doc no. combined' to the failed list
                    self.failed_doc_nos.append(row1['Doc no. combined'])
            # else:
            #     # If no match or multiple matches, add to failed list
            #     failed_doc_nos.append(row1['Doc no. combined'])

        # Output the updated cleared_df, other_df, and the failed list
        self.cleared_df.reset_index(drop=True, inplace=True)
        self.other_df.reset_index(drop=True, inplace=True)

        print("Cleared df : ", len(self.cleared_df))
        print("Non Cleared df : ", len(self.other_df))
        print("Failed docs : ", len(self.failed_doc_nos))
        print("Examples of Failed docs : ", self.failed_doc_nos[0:20])

        gr_len2_concat = pd.concat([self.cleared_df, self.other_df, self.not_cleared_miag_records, self.both_cleared_or_one_progress], ignore_index=True)
        gr_len2_concat.reset_index(drop=True, inplace=True)
        return gr_len2_concat

    def gr_length_3(self):
        self.len_3_cleared_df = self.df_group_len_3[self.df_group_len_3['INVOICE_STATUS'] == 'cleared-MIAG'].copy()
        self.len_3_other_df = self.df_group_len_3[self.df_group_len_3['INVOICE_STATUS'] != 'cleared-MIAG'].copy()

        rows_to_delete = []
        for index_other, row2 in self.len_3_other_df.iterrows():
            # Find matching row in len_3_cleared_df
            match = self.len_3_cleared_df[
                (self.len_3_cleared_df['Line Item No. SFTP'] == row2['BUZEI']) &
                (self.len_3_cleared_df['Document type'] == row2['Document_type']) &
                (self.len_3_cleared_df['Business Year SFTP'] == row2['GJAHR_x']) &
                (self.len_3_cleared_df['Gross amount'] == row2['Amount_in_local_currency']) &
                (self.len_3_cleared_df['Doc no. combined'] == row2['Doc no. combined'])
                ]

            if not match.empty:
                # Take the first matched row (assuming only one match is expected)
                row1 = match.iloc[0]

                self.len_3_cleared_df.loc[match.index[0], 'NET_DUE_DATE'] = row2['NET_DUE_DATE']

                # Check the Store column in row1
                if pd.isna(row1['Store']):
                    # Check the store_or_dc column in row2
                    if not pd.isna(row2.get('store_or_dc')):
                        self.len_3_cleared_df.loc[match.index[0], 'Store'] = row2['store_or_dc']

                # Mark the row2 for deletion
                rows_to_delete.append(index_other)

        # Delete rows from len_3_other_df that were processed
        self.len_3_other_df.drop(index=rows_to_delete, inplace=True)

        print("Length 3 : ", len(self.df_group_len_3))
        print("len_3_cleared_df : ", len(self.len_3_cleared_df))
        print("len_3_other_df : ", len(self.len_3_other_df))

        gr_len3_concat = pd.concat([self.len_3_cleared_df, self.len_3_other_df], ignore_index=True)
        gr_len3_concat.reset_index(drop=True, inplace=True)
        return gr_len3_concat

# Standardize

In [14]:
import time

import pandas as pd
import numpy as np
# from Merge import *
# from Postprocess import *
# from GR_Invoice import *


class Standardize:

    def __init__(self):
        self.merge = Merge()
        self.postprocess = Postprocess()
        self.gr_invoice = GR_Invoice()
        self.clean_ic = Clean_IC()

    def standardize(self, df_fi, df_ic, sftp_df):
        df_ic['LIFNR'] = df_ic['LIFNR'].apply(
            lambda x: '10000' + str(x) if not pd.isna(x) and len(str(x)) < 10 else str(x) if not pd.isna(
                x) else '0' * 10
        )
        df_fi['suppl_no'] = df_fi['suppl_no'].apply(
            lambda x: '10000' + str(x) if not pd.isna(x) and len(str(x)) < 10 else str(x) if not pd.isna(
                x) else '0' * 10
        )
        df_ic['BELNR'] = df_ic['BELNR'].apply(
            lambda x: '0' * (10 - len(str(x))) + str(x) if not pd.isna(x) else '0' * 10)
        df_fi['BELNR'] = df_fi['BELNR'].apply(
            lambda x: '0' * (10 - len(str(x))) + str(x) if not pd.isna(x) else '0' * 10)
        sftp_df['Document number'] = sftp_df['Document number'].apply(
            lambda x: '0' * (10 - len(str(x))) + str(x) if not pd.isna(x) else '0' * 10)



        df_ic['LIFNR'] = df_ic['LIFNR'].apply(self.transform_number)
        
        new_ic_unique_count = df_ic[['LIFNR', 'BELNR', 'RENR', 'GEBRF', 'GJAHR']].drop_duplicates().shape[0]
        print("Count of unique records in IC after cleaning : ", new_ic_unique_count)
        
        
        df_ic = self.clean_ic.clean_ic(df_ic)
        

        columns_to_convert_ic = ['REDAT', 'RGDAT', 'WEDAT']
        for col in columns_to_convert_ic:
            df_ic[col] = pd.to_datetime(df_ic[col], format='%Y%m%d').dt.strftime('%d.%m.%Y')



        columns_to_convert_fi = ['AUGDT', 'ZFBDT', 'NETDT', 'BLDAT', 'BUDAT', 'CPUDT', 'partition_date','dana_ingestion_date']
        for col in columns_to_convert_fi:
            df_fi[col] = pd.to_datetime(df_fi[col], format='%Y-%m-%d').dt.strftime('%d.%m.%Y')



        df_fi['Amount_in_local_currency'] = df_fi['Amount_in_local_currency'] * -1

        df_ic['fin_year_IC'] = pd.to_datetime(df_ic['REDAT'], format='%d.%m.%Y').dt.year.astype('int64')

        df_fi['fin_year_FI'] = pd.to_datetime(df_fi['BLDAT'], format='%d.%m.%Y').dt.year.astype('int64')

        split_columns_sftp_df = sftp_df['ARKTX'].str.split('#', expand=True)
        sftp_df['Business Year new'] = split_columns_sftp_df[3].apply(
            lambda x: str(x) if pd.notnull(x) and x.isdigit() else pd.NA
        ).astype('string')


        merge_start = time.time()
        merged_df = self.merge.merge(df_fi, df_ic, sftp_df)
        merge_end = time.time()
        print("Merge time : ", merge_end-merge_start)

        postprocess_start = time.time()
        merged_df = self.postprocess.postprocess(merged_df)
        postprocess_end = time.time()
        print("Postprocess time : ", postprocess_end-postprocess_start)


        merged_df = self.gr_invoice.gr_invoice(merged_df)


        return merged_df



    def transform_number(self, num):
        if pd.isna(num):  # Check for NaN values
            return np.nan  # Return NaN if the input is NaN
        num_str = str(num)  # Convert to string

        # If the number already starts with '10000', return it as is
        if num_str.startswith('10000'):
            return num_str

        stripped_num = num_str.lstrip('0')  # Remove leading zeros
        final_num = stripped_num.zfill(6)  # Ensure it has at least 6 digits
        return '1000' + final_num  # Prepend '10000'



# Preprocess

In [15]:
import time
import pandas as pd
import numpy as np
# from Clean_FI import *
# from Clean_IC import *
# from Typecast_FI import *
# from DB_Instance_Operations import *
# from ReverseCast import *
# from Standardize import *

class Preprocess:

    def __init__(self):
        self.clean_fi = Clean_FI()
        self.clean_ic = Clean_IC()
        self.typecast_fi = Typecast_FI()
        self.db_instance_operations_2 = DB_Instance_Operations()
        self.reverse_cast = ReverseCast()
        self.standardize = Standardize()

    def preprocess(self, df_fi, sisic_df, mmsic_df, sftp_df):
        df_fi = df_fi.applymap(lambda x: x.strip() if isinstance(x, str) else x)
        sisic_df = sisic_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
        mmsic_df = mmsic_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)


        df_fi_copy = df_fi.copy()
        df_ic = pd.concat([mmsic_df, sisic_df], ignore_index=True)
        df_ic_copy = df_ic.copy()
        df_fi = df_fi[df_fi['Document_type'] != 'PM']


        doc_to_type_ic = dict(zip(df_ic['BELNR'], df_ic['RENR']))
        df_fi['XBLNR'] = df_fi['XBLNR'].fillna(df_fi['BELNR'].map(doc_to_type_ic))
        doc_to_type_fi = dict(zip(df_fi['XBLNR'], df_fi['BELNR']))
        df_ic['BELNR'] = df_ic['BELNR'].str.strip().replace('', np.nan)
        df_ic['BELNR'] = df_ic['BELNR'].fillna(df_ic['RENR'].map(doc_to_type_fi))


        fi_unique_count = df_fi[['suppl_no', 'BELNR', 'XBLNR', 'GJAHR']].drop_duplicates().shape[0]
        print("Count of unique records in FI before cleaning : ", fi_unique_count)

        clean_fi_start = time.time()
        df_fi = self.clean_fi.clean_fi(df_fi)
        clean_fi_end = time.time()
        print("Cleaning FI time : ", clean_fi_end-clean_fi_start)
        df_fi = self.typecast_fi.typecast_fi(df_fi)
        typecast_fi_end = time.time()
        print("Typecasting FI time : ", typecast_fi_end-clean_fi_end)
        self.db_instance_operations_2.writeFITable(df_fi)
        write_fi_end = time.time()
        print("Writing FI time : ", write_fi_end-typecast_fi_end)

        ic_unique_count = df_ic[['LIFNR', 'BELNR', 'RENR', 'GEBRF', 'GJAHR']].drop_duplicates().shape[0]
        print("Count of unique records in IC before cleaning : ", ic_unique_count)

        clean_ic_start = time.time()
        df_ic = self.clean_ic.clean_ic(df_ic)
        clean_ic_end = time.time()
        print("Cleaning IC time : ", clean_ic_end-clean_ic_start)
        self.db_instance_operations_2.writeICTable(df_ic)
        write_ic_end = time.time()
        print("Writing IC time : ", write_ic_end-clean_ic_end)

        df_fi = self.reverse_cast.reverse_cast_fi(df_fi)
        df_ic = self.reverse_cast.reverse_cast_ic(df_ic)


        merged_df = self.standardize.standardize(df_fi, df_ic, sftp_df)

        return merged_df

# Loadfile

In [16]:
import pandas as pd
import numpy as np
import datetime

class Loadfile:

    def __init__(self):
        self.loadfile_df = pd.DataFrame([])

    def loadfile(self, merged_df):
        
        unique_pairs = merged_df.drop_duplicates(subset=['Supplier number (Sales Line)', 'Supplier number (MIAG)'])
        unique_pairs = unique_pairs.drop_duplicates(subset=['Supplier number (Sales Line)'], keep='first')
        doc_to_type = dict(zip(unique_pairs['Supplier number (Sales Line)'], unique_pairs['Supplier number (MIAG)']))



        company_code = 3142
        self.loadfile_df['COMPANY_CODE'] = company_code



        self.loadfile_df['SUPPLIER_NO'] = merged_df['Supplier number (Sales Line)'].fillna(
            merged_df['suppl_no']
        ).fillna(
            merged_df['LIFNR']
        )


        self.loadfile_df['MIAG_SUPPLIER_NO'] = merged_df['Supplier number (MIAG)']
        self.loadfile_df['MIAG_SUPPLIER_NO'] = self.loadfile_df['MIAG_SUPPLIER_NO'].fillna(
            self.loadfile_df['SUPPLIER_NO'].map(doc_to_type))



        self.loadfile_df['ORDER_NO'] = merged_df['AUFNR']



        self.loadfile_df['DOC_TYPE'] = merged_df['Document type'].where(merged_df['Document type'].notna(),
                                                                   merged_df['Document_type'])



        self.loadfile_df['INVOICE_NO'] = merged_df['Invoice number'].where(merged_df['Invoice number'].notna(),
                                                                      merged_df['XBLNR'])
        self.loadfile_df['INVOICE_NO'] = merged_df['Invoice number'].fillna(
            merged_df['XBLNR']
        ).fillna(
            merged_df['RENR']
        )



        self.loadfile_df['INVOICE_DATE'] = merged_df['Document date'].fillna(merged_df['REDAT'])



        self.loadfile_df['DELIVERY_NOTE_NO'] = merged_df['LFSNR']




        self.loadfile_df['TOTAL_AMT_DC'] = merged_df['Gross amount'].fillna(
            merged_df['Amount_in_local_currency']
        ).fillna(
            merged_df['GEBRF']
        )



        self.loadfile_df['TOTAL_VAT_DC'] = merged_df['GSMWF']

        company_code = 3142
        self.loadfile_df['COMPANY_CODE'] = company_code

        country_currency_dict = {3142: 'TRY'}
        self.loadfile_df['CURRENCY'] = self.loadfile_df['COMPANY_CODE'].map(country_currency_dict)

        condition = (self.loadfile_df['DOC_TYPE'] == 'MV')

        self.loadfile_df['PRE_FINANCE_DATE'] = np.where(condition, merged_df['Value date'], '')

        self.loadfile_df['GOODS_RECEIPT_NO'] = merged_df['WENUM']

        self.loadfile_df['GOODS_RECEIPT_DATE'] = merged_df['WEDAT']

        self.loadfile_df['INVOICE_ENTRY_DATE'] = merged_df['RGDAT'].where(merged_df['RGDAT'].notna(), merged_df['BLDAT'])

        self.loadfile_df['INVOICE_STATUS'] = merged_df['INVOICE_STATUS']

        self.loadfile_df['INVOICE_STATUS_INTERNAL'] = merged_df['ABGST']

        self.loadfile_df['NET_DUE_DATE'] = merged_df['NET_DUE_DATE']

        self.loadfile_df['DEBIT_NOTE_NO'] = merged_df['DEBNOTNO']

        self.loadfile_df['REMITTANCE_ADVICE_NO'] = np.where(
            merged_df['INVOICE_STATUS'] == 'cleared-MIAG',
            merged_df['Remittance advice number'],
            '')

        self.loadfile_df['CLEARING_DATE'] = merged_df['Value date'].where(merged_df['Value date'].notna(),
                                                                     merged_df['AUGDT'])

        self.loadfile_df['DOCUMENT_NO'] = merged_df['BELNR'].where(merged_df['BELNR'].notna(), merged_df['Document number'])

        self.loadfile_df['STORE_NO'] = merged_df['Store'].fillna(merged_df['store_or_dc'])

        self.loadfile_df['ARKTX'] = merged_df['ARKTX']

        current_date = datetime.date.today()
        formatted_current_date = current_date.strftime("%d.%m.%Y")
        self.loadfile_df['MATCHING_DATE'] = formatted_current_date
        self.loadfile_df['MATCH_STATUS'] = 'No Matching Requested'
        self.loadfile_df['SYNC_DATE'] = formatted_current_date
        self.loadfile_df['SYNC_STATUS'] = '1'
        self.loadfile_df = self.loadfile_df.fillna('')
        self.loadfile_df['INVOICE_NO'].replace('nan', '', inplace=True)
        self.loadfile_df['DOCUMENT_NO'] = self.loadfile_df['DOCUMENT_NO'].replace('0000000000', '')
        self.loadfile_df['DEBIT_NOTE_NO'] = self.loadfile_df['DEBIT_NOTE_NO'].apply(
            lambda x: str(x).strip() if str(x).strip() else '')
        self.loadfile_df = self.loadfile_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
        return self.loadfile_df




# Loadfile_upload

In [17]:
import io
import pandas as pd
from google.cloud import storage
import datetime
import tempfile

class Loadfile_upload:

    def __init__(self):
        self.bucket_name = "miag-m360-test-bucket"

    def upload_dataframe_to_gcs(self, df, destination_blob_name, separator):
        storage_client = storage.Client()
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
        try:
            df.to_csv(temp_file.name, index=False, sep=separator, encoding='utf-8')
            bucket = storage_client.bucket(self.bucket_name)
            blob = bucket.blob(destination_blob_name)
            blob.upload_from_filename(temp_file.name)
            print(f"CSV uploaded to {destination_blob_name} in bucket {self.bucket_name}.")
        finally:
            temp_file.close()
            os.remove(temp_file.name)

# Process

In [18]:
# from Storage_Bucket_Operations import *
# from DB_Instance_Operations import *
# from BigQuery_Operations import *
# from Preprocess import *
# from Loadfile import *
# from Latest_SFTP_file import *
# import time
# from Loadfile_upload import *


class Process:

    def __init__(self):
        self.storage_bucket_operations = Storage_Bucket_Operations()
        self.db_instance_operations = DB_Instance_Operations()
        self.bigquery_operations = BigQuery_Operations()
        self.preprocess = Preprocess()
        self.loadfile = Loadfile()
        self.latest_sftp_file = Latest_SFTP_file()
        self.loadfile_upload = Loadfile_upload()

    def process(self):
        print("Process started...")
        sftp_start_time = time.time()
        sftp_file = self.latest_sftp_file.get_latest_file()
        print("SFTP file : ", sftp_file)
        sftp_df = self.storage_bucket_operations.readFromBucket(sftp_file)
        print("SFTP shape : ", sftp_df.shape)
        sftp_end_time = time.time()
        print("SFTP fetch time : ", sftp_end_time-sftp_start_time)
        # lowest_doc_date = self.latest_sftp_file.lowest_document_date(sftp_df)
        # print("Lowest Document Date : ", lowest_doc_date)
        # ic_start_date = self.latest_sftp_file.convert_to_yyyymmdd(lowest_doc_date)
        # fi_start_date = self.latest_sftp_file.convert_to_yyyy_mm_dd(lowest_doc_date)
        sdp_start_time = time.time()
        sdp_df = self.db_instance_operations.readSDPTable()
        sdp_df = self.db_instance_operations.updateSDP(sdp_df, sftp_df)
        self.db_instance_operations.writeSDPTable(sdp_df)
        sdp_end_time = time.time()
        print("SDP rxw time : ", sdp_end_time-sdp_start_time)
        sdp_supplier_list_for_mmsic = self.db_instance_operations.getSupplierNumberForMMSIC()
        sdp_supplier_list_for_sisic = self.db_instance_operations.getSupplierNumberForSISIC()
        sdp_supplier_list_for_fi = self.db_instance_operations.getSupplierNumberForFI()
        fi_start_time = time.time()
        df_fi = self.bigquery_operations.extract_FI(sdp_supplier_list_for_fi)
        fi_end_time = time.time()
        print("FI fetch time : ", fi_end_time-fi_start_time)
        sisic_df = self.bigquery_operations.extract_SISIC(sdp_supplier_list_for_sisic)
        sis_end_time = time.time()
        print("SIS fetch time : ", sis_end_time-fi_end_time)
        mmsic_df = self.bigquery_operations.extract_MMSIC(sdp_supplier_list_for_mmsic)
        mmsic_end_time = time.time()
        print("MMS fetch time : ",mmsic_end_time - sis_end_time)
        merged_df = self.preprocess.preprocess(df_fi, sisic_df, mmsic_df, sftp_df)
        loadfile_start = time.time()
        loadfile_df = self.loadfile.loadfile(merged_df)
        loadfile_end = time.time()
        print("Loadfile Creation time : ", loadfile_end-loadfile_start)
        current_datetime = datetime.datetime.now().strftime("%Y%m%d")
        self.loadfile_upload.upload_dataframe_to_gcs(loadfile_df, f"analysis/load.360.35.{current_datetime}.001_test_internal.csv", separator=',')
        self.loadfile_upload.upload_dataframe_to_gcs(loadfile_df, f"share/load.360.35.{current_datetime}.001.csv", separator=';')
        # print(loadfile_df.dtypes)
        self.db_instance_operations.writeMergedTable(loadfile_df)
        self.db_instance_operations.__del__()
        print("Process completed...")

if __name__ == "__main__":
    process = Process()
    print(process.process())


sslrootcert downloaded and stored temporarily at /var/tmp/tmphdn2lkvi
sslcert downloaded and stored temporarily at /var/tmp/tmp81q7aodx
sslkey downloaded and stored temporarily at /var/tmp/tmp5720vvhj
sslrootcert downloaded and stored temporarily at /var/tmp/tmps5qjdav9
sslcert downloaded and stored temporarily at /var/tmp/tmp2w3gkm4r
sslkey downloaded and stored temporarily at /var/tmp/tmpf9z2gvne
Process started...
The latest file in 'Downloaded Files' is: Downloaded Files/miag.35.288560.20241114.959.csv
SFTP file :  Downloaded Files/miag.35.288560.20241114.959.csv
SFTP shape :  (110841, 18)
SFTP fetch time :  1.5190658569335938
Written back to SDP Table of DB Instance...
SDP rxw time :  2.4581332206726074
FI fetch time :  23.59712266921997
SIS fetch time :  3.1702136993408203
MMS fetch time :  11.371078491210938
Count of unique records in FI before cleaning :  180274
FI shape after cleaning :  (180274, 35)
Cleaning FI time :  124.51346182823181
Type casted cleaned FI for storage
Typ

In [19]:
end = time.time()

In [20]:
print(end-start)

921.6354794502258
