<img src="http://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

# Python for Financial Data Science

Dr Yves J Hilpisch | The Python Quants GmbH

http://tpq.io | <a href="mailto:training@tpq.io">training@tpq.io</a>


<img src="http://hilpisch.com/images/py4fi_2nd.png" width="35%" align="left">

# Statistics (a)

## Normality Tests

### Benchmark Case

In [None]:
!git clone https://github.com/tpq-classes/financial_data_science_.git
import sys
sys.path.append('financial_data_science_')


In [None]:
from pylab import mpl, plt
plt.style.use('seaborn-v0_8')
mpl.rcParams['font.family'] = 'serif'
import warnings; warnings.simplefilter('ignore')
%matplotlib inline

In [None]:
import math
import numpy as np
import scipy.stats as scs
import statsmodels.api as sm
import matplotlib as mpl
import matplotlib.pyplot as plt

In [None]:
def gen_paths(S0, r, sigma, T, M, I):
    ''' Generate Monte Carlo paths for geometric Brownian motion.
    
    Parameters
    ==========
    S0: float
        initial stock/index value
    r: float
        constant short rate
    sigma: float
        constant volatility
    T: float
        final time horizon
    M: int
        number of time steps/intervals
    I: int
        number of paths to be simulated
        
    Returns
    =======
    paths: ndarray, shape (M + 1, I)
        simulated paths given the parameters
    '''
    dt = T / M
    paths = np.zeros((M + 1, I))
    paths[0] = S0
    for t in range(1, M + 1):
        rand = np.random.standard_normal(I)
        rand = (rand - rand.mean()) / rand.std()
        paths[t] = paths[t - 1] * np.exp((r - 0.5 * sigma ** 2) * dt +
                                         sigma * math.sqrt(dt) * rand)
    return paths

In [None]:
S0 = 100.
r = 0.05
sigma = 0.2
T = 1.0 
M = 50
I = 250000
np.random.seed(1000)

In [None]:
paths = gen_paths(S0, r, sigma, T, M, I)

In [None]:
S0 * math.exp(r * T)

In [None]:
paths[-1].mean()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(paths[:, :10])
plt.xlabel('time steps')
plt.ylabel('index level');

In [None]:
paths[:, 0].round(4)

In [None]:
log_returns = np.log(paths[1:] / paths[:-1]) 

In [None]:
log_returns[:, 0].round(4)

In [None]:
def print_statistics(array):
    ''' Prints selected statistics.
    
    Parameters
    ==========
    array: ndarray
        object to generate statistics on
    '''
    sta = scs.describe(array)
    print('%14s %15s' % ('statistic', 'value'))
    print(30 * '-')
    print('%14s %15.5f' % ('size', sta[0]))
    print('%14s %15.5f' % ('min', sta[1][0]))
    print('%14s %15.5f' % ('max', sta[1][1]))
    print('%14s %15.5f' % ('mean', sta[2]))
    print('%14s %15.5f' % ('std', np.sqrt(sta[3])))
    print('%14s %15.5f' % ('skew', sta[4]))
    print('%14s %15.5f' % ('kurtosis', sta[5]))

In [None]:
print_statistics(log_returns.flatten())

In [None]:
log_returns.mean() * M + 0.5 * sigma ** 2

In [None]:
log_returns.std() * math.sqrt(M)

In [None]:
plt.figure(figsize=(10, 6))
plt.hist(log_returns.flatten(), bins=70, density=True,
         label='frequency', color='b')
plt.xlabel('log return')
plt.ylabel('frequency')
x = np.linspace(plt.axis()[0], plt.axis()[1])
plt.plot(x, scs.norm.pdf(x, loc=r / M, scale=sigma / np.sqrt(M)),
         'r', lw=2.0, label='pdf')
plt.legend();

In [None]:
sm.qqplot(log_returns.flatten()[::500], line='s')
plt.xlabel('theoretical quantiles')
plt.ylabel('sample quantiles');

In [None]:
def normality_tests(arr):
    ''' Tests for normality distribution of given data set.
    
    Parameters
    ==========
    array: ndarray
        object to generate statistics on
    '''
    print('Skew of data set  %14.3f' % scs.skew(arr))
    print('Skew test p-value %14.3f' % scs.skewtest(arr)[1])
    print('Kurt of data set  %14.3f' % scs.kurtosis(arr))
    print('Kurt test p-value %14.3f' % scs.kurtosistest(arr)[1])
    print('Norm test p-value %14.3f' % scs.normaltest(arr)[1])

In [None]:
normality_tests(log_returns.flatten())

In [None]:
f, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 6))
ax1.hist(paths[-1], bins=30)
ax1.set_xlabel('index level')
ax1.set_ylabel('frequency')
ax1.set_title('regular data')
ax2.hist(np.log(paths[-1]), bins=30)
ax2.set_xlabel('log index level')
ax2.set_title('log data')

In [None]:
print_statistics(paths[-1])

In [None]:
print_statistics(np.log(paths[-1]))

In [None]:
normality_tests(np.log(paths[-1]))

In [None]:
plt.figure(figsize=(10, 6))
log_data = np.log(paths[-1])
plt.hist(log_data, bins=70, density=True,
         label='observed', color='b')
plt.xlabel('index levels')
plt.ylabel('frequency')
x = np.linspace(plt.axis()[0], plt.axis()[1])
plt.plot(x, scs.norm.pdf(x, log_data.mean(), log_data.std()),
         'r', lw=2.0, label='pdf')
plt.legend();

In [None]:
sm.qqplot(log_data, line='s')
plt.xlabel('theoretical quantiles')
plt.ylabel('sample quantiles');

### Real World Data

In [None]:
import pandas as pd

In [None]:
raw = pd.read_csv('http://hilpisch.com/tr_eikon_eod_data.csv',
                 index_col=0, parse_dates=True)

In [None]:
symbols = ['SPY', 'GLD', 'AAPL.O', 'MSFT.O']

In [None]:
data = raw[symbols]
data = data.dropna()

In [None]:
data.info()

In [None]:
data.head()

In [None]:
(data / data.iloc[0] * 100).plot(figsize=(10, 6));

In [None]:
log_returns = np.log(data / data.shift(1))
log_returns.head()

In [None]:
log_returns.hist(bins=50, figsize=(10, 8));

In [None]:
for sym in symbols:
    print('\nResults for symbol {}'.format(sym))
    print(30 * '-')
    log_data = np.array(log_returns[sym].dropna())
    print_statistics(log_data)

In [None]:
sm.qqplot(log_returns['SPY'].dropna(), line='s')
plt.title('SPY')
plt.xlabel('theoretical quantiles')
plt.ylabel('sample quantiles');

In [None]:
sm.qqplot(log_returns['MSFT.O'].dropna(), line='s')
plt.title('MSFT.O')
plt.xlabel('theoretical quantiles')
plt.ylabel('sample quantiles');

In [None]:
for sym in symbols:
    print('\nResults for symbol {}'.format(sym))
    print(32 * '-')
    log_data = np.array(log_returns[sym].dropna())
    normality_tests(log_data)

## Portfolio Optimization

### The Data

In [None]:
symbols = ['AAPL.O', 'MSFT.O', 'SPY', 'GLD']

In [None]:
noa = len(symbols)

In [None]:
data = raw[symbols]

In [None]:
rets = np.log(data / data.shift(1))

In [None]:
rets.hist(bins=40, figsize=(10, 8));

In [None]:
rets.mean() * 252

In [None]:
rets.cov() * 252

### The Basic Theory

In [None]:
weights = np.random.random(noa)
weights /= np.sum(weights)

In [None]:
weights

In [None]:
weights.sum()

In [None]:
np.sum(rets.mean() * weights) * 252

In [None]:
np.dot(weights.T, np.dot(rets.cov() * 252, weights))

In [None]:
math.sqrt(np.dot(weights.T, np.dot(rets.cov() * 252, weights)))

In [None]:
def port_ret(weights):
    return np.sum(rets.mean() * weights) * 252

In [None]:
def port_vol(weights):
    return np.sqrt(np.dot(weights.T, np.dot(rets.cov() * 252, weights)))

In [None]:
prets = []
pvols = []
for p in range (2500):
    weights = np.random.random(noa)
    weights /= np.sum(weights)
    prets.append(port_ret(weights))
    pvols.append(port_vol(weights))
prets = np.array(prets)
pvols = np.array(pvols)

In [None]:
plt.figure(figsize=(10, 6))
plt.scatter(pvols, prets, c=prets / pvols,
            marker='o', cmap='coolwarm')
plt.xlabel('expected volatility')
plt.ylabel('expected return')
plt.colorbar(label='Sharpe ratio');

### Portfolio Optimizations

In [None]:
import scipy.optimize as sco

In [None]:
def min_func_sharpe(weights):
    return -port_ret(weights) / port_vol(weights)

In [None]:
cons = ({'type': 'eq', 'fun': lambda x:  np.sum(x) - 1})

In [None]:
bnds = tuple((0, 1) for x in range(noa))

In [None]:
eweights = np.array(noa * [1. / noa,])
eweights

In [None]:
min_func_sharpe(eweights)

In [None]:
%%time
opts = sco.minimize(min_func_sharpe, eweights,
                    method='SLSQP', bounds=bnds,
                    constraints=cons)

In [None]:
opts

In [None]:
opts['x'].round(3)

In [None]:
port_ret(opts['x']).round(3)

In [None]:
port_vol(opts['x']).round(3)

In [None]:
port_ret(opts['x']) / port_vol(opts['x'])

In [None]:
optv = sco.minimize(port_vol, eweights,
                    method='SLSQP', bounds=bnds,
                    constraints=cons)

In [None]:
optv

In [None]:
optv['x'].round(3)

In [None]:
port_vol(optv['x']).round(3)

In [None]:
port_ret(optv['x']).round(3)

In [None]:
port_ret(optv['x']) / port_vol(optv['x'])

### Efficient Frontier

In [None]:
cons = ({'type': 'eq', 'fun': lambda x:  port_ret(x) - tret},
        {'type': 'eq', 'fun': lambda x:  np.sum(x) - 1})

In [None]:
bnds = tuple((0, 1) for x in weights)

In [None]:
%%time
trets = np.linspace(0.05, 0.2, 50)
tvols = []

for tret in trets:
    res = sco.minimize(port_vol, eweights, method='SLSQP',
                       bounds=bnds, constraints=cons)
    tvols.append(res['fun'])

tvols = np.array(tvols, dtype=np.double)

In [None]:
plt.figure(figsize=(10, 6))
plt.scatter(pvols, prets, c=prets / pvols,
            marker='.', alpha=0.8, cmap='coolwarm')
plt.plot(tvols, trets, 'xb')
plt.plot(port_vol(opts['x']), port_ret(opts['x']),
         'r*', markersize=15.0)
plt.plot(port_vol(optv['x']), port_ret(optv['x']),
         'y*', markersize=15.0)
plt.xlabel('expected volatility')
plt.ylabel('expected return')
plt.colorbar(label='Sharpe ratio');

### Capital Market Line

In [None]:
import scipy.interpolate as sci

In [None]:
ind = np.argmin(tvols)
evols = tvols[ind:44]
erets = trets[ind:44]

In [None]:
ind

In [None]:
evols

In [None]:
erets

In [None]:
tck = sci.splrep(evols, erets)

In [None]:
def f(x):
    ''' Efficient frontier function (splines approximation). '''
    return sci.splev(x, tck, der=0)
def df(x):
    ''' First derivative of efficient frontier function. '''
    return sci.splev(x, tck, der=1)

In [None]:
def equations(p, rf=0.01):
    eq1 = rf - p[0]
    eq2 = rf + p[1] * p[2] - f(p[2])
    eq3 = p[1] - df(p[2])
    return eq1, eq2, eq3

In [None]:
opt = sco.fsolve(equations, [0.01, 0.5, 0.15])

In [None]:
opt

In [None]:
np.round(equations(opt), 6)

In [None]:
plt.figure(figsize=(10, 6))
plt.scatter(pvols, prets, c=(prets - 0.01) / pvols,
            marker='.', cmap='coolwarm')
plt.plot(evols, erets, 'g', lw=4.0)
cx = np.linspace(0.0, 0.3)
plt.plot(cx, opt[0] + opt[1] * cx, lw=1.5)
plt.plot(opt[2], f(opt[2]), 'r*', markersize=15.0) 
plt.grid(True)
plt.axhline(0, color='k', ls='--', lw=2.0)
plt.axvline(0, color='k', ls='--', lw=2.0)
plt.xlabel('expected volatility')
plt.ylabel('expected return')
plt.colorbar(label='Sharpe ratio');

In [None]:
cons = ({'type': 'eq', 'fun': lambda x:  port_ret(x) - f(opt[2])},
        {'type': 'eq', 'fun': lambda x:  np.sum(x) - 1})
res = sco.minimize(port_vol, eweights, method='SLSQP',
                   bounds=bnds, constraints=cons)

In [None]:
res['x'].round(3)

In [None]:
port_ret(res['x'])

In [None]:
port_vol(res['x'])

In [None]:
port_ret(res['x']) / port_vol(res['x'])

<img src="http://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

<a href="http://tpq.io" target="_blank">http://tpq.io</a> | <a href="http://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:training@tpq.io">training@tpq.io</a>