# Dynamic Time Warping (DTW) Analysis Demo

This notebook demonstrates the DTW module capabilities for financial time series pattern matching.

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Import DTW modules
import sys
sys.path.append('..')

from src.dtw import (
    DTWCalculator, FastDTW, ConstrainedDTW,
    SimilarityEngine, PatternClusterer, DTWVisualizer
)

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
%matplotlib inline

## 1. Basic DTW Example

Let's start with a simple example comparing two time series.

In [None]:
# Generate two similar but shifted sine waves
t1 = np.linspace(0, 4*np.pi, 100)
t2 = np.linspace(0, 4*np.pi, 120)

# Create patterns with different phases and noise
pattern1 = np.sin(t1) + 0.1 * np.random.randn(100)
pattern2 = np.sin(t2 + 0.5) + 0.1 * np.random.randn(120)

# Plot the patterns
plt.figure(figsize=(12, 4))
plt.plot(pattern1, label='Pattern 1', linewidth=2)
plt.plot(pattern2, label='Pattern 2', linewidth=2)
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Two Time Series Patterns')
plt.legend()
plt.show()

In [None]:
# Compute DTW distance
dtw_calc = DTWCalculator(return_cost_matrix=True)
result = dtw_calc.compute(pattern1, pattern2)

print(f"DTW Distance: {result.distance:.4f}")
print(f"Normalized Distance: {result.normalized_distance:.4f}")
print(f"Optimal Path Length: {len(result.path)}")

In [None]:
# Visualize the alignment
visualizer = DTWVisualizer(figsize=(12, 8))
visualizer.plot_alignment(pattern1, pattern2, result.path, 
                         title="DTW Alignment Visualization")

In [None]:
# Visualize the cost matrix
if result.cost_matrix is not None:
    visualizer.plot_cost_matrix(result.cost_matrix, result.path,
                               title="DTW Cost Matrix with Optimal Path")

## 2. Comparing DTW Algorithms

Let's compare the performance and results of different DTW variants.

In [None]:
# Generate longer time series for comparison
n_points = 500
x = np.cumsum(np.random.randn(n_points)) + np.sin(np.linspace(0, 10*np.pi, n_points))
y = np.cumsum(np.random.randn(n_points)) + np.sin(np.linspace(0, 10*np.pi, n_points))

# Normalize
x = (x - x.mean()) / x.std()
y = (y - y.mean()) / y.std()

plt.figure(figsize=(12, 4))
plt.plot(x, alpha=0.7, label='Series X')
plt.plot(y, alpha=0.7, label='Series Y')
plt.xlabel('Time')
plt.ylabel('Normalized Value')
plt.title('Long Time Series for DTW Comparison')
plt.legend()
plt.show()

In [None]:
import time

# Compare different DTW methods
methods = {
    'Standard DTW': DTWCalculator(),
    'FastDTW (r=1)': FastDTW(radius=1),
    'FastDTW (r=5)': FastDTW(radius=5),
    'Sakoe-Chiba (w=20)': ConstrainedDTW(constraint_type='sakoe_chiba', constraint_param=20),
    'Itakura': ConstrainedDTW(constraint_type='itakura', constraint_param=2.0)
}

results = {}
for name, calculator in methods.items():
    start_time = time.time()
    result = calculator.compute(x, y)
    comp_time = time.time() - start_time
    
    results[name] = {
        'distance': result.distance,
        'normalized_distance': result.normalized_distance,
        'time': comp_time
    }
    
    print(f"{name}:")
    print(f"  Distance: {result.distance:.4f}")
    print(f"  Normalized: {result.normalized_distance:.4f}")
    print(f"  Time: {comp_time:.4f}s")
    print()

In [None]:
# Visualize comparison
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Distance comparison
methods_list = list(results.keys())
distances = [results[m]['normalized_distance'] for m in methods_list]
ax1.bar(range(len(methods_list)), distances)
ax1.set_xticks(range(len(methods_list)))
ax1.set_xticklabels(methods_list, rotation=45, ha='right')
ax1.set_ylabel('Normalized DTW Distance')
ax1.set_title('Distance Comparison')

# Time comparison
times = [results[m]['time'] for m in methods_list]
ax2.bar(range(len(methods_list)), times)
ax2.set_xticks(range(len(methods_list)))
ax2.set_xticklabels(methods_list, rotation=45, ha='right')
ax2.set_ylabel('Computation Time (seconds)')
ax2.set_title('Speed Comparison')
ax2.set_yscale('log')

plt.tight_layout()
plt.show()

## 3. Pattern Similarity Analysis

Now let's analyze similarities between multiple patterns.

In [None]:
# Generate diverse patterns
n_patterns = 12
patterns = []
labels = []

for i in range(n_patterns):
    t = np.linspace(0, 4*np.pi, 100)
    
    if i < 4:  # Sine variations
        pattern = np.sin(t * (1 + i*0.2)) + 0.1*np.random.randn(100)
        labels.append(f"Sine_{i+1}")
    elif i < 8:  # Trend patterns
        pattern = 0.1 * t + np.sin(t) * (0.5 + (i-4)*0.2) + 0.1*np.random.randn(100)
        labels.append(f"Trend_{i-3}")
    else:  # Random walk
        pattern = np.cumsum(np.random.randn(100)) * 0.1
        labels.append(f"Random_{i-7}")
    
    # Normalize
    pattern = (pattern - pattern.mean()) / (pattern.std() + 1e-8)
    patterns.append(pattern)

# Visualize all patterns
fig, axes = plt.subplots(3, 4, figsize=(15, 8))
axes = axes.flatten()

for i, (pattern, label) in enumerate(zip(patterns, labels)):
    axes[i].plot(pattern)
    axes[i].set_title(label)
    axes[i].set_ylim(-3, 3)
    
plt.suptitle('Pattern Library', fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
# Compute similarity matrix
engine = SimilarityEngine(dtw_type='fast', radius=2, n_jobs=4)
sim_results = engine.compute_similarity_matrix(patterns, labels)

print(f"Similarity matrix shape: {sim_results['similarity_matrix'].shape}")
print(f"Mean similarity: {np.mean(sim_results['similarity_matrix']):.3f}")
print(f"Std similarity: {np.std(sim_results['similarity_matrix']):.3f}")

In [None]:
# Visualize similarity matrix
visualizer.plot_similarity_matrix(
    sim_results['similarity_matrix'],
    labels=labels,
    title="Pattern Similarity Matrix",
    annotate=True
)

In [None]:
# Find most similar patterns
stats = engine.compute_pattern_statistics(sim_results['similarity_matrix'], labels)

print("Most Similar Pattern Pairs:")
for i, pair in enumerate(stats['most_similar_pairs'][:5]):
    print(f"{i+1}. {pair['labels'][0]} <-> {pair['labels'][1]}: {pair['similarity']:.3f}")

print("\nMost Connected Patterns (highest average similarity):")
for i, conn in enumerate(stats['pattern_connectivity'][:5]):
    print(f"{i+1}. {conn['label']}: {conn['avg_similarity']:.3f}")

## 4. Pattern Clustering

Let's cluster the patterns based on their DTW similarities.

In [None]:
# Perform hierarchical clustering
clusterer = PatternClusterer(
    clustering_method='hierarchical',
    linkage_method='average',
    n_clusters=3
)

cluster_results = clusterer.fit_predict(
    patterns,
    similarity_matrix=sim_results['similarity_matrix']
)

print(f"Number of clusters: {cluster_results['n_clusters']}")
print(f"Silhouette score: {cluster_results['silhouette_score']:.3f}")
print("\nCluster assignments:")
for label, cluster in zip(labels, cluster_results['labels']):
    print(f"  {label}: Cluster {cluster}")

In [None]:
# Plot dendrogram
clusterer.plot_dendrogram(
    cluster_results['linkage_matrix'],
    labels=labels,
    figsize=(12, 6)
)

In [None]:
# Visualize clustered patterns
visualizer.plot_cluster_patterns(
    patterns,
    cluster_results['labels'],
    cluster_results['cluster_centers'],
    n_examples=3
)

In [None]:
# Plot cluster heatmap
clusterer.plot_cluster_heatmap(
    1 - sim_results['similarity_matrix'],  # Convert to distance
    cluster_results['labels'],
    pattern_labels=labels,
    figsize=(10, 10)
)

## 5. Real Financial Data Analysis

Let's apply DTW to real financial time series data.

In [None]:
# Load financial data
data_dir = Path('../data/processed')
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META']

price_patterns = []
pattern_labels = []

for ticker in tickers:
    file_path = data_dir / f"{ticker}_processed.h5"
    if file_path.exists():
        with pd.HDFStore(file_path, 'r') as store:
            df = store['data']
            
            # Extract recent price pattern (last 100 days)
            returns = df['returns'].iloc[-100:].values
            
            # Normalize
            returns_norm = (returns - returns.mean()) / (returns.std() + 1e-8)
            
            price_patterns.append(returns_norm)
            pattern_labels.append(ticker)

if price_patterns:
    print(f"Loaded {len(price_patterns)} financial patterns")
    
    # Visualize patterns
    plt.figure(figsize=(12, 6))
    for pattern, label in zip(price_patterns, pattern_labels):
        plt.plot(pattern, label=label, linewidth=2)
    plt.xlabel('Days')
    plt.ylabel('Normalized Returns')
    plt.title('Financial Return Patterns (Last 100 Days)')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
else:
    print("No financial data found. Using synthetic data instead.")
    # Generate synthetic financial-like patterns
    price_patterns = []
    pattern_labels = ['Stock_A', 'Stock_B', 'Stock_C', 'Stock_D', 'Stock_E']
    
    for i, label in enumerate(pattern_labels):
        # Simulate returns with different characteristics
        trend = 0.0001 * i * np.arange(100)
        volatility = 0.01 * (1 + i * 0.2)
        returns = trend + volatility * np.random.randn(100)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        price_patterns.append(returns)
    
    print("Generated synthetic financial patterns")

In [None]:
# Compute financial pattern similarities
fin_engine = SimilarityEngine(dtw_type='fast', radius=2)
fin_sim_results = fin_engine.compute_similarity_matrix(price_patterns, pattern_labels)

# Visualize
visualizer.plot_similarity_matrix(
    fin_sim_results['similarity_matrix'],
    labels=pattern_labels,
    title="Stock Return Pattern Similarities",
    annotate=True
)

In [None]:
# Find most similar stock patterns
fin_stats = fin_engine.compute_pattern_statistics(
    fin_sim_results['similarity_matrix'], 
    pattern_labels
)

print("Most Similar Stock Pairs:")
for i, pair in enumerate(fin_stats['most_similar_pairs'][:3]):
    print(f"{i+1}. {pair['labels'][0]} <-> {pair['labels'][1]}: {pair['similarity']:.3f}")

## 6. Interactive Visualizations

Create interactive visualizations using Plotly.

In [None]:
# Create interactive DTW alignment
if len(price_patterns) >= 2:
    dtw_calc = DTWCalculator()
    result = dtw_calc.compute(price_patterns[0], price_patterns[1])
    
    fig = visualizer.create_interactive_alignment(
        price_patterns[0], 
        price_patterns[1], 
        result.path,
        title=f"DTW Alignment: {pattern_labels[0]} vs {pattern_labels[1]}",
        labels=(pattern_labels[0], pattern_labels[1])
    )
    fig.show()

In [None]:
# Create interactive similarity matrix
fig = visualizer.create_interactive_similarity_matrix(
    fin_sim_results['similarity_matrix'],
    pattern_labels,
    title="Interactive Stock Similarity Matrix"
)
fig.show()

## 7. Performance Analysis

Let's analyze the performance characteristics of different DTW methods.

In [None]:
# Performance comparison on different series lengths
lengths = [50, 100, 200, 500]
methods_perf = {
    'Standard': DTWCalculator(),
    'FastDTW': FastDTW(radius=2),
    'Constrained': ConstrainedDTW(constraint_param=10)
}

performance_data = {name: {'time': [], 'lengths': lengths} for name in methods_perf}

for length in lengths:
    print(f"Testing length: {length}")
    
    # Generate test data
    x = np.random.randn(length)
    y = np.random.randn(length)
    
    for name, calculator in methods_perf.items():
        # Time multiple runs
        times = []
        for _ in range(5):
            start = time.time()
            _ = calculator.compute(x, y)
            times.append(time.time() - start)
        
        avg_time = np.mean(times)
        performance_data[name]['time'].append(avg_time)
        print(f"  {name}: {avg_time:.4f}s")

In [None]:
# Plot performance comparison
plt.figure(figsize=(10, 6))

for name, data in performance_data.items():
    plt.plot(data['lengths'], data['time'], marker='o', label=name, linewidth=2)

plt.xlabel('Time Series Length')
plt.ylabel('Computation Time (seconds)')
plt.title('DTW Performance Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.show()

# Calculate speedup
standard_times = performance_data['Standard']['time']
fast_times = performance_data['FastDTW']['time']
speedups = [s/f for s, f in zip(standard_times, fast_times)]

print("\nFastDTW Speedup over Standard DTW:")
for length, speedup in zip(lengths, speedups):
    print(f"  Length {length}: {speedup:.2f}x faster")

## Summary

This notebook demonstrated:

1. **Basic DTW computation** and alignment visualization
2. **Comparison of DTW algorithms** (Standard, FastDTW, Constrained)
3. **Pattern similarity analysis** using DTW distance matrices
4. **Hierarchical clustering** of time series patterns
5. **Real financial data analysis** with stock return patterns
6. **Interactive visualizations** for exploring DTW results
7. **Performance analysis** showing the trade-offs between methods

The DTW module provides powerful tools for:
- Finding similar patterns in financial time series
- Clustering related market behaviors
- Building pattern libraries for prediction
- Efficient computation with FastDTW for large-scale analysis