Install dependency


In [8]:
!pip install schedule




In [6]:
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time
import logging
from tenacity import retry, stop_after_attempt, wait_exponential

In [11]:
# ----------------------
logging.basicConfig(filename='etl_ar_log.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')


In [57]:
def extract(file_path):
    logging.info("Starting AR data extraction")
    cleaned_lines = []
    skipped_lines = []

    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            for i, line in enumerate(file):
                if line.count(',') < 6:  # crude check for malformed lines
                    skipped_lines.append((i + 1, line.strip()))
                else:
                    cleaned_lines.append(line)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            for i, line in enumerate(file):
                if line.count(',') < 6:
                    skipped_lines.append((i + 1, line.strip()))
                else:
                    cleaned_lines.append(line)

    with open('cleaned_account_receivables.csv', 'w', encoding='utf-8') as clean_file:
        clean_file.writelines(cleaned_lines)

    if skipped_lines:
        logging.warning(f"Skipped {len(skipped_lines)} malformed lines during extraction.")
        with open('skipped_lines.log', 'w') as skipped_log:
            for lineno, content in skipped_lines:
                skipped_log.write(f"Line {lineno}: {content}\n")

    df = pd.read_csv('cleaned_account_receivables.csv', skiprows=1, header=0)
    logging.info(f"Extracted {len(df)} rows from AR data")

    expected_cols = ['invoice_id', 'customer_id', 'invoice_date', 'due_date', 'amount_due', 'currency', 'status']
    if not all(col in df.columns for col in expected_cols):
        raise ValueError("Missing required columns in AR data")

    return df

In [59]:
def transform(df):
    logging.info("Starting AR data transformation")
    try:
        conversion_rates = {'EUR': 1.1, 'USD': 1.0}
        df['amount_due'] = pd.to_numeric(df['amount_due'], errors='coerce')
        df['amount_usd'] = df.apply(lambda row: row['amount_due'] * conversion_rates.get(row['currency'], 1.0), axis=1)

        # Flag overdue invoices
        df['invoice_date'] = pd.to_datetime(df['invoice_date'])
        df['due_date'] = pd.to_datetime(df['due_date'])
        df['is_overdue'] = df['due_date'] < pd.Timestamp.now()

        if df.isnull().any().any():
            logging.warning("Null values found in AR data")

        df.drop_duplicates(subset='invoice_id', inplace=True)
        df.to_csv('checkpoint_ar_transformed.csv', index=False)

        logging.info("AR data transformation complete")
        return df
    except Exception as e:
        logging.error(f"Transformation failed: {e}")
        raise

In [61]:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def load(df, db_uri='sqlite:///account_receivables.db'):
    logging.info("Starting AR data load")
    try:
        engine = create_engine(db_uri)
        df.to_sql('account_receivables', engine, index=False, if_exists='replace')
        logging.info("AR data load successful")
    except Exception as e:
        logging.error(f"Load failed: {e}")
        raise

In [47]:
def etl_job():
    logging.info("AR ETL job started")
    try:
        df = extract('account_receivables.csv')
        df_transformed = transform(df)
        load(df_transformed)
        logging.info("AR ETL job completed successfully")
    except Exception as e:
        logging.error(f"AR ETL job failed: {e}")
        # Optionally: Send alert/notification here


In [49]:
# ----------------------
# Schedule Daily Run
# ----------------------
schedule.every().day.at("18:30").do(etl_job)

print("AR ETL Scheduler started. Waiting for scheduled time...")
while True:
    schedule.run_pending()
    time.sleep(60)

AR ETL Scheduler started. Waiting for scheduled time...


KeyboardInterrupt: 

In [63]:
etl_job()