# Data Engineering Pipeline Verification

This notebook verifies the output of the data engineering pipeline by:

1. Loading and examining processed files
2. Performing data quality checks
3. Verifying asset class categorization
4. Validating currency normalization
5. Verifying return calculations
6. Testing data consistency


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings

# Ignore FutureWarnings
warnings.filterwarnings('ignore', category=FutureWarning)

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 20)

# Set plotting style
plt.style.use('seaborn')
sns.set_palette("husl")

# Helper function to load pickle files
def load_pickle(file_path):
    try:
        return pd.read_pickle(file_path)
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

# Helper function to format file size
def format_size(size_in_bytes):
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_in_bytes < 1024:
            return f"{size_in_bytes:.2f} {unit}"
        size_in_bytes /= 1024
    return f"{size_in_bytes:.2f} GB"

## 1. Load and Examine Processed Files

First, we'll load all the processed files and examine their basic properties:


In [None]:
# Load all processed data files
data_dir = Path("../data/processed")

# Dictionary to store our loaded data
data = {}

# Load key files
data['daily_prices'] = load_pickle(data_dir / 'daily_prices.pkl')
data['daily_returns'] = load_pickle(data_dir / 'daily_returns.pkl')
data['monthly_returns'] = load_pickle(data_dir / 'monthly_returns.pkl')
data['data_dictionary'] = pd.read_excel(data_dir / 'data_dictionary.xlsx')

# Display basic information about each dataset
for name, df in data.items():
    if df is not None:
        print(f"\n{name}:")
        print("-" * 50)
        print(f"Shape: {df.shape}")
        if isinstance(df.index, pd.DatetimeIndex):
            print(f"Date range: {df.index.min()} to {df.index.max()}")
        print(f"Number of columns: {len(df.columns)}")
        print("First few columns:", ", ".join(df.columns[:5]))

## 2. Data Quality Checks

Now we'll perform various data quality checks:

- Missing values analysis
- Completeness verification (should match 84.57% from pipeline logs)
- Check for duplicates
- Identify potential outliers


In [None]:
# Check data completeness
daily_prices = data['daily_prices']
total_points = daily_prices.size
missing_points = daily_prices.isna().sum().sum()
completeness = (1 - missing_points / total_points) * 100

print(f"Data Completeness: {completeness:.2f}%")
print(f"Total missing values: {missing_points}")

# Check for duplicated dates
duplicates = daily_prices.index.duplicated().sum()
print(f"\nDuplicated dates: {duplicates}")

# Missing values by column
missing_by_column = daily_prices.isna().sum().sort_values(ascending=False)
print("\nTop 5 columns with most missing values:")
print(missing_by_column.head())

# Calculate basic statistics for outlier detection
stats = daily_prices.describe()
print("\nBasic statistics:")
print(stats)

In [None]:
# Create box plots to visualize outliers
plt.figure(figsize=(15, 6))
sns.boxplot(data=daily_prices)
plt.xticks(rotation=90)
plt.title('Distribution of Values by Asset (with Outliers)')
plt.tight_layout()
plt.show()

# Calculate and display extreme outliers (beyond 5 standard deviations)
def find_extreme_outliers(df, n_std=5):
    outliers = {}
    for col in df.columns:
        mean = df[col].mean()
        std = df[col].std()
        mask = abs(df[col] - mean) > n_std * std
        if mask.any():
            outliers[col] = df[mask][col]
    return outliers

extreme_outliers = find_extreme_outliers(daily_prices)
if extreme_outliers:
    print("\nExtreme outliers (>5 std dev):")
    for col, values in extreme_outliers.items():
        print(f"\n{col}:")
        print(values)

## 3. Asset Class Verification

Let's verify that each asset is correctly categorized according to the ASSET_MAPPING:


In [None]:
# Import asset mapping
import sys
sys.path.append('..')
from src.asset_class_mapping import ASSET_MAPPING, create_ticker_to_asset_class_map

# Get asset mapping
ticker_map = create_ticker_to_asset_class_map()

# Compare data dictionary with asset mapping
data_dict = data['data_dictionary']
print("Asset Class Distribution:")
print("-" * 50)
asset_class_counts = data_dict['Asset Class'].value_counts()
print(asset_class_counts)

# Verify each asset's classification
print("\nVerifying individual asset classifications:")
print("-" * 50)
for _, row in data_dict.iterrows():
    ticker = row['Ticker']
    asset_info = ticker_map.get(ticker, {})
    mapped_class = asset_info.get('asset_class', 'Unknown')
    dict_class = row['Asset Class'].lower().replace(' ', '_')
    
    if mapped_class != dict_class:
        print(f"Mismatch for {ticker}:")
        print(f"  Data Dictionary: {row['Asset Class']}")
        print(f"  Asset Mapping: {mapped_class}")

## 4. Currency Normalization Validation

Let's verify that the currency normalization was performed correctly by checking a few sample assets:


In [None]:
# Sample FX conversion check
fx_mapping = {
    'NKY Index': 'USDJPY Curncy',
    'PCOMP Index': 'USDPHP Index'
}

# Load original data for comparison
raw_data = pd.read_excel("../data/raw/MSCI_Comps.xlsx", skiprows=7, index_col=0)

# Check currency conversions
for asset, fx in fx_mapping.items():
    if asset in daily_prices.columns and asset in raw_data.columns and fx in raw_data.columns:
        print(f"\nChecking currency conversion for {asset}")
        print("-" * 50)
        
        # Get sample dates (first and last available)
        sample_dates = [
            max(raw_data.index[0], daily_prices.index[0]),
            min(raw_data.index[-1], daily_prices.index[-1])
        ]
        
        for date in sample_dates:
            raw_price = raw_data.loc[date, asset]
            converted_price = daily_prices.loc[date, asset]
            fx_rate = raw_data.loc[date, fx]
            
            # Calculate expected USD price
            if fx.startswith('USD'):
                expected_price = raw_price / fx_rate
            else:
                expected_price = raw_price * fx_rate
            
            print(f"\nDate: {date}")
            print(f"Original price: {raw_price:.2f}")
            print(f"FX rate ({fx}): {fx_rate:.2f}")
            print(f"Expected USD price: {expected_price:.2f}")
            print(f"Actual USD price: {converted_price:.2f}")
            print(f"Difference (%): {((converted_price - expected_price) / expected_price * 100):.4f}%")

## 5. Returns Calculation Verification

Let's verify the return calculations by manually computing returns for a sample of assets and comparing with the processed data:


In [None]:
# Select a sample asset for verification
sample_asset = daily_prices.columns[0]

# Calculate daily returns manually
manual_daily_returns = daily_prices[sample_asset].pct_change()

# Calculate monthly returns manually
manual_monthly_returns = daily_prices[sample_asset].resample('ME').last().pct_change()

print(f"Returns verification for {sample_asset}")
print("-" * 50)

# Compare daily returns
daily_diff = manual_daily_returns - data['daily_returns'][sample_asset]
print("\nDaily Returns Verification:")
print(f"Max difference: {daily_diff.abs().max():.10f}")
print(f"Mean difference: {daily_diff.abs().mean():.10f}")

# Compare monthly returns
monthly_diff = manual_monthly_returns - data['monthly_returns'][sample_asset]
print("\nMonthly Returns Verification:")
print(f"Max difference: {monthly_diff.abs().max():.10f}")
print(f"Mean difference: {monthly_diff.abs().mean():.10f}")

# Visualize the comparison
plt.figure(figsize=(15, 6))
plt.plot(manual_daily_returns.index[-50:], manual_daily_returns[-50:], 
         label='Manual Calculation', alpha=0.7)
plt.plot(data['daily_returns'].index[-50:], data['daily_returns'][sample_asset][-50:], 
         label='Pipeline Output', alpha=0.7)
plt.title(f'Daily Returns Comparison - Last 50 Days\n{sample_asset}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

## 6. Data Consistency Tests

Finally, let's perform consistency checks across the processed data:


In [None]:
# Check business day frequency
print("Checking business day frequency...")
print("-" * 50)

date_diffs = pd.Series(daily_prices.index[1:]) - pd.Series(daily_prices.index[:-1])
irregular_intervals = date_diffs[date_diffs.dt.days > 3]

if len(irregular_intervals) > 0:
    print("\nFound irregular intervals (gaps > 3 days):")
    for date in irregular_intervals.index:
        print(f"Gap between {daily_prices.index[date]} and {daily_prices.index[date + 1]}")
else:
    print("All intervals are within expected business day frequency")

# Check for consistency between daily and monthly returns
print("\nChecking daily vs monthly returns consistency...")
print("-" * 50)

# Calculate monthly returns from daily returns
monthly_from_daily = (1 + data['daily_returns']).resample('ME').prod() - 1

# Compare with processed monthly returns
monthly_diff = monthly_from_daily - data['monthly_returns']
max_diff = monthly_diff.abs().max().max()
mean_diff = monthly_diff.abs().mean().mean()

print(f"Maximum difference: {max_diff:.10f}")
print(f"Mean difference: {mean_diff:.10f}")

# Summary statistics comparison
print("\nSummary Statistics Comparison:")
print("-" * 50)
print("\nDaily Returns Statistics:")
print(data['daily_returns'].describe())
print("\nMonthly Returns Statistics:")
print(data['monthly_returns'].describe())