In [36]:
import os
import pandas as pd
from datetime import datetime
import hashlib
import psycopg2
from config import read_config_file # the read_config_file is a function name which reads params from database.ini file

In [37]:
def load_data_and_add_metadata(file_path: str, schema: dict = None) -> pd.DataFrame:
    """
    This function reads data from an Excel file located at the specified path using pandas' `read_excel` method.
    It allows an optional schema dictionary to define data types during loading.
    Two new columns, '_Record_Source' and '_Load_Date', are then added to the DataFrame.
    '_Record_Source' contains the file path, and '_Load_Date' contains the current date and time.
    The resulting DataFrame is returned.

    Input:
    - file_path (str): The path to the Excel file.
    - schema (dict, optional): A dictionary specifying the data types for columns during loading.

    Output:
    - pd.DataFrame: A DataFrame containing the loaded data with additional '_Record_Source' and '_Load_Date' columns.
    """
    try:
        # Load excel data
        df = pd.read_excel(file_path, dtype = schema)

        # Add 'Filename' column
        df['_Record_Source'] = file_path

        # Dodaj nową kolumnę 'Load_Time' z aktualną datą i czasem
        load_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        df['_Load_Date'] = load_time

        return df
    
    except Exception as error:
        print(f"Error loading data: {error}")
        return None

In [40]:
mu_schema = {
    'Controlling_Area': 'str',
    'Company_Code': 'str',
    'Cost_Object': 'str',
    'Cost_Object_Type': 'str',
    'Cost_Element': 'str',
    'Year': 'int',
    'Period': 'int',
    'Currency': 'str',
    'Amount': 'float'
}

excel_file_path = "Manual Upload.xlsx"

manual_upload_df = load_data_and_add_metadata(excel_file_path, schema = mu_schema)
print(manual_upload_df.info())
manual_upload_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 11 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   Controlling_Area  10 non-null     object 
 1   Company_Code      10 non-null     object 
 2   Cost_Object       10 non-null     object 
 3   Cost_Object_Type  10 non-null     object 
 4   Cost_Element      10 non-null     object 
 5   Year              10 non-null     int32  
 6   Period            10 non-null     int32  
 7   Currency          10 non-null     object 
 8   Amount            10 non-null     float64
 9   _Record_Source    10 non-null     object 
 10  _Load_Date        10 non-null     object 
dtypes: float64(1), int32(2), object(8)
memory usage: 932.0+ bytes
None


Unnamed: 0,Controlling_Area,Company_Code,Cost_Object,Cost_Object_Type,Cost_Element,Year,Period,Currency,Amount,_Record_Source,_Load_Date
0,WXYZ,1234,0000123456,Cost Center,9876543210,1999,1,USD,1500.5,Manual Upload.xlsx,2024-02-05 15:22:24
1,4567,5678,0000234567,Cost Center,1234567890,2023,2,EUR,2000.75,Manual Upload.xlsx,2024-02-05 15:22:24
2,WXYZ,WXYZ,A12.345.678,WBS,4567890123,2022,3,CHF,1800.2,Manual Upload.xlsx,2024-02-05 15:22:24
3,9876,WXYZ,B23.456.789,WBS,7890123456,2023,4,PLN,1200.8,Manual Upload.xlsx,2024-02-05 15:22:24
4,ABCD,6789,9876543210,COPA,9876543210,2024,12,USD,2200.25,Manual Upload.xlsx,2024-02-05 15:22:24


In [41]:
manual_upload_df["Cost_Object_Type"].unique()

array(['    Cost Center', 'Cost Center', 'WBS    ', 'WBS   ', 'COPA',
       '    COPA'], dtype=object)

In [26]:
def trim_all_spaces(df: pd.DataFrame) -> pd.DataFrame:
    """ 
    Iterates through the columns of a DataFrame and removes leading and trailing white-space characters for object data types.

    Input:
    - df (pd.DataFrame): The input DataFrame to be processed.

    Output:
    - pd.DataFrame: A new DataFrame with white-spaces trimmed for object data types.
    """
    def remove_whitespaces(text: str) -> str:
        text_clean = text.strip()
        return text_clean
    
    try: 
        assert isinstance(df, pd.DataFrame), "Error: Input must be a pandas DataFrame"
    except AssertionError as error:
        print(error)
        return None

    for column_name in df.columns:
        if df[column_name].dtype == 'O':  # 'O' means object
            df[column_name] = df[column_name].apply(remove_whitespaces)
            print(f'Whitespaces removed for: {column_name}')
        else:
            print(f'{column_name} is not of object type, moved to the next column')

    return df      

In [42]:
trim_all_spaces(manual_upload_df)

Whitespaces removed for: Controlling_Area
Whitespaces removed for: Company_Code
Whitespaces removed for: Cost_Object
Whitespaces removed for: Cost_Object_Type
Whitespaces removed for: Cost_Element
Year is not of object type, moved to the next column
Period is not of object type, moved to the next column
Whitespaces removed for: Currency
Amount is not of object type, moved to the next column
Whitespaces removed for: _Record_Source
Whitespaces removed for: _Load_Date


Unnamed: 0,Controlling_Area,Company_Code,Cost_Object,Cost_Object_Type,Cost_Element,Year,Period,Currency,Amount,_Record_Source,_Load_Date
0,WXYZ,1234,0000123456,Cost Center,9876543210,1999,1,USD,1500.5,Manual Upload.xlsx,2024-02-05 15:22:24
1,4567,5678,0000234567,Cost Center,1234567890,2023,2,EUR,2000.75,Manual Upload.xlsx,2024-02-05 15:22:24
2,WXYZ,WXYZ,A12.345.678,WBS,4567890123,2022,3,CHF,1800.2,Manual Upload.xlsx,2024-02-05 15:22:24
3,9876,WXYZ,B23.456.789,WBS,7890123456,2023,4,PLN,1200.8,Manual Upload.xlsx,2024-02-05 15:22:24
4,ABCD,6789,9876543210,COPA,9876543210,2024,12,USD,2200.25,Manual Upload.xlsx,2024-02-05 15:22:24
5,1234,2345,1234567890,COPA,1234567890,2022,1,EUR,1600.9,Manual Upload.xlsx,2024-02-05 15:22:24
6,WXYZ,7890,000007001,Cost Center,4567890123,2023,7,CHF,1750.3,Manual Upload.xlsx,2024-02-05 15:22:24
7,9876,4321,000008001,Cost Center,7890123456,2024,13,PLN,1950.6,Manual Upload.xlsx,2024-02-05 15:22:24
8,1234,WXYZ,000009001,WBS,9876543210,2022,14,USD,2100.7,Manual Upload.xlsx,2024-02-05 15:22:24
9,WXYZ,2109,000001001,WBS,1234567890,2023,10,EUR,1850.4,Manual Upload.xlsx,2024-02-05 15:22:24


In [43]:
manual_upload_df["Cost_Object_Type"].unique()

array(['Cost Center', 'WBS', 'COPA'], dtype=object)

In [29]:
def data_validation(df: pd.DataFrame, save_invalid: bool = False) -> pd.DataFrame:
    """
    Validates a DataFrame against specified business rules and optionally saves invalid rows to a CSV file.

    Input:
    - df (pd.DataFrame): The input DataFrame to be validated.
    - save_invalid (bool, optional): If True, saves invalid rows to a CSV file (default is False) in the 'Error Output' folder.
    The CSV file is named 'invalid_data_<current_date_time>.csv'.

    Output:
    - pd.DataFrame: A new DataFrame with invalid rows removed.

    Business Rules:
    1. Cost Object Type must be one of ('Cost Center', 'WBS', 'COPA').
    2. Year must be greater than or equal to 2000.
    3. Period must be in the range [1, 12].
    """
    
    # Ignore SettingWithCopyWarning
    pd.options.mode.chained_assignment = None 

    # Rule 1: Cost Object Type: ('Cost Center', 'WBS', 'COPA')
    valid_cost_object_types = {'Cost Center', 'WBS', 'COPA'}
    invalid_rows_rule1 = df[~df['Cost_Object_Type'].isin(valid_cost_object_types)]
    invalid_rows_rule1['Error_Type'] = 'Invalid Cost Object Type'
    
    print("Invalid Rows Rule 1:")
    print(invalid_rows_rule1)

    # Rule 2: Valid Year format: >= 2000
    invalid_rows_rule2 = df[df['Year'] < 2000]
    invalid_rows_rule2['Error_Type'] = 'Invalid Year Format'
    
    print("Invalid Rows Rule 2:")
    print(invalid_rows_rule2)

    # Rule 3: Valid Period format: [1, 12]
    invalid_rows_rule3 = df[(df['Period'] < 1) | (df['Period'] > 12)]
    invalid_rows_rule3['Error_Type'] = 'Invalid Period Format'
   
    print("Invalid Rows Rule 3:")
    print(invalid_rows_rule3)

    invalid_df = pd.concat([invalid_rows_rule1, invalid_rows_rule2, invalid_rows_rule3])

    if save_invalid and not invalid_df.empty:
        # Creating Error Output folderif it does not exist
        error_output_folder = "Error Output"
        if not os.path.exists(error_output_folder):
            os.makedirs(error_output_folder)

        # Create a unique file name based on date and time
        error_file_path = os.path.join(error_output_folder, f'invalid_data_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.csv')
        
        invalid_df.to_csv(error_file_path, index=True)

    # Drop invalid records from the df and reset index
    df = df.drop(index=invalid_df.index).reset_index(drop=True)

    return df

In [44]:
manual_upload_df = data_validation(manual_upload_df, save_invalid=True)
manual_upload_df

Invalid Rows Rule 1:
Empty DataFrame
Columns: [Controlling_Area, Company_Code, Cost_Object, Cost_Object_Type, Cost_Element, Year, Period, Currency, Amount, _Record_Source, _Load_Date, Error_Type]
Index: []
Invalid Rows Rule 2:
  Controlling_Area Company_Code Cost_Object Cost_Object_Type Cost_Element  \
0             WXYZ         1234  0000123456      Cost Center   9876543210   

   Year  Period Currency  Amount      _Record_Source           _Load_Date  \
0  1999       1      USD  1500.5  Manual Upload.xlsx  2024-02-05 15:22:24   

            Error_Type  
0  Invalid Year Format  
Invalid Rows Rule 3:
  Controlling_Area Company_Code Cost_Object Cost_Object_Type Cost_Element  \
7             9876         4321   000008001      Cost Center   7890123456   
8             1234         WXYZ   000009001              WBS   9876543210   

   Year  Period Currency  Amount      _Record_Source           _Load_Date  \
7  2024      13      PLN  1950.6  Manual Upload.xlsx  2024-02-05 15:22:24   
8  202

Unnamed: 0,Controlling_Area,Company_Code,Cost_Object,Cost_Object_Type,Cost_Element,Year,Period,Currency,Amount,_Record_Source,_Load_Date
0,4567,5678,0000234567,Cost Center,1234567890,2023,2,EUR,2000.75,Manual Upload.xlsx,2024-02-05 15:22:24
1,WXYZ,WXYZ,A12.345.678,WBS,4567890123,2022,3,CHF,1800.2,Manual Upload.xlsx,2024-02-05 15:22:24
2,9876,WXYZ,B23.456.789,WBS,7890123456,2023,4,PLN,1200.8,Manual Upload.xlsx,2024-02-05 15:22:24
3,ABCD,6789,9876543210,COPA,9876543210,2024,12,USD,2200.25,Manual Upload.xlsx,2024-02-05 15:22:24
4,1234,2345,1234567890,COPA,1234567890,2022,1,EUR,1600.9,Manual Upload.xlsx,2024-02-05 15:22:24
5,WXYZ,7890,000007001,Cost Center,4567890123,2023,7,CHF,1750.3,Manual Upload.xlsx,2024-02-05 15:22:24
6,WXYZ,2109,000001001,WBS,1234567890,2023,10,EUR,1850.4,Manual Upload.xlsx,2024-02-05 15:22:24


In [45]:
def hash_transformation(df):
    """
    Perform MD5 hash transformation on specified columns of a DataFrame.

    Input:
    - df (pd.DataFrame): The input DataFrame.

    Output:
    - pd.DataFrame: A new DataFrame with specified columns transformed using MD5 hashing.
    """

    def md5_hash(input):
        
        # None replacement
        if input is None:
            input = '_'

        # Convert data to a string before hashing
        input = str(input)

        # Create an MD5 hash object
        md5_hash_object = hashlib.md5()

        # Update the hash object with the bytes of the input text
        md5_hash_object.update(input.encode('utf-8'))

        # Get the hexadecimal representation of the hash
        hashed_text = md5_hash_object.hexdigest()

        return hashed_text

    # Creating _HSKs and additional columns
    df['CONTROLLING_AREA_HSK'] = df['Controlling_Area'].apply(md5_hash)

    df['COMPANY_CODE_HSK'] = df['Company_Code'].apply(md5_hash)

    df['COST_OBJECT_HSK'] = df.apply(lambda row: md5_hash(row['Cost_Object'] + '|' + row['Controlling_Area'])
                                     if row['Cost_Object_Type'] == 'Cost Center'
                                     else md5_hash(row['Cost_Object']),
                                     axis = 1) # rowwise apply
    
    df['COST_ELEMENT_CTR_AREA_HSK'] = (df['Cost_Element'] + df['Controlling_Area']).apply(md5_hash)

    df['CURRENCY_HSK'] = df['Currency'].apply(md5_hash)

    df['FISCAL_PERIOD'] = df['Year'].astype(str) + df['Period'].astype(str).str.zfill(2) + '01'

    return df

In [46]:
manual_upload_df = hash_transformation(manual_upload_df)
manual_upload_df

Unnamed: 0,Controlling_Area,Company_Code,Cost_Object,Cost_Object_Type,Cost_Element,Year,Period,Currency,Amount,_Record_Source,_Load_Date,CONTROLLING_AREA_HSK,COMPANY_CODE_HSK,COST_OBJECT_HSK,COST_ELEMENT_CTR_AREA_HSK,CURRENCY_HSK,FISCAL_PERIOD
0,4567,5678,0000234567,Cost Center,1234567890,2023,2,EUR,2000.75,Manual Upload.xlsx,2024-02-05 15:22:24,6562c5c1f33db6e05a082a88cddab5ea,674f3c2c1a8a6f90461e8a66fb5550ba,8361a1295e408a5abd007dea0d7844a9,cb19fc806bfcd6ba5907a15975a4be03,a055562bdb59ad8ba9cc680367308118,20230201
1,WXYZ,WXYZ,A12.345.678,WBS,4567890123,2022,3,CHF,1800.2,Manual Upload.xlsx,2024-02-05 15:22:24,e89eb41d980d68d6b1f22c893dc53514,e89eb41d980d68d6b1f22c893dc53514,a756e1d62a5ce9a0862424f3b96e3ef2,9eb7bd0ebb22f2484903758111545271,2fc2052eaa45739a9fd0854d4ed60178,20220301
2,9876,WXYZ,B23.456.789,WBS,7890123456,2023,4,PLN,1200.8,Manual Upload.xlsx,2024-02-05 15:22:24,912e79cd13c64069d91da65d62fbb78c,e89eb41d980d68d6b1f22c893dc53514,15d946ba6bcd9f72b6efcfc444b40231,59728b3cb60a02eb776f661c3b138866,446687ea2db1ada75be5ed053be77f59,20230401
3,ABCD,6789,9876543210,COPA,9876543210,2024,12,USD,2200.25,Manual Upload.xlsx,2024-02-05 15:22:24,cb08ca4a7bb5f9683c19133a84872ca7,46d045ff5190f6ea93739da6c0aa19bc,e388c1c5df4933fa01f6da9f92595589,9dc3a6ba621f7cb5e6673aac907afa1c,3518f8944d42212dd37daffe097d216e,20241201
4,1234,2345,1234567890,COPA,1234567890,2022,1,EUR,1600.9,Manual Upload.xlsx,2024-02-05 15:22:24,81dc9bdb52d04dc20036dbd8313ed055,81b073de9370ea873f548e31b8adc081,e807f1fcf82d132f9bb018ca6738a19f,100416b93d34d3482c47a7f06ca50f29,a055562bdb59ad8ba9cc680367308118,20220101
5,WXYZ,7890,000007001,Cost Center,4567890123,2023,7,CHF,1750.3,Manual Upload.xlsx,2024-02-05 15:22:24,e89eb41d980d68d6b1f22c893dc53514,cdaeb1282d614772beb1e74c192bebda,be99c59a2802976c66083139bd45b40a,9eb7bd0ebb22f2484903758111545271,2fc2052eaa45739a9fd0854d4ed60178,20230701
6,WXYZ,2109,000001001,WBS,1234567890,2023,10,EUR,1850.4,Manual Upload.xlsx,2024-02-05 15:22:24,e89eb41d980d68d6b1f22c893dc53514,05546b0e38ab9175cd905eebcc6ebb76,04b522d0c541274891f8aef7955633a5,8a3a8399c382b5b00a7edc3292bc5089,a055562bdb59ad8ba9cc680367308118,20231001


In [48]:
# Connect to an existing database
# connection = psycopg2.connect(
#                             user="postgres",
#                             password="XXXXXXXXXX",
#                             host="127.0.0.1",
#                             port="5432",
#                             database="postgres_db"
#                         )


def write_to_postgres(df):
    try:
        connection = None
        params = read_config_file()
        print('Connecting to the postgreSQL database ...')
        connection = psycopg2.connect(**params) # extracting everything from params to the .connect() method

        # Create a cursor
        cursor = connection.cursor()

        insert_query = """
        INSERT INTO manual_upload_tlnk 
            (
                controlling_area,
                company_code,
                cost_object,
                cost_object_type,
                cost_element,
                year,
                period,
                currency,
                amount,
                _record_source,
                _load_date,
                controlling_area_hsk,
                company_code_hsk,
                cost_object_hsk,
                cost_element_ctr_area_hsk,
                currency_hsk,
                fiscal_period
            ) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """

        processed_rows = 0
        for index, row in df.iterrows():
            # Makes tuples from rows
            values = tuple(row)
            # Execute INSERT statement
            cursor.execute(insert_query, values)
            processed_rows += 1

        # Commit changes
        connection.commit()
        print(f'Successfully inserted {processed_rows} rows into the MANUAL_UPLOAD_TLNK table.')
        cursor.close()
    except(Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if connection is not None:
            connection.close()
            print('Database connection terminated.')

write_to_postgres(manual_upload_df)

Connecting to the postgreSQL database ...
Successfully inserted 7 rows into the MANUAL_UPLOAD_TLNK table.
Database connection terminated.


In [49]:
class DataProcessingPipeline:
    """
    A class representing a data processing pipeline for loading, transforming, and writing data.

    This class encapsulates a series of data processing steps that can be applied sequentially to a DataFrame.
    The pipeline consists of the following methods:

    - `load_data(file_path, schema=None)`: Loads data from an Excel file into a DataFrame and adds metadata columns.
    - `trim_spaces()`: Trims leading and trailing white-space characters from object data types in the DataFrame.
    - `validate_data(save_invalid=False)`: Performs data validation against predefined business rules, optionally saving invalid records to a CSV file.
    - `hash_transform()`: Applies MD5 hash transformation to specified columns of the DataFrame.
    - `write_to_postgres()`: Writes the DataFrame to a PostgreSQL database table.

    Usage:
    ```
    pipeline = DataProcessingPipeline()
    pipeline.load_data(file_path='example.xlsx', schema = "schema.json")\
           .trim_spaces()\
           .validate_data(save_invalid = False)\
           .hash_transform()\
           .write_to_postgres()
    ```

    Each method updates the internal DataFrame (`self.df`), allowing for a flexible and customizable data processing flow.
    """
    def __init__(self):
        self.df = None

    def load_data(self, file_path, schema=None):
        self.df = load_data_and_add_metadata(file_path, schema)
        return self

    def trim_spaces(self):
        if self.df is not None:
            self.df = trim_all_spaces(self.df)
        return self

    def validate_data(self, save_invalid=False):
        if self.df is not None:
            self.df = data_validation(self.df, save_invalid)
        return self

    def hash_transform(self):
        if self.df is not None:
            self.df = hash_transformation(self.df)
        return self

    def write_to_postgres(self):
        if self.df is not None:
            write_to_postgres(self.df)
        return self

In [50]:
file_path = "Manual Upload.xlsx"
schema = {
    "Controlling_Area": "str",
    "Company_Code": "str",
    "Cost_Object": "str",
    "Cost_Object_Type": "str",
    "Cost_Element": "str",
    "Year": "int",
    "Period": "int",
    "Currency": "str",
    "Amount": "float"
}
save_invalid = True

pipeline = DataProcessingPipeline()
pipeline.load_data(file_path = file_path, schema = schema)\
       .trim_spaces()\
       .validate_data(save_invalid = save_invalid)\
       .hash_transform()\
       .write_to_postgres()

Whitespaces removed for: Controlling_Area
Whitespaces removed for: Company_Code
Whitespaces removed for: Cost_Object
Whitespaces removed for: Cost_Object_Type
Whitespaces removed for: Cost_Element
Year is not of object type, moved to the next column
Period is not of object type, moved to the next column
Whitespaces removed for: Currency
Amount is not of object type, moved to the next column
Whitespaces removed for: _Record_Source
Whitespaces removed for: _Load_Date
Invalid Rows Rule 1:
Empty DataFrame
Columns: [Controlling_Area, Company_Code, Cost_Object, Cost_Object_Type, Cost_Element, Year, Period, Currency, Amount, _Record_Source, _Load_Date, Error_Type]
Index: []
Invalid Rows Rule 2:
  Controlling_Area Company_Code Cost_Object Cost_Object_Type Cost_Element  \
0             WXYZ         1234  0000123456      Cost Center   9876543210   

   Year  Period Currency  Amount      _Record_Source           _Load_Date  \
0  1999       1      USD  1500.5  Manual Upload.xlsx  2024-02-05 15:37:

<__main__.DataProcessingPipeline at 0x1e8429e46e0>