In [2]:
import pep8
import pandas as pd
import numpy as np
import unittest

In [3]:
class Portfolio:
    """Includes methods for calculate asset return, currency return
    and total return"""
    def __init__(self, prices, weights, currencies, exchanges):
        """
        Class initialization with all input data,
        except start_date and end_date
        """
        self.set_weights(weights)
        self.set_prices(prices)
        self.set_currencies(currencies)
        self.set_exchanges(exchanges)

    def set_weights(self, weights):
        """Setter for weights data"""
        if type(weights) is pd.DataFrame:
            self._weights = weights
        else:
            raise TypeError("Wrond type of data. Need DataFrame")

    def set_prices(self, prices):
        """Setter for prices data"""
        if type(prices) is pd.DataFrame:
            self._prices = prices
        else:
            raise TypeError("Wrond type of data. Need DataFrame")

    def set_currencies(self, currencies):
        """Setter for currencies of assets"""
        if type(currencies) is pd.DataFrame:
            self._currencies = currencies
        else:
            raise TypeError("Wrond type of data. Need DataFrame")

    def set_exchanges(self, exchanges):
        """Setter for exchange rates data"""
        if type(exchanges) is pd.DataFrame:
            self._exchanges = exchanges
        else:
            raise TypeError("Wrond type of data. Need DataFrame")

    def shape_equalizator(df1, df2):
        """Equalize shapes of two data frames

        Join two DataFrames to one array, after than looking for NaN
        values in that array and fill in data for previous day
        """

        df_columns = df1.columns
        df1 = df1.add_prefix('d1_')
        df2 = df2.add_prefix('d2_')
        buff = pd.concat([df1, df2], axis=1, sort=True)
        buff_np = np.array(buff)

        for i1, i2 in enumerate(buff_np):
            for j1, j2 in enumerate(i2):
                if pd.isna(j2):
                    buff_np[i1, j1] = buff_np[i1-1, j1]
        filled_df = pd.DataFrame(buff_np, index=buff.index,
                                 columns=buff.columns)

        filled_df_1 = filled_df.iloc[:, :df1.shape[1]]
        filled_df_2 = filled_df.iloc[:, df1.shape[1]:]
        filled_df_1.columns = df_columns
        filled_df_2.columns = df_columns

        return filled_df_1, filled_df_2

    def __calculate_changing_of_data(self, data):
        """Calculation of total, assets and currency return.

        The input is fed processed data frame for for the appropriate
        types of return
        """

        curr_vals = data.iloc[1:]
        prev_vals = data.iloc[:-1]

        curr_vals.index = data.index[1:]
        prev_vals.index = data.index[1:]

        return (curr_vals - prev_vals) / prev_vals

    def calculate_portfolio_return(self, end_date):
        """Preparing of data to calculate assets return"""

        data = self._prices.loc[:end_date]
        return self.__calculate_changing_of_data(data)

    def calculate_currency_return(self, end_date):
        """Preparing of data to calculate currency return

        Accordance of assets and their exchange rates
        """

        data = self._exchanges.loc[:end_date].copy()
        data["USD"] = np.ones(data.shape[0])

        currency_dict = self._currencies.currency.to_dict()

        currency_for_assets = pd.DataFrame([], index=data.index)
        for i in currency_dict.keys():
            currency_for_assets[i] = data[currency_dict[i]]

        currency_return = self.__calculate_changing_of_data(
            currency_for_assets)
        return currency_return

    def calculate_total_return(self, end_date):
        """Preparing of data to calculate total return

        Accordance of assets and their exchange rates
        Multiplying prices and currencies data to get total
        value of assets
        """

        data_curr = self._exchanges.loc[:end_date].copy()
        data_curr["USD"] = np.ones(data_curr.shape[0])
        currency_dict = self._currencies.currency.to_dict()

        currency_for_assets = pd.DataFrame([], index=data_curr.index)
        for i in currency_dict.keys():
            currency_for_assets[i] = data_curr[currency_dict[i]]

        data_prices = self._prices.loc[:end_date]
        cur_pricec_mult = np.multiply(*Portfolio.shape_equalizator(
            currency_for_assets, data_prices))

        total_return = self.__calculate_changing_of_data(cur_pricec_mult)
        return total_return


class WeightedReturn(Portfolio):
    """Includes method for calculate weighted total, assets and currency
    return"""

    def calculate_weighted_return(self, end_date, portfolio_function):
        """Method for calculate weighted total, assets and currency return

        Equalize weight and assets (total or currency) return data frames
        Return weighted sum of assets values
        """

        data = portfolio_function(end_date)
        weights, data = Portfolio.shape_equalizator(
            data.loc[:end_date], self._weights.copy()[:end_date])

        mult_w_and_data = weights.values * data.values
        weighted_data = pd.DataFrame(
            mult_w_and_data, index=weights.index, columns=data.columns).sum(
            axis=1)

        return weighted_data


class Perfomance(WeightedReturn):
    """Includes method for calculate perfomance of assets"""

    @staticmethod
    def calculation_of_perfomance(data, start_date):
        """Method to calculate perfomance"""

        res = [1]
        np_data = np.array(data)

        [res.append((j + 1) * res[-1]) for i, j in enumerate(np_data)]

        return pd.Series(res[1:], index=data.index).loc[start_date:]

    def calculate_asset_perfomance(self, start_date, end_date):
        """Call methods to calculate perfomance, depending of assets prices"""

        if end_date >= start_date:
            data = self.calculate_weighted_return(
                end_date, self.calculate_portfolio_return)
            return self.calculation_of_perfomance(data, start_date)
        else:
            raise ValueError("End date less start date")

    def calculate_currency_perfomance(self, start_date, end_date):
        """Call methods to calculate perfomance, depending of
        exchange rate of assets currencies
        """

        if end_date >= start_date:
            data = self.calculate_weighted_return(
                end_date, self.calculate_currency_return)
            return self.calculation_of_perfomance(data, start_date)
        else:
            raise ValueError("End date less start date")

    def calculate_total_perfomance(self, start_date, end_date):
        """Call methods to calculate total perfomance of assets"""

        if end_date >= start_date:
            data = self.calculate_weighted_return(
                end_date, self.calculate_total_return)
            return self.calculation_of_perfomance(data, start_date)
        else:
            raise ValueError("End date less start date")


In [4]:
import unittest

class TestNotebook(unittest.TestCase):

    def setUp(self):
        weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
        prices = pd.read_csv('prices.csv', index_col='date')
        exchanges = pd.read_csv('exchanges.csv', index_col='Unnamed: 0')
        currencies = pd.read_csv('currencies.csv', index_col='Unnamed: 0')
        exchanges.fillna(1, inplace=True)
        self.portfolio = Perfomance(prices, weights, currencies, exchanges)

    def test_bad_data_format(self):
        test_start_data = '2014-01-13'
        test_end_data = '2018-03-05'
        with self.assertRaises(TypeError):
            self.portfolio.calculate_asset_perfomance(1, test_end_data)
            self.portfolio.calculate_asset_perfomance(test_end_data, 0)
            self.portfolio.calculate_total_perfomance(1, test_end_data)
            self.portfolio.calculate_total_perfomance(test_end_data, 0)
            self.portfolio.calculate_currency_perfomance(1, test_end_data)
            self.portfolio.calculate_currency_perfomance(test_end_data, 0)

    def test_date_start_after_end(self):
        test_start_data = '2018-03-05'
        test_end_data = '2014-01-13'
        with self.assertRaises(ValueError):
            self.portfolio.calculate_asset_perfomance(
                test_start_data, test_end_data)
            self.portfolio.calculate_total_perfomance(
                test_start_data, test_end_data)
            self.portfolio.calculate_currency_perfomance(
                test_start_data, test_end_data)

    def test_date_start_equal_end(self):

        """Return one-shape Series"""

        test_end_data = '2018-03-05'
        test_start_data = '2018-03-05'

        self.assertEqual(
            self.portfolio.calculate_asset_perfomance(
                test_start_data, test_end_data).shape, (1,))

        self.assertEqual(
            self.portfolio.calculate_total_perfomance(
                test_start_data, test_end_data).shape, (1,))

        self.assertEqual(
            self.portfolio.calculate_currency_perfomance(
                test_start_data, test_end_data).shape, (1,))

    def test_correct_calculation_asset_perfomance(self):
        weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
        prices = pd.DataFrame(
            [[1, 1, 1, 1, 1], [2, 2, 2, 2, 2]],
            index=weights.index[:2], columns=weights.columns)

        weights = pd.DataFrame(
            [[0.2, 0.2, 0.2, 0.2, 0.2]], index=weights.index[1:2],
            columns=weights.columns)

        test_end_data = weights.index[0]
        test_start_data = weights.index[-1]

        self.portfolio.set_prices(prices)
        self.portfolio.set_weights(weights)

        self.assertEqual(self.portfolio.calculate_asset_perfomance(
            test_start_data, test_end_data).iloc[0], 2)

    def test_correct_calculation_currency_perfomance(self):
        weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
        exchanges = pd.DataFrame(
            [[1, 1], [2, 2]], index=weights.index[:2],
            columns=['CHF', 'EUR'])

        weights = pd.DataFrame(
            [[0.2, 0.2, 0.2, 0.2, 0.2]],
            index=weights.index[1:2], columns=weights.columns)

        test_end_data = weights.index[0]
        test_start_data = weights.index[-1]

        self.portfolio.set_exchanges(exchanges)
        self.portfolio.set_weights(weights)

        self.assertEqual(self.portfolio.calculate_currency_perfomance(
            test_start_data, test_end_data).iloc[0], 1.6)

    def test_correct_calculation_total_perfomance(self):
        weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
        exchanges = pd.DataFrame(
            [[1, 1], [2, 2]], index=weights.index[:2],
            columns=['CHF', 'EUR'])

        prices = pd.DataFrame(
            [[1, 1, 1, 1, 1], [2, 2, 2, 2, 2]], index=weights.index[:2],
            columns=weights.columns)

        weights = pd.DataFrame(
            [[0.2, 0.2, 0.2, 0.2, 0.2]], index=weights.index[1:2],
            columns=weights.columns)

        test_end_data = weights.index[0]
        test_start_data = weights.index[-1]

        self.portfolio.set_exchanges(exchanges)
        self.portfolio.set_weights(weights)
        self.portfolio.set_prices(prices)

        self.assertEqual(self.portfolio.calculate_total_perfomance(
            test_start_data, test_end_data).iloc[0], 3.2)

    def test_wrong_types_dataframes(self):
        with self.assertRaises(TypeError):
            self.portfolio.set_exchanges([1, 2, 3])
            self.portfolio.set_weights('weights')
            self.portfolio.set_prices('prices')
            self.portfolio.set_currencies('currencies')

    def test_different_quantity_of_assets(self):
        weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
        exchanges = pd.DataFrame(
            [[1, 1], [2, 2]], index=weights.index[:2],
            columns=['CHF', 'EUR'])
        prices = pd.DataFrame(
            [[1, 1, 1, 1, 1], [2, 2, 2, 2, 2]],
            index=weights.index[:2], columns=weights.columns)
        weights = pd.DataFrame(
            [[0.2, 0.2, 0.2, 0.2, 0.2]],
            index=weights.index[1:2], columns=weights.columns)

        test_end_data = weights.index[0]
        test_start_data = weights.index[-1]

        self.portfolio.set_exchanges(exchanges)
        self.portfolio.set_weights(weights.iloc[:, :-1])
        self.portfolio.set_prices(prices)

        with self.assertRaises(ValueError):
            self.portfolio.calculate_total_perfomance(
                test_start_data, test_end_data)

    def test_one_and_zero_size_of_input_dataset(self):
        exchanges = pd.read_csv('exchanges.csv', index_col='Unnamed: 0')
        prices = pd.read_csv('prices.csv', index_col='date')

        self.portfolio.set_prices(prices=prices.iloc[:1])
        self.portfolio.set_exchanges(exchanges.iloc[:1])

        test_start_data = '2014-01-13'
        test_end_data = '2018-03-05'
        self.portfolio.calculate_asset_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_total_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_currency_perfomance(
            test_start_data, test_end_data)

    def test_include_zeros_or_nan_in_prices_and_exchanges_data(self):
        exchanges = pd.read_csv('exchanges.csv', index_col='Unnamed: 0')
        prices = pd.read_csv('prices.csv', index_col='date')
        prices.iloc[:5] = None
        exchanges.iloc[:5] = None
        self.portfolio.set_prices(prices)
        self.portfolio.set_exchanges(exchanges)

        test_start_data = '2014-01-13'
        test_end_data = '2018-03-05'
        self.portfolio.calculate_asset_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_total_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_currency_perfomance(
            test_start_data, test_end_data)

        prices.iloc[:5] = 0
        exchanges.iloc[:5] = 0
        self.portfolio.set_prices(prices)
        self.portfolio.set_exchanges(exchanges)

        test_start_data = '2014-01-13'
        test_end_data = '2018-03-05'
        self.portfolio.calculate_asset_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_total_perfomance(
            test_start_data, test_end_data)
        self.portfolio.calculate_currency_perfomance(
            test_start_data, test_end_data)


In [5]:
unittest.main(argv=[''], verbosity=2, exit=False);

test_bad_data_format (__main__.TestNotebook) ... ok
test_correct_calculation_asset_perfomance (__main__.TestNotebook) ... ok
test_correct_calculation_currency_perfomance (__main__.TestNotebook) ... ok
test_correct_calculation_total_perfomance (__main__.TestNotebook) ... ok
test_date_start_after_end (__main__.TestNotebook) ... ok
test_date_start_equal_end (__main__.TestNotebook)
Return one-shape Series ... ok
test_different_quantity_of_assets (__main__.TestNotebook) ... ok
test_include_zeros_or_nan_in_prices_and_exchanges_data (__main__.TestNotebook) ... ok
test_one_and_zero_size_of_input_dataset (__main__.TestNotebook) ... ok
test_wrong_types_dataframes (__main__.TestNotebook) ... ok

----------------------------------------------------------------------
Ran 10 tests in 1.137s

OK


In [6]:
weights = pd.read_csv('weights.csv', index_col='Unnamed: 0')
prices = pd.read_csv('prices.csv', index_col='date')
exchanges = pd.read_csv('exchanges.csv', index_col='Unnamed: 0')
currencies = pd.read_csv('currencies.csv', index_col='Unnamed: 0')

In [7]:
portfolio = Perfomance(prices, weights, currencies, exchanges)

exchanges.fillna(1, inplace=True)
test_start_data = '2010-01-13'
test_end_data = '2020-03-05'
portfolio.calculate_asset_perfomance(test_start_data, test_end_data).head(7)

2014-01-14    1.001019
2014-01-15    1.009386
2014-01-16    1.010742
2014-01-17    1.005987
2014-01-18    1.001750
2014-01-20    1.003562
2014-01-21    0.996108
dtype: float64

In [8]:
portfolio.calculate_currency_perfomance(test_start_data, test_end_data).head()

2014-01-14    1.019166
2014-01-15    1.016787
2014-01-16    1.016925
2014-01-17    1.020005
2014-01-18    1.020356
dtype: float64

In [9]:
portfolio.calculate_total_perfomance(test_start_data, test_end_data).head()

2014-01-14    1.020183
2014-01-15    1.026308
2014-01-16    1.027826
2014-01-17    1.026098
2014-01-18    1.026451
dtype: float64