In [1]:


import boto3
import pandas as pd
from datetime import datetime

# Set up logging
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_file = f'/home/ubuntu/DMML_project/Logs/Data_Validation/validation_{timestamp}.log'

def log_message(message):
    with open(log_file, 'a') as f:
        f.write(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} - {message}\n')

ACCESS_KEY = 'AKIAWPPO6VXLYSOLWFE7'
SECRET_KEY = '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'

s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

def get_latest_timestamp_folder(bucket_name, prefix):
    log_message(f'Getting latest timestamp folder for {prefix}')
    response = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    timestamp_folders = [obj['Prefix'] for obj in response['CommonPrefixes'] if 'timestamp=' in obj['Prefix']]
    
    latest_timestamp_folder = max(timestamp_folders, key=lambda x: datetime.strptime(x.split('/')[-2], 'timestamp=%Y-%m-%d'))
    log_message(f'Latest timestamp folder: {latest_timestamp_folder}')
    
    return latest_timestamp_folder

def copy_latest_csv_to_s3(bucket_name, prefix, target_key):
    latest_folder = get_latest_timestamp_folder(bucket_name, prefix)
    latest_csv_file = latest_folder + 'customer_churn.csv'
    log_message(f'Copying {latest_csv_file} to {target_key}')
    s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 'Key': latest_csv_file}, Key=target_key)

def validate_data(df, file_name):
    log_message(f'Validating data for {file_name}')
    # Check for missing values
    null_counts = df.isnull().sum().reset_index()
    null_counts.columns = ['Column', 'Null Count']
    null_counts['File Name'] = file_name
    log_message(f'Null counts for {file_name}: \n{null_counts}')
    
    # Check for duplicates
    duplicate_count = df.duplicated().sum()
    log_message(f'Duplicate rows for {file_name}: {duplicate_count}')
    duplicate_report = pd.DataFrame({'File Name': [file_name], 'Duplicate Rows': [duplicate_count]})
    
    return null_counts, duplicate_report

log_message('Validation started')

bucket_name = 'dmml-storage-bits'

prefix_postgres = 'raw-data/source=postgres_rds/type=customer_churn/'
prefix_google = 'raw-data/source=google_drive/type=customer_churn/'

latest_folder_postgres = get_latest_timestamp_folder(bucket_name, prefix_postgres)
latest_folder_google = get_latest_timestamp_folder(bucket_name, prefix_google)

print("Latest Postgres folder:", latest_folder_postgres)
print("Latest Google Drive folder:", latest_folder_google)

# Get CSV files from S3
postgres_csv_file = latest_folder_postgres + 'customer_churn.csv'
google_csv_file = latest_folder_google + 'customer_churn.csv'

postgres_df = pd.read_csv(s3.get_object(Bucket=bucket_name, Key=postgres_csv_file)['Body'])
google_df = pd.read_csv(s3.get_object(Bucket=bucket_name, Key=google_csv_file)['Body'])

# Validate data
postgres_null_counts, postgres_duplicate_report = validate_data(postgres_df, 'Postgres')
google_null_counts, google_duplicate_report = validate_data(google_df, 'Google Drive')

# Save validation reports to CSV
null_counts_report = pd.concat([postgres_null_counts, google_null_counts])
duplicate_report = pd.concat([postgres_duplicate_report, google_duplicate_report])

null_counts_report.to_csv('null_counts_report.csv', index=False)
duplicate_report.to_csv('duplicate_report.csv', index=False)

# Copy latest CSV files to S3
copy_latest_csv_to_s3(bucket_name, prefix_postgres, 'pre_processed/source/customer_churn_postgres.csv')
copy_latest_csv_to_s3(bucket_name, prefix_google, 'pre_processed/source/customer_churn_gdrive.csv')

log_message('Validation complete')


Latest Postgres folder: raw-data/source=postgres_rds/type=customer_churn/timestamp=2025-02-26/
Latest Google Drive folder: raw-data/source=google_drive/type=customer_churn/timestamp=2025-02-26/
