In [44]:
import random
import pymongo
from datetime import datetime, timedelta
import pandas as pd
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
import time
import datetime

class StockDataProcessor:
    def __init__(self, mongodb_url='mongodb://localhost:27017', days=60, num_records=1000, isFlatten=False  ):
        self.client = pymongo.MongoClient(mongodb_url)
        self.db = self.client['cleanData']
        self.days = days
        self.chaeckdays = 0 # 
        self.num_records = num_records
        self.column=["Open","High","Low","Close","Adj Close","Volume","SMA5","SMA10","SMA20","SMA60","HighestHigh","LowestLow","RSV","KDJ_K","KDJ_D","KDJ_J","dif","dea","macd_hist"]
        self.leable = ["Up_3days","Down_3days","Stable_3days","Other","Volume_5avg","Volume_flag"]
        self.label_tables = ['lst_labels_train', 'lst_labels_validation', 'lst_labels_test']
        self.data_label = {table: [x['label'] for x in self.db[table].find({}, {'label': 1})] for table in self.label_tables}
        self.data_label_count = {table:0 for table in self.label_tables  }
            

    def get_data(self, input_data, afterday=0):
        id, date = input_data.split('_')
        
        date_obj = pd.to_datetime(date)
        modified_date = date_obj + pd.DateOffset(days=afterday)
        modified_date_str = modified_date.strftime('%Y-%m-%d')
        
        df = pd.DataFrame(list(self.db[id].find({'Date': {'$lte':modified_date_str}}).sort('Date', pymongo.DESCENDING).limit(self.days+afterday)))
        # df = pd.DataFrame(list(collection.find({'Date': {'$lte':date}})))
        data = df.sort_values('Date').reset_index(drop=True)
        data =  df[self.column]
        
        data_label = df[self.leable].iloc[-1]
        return data,data_label

    def normalization(self, stock_data, isFlatten):
        stock_data = pd.DataFrame(stock_data)
        stock_data['Volume'] = stock_data['Volume'].apply(lambda x: np.log(x) if x > 0 else 0)
        stock_data['Volume'] = stock_data['Volume'].apply(lambda x: np.log(x) if x > 0 else 0)
        stock_data['Volume'] = stock_data['Volume'].apply(lambda x: np.log(x) if x > 0 else 0)

        columns_to_normalize = ["Open","High","Low","Close","Adj Close","Volume","SMA5","SMA10","SMA20","SMA60","HighestHigh","LowestLow"]
        scaler = StandardScaler()
        stock_data[columns_to_normalize] = scaler.fit_transform(stock_data[columns_to_normalize])
        stock_data[["RSV","KDJ_K","KDJ_D","KDJ_J"]] = stock_data[["RSV","KDJ_K","KDJ_D","KDJ_J"]]*0.01
        

        return stock_data.values.reshape(-1) if self.isFlatten else stock_data
    
    def process_data(self, label, count_local):
        '''
        輔助 process_total 重複執行的項目
        '''
        x_datas = []
        y_datas = []
        alldata_size = len(self.data_label[label])
        while self.data_label_count[label] < alldata_size and len(y_datas) < self.num_records :
            x_data, y_data = self.get_data(self.data_label[label][self.data_label_count[label]])
            x_datas.append(self.normalization(x_data))
            y_datas.append(y_data)
            count_local += 1
            self.data_label_count[label] += 1
            
        return x_datas, y_datas
    
    def process_total(self, label):
        count_local = 0
        x_datas = []
        y_datas = []
        x_datas , y_datas = self.process_data(label,count_local)
            
        if  count_local < self.num_records:
            self.data_label_count[label]  = 0
            random.shuffle(self.data_label[label])
            x_datas2 , y_datas2 = self.process_data(label,count_local)
            x_datas.extend(x_datas2)
            y_datas.extend(y_datas2)
            

        return np.array(x_datas), np.array(y_datas)

    
s = StockDataProcessor(num_records=10)
print(s.data_label['lst_labels_train'][:3])
_,yy=s.get_data('2371.TW.csv_2013-05-20',7)

yy.to_dict()
def clean_label(s,name) :
    check = {"Up_3days": 0, "Down_3days": 0, "Stable_3days": 0, "Other": 0}
    train_list = s.data_label[name]
    
    start_time = time.time()
    not_nan_list = []
    for x in train_list:
        train, y = s.get_data(x)
        if train.isna:
            continue
        dct_y = y.to_dict()
        for key in check.keys() :
            check[key] += dct_y[key]
        not_nan_list.append(x)

    end_time = time.time()

    # 计算经过的时间
    elapsed_time = end_time - start_time

    # 打印结果
    delta_time = datetime.timedelta(seconds=elapsed_time)

    # 打印结果
    print("Elapsed time: {}".format(str(delta_time)))
    print(check)
    not_nan_list = list(set(not_nan_list))
    collection = s.db[name]
    collection.drop()
    # 將列表中的每一個元素轉為字典並插入到MongoDB
    for value in not_nan_list:
        collection.insert_one({"label": value})

s = StockDataProcessor(num_records=10)
for name in ['lst_labels_train', 'lst_labels_validation', 'lst_labels_test']:
    print(f"\n\n>>{name}")
    clean_label(s,name) 

[]


>>lst_labels_train
Elapsed time: 0:00:00
{'Up_3days': 0, 'Down_3days': 0, 'Stable_3days': 0, 'Other': 0}


>>lst_labels_validation
Elapsed time: 0:05:26.501796
{'Up_3days': 0, 'Down_3days': 0, 'Stable_3days': 0, 'Other': 0}


>>lst_labels_test
Elapsed time: 0:00:00
{'Up_3days': 0, 'Down_3days': 0, 'Stable_3days': 0, 'Other': 0}


In [None]:
import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
cpus = tf.config.experimental.list_physical_devices(device_type='CPU')
print(gpus, cpus)
physical_devices = tf.config.list_physical_devices('GPU')
tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
import numpy as np
import pandas as pd
import pymongo
import keras
import keras.backend as K
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout,Activation,Input,Flatten,BatchNormalization
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam,Adamax
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.optimizers import SGD
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import plot_model
import sklearn
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import random
from datetime import datetime, timedelta
import time 
import sys
from datetime import datetime
from tensorflow.keras.callbacks import EarlyStopping

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.callbacks import Callback

# 設置 GPU 可見性
physical_devices = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)

from keras.callbacks import Callback

# three_high_df size is 95450
# five_high_df size is 57607
# one_low_df size is 1572160
# normal_df size is 11277551

# 條件終止
class StopTraining(Callback):
    def __init__(self, monitor='val_accuracy', value=0.7, consecutive_times=3):
        super(StopTraining, self).__init__()
        self.monitor = monitor
        self.value = value
        self.consecutive_times = consecutive_times
        self.count = 0
        
    def on_epoch_end(self, epoch, logs=None):
        current = logs.get(self.monitor)
        
        # Check for NaN loss
        loss = logs.get('loss')
        if loss is not None and np.isnan(loss):
            self.model.stop_training = True
            print(f"\nLoss is NaN, training stopped.")
            return

        print('check ')
        if current is None:
            return
        if current > self.value:
            self.count += 1
            if self.count >= self.consecutive_times:
                self.model.stop_training = True
                print(f"\nReached {self.consecutive_times} consecutive times with {self.monitor} > {self.value}, training stopped.")
        # else:
        #     self.count = 0



class DataGenerator:
    def __init__(self, days=30, num_records=1000, num_epochs=20, labels=[], value=0.7):
        self.days = days
        self.num_records = num_records
        self.num_epochs = num_epochs
        self.trainData = StockDataProcessor_weight( days=days-1, num_records=num_records*3, begin='2011-01-01', end='2019-01-01', ischeckReset=True)
        self.testData = StockDataProcessor_weight( days=days-1, num_records=num_records, begin='2019-01-01', end='2022-12-31', ischeckReset=False)
        self.value_size = self.trainData.get_value_size()
        self.model = None
        self.num_steps_per_epoch = self.trainData.countNum
        
        self.count_reload = 0
        self.num_epochs = num_epochs
        self.analysis_result = {}
        self.isTest = False
        self.labels = labels
        self.train_history = None
        self.value_size = self.testData.get_value_size()
        self.stop_training_callback = StopTraining(value=value)

        
    def get_data(self,isTrain=True):
        if isTrain:
            return self.trainData.get_uneven_date()
        return self.testData.get_uneven_date()
		
    
    def batch_generator(self, num_records=None ):
        while True:
            yield self.get_data()
                    
    def batch_generator_test(self,num_records=None ):
        return self.get_data(isTrain=False )    
    
    def create_keras_model(self,print_model=True ):
        K.clear_session()
        
        input_num = 2 
        while input_num < (self.days*self.value_size):
            input_num = input_num* 2
            
        self.model = Sequential()
        self.model.add(Input(shape=(self.days* self.value_size)))
    
        self.model.add(Dense((input_num), activation=LeakyReLU(0.1)))
        self.model.add(Dense((input_num/2), activation=LeakyReLU(0.1)))
        # self.model.add(BatchNormalization())
        self.model.add(Dense((input_num/4), activation=LeakyReLU(0.1)))
        self.model.add(Dropout(0.6))
        self.model.add(Dense(128, activation=LeakyReLU(0.1)))
        self.model.add(Dense(len(self.labels),activation='softmax'))
        # self.model.add(Activation('softmax'))
                       
        if(print_model):
            print(self.model.summary())
            plot_model( self.model, to_file="model.png", show_shapes=True, show_layer_names=True,expand_nested=True,)
            img = plt.imread('model.png')
            plt.imshow(img)
            plt.show()

        # 创建 Adamax 优化器实例
        optimizer = Adamax(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7)

        # 将优化器应用于模型编译
        self.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

  
        # self.model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
 

    def reshapeResult(self, x = []):
        num = 0 
        for i in range(len(x)):
            num += x[i]*(2**i)
        return num         

    def build_model(self):

        # 建立Keras模型
        self.create_keras_model()

        self.train_data()
        
        self.model_test()

    def train_data(self):

        train_history = self.model.fit(
            x=self.batch_generator(),
            steps_per_epoch=self.num_steps_per_epoch,
            epochs=self.num_epochs,
            validation_data=self.batch_generator_test(),
            # callbacks=[self.stop_training_callback]
        )

        print(f"self.count_reload:{self.count_reload}")
        print(train_history.history.keys())

        self.show_train_history(train_history, 'accuracy', 'val_accuracy')
        self.show_train_history(train_history, 'loss', 'val_loss')

        
    def show_train_history(self, train_history,train,validation):

      plt.plot(train_history.history[train])
      plt.plot(train_history.history[validation])
      plt.title('Train history')
      plt.ylabel('train')
      plt.xlabel('epoch')

      # 設置圖例在左上角
      plt.legend(['train','validation'],loc='upper left')
      plt.show()       
        
    def model_test(self):
        test_data,y_true = self.batch_generator_test()
        loss ,acc = self.model.evaluate(test_data,y_true)
        print(loss ,acc)
        predictions = self.model.predict(test_data)
        y_pred = (predictions > 0.5).astype(int)
        y_pred = np.array( [self.reshapeResult(x) for x in y_pred ]).reshape(-1)
        y_true =np.array( [self.reshapeResult(x) for x in y_true ]).reshape(-1)
        m = confusion_matrix(y_true,y_pred)
        print(f'confusion_matrix:\n{m}')
        self.analysis_result['confusion_matrix'] = m
        
        
# test = DataGenerator(date_size=30,pick=2000,num_epochs=40)  
# test = DataGenerator(date_size=60,pick=1000,num_epochs=40)     
# three_high_normal = DataGenerator(days=120,num_records=2000,num_epochs=30,value=0.7, labels= ['three_high','normal'])
three_high_normal = DataGenerator(days=60,num_records=2000,num_epochs=30,value=0.7, labels=  ['three_days_up','three_days_down','normal_up_down'])
# three_high_normal.batch_generator_test()
three_high_normal.build_model()
# three_high_normal.model.save(f'./model/{datetime.now().strftime("%Y-%m-%d_")}three_high_normal_90')
# three_high_normal.model.save(f'./model/{datetime.now().strftime("%Y-%m-%d_")}three_high_normal_120')

# batch_gen = three_high_normal.batch_generator(num_records=10)  # 假設每批次生成 10 條記錄
# 從生成器中提取指定數量的批次並打印內容
# for i in range(3):
#     print(f"Batch {i + 1}:")
#     batch = next(batch_gen)
#     print(batch)