#DNPV CODE 
#pip install numpy-financial
#pip install numpy

In [None]:
%pip install numpy_financial
%pip install pandas
%pip install openpyxl
%pip install matplotlib

import os
import glob
#import xlrd
import scipy.stats as st
from scipy.stats import norm
import numpy as np
import numpy_financial as npf
import pandas as pd
import openpyxl as xl 
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
from openpyxl import Workbook

from scipy.integrate import quad

In [13]:
def int_ab(Fx, xc, a, b, ns=1000):  #xc y ns no los uso
  '''
  Calculates the area (Aab)and center of gravity (cg) by numerical integration from a to b  
  of a 1D function Fx with respect to xc where ns is the discretization parameter.
  Returns the area and center of gravity 
  
  :param Fx: F(x) function to integrate  
  :type a:float (lower limit)
  :type b:float (upper limit)
  :type xc:float reference point
  :type ns:int discritization parameter
  
  delta = (b - a)/ns
  x2= a  
  Aab = 0
  momX = 0
  for i in range(ns):
    x1 = x2 
    x2 = x1 + delta
    xm = (x1+x2)/2 
    dA = Fx(xm)*delta
    Aab = Aab + dA
    momX = momX + dA*(xm-xc)
  cg = momX/Aab
  ''' 
 
  Aab, _ = quad(Fx, a, b) # Calcular el área bajo la curva A
  M, _ = quad(lambda x: x * Fx(x), a, b) # Calcular el momento M

  # Calcular el centro de gravedad
  if Aab != 0:
      cg = M / Aab
  else:
      raise ValueError("El área bajo la curva es cero. No se puede calcular el centro de gravedad.")

  return Aab, cg

####  Cambiaría la siguiente función a tener solo 2 parámetros: P50 y sig

In [3]:
def Risk_normal(p50=0, px = -3.0, x=0.9986, sig=1.0, sigf=True):
  '''
  Calculates the cost of risk of a random variable described by a normal probability density function (PDF).
  The PDF can be defined by:
   (1) the average (p50) and standard deviation (sig); or
   (2) p50, px and the associated cumulative probability x

  :type p50:float PDF average expected to be exceeded with 50% probability
  :type px:float represents the value expected to be exceeded with x% probability (default -3.0)
  :type x:float  represents the probability of exceedence (default 99.86%)
  :type sig:float standard deviation (default 1.0)
  :type sigf: bool indicates if the PDF will be assigned (False) or calculated from p50, px and x (True)
  '''
  if sigf:
    sig = (px-p50)/st.norm.ppf(1-x)

  #calculate the area of the downside and the center of gravity of the downside (cgd)
  #Because the PDF is symmetric, the distance from p50 to the cgd is the same for both sides of the curve 
  area, cgd = int_ab(st.norm.pdf,0,0,6)

  CostofRisk = cgd*sig*area
  if abs(p50)> 0: CostofRisk = CostofRisk/p50  
  return CostofRisk

In [5]:
print(1-norm.cdf(-3, loc=0, scale=1))

0.9986501019683699


In [4]:
def Risk_multinomial(p, x,ex = True, jf = 0, revenue=True):
  '''
  Calculates the cost of risk for a PDF defined by a multinomial distribution 
  p is a vector 
  x is a vector containing the corresponding events
  
  :type p: List containing the probabilities for each event x
  :type x: List containing the events
  ;type ex: bool flags if the base is the best (i.e., most likely) estimate (ex=False) or the average (ex=True)
  :type jf: indicates which event will be used as the based (default 0). Used when ex=False
  :type revenue: bool indicates is the PDF represents revenues or cost. Needed to evaluate the downside. 
  '''  
  ex_p = 0  
  risk = 0 
  j = len(p)
  for i in range(j):
    ex_p = ex_p + p[i]*x[i] 

  if ex: 
    base = ex_p  
  else: 
    base = x[jf]
    
  for i in range(j):
    if revenue: 
      risk = risk + p[i]*max(base-x[i],0) 
    else: 
      risk = risk + p[i]*max(x[i]-base,0) 

  if abs(base) > 0:
    costofrisk = risk/base
  else: costofrisk = risk
    
  return costofrisk

In [5]:
def getcashflowitems(xcl_file, srows=0, scols=0, sheet ='DNPV Input', fmt=''):
  '''
  Retrieves rows of data from an excel file stored in worksheet 'DNPV Input'
  Each line represents revenues or costs that are considered random
  Date is retrieved and stored it in a dictionary (df_dict) and panda dataframe (dfT) 
  Typical panda data structure have labels in the 1st row and data in columns
  To be used for data in xcelfile formatted in rows with id labes located in the 1st column. 
  The first row of data could be located srows below
  '''  

  if fmt=='':
    fmt = '${:,.0f}'     

  pd.options.display.float_format = fmt.format
  
  df = pd.read_excel(xcl_file, sheet, skiprows = srows, header=None)

  nrows = len(df)
  ncolumns = len(df.columns) 
  df_dict = {}

  for i in range(nrows):
    values=[]
    keys = df[0][i]
    for j in range(1,ncolumns):
      values.append(df.iloc[i][j])
    df_dict[keys] = values
  dfT = pd.DataFrame(df_dict).T

  return df_dict, dfT  
  

In [6]:
    #Testing kernel to be removed
    CFfilename = 'Solar Project 2024.xlsx'
    input_sht = 'DNPV Input' 
    output_sht = 'DNPV Output' 
    rowloc = 10

    #get cashflow items in a dictionary and panda dataframe formats
    #cashflow, dfT = getcashflowitems(CFfilename, sheet = input_sht, srows = rowloc)
    #cashflow
    #CFfilename, input_sht, rowloc    

In [7]:
def riskparameters():
    #This is a placeholder for later implementation. 
    #At the moment, they are simply assigned and stored in a list call rp
    #Each risk should be interactively assigned via pulldown menus
    #{Risk Key,[CF item, time independent risk, normalized risk cost]}
    #Also sets up the risk allocation factors for each run
    
    rp={}

    #Solar risk (revenue reduction)
    rp['Solar Risk']=['Revenues', True, Risk_normal(1164, 1088, 0.9)]
            
    #Maintence Risk (cost increase)
    p=[0.8,0.2]
    x=[1.0,1.25]
    rp['Maintenance Risk']=['Maintenance', True, Risk_multinomial(p,x, ex = False,revenue=False)]

    #Political Risk: FiT reduction (revenue reduction)
    p=[0.865,0.12,0.015]
    x=[0.43,0.28,0.06]
    rp['Political Risk']=['Revenues', False, Risk_multinomial(p,x, ex = False)]
 
    return rp



In [23]:
def risk_alloc(riskparam, nruns=1):
#This fucntion needs to be written to set the risk allocation factors for nruns
#for now the are all set = 1 for nruns = 1 else it should be read from excel

  rsk_alloc = {}
  run1 = [1, 1, 0.5]
  run2 = [1, 1, 0.0]

  for key in list(riskparam.keys()):
    rsk_alloc[key]=[1]

  if nruns>=2:
    for key in list(riskparam.keys()):
      ind = list(riskparam).index(key)  
      rsk_alloc[key].append(run1[ind])

  if nruns==3:
    for key in list(riskparam.keys()):
      ind = list(riskparam).index(key)  
      rsk_alloc[key].append(run2[ind])
        
  return rsk_alloc

In [10]:
class Risk:
    
  def __init__(self, label, vector, fctr=1, keyparameters={}, time = 'Year', d=0):
    self.dict ={}                 #initialize dictionario of risks
    self.l = len(vector)          #vector length
    self.d = d                    #rounding decimal for printing
    self.keyp = keyparameters     #dictionary containing all risk parameters used
    
    #create a list with time period & assign it to dict
    period = [i+1 for i in range(len(vector))]
    self.dict[time] = period
    
    if fctr != 1.0:
      vector = [x*fctr for x in vector]
    self.dict[label] = vector

  def assign(self,label,vector,fctr=1):
    if fctr != 1.0:
      vector = [x*fctr for x in vector]
    self.dict[label] = vector

  def add(self, label='', keys=[]):
    keyparam = list(self.keyp.keys())
    arrayt=np.array([0 for x in range(self.l)])
        
    for key in keyparam: 
      arrayt = arrayt + np.array(self.dict[key])

    self.dict[label] = arrayt.tolist()

  def sub(self, label, a, b, fa = 1, fb = 1):
    array_c = np.array(a)*fa - np.array(b)*fb
    self.dict[label] = array_c.tolist()
   
  def __str__(self):
    keys = list(self.dict.keys())
    keys.pop(0) #removes the year list
    txt = '{'
    for key in keys:
        vector = self.dict[key] 
        prn = [round(val,self.d) for val in vector]
        txt = txt + f"'{key}'" +':'+ str(prn) + '\n\n'  
    txt = txt[:-3]+'}'    
        
    return txt



In [12]:
def cost_of_risk_vectors(cashflow, riskparameters, rsk_alloc, sumkey, rcfkey, r_f=0, tax=0, run=1):

  risk = None
 
  for key in list(riskparameters.keys()):
        cfitem, tind, nriskcost = riskparameters[key]
        raf = rsk_alloc[key][run-1]
        vector = cashflow[cfitem]

        if tind == False:    
            dim = len(vector)  
            tvector=[]
            for i in range(dim):
              sum = 0
              for j in range(i,dim):
                sum = sum + vector[j]/pow(1+r_f+nriskcost,j-i+1)
              tvector.append(sum*nriskcost)
            vector = tvector
            nriskcost = 1

        if risk == None: 
            #initiates the 1st vector of the dictionary and setup risk parameters
            risk = Risk(key,vector, nriskcost*raf, d=1)
            risk.keyp = riskparameters
        else: 
            risk.assign(key, vector, nriskcost*raf)

  #Add all the calculated cost of risks and assign it to risk.dict[sumkey]  
  risk.add(sumkey)

  #calculate the riskless cashflows vector and assign it to risk.dict[rcfkey]  
  key0 = list(cashflow.keys())[-1]
  risk.sub(rcfkey, cashflow[key0],risk.dict[sumkey],fb = 1-tax)
   
  return risk
    

In [13]:
def write_pd_toxcl(xcl_file, output_sheet = '', data={},fmt=''):
  #writes the results from a pd dataframe to excel
    
  if output_sheet in list(pd.ExcelFile(xcl_file).sheet_names):
    sheet_status='overlay'
  else:
    sheet_status='new'

  if fmt=='':
    fmt = '${:,.0f}'     

  pd.options.display.float_format = fmt.format

  risk_pdmatrix = pd.DataFrame(data).T

  with pd.ExcelWriter(xcl_file, mode='a', if_sheet_exists=sheet_status) as writer:     
    risk_pdmatrix.to_excel(writer, sheet_name = output_sheet, header=False)  
  
  


In [14]:
def write_dict_to_xl(xcl_file, output_sheet='', sheet_loc = 4, data = {}):
  #writes the results from a dict to excel
    
  wb = xl.load_workbook(xcl_file)
  sheetnames = list(wb.sheetnames)

  if output_sheet in sheetnames:
    ws = wb[output_sheet]
    wb.remove(ws)

  risk_pd = pd.DataFrame(data).T

  with pd.ExcelWriter(xcl_file, mode='a') as writer:     
    risk_pd.to_excel(writer, sheet_name = output_sheet, header=False)  

  ws = wb[output_sheet]
  ws.delete_col[0]


In [38]:
    #Obtain the items needed to estimate the cost of risks
    #Excel tab should have the rows with data affecred by risk
    #All the data below should be provided 

    dnpv_cases = []
    dnpv_results = []
    npv_results = []

    #Parameters assigned that need to be input
    cotax = 0.333     #corporate tax rate
    rate_f = 0.028  #risk-free rate
    runtot = 3                   #Number of cases
    wacc = [0.10, 0.1, 0.08]     #investor's capital cost
    cost = 14e6                  #project cost
    CFfilename = 'Solar Project 2024.xlsx'
    input_ws = 'DNPV Input' 
    output_ws = 'DNPV Output' 
    dnpv_key = 'DNPV-CF'
    sum_key = 'Sum Risk'
    rowloc = 10
    time0 = False #Cash flow data does not start from t = 0

    #get cashflow items in a dictionary and panda dataframe formats
    cashflow, dfT = getcashflowitems(CFfilename, sheet = input_ws, srows = rowloc)

    #get the risk parameters in a dictionary format to calculate DNPV cash flows
    rp = riskparameters()
    
    #gets the risk allocation factors for each DNPV run to be performed 
    rsk_alloc = risk_alloc(rp, runtot)
 
    #calculate cost of risk items & DNPV cash flows and store them in risk dict 
    
    if not time0:
      init = [0]  # to be included as period 0 if cashflows do not include t=0 data (initial costs)
      cost0 = cost
    else:
      init = []
      cost0 = 0
    
    for i in range(runtot):
      risk = cost_of_risk_vectors(cashflow, rp, rsk_alloc, sum_key, dnpv_key, r_f =rate_f, tax=cotax, run = i+1)    

      #using the DNPV key, read riskless cash flows 
      dnpvvec = risk.dict[dnpv_key]
       
      dnpv = npf.npv(rate_f,init+dnpvvec)-cost0
      npv  = npf.npv(wacc[i],init+cashflow[keycf])-cost0
        
      dnpv_cases.append(risk.dict) 
      dnpv_results.append(dnpv)
      npv_results.append(npv)
    
      print(f"Case{i:2d} DNPV: ${dnpv_results[i]:,.0f} & NPV: ${npv_results[i]:,.0f} ")   

      pd.DataFrame(risk.dict)
        
    #Get the last key 
    keycf = list(cashflow.keys())[-1]

    #print(f"NPV: ${npv:,.0f} " )   


    #write_pd_toxcl(CFfilename, output_ws, risk.dict)        

    #pd.DataFrame(risk.dict)
 


Case 0 DNPV: $1,265,231 & NPV: $-241,190 
Case 1 DNPV: $5,976,366 & NPV: $-241,190 
Case 2 DNPV: $10,687,501 & NPV: $1,992,973 
