In [12]:
import pandas as pd # type: ignore
import numpy as np # type: ignore
from sklearn.model_selection import train_test_split # type: ignore
from sklearn.preprocessing import StandardScaler # type: ignore

In [13]:
# Đặt seed để tái lập ngẫu nhiên
np.random.seed(2904)

In [14]:
path = './data'

In [15]:
class MeanSquaredError:
    def forward(self, y_pred, y_true):
        self.pred = y_pred
        self.true = y_true
        return np.mean((y_pred - y_true) ** 2)

    def backward(self):
        return 2 * (self.pred - self.true) / self.true.size

In [16]:
class Linear:
    def __init__(self, input_dim, output_dim, regularization, learning_rate):
        self.W = np.random.randn(input_dim, output_dim) * 0.01  
        self.b = np.zeros((1, output_dim)) 
        self.reg = regularization  
        self.learning_rate = learning_rate

    def forward(self, X):
        self.X = X  
        return X @ self.W + self.b

    def backward(self, dLoss):
        m = self.X.shape[0]  
        dW = (self.X.T @ dLoss) / m + self.reg * self.W  
        db = np.sum(dLoss, axis=0, keepdims=True) / m 
        dX = dLoss @ self.W.T  
        self.W -= self.learning_rate * dW
        self.b -= self.learning_rate * db
        return dX, dW, db

In [17]:
class ReLU:
    def forward(self, X):
        self.X = X
        return np.maximum(0, X)

    def backward(self, dLoss):
        return dLoss * (self.X > 0)

In [18]:
class LinearAndReLU:
    def __init__(self, input_dim, output_dim, regularization, learning_rate):
        self.linear = Linear(input_dim, output_dim, regularization, learning_rate)
        self.relu = ReLU()

    def forward(self, X):
        self.X = X
        return self.relu.forward(self.linear.forward(X))

    def backward(self, dLoss):
        dLoss = self.relu.backward(dLoss)
        return self.linear.backward(dLoss)

In [19]:
def normalize(X):
    X = np.array(X, dtype=np.float64)  
    mean = np.mean(X, axis=0) 
    std = np.std(X, axis=0)   
    return (X - mean) / std

def divide_by_100(X):
    return np.array(X, dtype=np.float64) / 100

X_train = pd.read_csv(f'{path}/X_train.csv', low_memory=False)
y_train = pd.read_csv(f'{path}/Y_train.csv', low_memory=False)
if 'tradeTime' in X_train.columns:
    X_train['tradeTime'] = pd.to_datetime(X_train['tradeTime'], errors='coerce')
    X_train['construction_year'] = X_train['tradeTime'].dt.year
    X_train['construction_month'] = X_train['tradeTime'].dt.month
    X_train['construction_day'] = X_train['tradeTime'].dt.day
    X_train.drop(columns=['tradeTime'], inplace=True)
def extract_value_after_space(x):
    if isinstance(x, str):
        parts = x.split(' ')
        return parts[-1] if len(parts) > 1 else x
    return x
X_train = X_train.applymap(extract_value_after_space)
y_train = y_train.applymap(extract_value_after_space)
invalid_values = ['#NAME?', '未知', '混合结构', '钢混结构']
X_train.replace(invalid_values, np.nan, inplace=True)
y_train.replace(invalid_values, np.nan, inplace=True)

X_train = X_train.apply(pd.to_numeric, errors='coerce')
y_train = y_train.apply(pd.to_numeric, errors='coerce')

X_train.fillna(X_train.mean(), inplace=True)
y_train.fillna(y_train.mean(), inplace=True)

train_data = X_train.merge(y_train, on='ID', how='inner')
X = train_data.drop(columns=['ID', 'TARGET'])
y = train_data[['TARGET']]

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)

X_train = np.array(X_train)  
y_train = np.array(y_train)  
X_val = np.array(X_val)      
y_val = np.array(y_val)  

# Bật cái bên dưới để làm cho dữ liệu được chia dạng bình quân nha ní
# X_train = normalize(X_train)
# X_val = normalize(X_val)  
y_train = divide_by_100(y_train)
y_val = divide_by_100(y_val)   

X_test = pd.read_csv(f'{path}/X_test.csv', low_memory=False)

if 'tradeTime' in X_test.columns:
    X_test['tradeTime'] = pd.to_datetime(X_test['tradeTime'], errors='coerce')
    X_test['construction_year'] = X_test['tradeTime'].dt.year
    X_test['construction_month'] = X_test['tradeTime'].dt.month
    X_test['construction_day'] = X_test['tradeTime'].dt.day
    X_test.drop(columns=['tradeTime'], inplace=True)
    
X_test = X_test.applymap(extract_value_after_space)
X_test.replace(invalid_values, np.nan, inplace=True)
X_test = X_test.apply(pd.to_numeric, errors='coerce')
X_test.fillna(X_test.mean(), inplace=True)

X_test = np.array(X_test)
X_test = normalize(X_test)

print('Xong rồi nè bạn!!!')

  X_train = X_train.applymap(extract_value_after_space)
  y_train = y_train.applymap(extract_value_after_space)
  X_test = X_test.applymap(extract_value_after_space)


Xong rồi nè bạn!!!


In [20]:
class NeuralNetwork:
    def __init__(self, input_dim, hidden_dims, output_dim, regularization=0.01, learning_rate=1e-3):
        dimensions = [input_dim] + hidden_dims
        self.learning_rate = learning_rate;
        self.layers = []
        for i in range(1, len(dimensions)):
            self.layers.append(LinearAndReLU(dimensions[i-1], dimensions[i], regularization, learning_rate))
        self.final_layer = Linear(dimensions[-1], output_dim, regularization, learning_rate)
        self.fun_loss = MeanSquaredError()
        self.best_weights = None
        self.best_cost = float('inf')

    def forward(self, X, y=None, training=True):
        hidden = X
        for layer in self.layers:
            hidden = layer.forward(hidden)
        scores = self.final_layer.forward(hidden)
        cost = self.fun_loss.forward(scores, y) if y is not None else None
        
        if not training:
            return scores
        
        dLoss = self.fun_loss.backward() if y is not None else None
        dLoss, _, _ = self.final_layer.backward(dLoss) if y is not None else (None, None, None)
        
        for i in range(len(self.layers) - 1, -1, -1):
            dLoss, _, _ = self.layers[i].backward(dLoss)
        
        return cost

    def predict(self, X):
        return self.forward(X, training=False)
    
    def update_learning_rate(self, learning_rate):
        for layer in self.layers:
            layer.learning_rate = learning_rate
        self.final_layer.learning_rate = learning_rate

    def train(self, X_train, y_train, epochs, batch_size):
        for epoch in range(epochs):
            indices = np.arange(X_train.shape[0])
            np.random.shuffle(indices)
            X_train = X_train[indices]
            y_train = y_train[indices]

            for i in range(0, X_train.shape[0], batch_size):
                X_batch = X_train[i:i + batch_size]
                y_batch = y_train[i:i + batch_size]
                self.forward(X_batch, y_batch)

            if (epoch + 1) % 10 == 0:
                train_cost = self.forward(X_train, y_train)
                print(f"Epoch {epoch + 1}/{epochs}, Training Cost: {train_cost:.4f}")

                if train_cost < self.best_cost:
                    self.best_cost = train_cost
                    self.best_weights = {
                        'W': [layer.linear.W.copy() for layer in self.layers] + [self.final_layer.W.copy()],
                        'b': [layer.linear.b.copy() for layer in self.layers] + [self.final_layer.b.copy()]
                    }
                
            if (epoch + 1) % 30 == 0:
                self.learning_rate = self.learning_rate * 0.99
                self.update_learning_rate(self.learning_rate)

    def load_best_weights(self):
        if self.best_weights is not None:
            for i, layer in enumerate(self.layers):
                layer.linear.W = self.best_weights['W'][i].copy()
                layer.linear.b = self.best_weights['b'][i].copy()
            self.final_layer.W = self.best_weights['W'][-1].copy()
            self.final_layer.b = self.best_weights['b'][-1].copy()   

    def save_best_weights_to_csv(self, url):
        data = {
            'Layer': [],
            'Weights': [],
            'Bias': []
        }

        for i, layer in enumerate(self.layers):
            data['Layer'].append(f'Layer {i + 1}')
            data['Weights'].append(layer.linear.W.flatten())
            data['Bias'].append(layer.linear.b.flatten())

        data['Layer'].append('Final Layer')
        data['Weights'].append(self.final_layer.W.flatten())
        data['Bias'].append(self.final_layer.b.flatten())

        df = pd.DataFrame(data)
        df.to_csv(url, index=False)
print('Xong rồi nè bạn!!!')

Xong rồi nè bạn!!!


In [21]:
# Sử dụng mạng
net = NeuralNetwork(input_dim=22, hidden_dims=[60, 60], output_dim=1, regularization=0.01, learning_rate=1e-3)

# Huấn luyện mạng
net.train(X_train, y_train, epochs=200, batch_size=2)

# Dự đoán trên dữ liệu kiểm tra
y_test_predicted = net.predict(X_test)

results = pd.DataFrame({'TARGET': y_test_predicted.flatten()})
results['ID'] = range(0, len(results)) 
results = results[['ID', 'TARGET']]
results['TARGET'] = (results['TARGET'] * 100).round().astype(int)

path_out = './out'
name = 'tognoek_60_60_'

results.to_csv(f'{path_out}/{name}{(net.best_cost * 100000):.0f}.csv', index=False)
net.save_best_weights_to_csv(f'{path_out}/{name}{(net.best_cost * 100000):.0f}_w.csv')

print(f'{path_out}/{name}{(net.best_cost * 100000):.0f}')
print('Đã xong rồi !!!')

Epoch 10/200, Training Cost: 0.3719
Epoch 20/200, Training Cost: 0.3713
Epoch 30/200, Training Cost: 0.4174
Epoch 40/200, Training Cost: 0.3913
Epoch 50/200, Training Cost: 0.3793
Epoch 60/200, Training Cost: 0.3874
Epoch 70/200, Training Cost: 0.3998


KeyboardInterrupt: 