```python
# Copyright 2022 Bloomberg Finance L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

# Market Value Item

### What is market value?

All of the previously discussed items have focused on different collections of securities (positions, accounts, and portfolios). Now we're going to take another step forward and discuss market value. Market value is generally the price that an asset of any type (e.g., equity, fixed income, derivatives, fx) would fetch in the marketplace. It is also the total value of the securities in which you hold a position. Depending on the security type, market value can be simple (i.e., for equities, the value of a unit is well-known) or more complex with various options (i.e., for derivatives, there are a few ways to determine and/or state what the market value of your position is).

For our purpose we'll stick with simple market value calculations where market value is equal to some unit multiplier times your positions

For example, you could have a position and current share price/unit market value like below:

My position is ***2000*** shares of ***MSFT USD***. The current price of ***MSFT USD*** is ***19.00***. This gives my position a 
market value of **38,000 USD**!

### Problem Definition

We want to extend our classes to allow for querying of the market values for various securities at different levels of aggregation. We want this classes to have to the ability below functionality:

***Security***
- Query the current market value of the security

***Positions*** 
- Query the current market value for the current position

***Account***
- Query the market value for a given set of security names or security objects. These objects should act as filters to what should be included in calculating the accounts net market value. Any securities not present in the account but the filter can be ignored
- Query the market value for the entire account. This should return a net market value of all positions in the account.

***Portfolio***
- Query the market value for the entire portfolio. This requires that security market values be atomic in your calculation method per query.
- Query the market value for a given set of securities/account combinations. If securities are present they should act as a filter on the accounts to be queried. If accounts are present they should act as a filter on the accounts under query. Handle scenarios where securities or accounts given aren't present without causing exceptions.
- **Create tests for the above portfolio features**

### Provided Tools

#### *Data Source*

To simulate prices, you can utilize the given class priceData which can be imported from generators.priceDataGenerator. This class is a singleton and will generate prices each time a query is given. These prices are stored internally in the class for testing in a replay mode. For a given security if the security name contains "Equity" or "eqty" (case insensitive) the market value will always be positive. To query the price of a security you can use the object's getCurrentPrice method. Below is an example of this

```python
#Data Souce
from generators.priceDataGenerator import priceData

pD = priceData()

curPrice = pD.getCurrentPrice("IBM US Equity")
```

#### *Solution Interface*

The below interface show the new functionality you should add to your existing classes.

```python
#filename securityInterface.py
#Security Class Interface
class securityInterface():
    #...
    #Previous implementation above
    def getCurrentMarketValue(self) -> float:
```

```python
#filename positionInterface.py
#Position Class Interface
class positionInterface():
    #...
    #Previous implementation above
    def getCurrentMarketValue(self) -> float:
        pass
```

```python
#filename accountInterface.py
#Account Class Interface
class accountInterface():
    #...
    #Previous implementation above
    
    def getCurrentMarketValue(self) -> float:
        pass

    def getCurrentFilteredMarketValue(self, securities: Set) -> float:
        pass
```

```python
#filename portfolioInterface.py
#Portfolio Class Interface
class portfolioInterface():
    #...
    #Previous implementation above
    
    def getCurrentMarketValue(self) -> dict:
        pass

    def getCurrentFilteredMarketValue(self, securities: Set, accountNames: Set[str]) -> float:
        pass
```

#### *Testing*

Once you have completed & saved your solution you can run the test file to validate that your solution works as expected. For the test to run the following need to be true.
- Saved code to files to their respective .py's in the implementation folders. **Remember to copy the current implementation as saving will overwrite your previous implementation!** 

In [1]:
import os
import sys
module_path = os.path.abspath('..')
if module_path not in sys.path:
    sys.path.append(module_path)

In [2]:
%%writefile ../implementations/securitySolution.py 
#Uncomment line above & run cell to save solution
#TODO Define class that implements securityInterface & allows for the management of a security
from interfaces.securityInterface import securityInterface
from generators.priceDataGenerator import priceData

class security(securityInterface):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self.m_name = name
        self.m_priceData = priceData()

    def getName(self) -> str:
        return self.m_name

    def getCurrentMarketValue(self) -> float:
        return self.m_priceData.getCurrentPrice(self.m_name)

Overwriting ../implementations/securitySolution.py


In [3]:
%%writefile ../implementations/positionSolution.py 
#Uncomment line above & run cell to save solution
#TODO Define class that implements positionInterface & allows for the management of a position
from interfaces.securityInterface import securityInterface
from interfaces.positionInterface import positionInterface

from implementations.securitySolution import security

class position(positionInterface):
    def __init__(self, positionType, initialPosition: int) -> None:
        self.positionType = positionType
        self.m_initialPosition = initialPosition
        
        if isinstance(positionType, securityInterface):
            self.m_security = positionType
        else:
            self.m_security = security(positionType)
            
    def getSecurity(self) -> securityInterface:
        return self.m_security

    def getPosition(self) -> int:
        return self.m_initialPosition
    
    def setPosition(self, inputValue: int) -> None:
        if inputValue < 0:
            raise Exception('Short position', inputValue)
        self.m_initialPosition = inputValue
    
    def addPosition(self, inputValue: int) -> None:
        newPosition = self.m_initialPosition + inputValue
        if newPosition < 0:
            raise Exception('Short position', newPosition)
        self.m_initialPosition = newPosition

    def getCurrentMarketValue(self) -> float:
        return self.m_security.getCurrentMarketValue() * self.m_initialPosition

Overwriting ../implementations/positionSolution.py


In [1]:
%%writefile ../implementations/accountSolution.py 
#Uncomment line above & run cell to save solution
#TODO Define class that implements accountInterface & allows for the management of an account
from interfaces.positionInterface import positionInterface
from interfaces.accountInterface import accountInterface
from interfaces.securityInterface import securityInterface
from typing import Any, Dict, Set, Iterable
from math import fsum

class account(accountInterface):
    def __init__(self, position: Set[positionInterface], accountName: str) -> None:
        self.m_accountName = accountName
        self.m_positionMap = {posItem.getSecurity().getName() : posItem for posItem in position}
        
    def getName(self) -> str:
        return self.m_accountName

    def getAllPositions(self) -> Iterable[positionInterface]: #  is iterable of type positionInterface
        return list(self.m_positionMap.values())

    def getPositions(self, securities: Set) -> Dict[Any, positionInterface]:
        returnPositionMap = dict()

        for securityKey in securities:
            if isinstance(securityKey, securityInterface) and securityKey.getName() in self.m_positionMap:
                returnPositionMap[securityKey] = self.m_positionMap[securityKey.getName()]
            elif securityKey in self.m_positionMap:
                returnPositionMap[securityKey] = self.m_positionMap[securityKey]
        
        return returnPositionMap

    def addPositions(self, positions: Set[positionInterface]) -> None:
        for posItem in positions:
            if posItem.getSecurity().getName() in self.m_positionMap:
                self.m_positionMap[posItem.getSecurity().getName()].setPosition(posItem.getPosition())
            else:
                self.m_positionMap[posItem.getSecurity().getName()] = posItem
    
    def removePositions(self, securities: Set) -> None:
        for securityKey in securities:
            if isinstance(securityKey, securityInterface):
                self.m_positionMap.pop(securityKey.getName(), None)
            else: 
                self.m_positionMap.pop(securityKey, None)

    def getCurrentMarketValue(self) -> float:
        mv = float(0)
        for posItem in self.getAllPositions():
            mv += posItem.getCurrentMarketValue()
        return mv

    def getCurrentFilteredMarketValue(self, securities: Set) -> float:
        return fsum([pos.getCurrentMarketValue() for pos in self.getPositions(securities).values()])
        

Overwriting ../implementations/accountSolution.py


In [5]:
%%writefile ../implementations/portfolioSolution.py 
#Uncomment line above & run cell to save solution
#TODO Define class that implements portFolioInterface & allows for the management of a portfolio
from interfaces.portfolioInterface import portfolioInterface
from interfaces.accountInterface import accountInterface
from implementations.accountSolution import account
from typing import Set, Iterable

class portfolio(portfolioInterface):
    def __init__(self, portfolioName: str, accounts: Set[accountInterface]) -> None:
        super().__init__(portfolioName, accounts)
        self.m_name = portfolioName
        self.m_accountsMap = {accItem.getName() : accItem for accItem in accounts}

    def getAllAccounts(self) -> Iterable[accountInterface]:
        return list(self.m_accountsMap.values())

    def getAccounts(self, accountNamesFilter: Set[str], securitiesFilter: Set) -> Iterable[accountInterface]:
        if len(accountNamesFilter) == 0 and len(securitiesFilter) == 0:
            return self.getAllAccounts()

        if len(accountNamesFilter) != 0:
            filteredAcc = set()

            for acc in accountNamesFilter:
                if acc in self.m_accountsMap:
                    filteredAcc.add(self.m_accountsMap[acc])
        else:
            filteredAcc = set(self.m_accountsMap.values())

        finalSet = set()
        if len(securitiesFilter) != 0:
            for acc in filteredAcc:
                if len(acc.getPositions(securitiesFilter)) != 0:
                    finalSet.add(acc)
        else:
            finalSet = filteredAcc

        return finalSet
                
    def addAccounts(self, accounts: Set[accountInterface]) -> None:
        for acc in accounts:
            if acc.getName() in self.m_accountsMap:
                self.m_accountsMap.pop(acc.getName())
            else:
                self.m_accountsMap[acc.getName()] = acc

    def removeAccounts(self, accountNames: Set[str]) -> None:
        for acc in accountNames:
            if acc in self.m_accountsMap:
                self.m_accountsMap.pop(acc, None)

    def __aggregateAccountMV(self, accounts: Iterable[accountInterface]):
        #Aggregate positions at this level & query their security value.
        aggregatePosMap = {}
        
        for account in accounts:
            for position in account.getAllPositions():
                if position.getSecurity().getName() in aggregatePosMap:
                    aggregatePosMap[position.getSecurity().getName()][0] += position.getPosition()
                else:
                    #List with reference to underlying security
                    aggregatePosMap[position.getSecurity().getName()] = [position.getPosition(), position.getSecurity()]
                    
        summedMarketValue = 0
        for posTuple in aggregatePosMap.values():
            summedMarketValue += posTuple[0] * posTuple[1].getCurrentMarketValue()

        return summedMarketValue

    def getCurrentMarketValue(self) -> dict:
        return self.__aggregateAccountMV(self.m_accountsMap.values())

    def getCurrentFilteredMarketValue(self, securities: Set, accountNames: Set[str]) -> float:
        return self.__aggregateAccountMV(self.trimAccountPositions(self.getAccounts(accountNames, securities), securities))

    def trimAccountPositions(self, accounts: Iterable[accountInterface], securities: Set) -> Iterable[accountInterface]:
        if len(securities) == 0:
            return accounts

        trimmedAccounts = set()
        for acc in accounts:
            trimmedAccounts.add(account(acc.getPositions(securities).values(), "trimmed"))    

        return trimmedAccounts

Overwriting ../implementations/portfolioSolution.py


In [6]:
#%conda install ipytest
#%pip install ipytest
import ipytest

ipytest.autoconfig()

In [7]:
%%ipytest -qq
import pytest
# import implementations.securitySolution
# import implementations.positionSolution
# import implementations.accountSolution

from implementations.securitySolution import security
from implementations.positionSolution import position
from implementations.accountSolution import account
from implementations.portfolioSolution import portfolio
from generators.priceDataGenerator import priceData
# import importlib
# importlib.reload(implementations.securitySolution)
# importlib.reload(implementations.positionSolution)
# importlib.reload(implementations.accountSolution)

def test_securityValueGather():
    #GIVEN
    SECURITY_NAME = "TSLA US Equity"
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()

    #WHEN
    testObj = security(SECURITY_NAME) # implementations.securitySolution.security
    currentPrice = testObj.getCurrentMarketValue()

    #EXPECT
    assert currentPrice == DATA_SOURCE.getSecurityPriceDataList(SECURITY_NAME)[-1]

def test_PositionMarketValue():
    #GIVEN
    EXPECTED_NAME = "IBM US Equity"
    EXPECTED_POSITION_AMOUNT = 1000
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()

    #WHEN
    testObj = position(EXPECTED_NAME, EXPECTED_POSITION_AMOUNT) # implementations.positionSolution
    MV = testObj.getCurrentMarketValue()
    LASTEST_EXPECTED_MV = EXPECTED_POSITION_AMOUNT * DATA_SOURCE.getSecurityPriceDataList(EXPECTED_NAME)[-1]

    #EXPECT
    assert (LASTEST_EXPECTED_MV == MV)

def test_SecuritySearchAccountMV():
    #GIVEN
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()
    EXPECTED_ACCOUNT_POSITIONS = [
        position("IBM US Equity", 530),
        position("TSLA US Equity", 1120),
        position("NVDA US Equity", 7421)
    ]
    SEARCH_SECURITIES_LIST =  ["IBM US Equity", security("NVDA US Equity"), "MSFT US Equity"] # implementations.securitySolution
    SEARCH_SECURITIES_TUPLE = [["IBM US Equity", 530], ["NVDA US Equity", 7421]]
    testObj = account(EXPECTED_ACCOUNT_POSITIONS, "Test Account") # implementations.accountSolution

    #WHEN
    MV = testObj.getCurrentFilteredMarketValue(SEARCH_SECURITIES_LIST)
    EXPECTED_MV = 0
    for secTuple in SEARCH_SECURITIES_TUPLE:
        if secTuple[1] != 0:
            EXPECTED_MV +=  secTuple[1] * DATA_SOURCE.getSecurityPriceDataList(secTuple[0])[-1]
    #EXPECT
    assert (EXPECTED_MV == MV)

def test_TotalAccountMV():
    #GIVEN
    EXPECTED_ACCOUNT_POSITIONS = [
        position("IBM US Equity", 530), # implementations.positionSolution
        position("TSLA US Equity", 1120), 
        position("NVDA US Equity", 7421)
    ]
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()
    testObj = account(EXPECTED_ACCOUNT_POSITIONS, "Test Account") # implementations.accountSolution

    #WHEN
    MV = testObj.getCurrentMarketValue()
    EXPECTED_MV = 0
    for pos in EXPECTED_ACCOUNT_POSITIONS:
        EXPECTED_MV += pos.getPosition() * DATA_SOURCE.getSecurityPriceDataList(pos.getSecurity().getName())[-1]
    #EXPECT
    assert (EXPECTED_MV == MV)

#TODO Portfolio MV Tests
def test_PortfolioMV():
    #GIVEN
    # DATA_SOURCE = priceData()
    # DATA_SOURCE.clearPriceHistory()
    PORTFOLIO_NAME = 'TestPortfolio'
    ACCOUNT_A_POSITIONS = [
        position("MSFT US Equity", 400),
        position("TSLA US Equity", 1120),
        position("IBM US Equity", 201)
    ]
    ACCOUNT_B_POSITIONS = [
        position("APPL US Equity", 500),
        position("DLTA US Equity", 623),
        position("NVDA US Equity", 7421)
    ]
    ACCOUNT_A = account(ACCOUNT_A_POSITIONS, 'Account A')
    ACCOUNT_B = account(ACCOUNT_B_POSITIONS, 'Account B')
    ACCOUNTS = set([ACCOUNT_A, ACCOUNT_B])
    
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()
    testObj = portfolio(PORTFOLIO_NAME, ACCOUNTS)

    #WHEN
    MV = testObj.getCurrentMarketValue()
    EXPECTED_MV = 0

    POSITION_MAP_TOTAL = {}
    for acc in ACCOUNTS:
        for pos in acc.getAllPositions():
            if pos.getSecurity().getName() in POSITION_MAP_TOTAL:
                POSITION_MAP_TOTAL[pos.getSecurity().getName()].addPosition(pos.getPosition())
            else:
                POSITION_MAP_TOTAL[pos.getSecurity().getName()] = position(pos.getSecurity(), pos.getPosition())

    for pos in POSITION_MAP_TOTAL.values():
        EXPECTED_MV += pos.getPosition() * DATA_SOURCE.getSecurityPriceDataList(pos.getSecurity().getName())[-1]

    #EXPECT
    assert (EXPECTED_MV == MV)

def test_PortfolioSearchMV():
    #GIVEN
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()
    PORTFOLIO_NAME = 'TestPortfolio'
    ACCOUNT_A_POSITIONS = [
        position("MSFT US Equity", 400),
        position("TSLA US Equity", 1120),
        position("IBM US Equity", 201)
    ]
    ACCOUNT_B_POSITIONS = [
        position("APPL US Equity", 500),
        position("DLTA US Equity", 623),
        position("NVDA US Equity", 7421),
        position("IBM US Equity", 530)
    ]
    ACCOUNT_A = account(ACCOUNT_A_POSITIONS, 'Account A')
    ACCOUNT_B = account(ACCOUNT_B_POSITIONS, 'Account B')
    ACCOUNTS = set([ACCOUNT_A, ACCOUNT_B])

    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()
    testObj = portfolio(PORTFOLIO_NAME, ACCOUNTS)

    #WHEN
    POSITION_MAP_TOTAL_ACC = {}
    POSITION_MAP_TOTAL_SECURITY = {}
    POSITION_MAP_TOTAL_SECURITY_ACC = {}
    SECURITY_FILTER = "IBM US Equity"
    ACC_FILTER = "Account B"

    for acc in ACCOUNTS:
        for pos in acc.getAllPositions():
            if pos.getSecurity().getName() == SECURITY_FILTER:
                if pos.getSecurity().getName() in POSITION_MAP_TOTAL_SECURITY:
                    print
                    POSITION_MAP_TOTAL_SECURITY[pos.getSecurity().getName()].addPosition(pos.getPosition())
                else:
                    POSITION_MAP_TOTAL_SECURITY[pos.getSecurity().getName()] = position(pos.getSecurity().getName(), pos.getPosition())
                    
            if acc.getName() == ACC_FILTER:
                if pos.getSecurity().getName() in POSITION_MAP_TOTAL_ACC:
                    POSITION_MAP_TOTAL_ACC[pos.getSecurity().getName()].addPosition(pos.getPosition())
                else:
                    POSITION_MAP_TOTAL_ACC[pos.getSecurity().getName()] = position(pos.getSecurity().getName(), pos.getPosition())

                if pos.getSecurity().getName() == SECURITY_FILTER:
                    if pos.getSecurity().getName() in POSITION_MAP_TOTAL_SECURITY_ACC:
                        POSITION_MAP_TOTAL_SECURITY_ACC[pos.getSecurityName()].addPosition(pos.getPosition())
                    else:
                        POSITION_MAP_TOTAL_SECURITY_ACC[pos.getSecurity().getName()] = position(pos.getSecurity().getName(), pos.getPosition())
    
    MV_ACC = testObj.getCurrentFilteredMarketValue([], [ACC_FILTER])
    EXPECTED_MV_ACC = 0
    for pos in POSITION_MAP_TOTAL_ACC.values():
        EXPECTED_MV_ACC += pos.getPosition() * DATA_SOURCE.getSecurityPriceDataList(pos.getSecurity().getName())[-1]
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()

    MV_SECURITY = testObj.getCurrentFilteredMarketValue([SECURITY_FILTER], [])
    EXPECTED_MV_SECURITY = 0
    for pos in POSITION_MAP_TOTAL_SECURITY.values():
        EXPECTED_MV_SECURITY += pos.getPosition() * DATA_SOURCE.getSecurityPriceDataList(pos.getSecurity().getName())[-1]
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()

    MV_ACC_SECURITY = testObj.getCurrentFilteredMarketValue([SECURITY_FILTER], [ACC_FILTER])
    EXPECTED_MV_ACC_SECURITY = 0
    for pos in POSITION_MAP_TOTAL_SECURITY_ACC.values():
        EXPECTED_MV_ACC_SECURITY += pos.getPosition() * DATA_SOURCE.getSecurityPriceDataList(pos.getSecurity().getName())[-1]
    DATA_SOURCE = priceData()
    DATA_SOURCE.clearPriceHistory()

    #EXPECT
    assert (MV_ACC == EXPECTED_MV_ACC) 
    assert (MV_ACC_SECURITY == EXPECTED_MV_ACC_SECURITY) 
    assert (MV_SECURITY == EXPECTED_MV_SECURITY) 

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                       [100%][0m
