In [1]:
# import libraries

import pandas as pd
import numpy as np
import re
import os
import glob
import random
import pickle

import nltk
from nltk.stem import WordNetLemmatizer 

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
import xgboost as xgb

import torch
from transformers import AutoTokenizer, AutoModel

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import random

# 進行 10000 次試驗
num_trials = 100000

# 計數器
count_a = 0
count_b = 0 
count_c = 0

for i in range(num_trials):
    # 生成一個 0-1 之間的隨機數
    rand_num = random.random()
    
    # 根據概率判斷事件
    if rand_num < 0.2:
        count_a += 1
    elif rand_num < 0.4:
        count_b += 1
    else:
        count_c += 1

# 輸出結果
print(f"事件 A 出現了 {count_a} 次")
print(f"事件 B 出現了 {count_b} 次") 
print(f"事件 C 出現了 {count_c} 次")

事件 A 出現了 19855 次
事件 B 出現了 20065 次
事件 C 出現了 60080 次


In [3]:
nltk.download("wordnet")

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\f8210\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [4]:
grade_dict = {
'AAA': 0,
'AA+': 1,
'AA': 2,
'AA-': 3,
'A+': 4,
'A': 5,
'A-': 6,
'B+': 7,
'B': 8,
'B-': 9,
'BB+': 10,
'BB': 11,
'BB-': 12,
'BBB+': 13,
'BBB': 14,
'BBB-': 15,
'CCC+': 16,
'CCC': 17,
'CCC-': 18,
'D': 19
}

inv_grade_dict = {v: k for k, v in grade_dict.items()}

In [5]:
def clean_text(text):
    
    # 去除 HTML 標籤
    text = re.sub(r'<[^>]+>', '', text)
    
    # 去除數字
    text = re.sub(r'\d+', '', text)
    
    # 去除標點符號
    text = re.sub(r'[^\w\s]', '', text)
    
    # 去除非英文單字
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    
    # 去除換行符號
    text = re.sub(r'\n', ' ', text)
    
    # 統一為小寫
    text = text.lower()
    
    # 詞性還原
    lemmatized_text = ' '.join([WordNetLemmatizer().lemmatize(w) for w in nltk.word_tokenize(text)])
    
    return lemmatized_text

In [6]:
def data_processing(year):

    def judge(row):

        rand_num = random.random()
        
        if row["year"] > year:
            
            grade_num = row["grade_num"]
            if rand_num < 0.2:
                grade_num += 1
            elif rand_num < 0.4:
                grade_num -= 1
        else:
            grade_num = row["grade_num"]
        
        if grade_num < 0:
            grade_num = 0
        elif grade_num > 19:
            grade_num = 19
            
        return grade_num
        
    df_rate = pd.read_excel("標準普爾最新信用評級.xls", header=7)
    df_rate["year"] = df_rate["S&P Entity Credit Rating Date - Issuer Credit Rating - Local Currency LT [Latest] (Rating Date)"].dt.year
    feature_names = [col.replace('[', '').replace(']', '').replace('<', '') for col in pd.read_excel(f"NEW財務數據/財務數據/2019財務數據.xls", header=7).columns]    
    df_rate["grade_num"] = df_rate["S&P Entity Credit Rating - Issuer Credit Rating - Local Currency LT [Latest] (Rating)"].map(grade_dict)
    df_rate["grade_num"] = df_rate.apply(judge, axis=1)
    df_rate["S&P Entity Credit Rating - Issuer Credit Rating - Local Currency LT [Latest] (Rating)"] = df_rate["grade_num"].map(inv_grade_dict)
    dict_rate = dict(zip(df_rate['Exchange:Ticker'], df_rate['S&P Entity Credit Rating - Issuer Credit Rating - Local Currency LT [Latest] (Rating)']))

    df = pd.read_excel(f"NEW財務數據/財務數據/{year}財務數據.xls", header=7)
    
    # replace columns as 2019 columns
    df.columns = feature_names
    
    # map Exchange:Ticker to credit rating
    df["rate"] = df["Exchange:Ticker"].map(dict_rate)
    df['Exchange:Ticker'] = df['Exchange:Ticker'].str.split(':').str[-1]
    
    # For each ticker in the 'Exchange:Ticker' column, search for a matching text file
    for ticker in df['Exchange:Ticker']:
        txt_files = glob.glob(os.path.join(f'NEW文字檔/10-K文字檔/{year}txt/', f"{ticker}_*.txt"))
        if txt_files:
            with open(txt_files[0], 'r') as f:
                content = clean_text(f.read())
            df.loc[df['Exchange:Ticker'] == ticker, 'text'] = content
        else:
            df.loc[df['Exchange:Ticker'] == ticker, 'text'] = np.nan    

    # Create a rating map dictionary
    rating_map = {
        'AAA': 1, 'AA+': 1, 'AA': 1, 'AA-': 1, 'A+': 1, 'A': 1, 'A-': 1,
        'BBB+': 2, 'BBB': 2, 'BBB-': 2,
        'BB+': 3, 'BB': 3, 'BB-': 3,
        'B+': 4, 'B': 4, 'B-': 4, 'CCC+': 4, 'CCC': 4, 'CCC-': 4, 'D': 4,
        'NR': np.nan
    }

    # Map the credit rating values to numerical values 
    df['rate'] = df['rate'].map(lambda x: rating_map.get(x, x))
    
    # drop the nan in rate column
    df = df.dropna(subset=['rate'])

    # Replace '-' with NaN values in all columns
    for col in df.columns:
        df[col] = df[col].replace('-', np.nan)

    # Replace 'NM' with NaN values
    df = df.replace('NM', np.nan)

    # Fill NaN values with the mean
    for col in df.columns:
        if df[col].dtype != 'object':
            df[col] = df[col].fillna(df[col].mean())
    
    df = df.dropna(axis=1, how='all')
    df = df.dropna()

    # company & index mapping
    df_company = df[["Company Name", "Security Tickers","Exchange:Ticker"]]
    
    # Drop the following columns
    df = df.drop(columns=["Company Name", "Security Tickers","Exchange:Ticker"])    
    
    return df, df_company

In [7]:
df_old = pd.DataFrame()
df_old_company = pd.DataFrame()

for year in [2019,2020]:
    # Concatenate the DataFrames
    df_year, df_company = data_processing(year)
    df_old = pd.concat([df_old, df_year], ignore_index=True)
    df_old_company = pd.concat([df_old_company, df_company], ignore_index=True)

  df[col] = df[col].replace('-', np.nan)
  df = df.replace('NM', np.nan)
  df[col] = df[col].replace('-', np.nan)
  df = df.replace('NM', np.nan)


In [8]:
df_new = pd.DataFrame()
df_new_company = pd.DataFrame()

for year in [2021,2022]:
    # Concatenate the DataFrames
    df_year, df_company = data_processing(year)
    df_new = pd.concat([df_new, df_year], ignore_index=True)
    df_new_company = pd.concat([df_new_company, df_company], ignore_index=True)

  df[col] = df[col].replace('-', np.nan)
  df = df.replace('NM', np.nan)
  df[col] = df[col].replace('-', np.nan)
  df = df.replace('NM', np.nan)


In [9]:
df_old.to_excel("df_old.xlsx")
df_new.to_excel("df_new.xlsx")
df_old_company.to_excel("df_old_company.xlsx")
df_new_company.to_excel("df_new_company.xlsx")

In [10]:
for data in ["old", "new"]:

    if data == "old":
        df = df_old
    else:
        df = df_new
        
    # Sample 500 rows with a fixed random state
    df = df.sample(n=100, random_state=42)
    df.to_excel(f"df_{data}_sample.xlsx")
    df = df.reset_index(drop=True)

    # 分成文本和數值型
    df_numeric = df.drop("text", axis=1)
    df_text = df[["text","rate"]]

    ## 純數值
    
    le = LabelEncoder()

    X = df_numeric.drop('rate', axis=1)
    y = le.fit_transform(df_numeric['rate'])

    # 對 X 資料標準化
    scaler = StandardScaler().fit(X)
    X = scaler.transform(X)

    indices = range(100)
    X_train, X_test, y_train, y_test, indices_train, indices_test = train_test_split(X, y, indices, test_size=0.2, random_state=42)
    
    ## 純文本
    
    # Split the DataFrame into training and test sets
    df_train, df_test = train_test_split(df_text, test_size=0.2, random_state=42)

    # Set the device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load the DistilBERT tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
    model = AutoModel.from_pretrained("distilbert-base-uncased").to(device)

    # Tokenize the text data for the training and validation sets
    tokenized_train = tokenizer(df_train["text"].values.tolist(), padding=True, truncation=True, return_tensors="pt")
    tokenized_test = tokenizer(df_test["text"].values.tolist(), padding=True, truncation=True, return_tensors="pt")

    # Pass the tokenized text through the DistilBERT model to get the hidden states
    with torch.no_grad():
        hidden_train = model(**tokenized_train)
        hidden_test = model(**tokenized_test)

    # Get only the [CLS] token hidden states
    cls_train = hidden_train.last_hidden_state[:,0,:]
    cls_test = hidden_test.last_hidden_state[:,0,:]

    ## 數值 + 文本

    # Concatenate the [CLS] token hidden states and the general features for the training set
    x_train = torch.cat((cls_train, torch.from_numpy(X_train)), 1)
    x_test = torch.cat((cls_test, torch.from_numpy(X_test)), 1)

    # Print the shapes of the input and target tensors
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")

    # data & index storing
    with open(f'x_train_{data}.pickle', 'wb') as handle:
        pickle.dump(x_train, handle)
    with open(f'y_train_{data}.pickle', 'wb') as handle:
        pickle.dump(y_train, handle)
    with open(f'x_test_{data}.pickle', 'wb') as handle:
        pickle.dump(x_test, handle)
    with open(f'y_test_{data}.pickle', 'wb') as handle:
        pickle.dump(y_test, handle)     
    with open(f"indices_train_{data}", "wb") as handle:
        pickle.dump(indices_train, handle)        
    with open(f"indices_test_{data}", "wb") as handle:
        pickle.dump(indices_test, handle)          

x_train shape: torch.Size([80, 791])
y_train shape: (80,)
x_test shape: torch.Size([20, 791])
y_test shape: (20,)
x_train shape: torch.Size([80, 791])
y_train shape: (80,)
x_test shape: torch.Size([20, 791])
y_test shape: (20,)
