In [1]:
import pandas as pd
from vnstock import Vnstock
import datetime
import numpy as np
from collections import defaultdict
from scipy.optimize import minimize
import matplotlib.pyplot as plt

In [2]:


class Stock_data:
    def __init__(self,stock:Vnstock.stock):
        self.stock = stock
        #calculate date
        
        today = datetime.date.today()
        start = today.replace(year=today.year - 5)
        end = today.replace(day=today.day - 1)
        #modify history by adding shift 1 day then remove the first row
        self.history = self.stock.quote.history(start = str(start),end = str(end),interval = "1D")
        self.history["close_shift_1"] = self.history["close"].shift(1)
        self.history = self.history.iloc[1:,:]
    def hpr(self,display_option = False):

        #calculate hpr day
        self.history["hpr"] = (self.history["close"]- self.history["close_shift_1"]) / self.history["close_shift_1"]
        if display_option:
            display(self.history)
    def arithmethic_mean(self,display_option = False):
        if "hpr" not in self.history.columns:
            self.hpr(display_option=display_option)
        self.a_mean = self.history["hpr"].mean()
        return self.a_mean
    def geometric_mean(self,display_option = False):
        if "hpr" not in self.history.columns:
            self.hpr(display_option=display_option)
        growth_factors = 1 + self.history["hpr"]
        self.geo_mean = np.prod(growth_factors) ** (1 / len(growth_factors)) - 1
        return self.geo_mean
    def standard_deviation(self,display_option = False):
        if "hpr" not in self.history.columns:
            self.hpr(display_option=display_option)
        self.std_dev = self.history["hpr"].std()  
        return self.std_dev
    def annualize(self):
        if hasattr(self, 'std_dev') and hasattr(self, 'a_mean') and hasattr(self, 'geo_mean'):
            trading_days = 252  # typical number of trading days in a year
            annualized_mean = (1 + self.geo_mean) ** trading_days - 1
            annualized_std = self.std_dev * np.sqrt(trading_days)
            annualized_amean = self.a_mean * trading_days
            return {
                    "Annualized_arithmethic_mean": annualized_amean,
                    "Annualized_geometric_mean": annualized_mean,
                    "Annualized_std": annualized_std,
                    
            }
        else:
            raise AttributeError("Please calculate mean, geometric mean, and standard deviation before annualizing.")



In [3]:
from collections import defaultdict
stocks = ["ACB","VCB","VIC","HPG","FPT"]
results = defaultdict(dict)
for stock in stocks:
    stock_obj = Vnstock().stock(symbol= stock,source="VCI")
    stock_data = Stock_data(stock=stock_obj)
    results["open"] = stock_data.history["open"].copy()
    results["close"] = stock_data.history["close"].copy()
    results["symbol"] = stock



In [4]:
today = datetime.date.today()
start = today.replace(year=today.year - 5)
end = today.replace(day=today.day - 1)
stock_obj = Vnstock().stock(symbol= "ACB",source="VCI")
stock_data = stock_obj.quote.history(start = str(start),end = str(end),interval = "1D")
stock_data

Unnamed: 0,time,open,high,low,close,volume
0,2020-09-03,7.92,8.03,7.88,7.95,6080561
1,2020-09-04,7.88,7.95,7.80,7.92,4665691
2,2020-09-07,7.92,7.95,7.80,7.80,6067374
3,2020-09-08,7.80,7.84,7.73,7.73,12273484
4,2020-09-09,7.73,7.84,7.62,7.84,5021412
...,...,...,...,...,...,...
1237,2025-08-25,28.20,28.40,25.95,26.20,37410300
1238,2025-08-26,26.25,27.40,26.10,27.35,20812100
1239,2025-08-27,27.80,28.60,27.55,27.65,18831800
1240,2025-08-28,27.80,28.15,27.10,27.55,13927700


In [3]:
import pandas as pd
from vnstock import Vnstock
import datetime
import numpy as np
from collections import defaultdict
from scipy.optimize import minimize
import matplotlib.pyplot as plt
class Portfolio():
    def __init__(self, stocks: list, weight_list: list, start_year_previous=5):
        if not weight_list:
            weight_list = [1/len(stocks)]*len(stocks)
            print(weight_list) 
        if len(weight_list) != len(stocks):
            raise ValueError("Length of weight_list must equal length of stocks")
        if np.sum(weight_list) != 1:
            raise ValueError("Sum of weights must equal 1")
        self.stocks = stocks
        self.weight_list = weight_list
        self.history = defaultdict(list)
        today = datetime.date.today()  # 02:16 PM +07, Aug 29, 2025
        start = today.replace(year=today.year - start_year_previous)
        end = today.replace(day=today.day - 1)
        for i, stock in enumerate(self.stocks):
            stock_obj = Vnstock().stock(symbol=stock, source="VCI")
            stock_data = stock_obj.quote.history(start=str(start), end=str(end), interval="1D")
            if stock_data.empty:
                print(f"No data for {stock}")
                continue
            self.history["time"].append(stock_data["time"].copy())
            self.history["open"].append(stock_data["open"].copy())
            self.history["close"].append(stock_data["close"].copy())
            self.history["symbol"].append(stock)
            self.history["weight"].append(weight_list[i])
    def export_history(self):
        history_df = self.history
        return history_df
    def calculate_hpr(self, display_option=False):
        self.hpr_df = pd.DataFrame(columns=["stocks", "annualized_return"])
        for i in range(len(self.stocks)):
            data = pd.DataFrame({
                "time": self.history["time"][i],
                "close": self.history["close"][i]
            })
            data["return"] = data["close"].pct_change().dropna()
            if data["return"].empty:
                print(f"No valid returns for {self.stocks[i]}")
                continue
            overall_return = data["return"].mean() * 252  # Annualized return
            self.hpr_df = pd.concat([self.hpr_df, pd.DataFrame({"stocks": [self.stocks[i]], "annualized_return": [overall_return]})], ignore_index=True)
        if display_option:
            print(self.hpr_df)
        return self.hpr_df

    def portfolio_return(self, weights):
        hpr = pd.concat([pd.Series(self.history["close"][i]).pct_change().dropna() for i in range(len(self.stocks))], axis=1)
        if hpr.empty or hpr.isna().all().any():
            print("Warning: Empty or NaN data in returns")
            return 0.0
        hpr.columns = self.stocks
        hpr_annualized = hpr.mean() * 252  # Annualized daily returns
        return np.dot(weights, hpr_annualized)

    def portfolio_variance(self, weights):
        hpr = pd.concat([pd.Series(self.history["close"][i]).pct_change().dropna() for i in range(len(self.stocks))], axis=1)
        if hpr.empty or hpr.isna().all().any():
            print("Warning: Empty or NaN data in returns")
            return 0.0
        hpr.columns = self.stocks
        cov_matrix = hpr.cov() * 252  # Annualize covariance
        if np.any(np.isnan(cov_matrix)):
            print("Warning: NaN in covariance matrix")
            return 0.0
        return np.dot(weights.T, np.dot(cov_matrix, weights))

    def optimize_portfolio(self, target_return):
        constraints = (
            {'type': 'eq', 'fun': lambda w: np.sum(w) - 1},
            {'type': 'eq', 'fun': lambda w: self.portfolio_return(w) - target_return}
        )
        bounds = tuple((0, 1) for _ in range(len(self.stocks)))
        initial_guess = np.array(len(self.stocks) * [1. / len(self.stocks)])
        
        result = minimize(self.portfolio_variance, initial_guess, method='SLSQP', bounds=bounds, constraints=constraints)
        if result.success:
            return result.fun**0.5, target_return, result.x  # Return volatility, target return, weights
        else:
            print(f"Optimization failed for target_return={target_return}. Message: {result.message}")
            return None
    def draw_efficient_frontier(self,):
        min_return = self.hpr_df["annualized_return"].min()
        max_return = self.hpr_df["annualized_return"].max()
        if pd.isna(min_return) or pd.isna(max_return):
            print("Invalid return range. Check data.")
        else:
            target_returns = np.linspace(min_return, max_return, 50)
            frontier = []
            for tr in target_returns:
                res = self.optimize_portfolio(tr)
                if res:
                    frontier.append(res)

            if frontier:
                volatilities, returns, _ = zip(*frontier)
                plt.figure(figsize=(10, 6))
                plt.plot(volatilities, returns, 'b-', label='Effgiticient Frontier')
                plt.scatter(volatilities, returns, c='blue', s=20)
                plt.xlabel('Volatility (Standard Deviation)')
                plt.ylabel('Expected Return')
                plt.title('Efficient Frontier for Portfolio')
                plt.grid(True)
                plt.legend()
                plt.show()
            else:
                print("No valid points on the efficient frontier. Check data or target returns.")
        # Initialize portfolio
    def display_result(self):
        # Check if results exist, if not, calculate
        if not hasattr(self, "hpr_df"):
            self.calculate_hpr()
        if not hasattr(self, "history") or not self.history["close"]:
            print("No portfolio history available.")
            return

        weights = np.array(self.weight_list)
        port_return = self.portfolio_return(weights)
        port_variance = self.portfolio_variance(weights)
        port_volatility = np.sqrt(port_variance)

        # Portfolio statistics in one DataFrame
        stats_df = pd.DataFrame({
            "Expected Annualized Return": [port_return],
            "Annualized Variance": [port_variance],
            "Annualized Stdev": [port_volatility]
        })
        print("Portfolio Statistics:")
        display(stats_df)

        # Prepare returns for covariance/correlation
        returns = pd.concat([pd.Series(self.history["close"][i]).pct_change().dropna() for i in range(len(self.stocks))], axis=1)
        returns.columns = self.stocks

        cov_matrix = returns.cov() * 252
        cor_matrix = returns.corr()

        print("Covariance Matrix (annualized):")
        display(cov_matrix)
        print("Correlation Matrix:")
        display(cor_matrix)
portfolio = Portfolio(stocks=["ACB", "VCB", "VIC","HPG","VIX"], weight_list=[0.1, 0.3, 0.3,0.1,0.2], start_year_previous=25)
portfolio.display_result()



Portfolio Statistics:


The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.


Unnamed: 0,Expected Annualized Return,Annualized Variance,Annualized Stdev
0,0.22276,0.033866,0.184028


Covariance Matrix (annualized):


Unnamed: 0,ACB,VCB,VIC,HPG,VIX
ACB,0.107778,0.000533,0.001422,-0.00073,-0.000807
VCB,0.000533,0.089689,-0.00087,0.000569,0.002763
VIC,0.001422,-0.00087,0.111294,-7.6e-05,-0.000613
HPG,-0.00073,0.000569,-7.6e-05,0.126309,-0.003571
VIX,-0.000807,0.002763,-0.000613,-0.003571,0.334453


Correlation Matrix:


Unnamed: 0,ACB,VCB,VIC,HPG,VIX
ACB,1.0,0.005189,0.012879,-0.006182,-0.004089
VCB,0.005189,1.0,-0.008718,0.005225,0.01594
VIC,0.012879,-0.008718,1.0,-0.000643,-0.003207
HPG,-0.006182,0.005225,-0.000643,1.0,-0.016895
VIX,-0.004089,0.01594,-0.003207,-0.016895,1.0
