![](../images/rivacon_frontmark_combined_header.png)

# Equity Volatility Surfaces

In [1]:
import matplotlib
matplotlib.use('nbagg')
import rivapy
from rivapy import marketdata as mkt_data
from rivapy import enums as enums
# import pyvacon
import datetime as dt
import math
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import plotly.graph_objects as go
#the next lin is a jupyter internal command to show the matplotlib graphs within the notebook
#%matplotlib inline

## General Remarks
An equity volatility surface is an object providing for arbitrary expiries and strikes implied volatilities. The volatility surfaces provided by the analytics library are parametrized w.r.t. the so-called X-strikes, i.e. one has to put in a strike w.r.t. the X-variable which is the driving process of the spot $S$, i.e.
$$ S_t=(F_t-D_t)X_t+D_t$$
where $F_t$ is the risky forward and $D_t$ the cash dividends, see [Buehler](https://papers.ssrn.com/sol3/papers.cfm?abstract_id=1141877) for a more detailed discussion. 

To create a volatility surface one needs two components:
- A reference forward curve (typically the forward curve which was used to compute the implieds from quoted prices)
- A volatility parametrization



## Creating Forward Curve
We create a dummy forward curve as shown in the  [forward_curve](equity_forwardcurve.ipynb) notebook which will be used in all subsequent volatility surface constructions.

In [2]:
refdate = dt.datetime(2017,1,1)

#dividend table neede fo forward curve
object_id = "TEST_DIV" 
ex_dates = [dt.datetime(2018,3,29), dt.datetime(2019,3,29), dt.datetime(2020,3,29), dt.datetime(2021,3,29)]
pay_dates = [dt.datetime(2018,4,1), dt.datetime(2019,4,1), dt.datetime(2020,4,1), dt.datetime(2021,4,1)]
tax_factors = [1.0, 1.0, 1.0, 1.0]
div_yield = [0, 0.005, 0.01, 0.01]
div_cash = [3.0, 2.0, 1.0, 0.0]
div_table=rivapy.marketdata.DividendTable(object_id, refdate, ex_dates, pay_dates, div_yield, div_cash, tax_factors)

#discount- and borrowing curve needed for forward curve
dates = [refdate + dt.timedelta(days=x) for x in [0,10]]
df = [1.0,1.0]
dc = mkt_data.DiscountCurve(object_id, refdate, dates, df, 
                             enums.InterpolationType.HAGAN_DF, enums.ExtrapolationType.NONE, enums.DayCounterType.Act365Fixed)
bc = mkt_data.DiscountCurve(object_id, refdate, dates, df, 
                             enums.InterpolationType.HAGAN_DF, enums.ExtrapolationType.NONE, enums.DayCounterType.Act365Fixed)
spot = 100.0

#forward curve
forward_curve = mkt_data.EquityForwardCurve(spot, dc, bc, div_table)

## Volatility Parametrizations

The volatility parametrization provides the method *calcImpliedVol* functionality to retrieve for each x-strike and time-to-maturity (in year fractions) the implied volatility. This method is used internally in the volatility surface in all methods where a implied volatility is computed.
We will discuss the different available parametrizations in this subsection.

### Flat volatility
To setup a flat volatility, one may use the VolatilityParametrizationFlat

In [3]:
flat_param = mkt_data.VolatilityParametrizationFlat(0.3)

### Term structure volatility
To create a volatility which has only a term structure and no strike dependency one may use this parametrization type. This parametrization needs a vector of expiry times (given as year fractions which are interpreted as year fractions w.r.t. the day counter specified in th volatility surface) and forward at-the-money volatilities, i.e. X-strike=1.0.

In [4]:
ttm = [1.0/12.0, 1.0, 2.0, 3.0]
fwd_atm_vols =  [0.3, 0.28, 0.25, 0.24]
term_param = mkt_data.VolatilityParametrizationTerm(ttm,fwd_atm_vols)

### SSVI
This parametrization is inspired by the volatility structure provided by stochastic volatility models. The total variance $w(k,t)$ for a strike log strike $k$ and time-to-maturity $t$ is given by
$$ w(k,t) = \frac{\theta_t}{2}\left( 1+\rho\phi(\theta_t)k+\sqrt{(\phi(\theta_t)k+\rho)^2+(1-\rho^2)}  \right) $$ 
and 
$$ \phi(\theta_t) = \frac{\eta}{\theta_t^\gamma(1+\theta_t)^{1-\gamma}} $$
for parameters $\rho$, $\eta$, $\gamma$ and given atm implied total variances $\theta_t:=\sigma^2(t)t$. The term structure of implied total variances is internally approximated by interpolation from given atm volatilities. The nice property of this surface is that there are very simple conditions on the parameters to guarantee that the surface is free of arbitrage, see [gatheral_jacquier_svi](https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2033323).

In [5]:
gamma = 0.5 # responsible for the "rate of decay"
rho = -0.7 # responsible for the skewness of the vol_surface
eta = 1.0 # responsible for the curvature
ssvi_param = mkt_data.VolatilityParametrizationSSVI(ttm, fwd_atm_vols, rho, eta, gamma)

In [6]:
ssvi_param.calc_implied_vol(1,.9)

0.3151536735681129

### SVI

In [7]:
svi_param = mkt_data.VolatilityParametrizationSVI(expiries=np.array([1.0/365.0, 1.0]), svi_params=[
        (0.0001, 0.1, -0.5, 0.0, 0.0001),
        (0.0002, 0.1, -0.5, 0.0, 0.00004),
])

### SABR

In [31]:
ttm = [1.0/12.0, 1.0, 2.0, 3.0]
sabr_params = [[.1, 0.1, .9,-.8],
               [.3, 0.1, .7, -.9],
               [.5, .3, .9, -.75,],
              [.5, .3, .9, -.75,]]

In [9]:
sabr_param = mkt_data.VolatilityParametrizationSABR(ttm, sabr_params)
sabr_param.calc_implied_vol(2,.9)

0.48967013729911385

In [15]:
hasattr(mkt_data.VolatilityParametrizationSABR,'expiries')

False

In [42]:
expiries =[1]
ttm =[1]
class abx:
    def __init__(self,expiries):
    
        self.expiries = [1,2]
    
    def de():
        return 2
    
    a = 5

ggg = abx(ttm)
hasattr(abx,'expiries')

class bbb():
    @staticmethod
    def _b(abx):
        if hasattr(abx,'expiries'):
            return abx.de()
        else:
            return 5
        
    def __init__(self,abx):
        
        self.abx = abx
        
bbb.mro

<__main__.bbb at 0x1de7f7e22b0>

In [43]:
ggg = abx([1,2])

In [45]:
a = bbb(ggg)

In [46]:
a.abx

<__main__.abx at 0x1de7fec0220>

In [34]:
import numpy as np
from rivapy import enums
from typing import List, Union, Tuple
from rivapy.marketdata.curves import *

import pyvacon.finance.marketdata as _mkt_data
import pyvacon.finance.utils as _utils
import pyvacon.finance.pricing as _pricing

class VolatilityParametrizationSABR:
    def __init__(self, expiries: List[float], sabr_params: List[Tuple]):
        
        self.expiries = np.array(expiries)
        self._x = self._get_x(sabr_params)
        
    def get_params_at_expiry(self, expiry: int)->np.array:
        return self._x[4*expiry:4*(expiry+1)]
    
    def calc_implied_vol(self, ttm, strike):
        K = strike
        alpha = self.get_params_at_expiry(ttm)[0] 
        ny = self.get_params_at_expiry(ttm)[1]
        beta = self.get_params_at_expiry(ttm)[2]
        rho = self.get_params_at_expiry(ttm)[3]
        f = 1
        
        zeta = ny/alpha*(f*K)**((1-beta)/2)*np.log(f/K)
        chi_zeta = np.log((np.sqrt(1-2*rho*zeta+zeta**2)+zeta-rho)/(1-rho))

        if f == K:
            sigma = alpha*(1+((1-beta)**2/24*alpha**2/f**(2-2*beta)+1/4*rho*beta*ny*alpha/f**(1-beta)+(2-3*rho**2)/24*ny**2)*ttm)/f**(1-beta)

        else:
            sigma = alpha*(1+((1-beta)**2/24*alpha**2/(f*K)**(1-beta)+1/4*rho*beta*ny*alpha/(f*K)**((1-beta)/2)+(2-3*rho**2)/24*ny**2)*ttm)/(f*K)**((1-beta)/2)*(1+(1-beta)**2/24*np.log(f/K)**2+(1-beta)**4/1920*np.log(f/K)**4)*zeta/chi_zeta

        return sigma

    def _get_x(self, sabr_params)->np.array:
        x = np.empty(len(sabr_params)*4)
        j = 0
        for i in range(len(sabr_params)):
            for k in range(4):
                x[j] = sabr_params[i][k]
                j += 1
        return x
    
    def _set_param(self, x)->np.array:
        self._x = x

class VolatilityGridParametrization:
    def __init__(self, expiries: np.array, strikes: np.ndarray, vols: np.ndarray):
        """Grid parametrization
        This parametrization stores a set of strike-vol grids for a given list of expiries and computes a volatility by
        - search for the neighboring expiries
        - apply a splien interpolation in each expiry to get the respective volatility
        - apply a linear interpolation (in total variance)

        Args:
            expiries (np.array): An array of the expiries.
            strikes (np.ndarray): 
            vols (np.ndarray): Two dimensional array of volatilities where each row i contains the values for expiry i
        """
        self.expiries = expiries
        if len(strikes.shape)==1:
            strikes = [strikes]*expiries.shape[0]
        self.strikes = strikes
        self.vols = vols
        self._pyvacon_obj = None

    def calc_implied_vol(self, ttm, strike):
        return self._get_pyvacon_obj().calcImpliedVol(ttm, strike)

    def _get_pyvacon_obj(self):
        if self._pyvacon_obj is None:
            vol_params = []
            self._pyvacon_obj = _mkt_data.VolatilityParametrizationTimeSlice(self.expiries, self.strikes, self.vols)  
        return self._pyvacon_obj
    
class VolatilitySurface:
    @staticmethod
    def _create_param_pyvacon_obj(vol_param):
        if hasattr(vol_param, '_get_pyvacon_obj'):
            print('pyvacon')
            return vol_param._get_pyvacon_obj()
            
        if hasattr(vol_param, 'expiries'):
            print('expiries')
            expiries = vol_param.expiries
        else:
            expiries = np.linspace(0.0, 4.0, 13, endpoint=True)
        strikes = np.linspace(0.4, 1.6, num=100)
        vols = np.empty(expiries.shape[0], strikes.shape[0])
        for i in range(expiries.shape[0]):
            for j in range(expiries.shape[0]):
                vols[i,j] = vol_param.calc_implied_vol(expiries[i], vols[j])
        return VolatilityGridParametrization(expiries, strikes, vols)

    def __init__(self, id: str, refdate: datetime, forward_curve, daycounter, vol_param):
        """Volatility surface

        Args:
            id (str): Identifier (name) of the volatility surface.
            refdate (datetime): Valuation date.
            forward_curve (rivapy.market_data.EquityForwardCurve): Forward curve.
            daycounter (enums.DayCounterType): [description]
            vol_param ([VolatilityParametrizationFlat,VolatilityParametrizationTerm,VolatilityParametrizationSSVI]): Volatility parametrization.
        """
        self.id = id
        self.refdate = refdate
        self.forward_curve = forward_curve
        self.daycounter = daycounter
        self.vol_param = vol_param
        self._pyvacon_obj = None  
           
    def _get_pyvacon_obj(self, fwd_curve=None):
        if self._pyvacon_obj is None:
            if fwd_curve is None:
                fwd_curve = self.forward_curve
            self._pyvacon_obj = _mkt_data.VolatilitySurface(self.id, self.refdate,
                fwd_curve._get_pyvacon_obj(),self.daycounter.name, 
                VolatilitySurface._get_pyvacon_obj(self.vol_param))
        return self._pyvacon_obj
    
    def calc_implied_vol(self,  expiry: datetime, strike: float, refdate: datetime = None, forward_curve=None)->float:
        """Calculate implied volatility

        Args:
            refdate (datetime): Valuation date.
            expiry (datetime): Expiration date.
            strike (float): Strike price.

        Raises:
            Exception: [description]

        Returns:
            float: Implied volatility.
        """
        # convert strike into x_strike 
        if refdate is None:
            refdate = self.forward_curve.refdate
        if forward_curve is None and self.forward_curve is None:
            raise Exception('Please specify a forward curve')
        vol = self._get_pyvacon_obj()
        if forward_curve is None:
            forward_curve = self.forward_curve
        elif self.forward_curve is not None:
            vol = _mkt_data.VolatilitySurface.createVolatilitySurfaceShiftedFwd(vol, forward_curve._get_pyvacon_obj())
        forward_curve_obj = forward_curve._get_pyvacon_obj() 
        x_strike = _utils.computeXStrike(strike, forward_curve_obj.value(refdate, expiry), forward_curve_obj.discountedFutureCashDivs(refdate, expiry))
        if x_strike < 0:
            raise Exception(f'The given strike value seems implausible compared to the discounted future cash dividends\
                ({forward_curve_obj.discountedFutureCashDivs(refdate, expiry)}).')
        return vol.calcImpliedVol(refdate, expiry, x_strike)
      
    @staticmethod        
    def set_stickyness(vol_stickyness: enums.VolatilityStickyness):
        if vol_stickyness is enums.VolatilityStickyness.StickyXStrike:
            _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyXStrike)
        elif vol_stickyness is enums.VolatilityStickyness.StickyStrike:
            _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(vol_stickyness)
        elif vol_stickyness is enums.VolatilityStickyness.StickyFwdMoneyness:
            _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyFwdMoneyness)
        elif vol_stickyness is enums.VolatilityStickyness.NONE:
            _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.NONE)
        else:
            raise Exception ('Error')


In [35]:
sabr_param = VolatilityParametrizationSABR(ttm, sabr_params)
sabr_param.calc_implied_vol(2,.9)
hasattr(sabr_param,'expiries')

True

In [36]:
obj_id = 'TEST_SURFACE'
vol_surf = VolatilitySurface(obj_id, refdate, forward_curve, enums.DayCounterType.Act365Fixed, sabr_param)

## Volatility Surface
The forward curve and the parametrization can now be combined into a VolatilitySurface

In [10]:
obj_id = 'TEST_SURFACE'
vol_surf = mkt_data.VolatilitySurface(obj_id, refdate, forward_curve, enums.DayCounterType.Act365Fixed, svi_param)

To compute an implied volatility, one may use the method *calcImpliedVol*. Note that this method applies a sticky-strike handling of volatilities, i.e. it assumes that the implied volatility given a certain strike is independent of current forward values which may differ from the forwards when the volatility surface was calibrated.

In [11]:
# vol = vol_surf.calc_implied_vol(refdate,refdate + dt.timedelta(days=180),120)
vol = vol_surf.calc_implied_vol(refdate + dt.timedelta(days=180),120,refdate)
print(vol)

AttributeError: 'VolatilityParametrizationSVI' object has no attribute '_pyvacon_obj'

In [None]:
wir haben sabr nicht in pyvacon....

By executing the following command line, the volatility surface is plotted.

In [None]:
# 
refdate = dt.datetime(2017,1,1,0,0,0)
expiries = [dt.datetime(2017,2,1,0,0,0), dt.datetime(2018,1,1,0,0,0), dt.datetime(2019,1,1,0,0,0), dt.datetime(2020,1,1,0,0,0)]
def s_range(x, y, count):
  jump = (y-x)/count
  while x <= y:
    yield x
    x += jump

# strikes = list(s_range(80, 120, 100))
moneyness = list(s_range(0.5, 1.5,100))

y = moneyness
x = ttm

term_structure = []
for i in moneyness: 
    temp = []
    for j in expiries:
      strike = i*forward_curve.value(refdate,j)
      temp.append(vol_surf.calc_implied_vol(refdate, j, strike))
    term_structure.append(temp)

fig = go.Figure(data=[go.Surface(x=x, y=y,z=term_structure
                      ,hovertemplate = 
                        'Moneyness:  %{y: .2%}' +\
                        '<br>Maturity (yrs): %{x: .1f}' +\
                        '<br>Volatility: %{z: .2f}<extra></extra>'
                     ,colorscale = 'temps')
                     ])

fig.update_layout(title={
                      'text': "<b>Volatility Surface</b>",
                      'y':0.95,
                      'x':0.5,
                      'xanchor': 'center',
                      'yanchor': 'top'
                        }
                # ,autosize=True
                ,width=1000
                ,height=500
                ,scene = dict(
                  xaxis_title='Maturity (yrs)'
                  ,xaxis_tickformat = '.1f'
                  ,xaxis_autorange = 'reversed'
                  ,yaxis_title='Moneyness'
                  ,yaxis_tickformat = '.2%'
                  ,zaxis_title='Volatility'
                  ,zaxis_tickformat = '.2%'
                  )
                ,font=dict(
                  family="Courier New, monospace"
                  ,size=10
                  )
                ,margin=dict(l=65, r=50, b=65, t=90)
)

fig.show()

In [None]:
# 
refdate = dt.datetime(2017,1,1,0,0,0)
expiries = [dt.datetime(2017,2,1,0,0,0), dt.datetime(2018,1,1,0,0,0), dt.datetime(2019,1,1,0,0,0), dt.datetime(2020,1,1,0,0,0)]
def s_range(x, y, count):
  jump = (y-x)/count
  while x <= y:
    yield x
    x += jump

moneyness = list(s_range(0.5, 1.5, 100))

x = moneyness
y = ttm

term_structure = []
for i in moneyness: 
    temp = []
    for j in expiries:
        strike = i*forward_curve.value(refdate,j)
        temp.append(vol_surf.calc_implied_vol(refdate, j, strike))
    term_structure.append(temp)

yv, xv = np.meshgrid(y, x)
xv = np.array(xv)  #strikes 1x no. strikes
yv = np.array(yv)  #ttm     1x no. ttms
z = np.array(term_structure)           #vols    no. strikes x no. ttms

# fig = mkt_plot.plt.figure()
fig = matplotlib.pyplot.figure()
ax4 = fig.add_subplot(111, projection='3d')
ax4.set_title('Volatility Surface')
ax4.set_xlabel('Strikes')
ax4.set_ylabel('TTM')
ax4.set_zlabel('Volatility')
ax4.plot_surface(xv, yv, z, rstride=2, cstride=1, alpha=0.58, cmap=cm.coolwarm, linewidth=0, antialiased=False)
# ax4.plot_surface(xv, yv, z, rstride=8, cstride=1, alpha=0.3)

#ax4.contour(xv, yv, z, zdir='z', offset=ax4.get_zlim()[0], cmap=cm.coolwarm)
ax4.contour(xv, yv, z, zdir='x', offset=ax4.get_xlim()[1]-0.08, cmap=cm.coolwarm)
ax4.contour(xv, yv, z, zdir='y', offset=ax4.get_ylim()[1], cmap=cm.coolwarm)
ax4.contour(xv, yv, z, zdir='y', offset=ax4.get_ylim()[1], cmap=cm.coolwarm, levels = [0,0.2,0.3,0.5,0.7,0.8])
fig.show()

### Stickiness assumptions

In [None]:
obj_id_shifted = "TEST_SURFACE_FWD_SHIFTED"
forward_curve_shifted = mkt_data.EquityForwardCurve(1.0*spot, dc, bc, div_table)
shiftet_spot = 100
# vol_surf_shifted = vol_surf.shift_fwd_curve(obj_id_shifted, forward_curve_shifted)
vol_surf_shifted = vol_surf.shift_fwd_spot(obj_id_shifted, shiftet_spot)

In [None]:
vol_surf_shifted.calc_implied_vol(refdate,refdate + dt.timedelta(days=180),120)

In [None]:
pyvacon.finance.pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(pyvacon.finance.pricing.VolatilitySurfaceFwdStickyness.Type.StickyXStrike)
print(vol_surf_shifted.calcImpliedVol(refdate,refdate + dt.timedelta(days=180), 1.0))
pyvacon.finance.pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(pyvacon.finance.pricing.VolatilitySurfaceFwdStickyness.Type.StickyStrike)
print(vol_surf_shifted.calcImpliedVol(refdate,refdate + dt.timedelta(days=180), 1.0))

In [None]:
# 0.29221908667483804

In [None]:
mkt_data.VolatilitySurface.set_stickyness(rivapy.enums.VolatilityStickyness.StickyXStrike)
print(vol_surf_shifted.calc_implied_vol(refdate,refdate + dt.timedelta(days=180), 120))

In [None]:
y = pyvacon.finance.marketdata.VolatilitySurface("y", refdate, forward_curve._get_pyvacon_obj(), enums.DayCounterType.Act365Fixed.name, ssvi_param._get_pyvacon_obj())
print(y.calcImpliedVol(refdate,refdate + dt.timedelta(days=180), 120))

z = pyvacon.finance.marketdata.VolatilitySurface.createVolatilitySurfaceShiftedFwd(vol_surf._get_pyvacon_obj(), forward_curve._get_pyvacon_obj())
z.calcImpliedVol(refdate,refdate + dt.timedelta(days=180), 120)

In [None]:
import pyvacon
a = pyvacon.finance.marketdata.VolatilitySurface.createVolatilitySurfaceShiftedFwd(vol_surf._get_pyvacon_obj(), forward_curve_shifted._get_pyvacon_obj())

In [None]:
a.calcImpliedVol(refdate,refdate + dt.timedelta(days=180), 120)

In [None]:
b = vol_surf.shift_fwd_spot(obj_id_shifted, shiftet_spot)

In [None]:
b.calc_implied_vol(refdate,refdate + dt.timedelta(days=180), 120)

In [None]:
vol_surf.calc_implied_vol(refdate,refdate + dt.timedelta(days=180), 120)