(a)
Make an ‘agent’ class with the ‘name’, ‘credit’, ‘holdings’, and ‘stock_data’ as object variables in
‘Agent.py’.


(b)
Make a ‘trading’ method that if the price rises for two consecutive days, then buy as much as one
can and if the price goes down after bought it, then sell all. No action is taken in other cases. Both
‘credit’ and ‘holdings’ should change. If the agent receives daily stock prices, then save it to ‘stock_data’.

In [None]:
import numpy as np
import pandas as pd

In [None]:
class Agent:

    def __init__ (self, name="The agent"):
        '''create a agent with given name having $1000
        
        Arguments
        name [str]: name of the agent (default: The agent)
        '''
        self._name = name
        self._credit = 1000
        self._holdings = pd.DataFrame(index=["avg_price", "num"], dtype="float")
        self._stock_data = {}
        self._today_stock_data = {}

        print(f"{self._name} is created.")

    def check(self):
    
        '''print current state(credit and holdings) of the agent
        '''
        print(f"credit : {self._credit}")
        print(f"holding : {self.df_to_list(self._holdings)}")
    
    def recieve_stock_data(self, new_stock_data: list):
        '''update self._stock_data DataFrame from given new_stock_data

        Arguments
        new_stock_data [list(list(str, float))]: i.e., [["APPL", 143.1], ["GOOGL", 500.2]]
        '''
        self.update_stock_data(new_stock_data)
        self._today_stock_data = dict(new_stock_data)

    def buy(self, buy_inform : list):
        '''buy stocks according to buy_inform

        Arguments
        buy_inform [list]: information of the stock to buy [ticker : str, price : float|int, quantity : int]
        '''
        ticker, price, quantity = buy_inform

        if not quantity > 0:
            return

        if price*quantity > self._credit:
            print(f"{self._name} can't buy.")
            return
        
        if ticker in self._holdings.columns:
            ticker_holdings = self._holdings[ticker]
            prev_avg_price = ticker_holdings["avg_price"]
            prev_num = ticker_holdings["num"]
            ticker_holdings["avg_price"] = (prev_avg_price*prev_num + price*quantity)/(prev_num + quantity)
            ticker_holdings["num"] += quantity
        else:
            self._holdings[ticker] = [price, quantity]

        self._credit -= price*quantity

        print(f"{self._name} buys {int(quantity)} {ticker} for {price}.")
        return

    def sell(self, sell_inform : list):
        '''sell stocks according to sell_inform

        Arguments
        sell_inform [list]: information of the stock to sell [ticker : str, price : float|int, quantity : int]
        '''
        ticker, price, quantity = sell_inform

        if not quantity > 0:
            return

        if not ticker in self._holdings.columns or quantity > self._holdings[ticker]["num"]:
            print(f"{self._name} can't sell.")
            return
        
        ticker_holdings = self._holdings[ticker]
        ticker_holdings["num"] -= quantity
        if ticker_holdings["num"] == 0:
            print("drop")
            self._holdings.drop(columns=[ticker], inplace=True)
        self._credit += price*quantity

        print(f"{self._name} sells {int(quantity)} {ticker} for {price}.")
        return

    def trading(self, strategy, **kwargs):
        '''trade w.r.t given strategy
        strategy can get arguments by kwargs

        Parameters
        strategy [func]: function that return a normalized vector that indicates
        to buy or sell each stock
        '''
        strategy_result = strategy(self._today_stock_data, self._stock_data, **kwargs)
        assert len(self._stock_data.keys()) == len(strategy_result)

        for ticker, order in zip(self._stock_data.keys(), strategy_result):
            if order == 1:
                price = self._today_stock_data[ticker]
                max_quantity = self._credit // price
                if max_quantity > 0:
                    self.buy([ticker, price, max_quantity])
                    return ticker, price, max_quantity
            elif order == -1 and ticker in self._holdings:
                price = self._today_stock_data[ticker]
                max_quantity = self._holdings[ticker]["num"]
                if max_quantity > 0:
                    self.sell([ticker, price, max_quantity])
                    return ticker, price, max_quantity
            else:
                pass
        
        return None

    def update_stock_data(self, new_stock_data):
        '''update _stock_data [dict] w.r.t new_stock_data

        Parameters
        new_stock_data [list]: [list(list(str, float))]: i.e., [["APPL", 143.1], ["GOOGL", 500.2]]
        '''
        for ticker, price in new_stock_data:
            if ticker in self._stock_data.keys():
                self._stock_data[ticker].append(price)
            else:
                self._stock_data[ticker] = [price]
    
    def df_to_list(self, df: pd.DataFrame):
        '''convert dataframe object to list

        Arguments
        df [pd.DataFrame]: DataFrame of holdings (index: avg_price, num; columns: tickers)
        '''
        return [[ticker] + df[ticker].tolist() for ticker in df.columns]

In [None]:
def consecutive_strategy_single(historical_stock_data: list, consec_inc: int, consec_dec: int):
    '''consecutive strategy method for a single stock(ticker)
    buy all if the price rises for consec_inc consecutive days
    sell all if the price goes down for consec_dec consecutive days

    Parameter
    historical_stock_data [list]: historical stock price data including today
    consec_inc [int]: consecutive increasing days
    consec_dec [int]: consecutive decreasing days
    '''
    if len(historical_stock_data) > consec_inc:
        if np.all(np.diff(np.array(historical_stock_data[-consec_inc-1:]))>0):
            return 1
    
    if len(historical_stock_data) > consec_dec:
        if np.all(np.diff(np.array(historical_stock_data[-consec_dec-1:]))<0):
            return -1
    
    return 0

def consecutive_strategy(today_stock_data: dict, historical_stock_data: dict, **kwargs):
    '''accumulate consecutive strategy methods for all stocks that have today's data

    Parameter
    today_stock_data [dict]: today stock data to get tradable stocks
    historical_stock_data [dict]: historical price data
    '''
    strategy_vector = []
    for key in today_stock_data.keys():
        strategy_vector.append(consecutive_strategy_single(historical_stock_data[key], **kwargs))
    return strategy_vector

In [None]:
agent1 = Agent()

In [None]:
agent1.recieve_stock_data([["APPL", 123.2]])

In [None]:
agent1.recieve_stock_data([["APPL", 125.2]])
agent1.trading(consecutive_strategy, consec_inc=2, consec_dec=1)

In [None]:
agent1.recieve_stock_data([["APPL", 126.2]])
agent1.trading(consecutive_strategy, consec_inc=2, consec_dec=1)

In [None]:
agent1.recieve_stock_data([["APPL", 127.2]])
agent1.trading(consecutive_strategy, consec_inc=2, consec_dec=1)

In [None]:
agent1.recieve_stock_data([["APPL", 124.2]])
agent1.trading(consecutive_strategy, consec_inc=2, consec_dec=1)

In [None]:
agent1.recieve_stock_data([["APPL", 121.2]])
agent1.trading(consecutive_strategy, consec_inc=2, consec_dec=1)

In [None]:
agent1.check()