# Statistical Tests for Time Series Analysis on NIFTY50 5 min OHLC Data 📈

## 1. Augmented Dickey-Fuller (ADF) Test 🎯

### Purpose
> Tests for stationarity in time series data (Critical for prediction)

### Why Important
* **Model Prerequisites**
  - Essential for ARIMA modeling
  - Foundation for reliable predictions

### Interpretation
| Result | Meaning | Action |
|--------|---------|--------|
| p < 0.05 | Stationary | Ready for modeling |
| p > 0.05 | Non-stationary | Need differencing |

### Impact on Models
* **ARIMA**: Determines 'd' parameter directly
* **Transformers**: Guides data preprocessing strategy

---

## 2. ACF (Autocorrelation Function) 📊

### Purpose
> Reveals relationship between current and past values

### Key Insights
* **Pattern Detection**
  - Series memory depth
  - Seasonal patterns
  - Trend strength

### Pattern Interpretation
* **Slow Decay**: Strong trend present
* **Periodic Spikes**: Seasonal patterns exist
* **Quick Decay**: Weak autocorrelation

### Model Implications
* **ARIMA**: Helps set 'q' (MA) parameter
* **Transformers**: Guides sequence length choice

---

## 3. PACF (Partial Autocorrelation Function) 🔍

### Purpose
> Shows direct relationships between time points

### Key Benefits
* **Direct Dependencies**
  - Pure lag relationships
  - Optimal lookback period
  - AR order identification

### Pattern Reading
* **Significant Spikes**: Direct relationships
* **Cut-off Point**: Suggests AR order
* **Decay Pattern**: Complexity indicator

### Model Applications
* **ARIMA**: Sets 'p' (AR) parameter
* **Transformers**: Guides attention window

---

## 4. Ljung-Box Test ⚖️

### Purpose
> Validates residual randomness

### Importance
* **Model Validation**
  - Tests assumptions
  - Checks pattern capture
  - Quality assurance

### Results Guide
| p-value | Interpretation | Next Steps |
|---------|----------------|------------|
| > 0.05 | Good fit | Proceed with model |
| < 0.05 | Missing patterns | Model refinement needed |

---

## Trading Strategy Implementation 💹

### Data Properties Analysis
* **Pattern Understanding**
  - Price movement characteristics
  - Key time intervals
  - Preprocessing requirements

### Model Selection Framework
| Pattern | Suggested Model | Why |
|---------|----------------|-----|
| Strong seasonality | ARIMA | Good for regular patterns |
| Complex patterns | Transformers | Better with non-linear relationships |
| High randomness | Feature enrichment | Need more predictive signals |

### Parameter Optimization
1. **ARIMA Parameters**
   - p: From PACF analysis
   - d: From ADF test
   - q: From ACF analysis

2. **Transformer Settings**
   - Sequence length
   - Attention window
   - Feature engineering scope

---


In [8]:
# !pip install mlflow
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller, kpss, acf, pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
import matplotlib.pyplot as plt
import seaborn as sns
import gc
import os
# import mlflow

In [9]:
# from google.colab import drive
# drive.mount('/content/drive')

In [10]:
# Input File - Data 2014 to 2024 - 1min ohlc
# 6 Parts - Last 6month 5 min ohlc, last to last 6 month 5 min ohlc, in similar fashion 1 year and 2 year.
## created df1- df6 serieses

In [30]:
# Data Extraction
def resample_and_get_periods(csv_path, interval='5T'):
   """
   Load data, resample to 5 min OHLC, perform quality checks, and split into periods

   Parameters:
   - csv_path: path to CSV file
   - interval: resampling interval (default '5T' for 5 minutes)

   Returns: Dictionary of quality-checked, resampled dataframes for different periods
   """
   # Load data
   def load_and_clean(csv_path):
       df = pd.read_csv(csv_path)
       df['date'] = pd.to_datetime(df['date'])
       return df.set_index('date').sort_index()

   # Quality checks
   def quality_checks(df, period_name):
       issues = []

       # Check for missing values
       missing = df.isnull().sum()
       if missing.any():
           issues.append(f"Missing values found: {missing[missing > 0]}")

       # Check for duplicates
       duplicates = df.index.duplicated().sum()
       if duplicates:
           issues.append(f"Found {duplicates} duplicate timestamps")

       # Check for price anomalies
       price_std = df['close'].std()
       price_mean = df['close'].mean()
       outliers = df[abs(df['close'] - price_mean) > 3 * price_std]
       if not outliers.empty:
           issues.append(f"Found {len(outliers)} potential price outliers")

       # Check for gaps in time series
       time_diff = df.index.to_series().diff()
       gaps = time_diff[time_diff > pd.Timedelta(minutes=6)]  # More than 6 min gap
       if not gaps.empty:
           issues.append(f"Found {len(gaps)} time gaps > 6 minutes")

       # Print issues for this period
       if issues:
           print(f"\nQuality issues in {period_name}:")
           for issue in issues:
               print(f"- {issue}")

       return len(issues) == 0

   # Resample to 5 min OHLC
   def resample_ohlc(df, interval):
       resampled = df.resample(interval).agg({
           'open': 'first',
           'high': 'max',
           'low': 'min',
           'close': 'last',
           'volume': 'sum'
       })
       return resampled.dropna()  # Remove any incomplete periods

   print("Loading and preprocessing data...")
   df = load_and_clean(csv_path)

   # Resample full dataset
   df_resampled = resample_ohlc(df, interval)

   # Get end date
   end_date = df_resampled.index.max()

   # Define periods and create slices
   periods = {
       'last_6m': (end_date - pd.DateOffset(months=6), end_date),
       'last_to_last_6m': (end_date - pd.DateOffset(months=12),
                          end_date - pd.DateOffset(months=6)),
       'last_1y': (end_date - pd.DateOffset(years=1), end_date),
       'last_to_last_1y': (end_date - pd.DateOffset(years=2),
                          end_date - pd.DateOffset(years=1)),
       'last_2y': (end_date - pd.DateOffset(years=2), end_date),
       'last_to_last_2y': (end_date - pd.DateOffset(years=4),
                          end_date - pd.DateOffset(years=2))
   }

   datasets = {}
   print("\nCreating and validating period datasets...")

   for name, (start, end) in periods.items():
       # Slice data
       period_data = df_resampled[start:end].copy()

       # Perform quality checks
       is_clean = quality_checks(period_data, name)

       # Store data and metadata
       datasets[name] = {
           'data': period_data,
           'metadata': {
               'start_date': period_data.index.min(),
               'end_date': period_data.index.max(),
               'records': len(period_data),
               'trading_days': len(set(period_data.index.date)),
               'quality_passed': is_clean
           }
       }

   # Print summary
   print("\nPeriod Summaries:")
   print("="*70)
   for name, dataset in datasets.items():
       meta = dataset['metadata']
       print(f"\n{name}:")
       print(f"Date Range: {meta['start_date']} to {meta['end_date']}")
       print(f"Records: {meta['records']}, Trading Days: {meta['trading_days']}")
       print(f"Quality Check: {'✓' if meta['quality_passed'] else '✗'}")

   return datasets

# Usage
REPO_PATH = "/content/drive/MyDrive/Colab Notebooks/plusEV-"
csv_path = os.path.join(REPO_PATH, "nifty50_1min_2015_to_2024.csv")
datasets = resample_and_get_periods(csv_path)

# # Access data for a period
# df0 = datasets['last_6m']['data']
# last_6m_metadata = datasets['last_6m']['metadata']
# df0 = datasets['last_to_last_6m']['data']
# df0 = datasets['last_1y']['data']
# df0 = datasets['last_to_last_1y']['data']
# df0 = datasets['last_2y']['data']
# df0 = datasets['last_to_last_2y']['data']
# df0["Return"] = df0["close"].pct_change()
# df0 = df0["Return"].dropna()
# df2["Return"] = df2["close"].pct_change()
# df2 = df2["Return"].dropna()
# df3["Return"] = df3["close"].pct_change()
# df3 = df3["Return"].dropna()
# df4["Return"] = df4["close"].pct_change()
# df4 = df4["Return"].dropna()
# df5["Return"] = df5["close"].pct_change()
# df5 = df5["Return"].dropna()
# df6["Return"] = df6["close"].pct_change()
# df6 = df6["Return"].dropna()

# print(df1)

df_0 = pd.read_csv(csv_path)
df_0['date'] = pd.to_datetime(df_0['date'])
df_0 = df_0.set_index('date').sort_index()
df_0 = df_0.resample('5T').agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': 'sum'
}).fillna(method='ffill')
df_0['Return'] = df_0['close'].pct_change()
df0 = df_0["Return"].dropna()

Loading and preprocessing data...


  resampled = df.resample(interval).agg({



Creating and validating period datasets...

Quality issues in last_6m:
- Found 124 time gaps > 6 minutes

Quality issues in last_to_last_6m:
- Found 126 time gaps > 6 minutes

Quality issues in last_1y:
- Found 250 time gaps > 6 minutes

Quality issues in last_to_last_1y:
- Found 247 time gaps > 6 minutes

Quality issues in last_2y:
- Found 497 time gaps > 6 minutes

Quality issues in last_to_last_2y:
- Found 496 time gaps > 6 minutes

Period Summaries:

last_6m:
Date Range: 2024-02-28 15:25:00+05:30 to 2024-08-28 15:25:00+05:30
Records: 9043, Trading Days: 123
Quality Check: ✗

last_to_last_6m:
Date Range: 2023-08-28 15:25:00+05:30 to 2024-02-28 15:25:00+05:30
Records: 9388, Trading Days: 127
Quality Check: ✗

last_1y:
Date Range: 2023-08-28 15:25:00+05:30 to 2024-08-28 15:25:00+05:30
Records: 18430, Trading Days: 249
Quality Check: ✗

last_to_last_1y:
Date Range: 2022-08-29 09:15:00+05:30 to 2023-08-28 15:25:00+05:30
Records: 18537, Trading Days: 248
Quality Check: ✗

last_2y:
Date 

  df_0 = df_0.resample('5T').agg({
  df_0 = df_0.resample('5T').agg({


In [31]:
midpoint = len(df0) // 2
df0 = df0[:midpoint]
gc.collect()

43917

In [32]:
df0

Unnamed: 0_level_0,Return
date,Unnamed: 1_level_1
2015-01-09 09:20:00+05:30,-0.000024
2015-01-09 09:25:00+05:30,-0.000825
2015-01-09 09:30:00+05:30,-0.000681
2015-01-09 09:35:00+05:30,-0.000609
2015-01-09 09:40:00+05:30,0.000254
...,...
2019-11-04 00:00:00+05:30,0.000000
2019-11-04 00:05:00+05:30,0.000000
2019-11-04 00:10:00+05:30,0.000000
2019-11-04 00:15:00+05:30,0.000000


In [33]:
# Add this at the start of your notebook
def setup_wandb():
    try:
        import wandb
        # Check if already logged in
        if wandb.api.api_key is None:
            # Your API key
            WANDB_API_KEY = "641b305133f7d8345e710ecf6c9d83fea7e225f1"
            os.environ["WANDB_API_KEY"] = WANDB_API_KEY

        print("WandB setup complete!")
        return True
    except Exception as e:
        print(f"Error setting up WandB: {str(e)}")
        return False

# Use it in your notebook
setup_wandb()

WandB setup complete!


True

In [34]:
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller
from pmdarima.arima.utils import ndiffs  # ndiffs is from pmdarima
import matplotlib.pyplot as plt
import seaborn as sns
import wandb
from scipy import stats
from datetime import datetime

plt.style.use('default')

def analyze_stationarity(returns_series, title="Returns Analysis", sample_size=10000):
    try:
        wandb.init(
            project="Time Series Analysis of Nifty50 5 min ohlc",
            name=f"first_half_data_{datetime.now().strftime('%Y%m%d_%H%M')}",
            group="Stationarity Analysis",
            config={
                "sample_size": sample_size,
                "data_points": len(returns_series),
                "analysis_type": "stationarity",
                "data_range": f"{returns_series.index[0]} to {returns_series.index[-1]}"
            },
            tags=["stationarity_test", "adf_analysis", "basic_statistics","ARIMA_parameter_calcualtion","nDiff_method" ]
        )

        # Efficient sampling for large datasets
        if len(returns_series) > sample_size:
            plot_data = returns_series.sample(n=sample_size, random_state=42)
        else:
            plot_data = returns_series

        # Run ADF Test on chunks if data is too large
        if len(returns_series) > 100000:
            chunk_size = 100000
            chunks = [returns_series[i:i+chunk_size] for i in range(0, len(returns_series), chunk_size)]
            adf_results = []
            for chunk in chunks[:3]:
                adf_results.append(adfuller(chunk))
            # Use the median of test results
            adf_result = (
                np.median([r[0] for r in adf_results]),
                np.median([r[1] for r in adf_results]),
                np.median([r[2] for r in adf_results]),
                None,
                {k: np.median([r[4][k] for r in adf_results]) for k in adf_results[0][4].keys()}
            )
        else:
            adf_result = adfuller(returns_series)

        additional_stats = {
            'quartiles': {
                'q1': float(returns_series.quantile(0.25)),
                'q2': float(returns_series.quantile(0.50)),  # median
                'q3': float(returns_series.quantile(0.75))
            },
            'percentiles': {
                'p1': float(returns_series.quantile(0.01)),
                'p5': float(returns_series.quantile(0.05)),
                'p95': float(returns_series.quantile(0.95)),
                'p99': float(returns_series.quantile(0.99))
            },
            'range': {
                'min': float(returns_series.min()),
                'max': float(returns_series.max()),
                'range': float(returns_series.max() - returns_series.min())
            },
            'distribution': {
                'jarque_bera': stats.jarque_bera(returns_series),
                'shapiro': stats.shapiro(returns_series.sample(min(5000, len(returns_series)))),
                'variation_coefficient': float(returns_series.std() / returns_series.mean() if returns_series.mean() != 0 else np.nan)
            }
        }

        ndiffs_results = {
            'kpss_diffs': ndiffs(returns_series, test='kpss', max_d=2),
            'adf_diffs': ndiffs(returns_series, test='adf', max_d=2),
            'pp_diffs': ndiffs(returns_series, test='pp', max_d=2)
        }

        # Memory-efficient plotting
        fig = plt.figure(figsize=(15, 10))

        # Plot 1: Returns Time Series
        plt.subplot(221)
        plt.plot(plot_data.index, plot_data.values, linewidth=0.5)
        plt.title(f"{title} - Time Series Plot")
        plt.xlabel("Time")
        plt.ylabel("Returns")

        # Plot 2: Returns Distribution
        plt.subplot(222)
        sns.histplot(data=plot_data.values, kde=True)
        plt.title("Returns Distribution with KDE")
        plt.xlabel("Return Value")

        # Plot 3: QQ Plot
        plt.subplot(223)
        stats.probplot(plot_data.values, dist="norm", plot=plt)
        plt.title("Q-Q Plot")

        # Plot 4: Box Plot
        plt.subplot(224)
        sns.boxplot(y=plot_data.values)
        plt.title("Box Plot of Returns")

        plt.tight_layout()

        # Calculate statistics
        results = {
            'test_statistic': float(adf_result[0]),
            'p_value': float(adf_result[1]),
            'critical_values': adf_result[4],
            'is_stationary': adf_result[1] < 0.05,
            'statistics': {
                'mean': float(returns_series.mean()),
                'std': float(returns_series.std()),
                'skew': float(returns_series.skew()),
                'kurtosis': float(returns_series.kurtosis())
            }
        }

        statistics_summary = pd.DataFrame({
            'Metric': [
                'ADF Test Statistic', 'ADF P-value', 'Is Stationary (1=Yes, 0=No)',
                'Mean', 'Standard Deviation', 'Skewness', 'Kurtosis',
                'Q1', 'Median', 'Q3',
                'P1', 'P5', 'P95', 'P99',
                'Min', 'Max', 'Range',
                'Jarque-Bera Statistic', 'Jarque-Bera P-value',
                'Shapiro Statistic', 'Shapiro P-value',
                'Variation Coefficient',
                'KPSS Diffs Required', 'ADF Diffs Required', 'PP Diffs Required'
            ],
            'Value': [
                float(results['test_statistic']),
                float(results['p_value']),
                float(results['is_stationary']),  # Convert boolean to float
                float(results['statistics']['mean']),
                float(results['statistics']['std']),
                float(results['statistics']['skew']),
                float(results['statistics']['kurtosis']),
                float(additional_stats['quartiles']['q1']),
                float(additional_stats['quartiles']['q2']),
                float(additional_stats['quartiles']['q3']),
                float(additional_stats['percentiles']['p1']),
                float(additional_stats['percentiles']['p5']),
                float(additional_stats['percentiles']['p95']),
                float(additional_stats['percentiles']['p99']),
                float(additional_stats['range']['min']),
                float(additional_stats['range']['max']),
                float(additional_stats['range']['range']),
                float(additional_stats['distribution']['jarque_bera'][0]),
                float(additional_stats['distribution']['jarque_bera'][1]),
                float(additional_stats['distribution']['shapiro'][0]),
                float(additional_stats['distribution']['shapiro'][1]),
                float(additional_stats['distribution']['variation_coefficient']),
                float(ndiffs_results['kpss_diffs']),
                float(ndiffs_results['adf_diffs']),
                float(ndiffs_results['pp_diffs'])
            ]
        })

        wandb.log({
            "time_series_plot": wandb.Image(plt.subplot(221)),
            "distribution_plot": wandb.Image(plt.subplot(222)),
            "qq_plot": wandb.Image(plt.subplot(223)),
            "box_plot": wandb.Image(plt.subplot(224)),
            "combined_plots": wandb.Image(fig),
            "statistics_table": wandb.Table(dataframe=statistics_summary)
        })

        # Enhanced printing
        print("\nStationarity Tests:")
        print(f"ADF Test: {'Stationary' if results['is_stationary'] else 'Non-stationary'}")
        print(f"P-value: {results['p_value']:.4f}")
        print("\nDifferencing Required:")
        print(f"KPSS Test: {ndiffs_results['kpss_diffs']} differences")
        print(f"ADF Test: {ndiffs_results['adf_diffs']} differences")
        print(f"PP Test: {ndiffs_results['pp_diffs']} differences")

        wandb.finish()
        return results, additional_stats, ndiffs_results, fig

    except Exception as e:
        print(f"Error in analysis: {str(e)}")
        if wandb.run is not None:
            wandb.finish()
        return None, None, None, None

# Usage
results, additional_stats, ndiffs_results, fig = analyze_stationarity(df0)
plt.close(fig)




Stationarity Tests:
ADF Test: Stationary
P-value: 0.0000

Differencing Required:
KPSS Test: 0 differences
ADF Test: 0 differences
PP Test: 0 differences


In [None]:
from google.colab import drive
drive.mount('/content/drive')