# weather prediction

## 1.Start from the simple markov chain

In [4]:
import random

class LYMarkovChain:
    def __init__(self) -> None:
        self.state = ["sunny", "rain", "cloudy", "snow"]
        self.trainsition_matrix = {
            "sunny":{"sunny":0.6, "rain":0.12, "cloudy":0.26, "snow":0.02},
            "rain":{"sunny":0.22, "rain":0.6, "cloudy":0.16, "snow":0.02},
            "cloudy":{"sunny":0.18, "rain":0.2, "cloudy":0.5, "snow":0.12},
            "snow":{"sunny":0.3, "rain":0.12, "cloudy":0.18, "snow":0.4},
        }

    def predict(self, current_weather):
        transition_prob = self.trainsition_matrix[current_weather]
        next_weather = random.choices(self.state, weights=transition_prob.values())[0]
        return next_weather
    
weather_chain = LYMarkovChain()
curr_weather = "sunny"
print(f"current weather: {curr_weather}")
pred_days = 7

for day in range(pred_days):
    print(f"day {day+1}")
    print(curr_weather)
    curr_weather = weather_chain.predict(curr_weather)

current weather: sunny
day 1
sunny
day 2
sunny
day 3
sunny
day 4
sunny
day 5
rain
day 6
rain
day 7
sunny


In [1]:
import numpy as np

class HiddenMarkovModel:
    def __init__(self, n_states, n_observations):
        self.n_states = n_states
        self.n_observations = n_observations
        self.transition_matrix = None
        self.emission_matrix = None
        self.initial_state_probs = None
    
    def fit(self, training_data, n_iter=100):
        # Initialize parameters randomly
        self.transition_matrix = np.random.rand(self.n_states, self.n_states)
        self.transition_matrix /= np.sum(self.transition_matrix, axis=1, keepdims=True)
        
        self.emission_matrix = np.random.rand(self.n_states, self.n_observations)
        self.emission_matrix /= np.sum(self.emission_matrix, axis=1, keepdims=True)
        
        self.initial_state_probs = np.ones(self.n_states) / self.n_states
        
        # Train parameters using the Baum-Welch algorithm (EM)
        for _ in range(n_iter):
            alpha = self.forward(training_data)
            beta = self.backward(training_data)
            gamma = alpha * beta
            xi = self.compute_xi(alpha, beta, training_data)
            
            # Update parameters
            self.update_parameters(gamma, xi, training_data)
    
    def forward(self, observations):
        T = len(observations)
        alpha = np.zeros((T, self.n_states))  
        alpha[0] = self.initial_state_probs * self.emission_matrix[:, observations[0]]
        
        for t in range(1, T):
            alpha[t] = np.dot(alpha[t-1], self.transition_matrix) * self.emission_matrix[:, observations[t]]
        
        return alpha
    
    def backward(self, observations):
        T = len(observations)
        beta = np.zeros((T, self.n_states))
        beta[-1] = 1
        
        for t in range(T - 2, -1, -1):
            beta[t] = np.dot(self.transition_matrix, self.emission_matrix[:, observations[t+1]] * beta[t+1])
        
        return beta
    
    def compute_xi(self, alpha, beta, observations):
        T = len(observations)
        xi = np.zeros((T - 1, self.n_states, self.n_states))

        for t in range(T - 1):
            alpha_t = alpha[t].reshape(1, -1)
            emission = self.emission_matrix[:, observations[t + 1]].reshape(1, -1)
            transition = self.transition_matrix * emission
            numerator = np.dot(alpha_t, transition)
            denominator = np.sum(numerator)
            xi[t] = numerator / denominator


        return xi
    
    def update_parameters(self, gamma, xi, observations):
        T = len(observations)
        self.initial_state_probs = gamma[0] / np.sum(gamma[0])
        
        for i in range(self.n_states):
            for j in range(self.n_states):
                self.transition_matrix[i, j] = np.sum(xi[:, i, j]) / np.sum(gamma[:, i])
        
        for j in range(self.n_states):
            for k in range(self.n_observations):
                filter_mask = (observations == k)
                self.emission_matrix[j, k] = np.sum(gamma[filter_mask, j]) / np.sum(gamma[:, j])

    def predict(self, observations):
        # Use the Viterbi algorithm to find the most likely hidden states
        T = len(observations)
        delta = np.zeros((T, self.n_states))
        psi = np.zeros((T, self.n_states), dtype=int)
        delta[0] = self.initial_state_probs * self.emission_matrix[:, observations[0]]
        
        for t in range(1, T):
            for j in range(self.n_states):
                delta[t, j] = np.max(delta[t-1] * self.transition_matrix[:, j]) * self.emission_matrix[j, observations[t]]
                psi[t, j] = np.argmax(delta[t-1] * self.transition_matrix[:, j])
        
        # Backtrack to find the most likely sequence of hidden states
        states = [np.argmax(delta[-1])]
        for t in range(T - 2, -1, -1):
            states.append(psi[t+1, states[-1]])
        states.reverse()
        
        return states

# Example usage
# Define the number of hidden states and observable states
n_hidden_states = 2  # Bullish and Bearish
n_observable_states = 3  # Positive, Neutral, and Negative

# Initialize the HMM
hmm_model = HiddenMarkovModel(n_hidden_states, n_observable_states)

# Training data (sequence of observable states, e.g., daily stock price changes)
training_data = np.array([0, 1, 2, 0, 1, 2, 2, 1, 0])  # Example data

# Train the model
hmm_model.fit(training_data)

# Predict future market conditions
observed_data = np.array([2,1,0])  # Example observed data
print(observed_data)
predicted_states = hmm_model.predict(observed_data)
print("Predicted states:", predicted_states)


[2 1 0]
Predicted states: [0, 0, 0]


  xi[t] = numerator / denominator
  self.initial_state_probs = gamma[0] / np.sum(gamma[0])
  self.emission_matrix[j, k] = np.sum(gamma[filter_mask, j]) / np.sum(gamma[:, j])
