In [1]:
import pandas as pd
from pathlib import Path
from abc import ABC, abstractmethod

In [2]:
# create an interface
class DiscrepanciesInterface(ABC):
    @abstractmethod
    def uploading_discrepancies(self, path_raw_csv: Path, path_database_A: Path) -> pd.DataFrame:
        pass
    @abstractmethod
    def streaming_discrepancies(self, path_database_A: Path, path_database_B: Path) -> pd.DataFrame:
        pass

In [3]:
class Discrepancies(DiscrepanciesInterface):
    def uploading_discrepancies(self, path_raw_csv: Path, path_database_A: Path) -> pd.DataFrame:
        try:
            df_raw = pd.read_csv(path_raw_csv, index_col=0)
            df_db_a = pd.read_csv(path_database_A)
            
            """Transform raw csv data from wide to longitude and attach col headers similar to db A """
            
            # Fill missing values with 0
            df_raw = df_raw.fillna(0) 
            # We use Stack to transform the DataFrame and reset the index
            melted_df = df_raw.stack().reset_index()
            
            # Rename the columns
            melted_df.columns = ['deliveryDate', 'farmerId', 'quantity']
            # Reorder the columns to match db A
            melted_df = melted_df[['farmerId', 'deliveryDate', 'quantity']]
            
            """Compare Transformed raw csv and db A involved in the Upload process"""
            
            #merge two dfs and return those that are not on both
            result = melted_df.merge(df_db_a, indicator=True, how='outer').loc[lambda v: v['_merge'] != 'both']
            return result
        
        except Exception as e:
            return f"-Error "+ f"{type(e).__name__} {str(e)}"   
        
    def streaming_discrepancies(self, path_database_A: Path, path_database_B: Path) -> pd.DataFrame:
        try:
            #compare db A and db B involved in the streaming process
            df_db_a = pd.read_csv(path_database_A)
            df_db_b = pd.read_csv(path_database_B)
            
            result = df_db_a.merge(df_db_b, indicator=True, how='outer').loc[lambda v: v['_merge'] != 'both']
            return result
        except Exception as e:
            return f"-Error "+ f"{type(e).__name__} {str(e)}" 
            

In [4]:
disc = Discrepancies() #instantiate class

In [5]:
disc.uploading_discrepancies('Raw_Data.csv', 'Database_A.csv') #check discrepancies in uploading data

Unnamed: 0,farmerId,deliveryDate,quantity,_merge
2,19fb0be7-b2b2-4038-80ba-42baabc21c0a,2019-01-01,149.0,left_only
7,584c4094-a231-4c6d-9c49-63b7192bf8a1,2019-01-01,29.0,left_only
15,82b77709-85cc-4a20-9893-cec85ff6d7bd,2019-01-01,0.0,left_only
20,9eb16e61-5d1b-43c7-91bd-7849280e260f,2019-01-01,0.0,left_only
22,573f09bd-c649-448a-b6c9-68f55cf8aa25,2019-01-01,0.0,left_only
24,e1b85e0e-bbe8-42fc-a564-8d296e35af50,2019-01-01,59.0,left_only
25,ff367fff-2f38-4146-bc0e-d5ca7230caf7,2019-01-01,84.0,left_only
27,5633d279-21ce-46e9-b8f9-80db6b071904,2019-01-01,504.0,left_only
41,64eca1f3-b712-467b-976b-107be26d8b18,2019-01-01,0.0,left_only
44,3584558d-d6f2-4b7d-9769-1f8a751782ab,2019-01-01,141.0,left_only


In [6]:
"""
    Uploading Discrepancies:
    The discrepancies in uploading data shows that raw csv file has extra rows and values not present in Database A indicated by
    left_only merge. The left only merge col shows rows available in raw csv data that we merging to database A. There are no right_only
    merges for this discrepancy.
    
    Action Points for correction:
    1. Implement data integrity checks, such as checksums or hashing to ensure data isnot corrupted during the transmission process due to network issues
    2. Implement a retry mechanism in your data transfer process to handle incomplete data transfer that occurs due to network interruptions or timeouts.
"""

'\n    Uploading Discrepancies:\n    The discrepancies in uploading data shows that raw csv file has extra rows and values not present in Database A indicated by\n    left_only merge. The left only merge col shows rows available in raw csv data that we merging to database A. There are no right_only\n    merges for this discrepancy.\n    \n    Action Points for correction:\n    1. Implement data integrity checks, such as checksums or hashing to ensure data isnot corrupted during the transmission process due to network issues\n    2. Implement a retry mechanism in your data transfer process to handle incomplete data transfer that occurs due to network interruptions or timeouts.\n'

In [7]:
disc.streaming_discrepancies('Database_A.csv', 'Database_B.csv') #check discrepancies in streaming data

Unnamed: 0,farmerId,deliveryDate,quantity,_merge
1,dc7a4468-9eba-4f0d-aedd-c57189467f18,2021-08-04,91,left_only
10,f05d1876-8002-4c38-a7a8-bbf94bcc0159,2021-07-15,8,left_only
14,dc7a4468-9eba-4f0d-aedd-c57189467f18,2021-10-19,808,left_only
25,58f674c2-662e-4d6e-84de-ed0c486a2a63,2020-05-15,54,left_only
29,ad81ce4b-5686-4a6e-9d7a-d9a112834706,2021-02-15,31,left_only
37,dc7a4468-9eba-4f0d-aedd-c57189467f18,2021-07-15,822,left_only
44,34da3784-d47d-4459-97b5-4f6fc800a503,2019-06-21,107,left_only
47,20a2cdfc-e506-4d2a-80f4-957299729790,2020-09-18,716,left_only
52,9f78b129-d618-4d13-8e68-3adc922e8176,2020-06-15,178,left_only
55,621a6f43-431c-4a08-aca2-2ec4553d82d3,2020-10-07,388,left_only


In [8]:
"""
    Streaming Discrepancies:
    The discrepancies in streaming data from db A to db B indicates that there are values present in db A that werenot transfered to
    db B indicated by left_only. There are also values transfered to db B from db A that changed and so not similar to db A indicated
    by right_only.
    Some ways to handle streaming discrepancies
    1. Thoroughly test data pipelines and processing logic to catch potential issues.
    2. Data Quality Monitoring: Continuously monitor data quality and set up alerts for data anomalies
"""

'\n    Streaming Discrepancies:\n'