In [None]:
import os
import pandas as pd
from fredapi import Fred

# 1. API 키 설정 (변수로!)
my_api_key = '40e75c608c24193ed3c91fa450715041'
fred = Fred(api_key=my_api_key)

# 2. 가져올 시리즈 ID 설정 (여기선 연방금리)
series_id = 'FEDFUNDS'
interest_rate = fred.get_series(series_id)

# 3. pandas DataFrame으로 변환 (날짜 포함)
df = pd.DataFrame(interest_rate)
df.columns = ['interest_rate']
df.index.name = 'date'

# 4. 저장 디렉토리 및 파일명 설정
save_dir = '../data/macro'
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f'{series_id.lower()}.csv')

# 5. CSV로 저장
df.to_csv(save_path)
print(f'Data saved to: {save_path}')
print(df.head())

Data saved to: ./data/macro\fedfunds.csv
            interest_rate
date                     
1954-07-01           0.80
1954-08-01           1.22
1954-09-01           1.07
1954-10-01           0.85
1954-11-01           0.83


In [9]:
from financetoolkit import Toolkit
import os

# 📌 여기에 너의 FMP API 키 입력
api_key = "fAtjTuyk4qp6hk3rxKMbshOQKSlc6rZh"  # 예: "c1f5e2aefed14714ac12c0dd08e774a9"

# 🔍 수집할 기업 티커 (여러 개도 가능)
companies = "AAPL"

toolkit = Toolkit(
    companies,
    api_key=api_key,
    start_date="2018-01-01",  # 원하는 시작 날짜
    quarterly=False           # True: 분기 / False: 연간
)

# 📁 저장 폴더 생성
save_dir = f"../data/firm/{companies}"
os.makedirs(save_dir, exist_ok=True)


# 📊 재무제표 수집
income_statement = toolkit.get_income_statement()
balance_sheet = toolkit.get_balance_sheet_statement()
cashflow = toolkit.get_cash_flow_statement()

# 💾 CSV로 저장
balance_sheet.to_csv(f"{save_dir}/balance_sheet.csv")
income_statement.to_csv(f"{save_dir}/income_statement.csv")
cashflow.to_csv(f"{save_dir}/cashflow.csv")

print(f"{companies} 재무제표 저장 완료.")


Obtaining income data: 100%|██████████| 1/1 [00:00<00:00,  9.67it/s]
Obtaining balance data: 100%|██████████| 1/1 [00:00<00:00,  9.77it/s]
Obtaining cashflow data: 100%|██████████| 1/1 [00:00<00:00,  9.79it/s]


AAPL 재무제표 저장 완료.


In [14]:
import requests
import pandas as pd
import os

api_key = "2b42d24318ea75778896dfaafd4f9b35"
query = "Apple Inc"
url = f"https://gnews.io/api/v4/search?q={query}&lang=en&max=100&token={api_key}"

response = requests.get(url)
data = response.json()

if "articles" in data:
    articles = data["articles"]
    df = pd.DataFrame(articles)
    df["publishedAt"] = pd.to_datetime(df["publishedAt"]).dt.date  # 날짜만 추출
    save_dir = "../data/firm/AAPL"
    os.makedirs(save_dir, exist_ok=True)
    df.to_csv(f"{save_dir}/news_raw.csv", index=False)
    print(f"✅ 뉴스 {len(df)}건 저장 완료.")
else:
    print("❌ 기사 없음 or API 인증 문제:", data)


✅ 뉴스 10건 저장 완료.


In [16]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import pipeline
import pandas as pd
import os

# 모델 로드
model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# 감정 분석 파이프라인 생성
nlp = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)

# 뉴스 로드 (news_raw.csv에서)
df = pd.read_csv("../data/firm/AAPL/news_raw.csv")
df["publishedAt"] = pd.to_datetime(df["publishedAt"]).dt.date

# 감정 분석 실행 (제목 기준)
df["finbert_sentiment"] = df["title"].apply(lambda x: nlp(x)[0]["label"])

# 날짜별 감정 분포 요약
daily_sentiment = df.groupby(["publishedAt", "finbert_sentiment"]).size().unstack(fill_value=0)
daily_sentiment = daily_sentiment.reset_index()
daily_sentiment.columns.name = None
daily_sentiment.rename(columns={"publishedAt": "date"}, inplace=True)

# 저장
save_path = "../data/firm/AAPL/news_sentiment_finbert.csv"
daily_sentiment.to_csv(save_path, index=False)
print("✅ FinBERT 감정 분석 및 저장 완료.")

✅ FinBERT 감정 분석 및 저장 완료.


In [17]:
import pandas as pd

# 금리 데이터 불러오기
macro = pd.read_csv("../data/macro/fedfunds.csv")
macro["date"] = pd.to_datetime(macro["date"])

# 기업 뉴스 감정 데이터
firm_sentiment = pd.read_csv("../data/firm/AAPL/news_sentiment_finbert.csv")
firm_sentiment["date"] = pd.to_datetime(firm_sentiment["date"])

# 날짜 기준 병합 (inner 또는 outer join 가능)
df = pd.merge(macro, firm_sentiment, on="date", how="outer")

# 결측값 처리 (예: 0으로 대체)
df.fillna(0, inplace=True)

# 확인
print(df.head())

        date  interest_rate  negative  neutral
0 1954-07-01            0.8       0.0      0.0
1 1954-08-01           1.22       0.0      0.0
2 1954-09-01           1.07       0.0      0.0
3 1954-10-01           0.85       0.0      0.0
4 1954-11-01           0.83       0.0      0.0


In [25]:
import wget
import os
import zstandard as zstd
import json
import pandas as pd
from tqdm import tqdm
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# 📌 Step 1: Reddit 아카이브 다운로드 설정
month = "2023-01"  # 원하는 월 선택
file_name = f"RS_{month}.zst"
url = f"https://files.pushshift.io/reddit/submissions/{file_name}"
save_path = f"../data/individual/{file_name}"
output_path = f"../data/individual/wsb_sentiment_{month}.csv"

# 📥 Step 2: 파일 다운로드
if not os.path.exists(save_path):
    print(f"📥 다운로드 중: {file_name} ...")
    wget.download(url, out=save_path)

# 📌 Step 3: 필요한 데이터만 필터링
TARGET_SUBREDDIT = "wallstreetbets"
KEYWORDS = ["TSLA", "Tesla", "AAPL", "Apple"]
DATE_RANGE = ("2023-01-10", "2023-01-20")  # 원하는 날짜 범위

records = []

with open(save_path, 'rb') as f:
    dctx = zstd.ZstdDecompressor()
    stream_reader = dctx.stream_reader(f)

    for line in tqdm(stream_reader):
        try:
            post = json.loads(line)
            # 서브레딧 필터
            if post.get("subreddit") != TARGET_SUBREDDIT:
                continue

            # 키워드 필터 (제목 + 본문)
            title = post.get("title", "")
            selftext = post.get("selftext", "")
            if not any(keyword.lower() in (title + selftext).lower() for keyword in KEYWORDS):
                continue

            # 날짜 필터
            timestamp = pd.to_datetime(post.get("created_utc", 0), unit='s')
            if not (DATE_RANGE[0] <= str(timestamp.date()) <= DATE_RANGE[1]):
                continue

            # 필요한 데이터만 저장
            records.append({
                "date": timestamp.date(),
                "title": title,
                "text": selftext
            })

        except:
            continue

# 📌 Step 4: DataFrame으로 변환
df = pd.DataFrame(records)

# 🧠 Step 5: 감정 분석 (VADER)
analyzer = SentimentIntensityAnalyzer()
df["sentiment"] = df["title"].apply(lambda x: analyzer.polarity_scores(str(x))["compound"])

# 📊 Step 6: 날짜별 평균 감정 점수 계산
daily_sentiment = df.groupby("date")["sentiment"].mean().reset_index()
daily_sentiment.columns = ["date", "avg_reddit_sentiment"]

# 💾 Step 7: 최종 저장
os.makedirs(os.path.dirname(output_path), exist_ok=True)
daily_sentiment.to_csv(output_path, index=False)

print(f"✅ 필터링된 게시글 {len(df)}개 수집 완료 → 감정 분석 완료 → {output_path} 저장!")


📥 다운로드 중: RS_2023-01.zst ...


HTTPError: HTTP Error 521: 

In [None]:
import pandas as pd

# 경로는 프로젝트 구조에 맞게 수정
macro = pd.read_csv("../data/macro/fedfunds.csv")  # 국가 에이전트
macro["date"] = pd.to_datetime(macro["date"])

news = pd.read_csv("../data/firm/AAPL/news_sentiment_finbert.csv")  # 기업
news["date"] = pd.to_datetime(news["date"])

reddit = pd.read_csv("../data/individual/wsb_sentiment_2023-01.csv")  # 개인
reddit["date"] = pd.to_datetime(reddit["date"])

# 타겟 생성용 주가
import yfinance as yf
import numpy as np

price_df = yf.download("AAPL", start="2022-12-31", end="2023-01-25", progress=False)
price_df = price_df[["Close"]].reset_index()
price_df.columns = ["date", "close"]
price_df["date"] = pd.to_datetime(price_df["date"])
price_df["log_return"] = np.log(price_df["close"].shift(-1) / price_df["close"])
price_df["target_binary"] = (price_df["log_return"] > 0).astype(int)
target = price_df[["date", "log_return", "target_binary"]]

# 병합
merged = pd.merge(macro, news, on="date", how="outer")
merged = pd.merge(merged, reddit, on="date", how="outer")
merged = pd.merge(merged, target, on="date", how="inner")

# 결측값 처리
merged.fillna(0, inplace=True)
merged = merged.sort_values("date")

# 저장
os.makedirs("./data/merged", exist_ok=True)
merged.to_csv("./data/merged/final_model_input.csv", index=False)

print("✅ final_model_input.csv 생성 완료!")


In [None]:
import pandas as pd
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# 1. 데이터 불러오기
df = pd.read_csv("./data/merged/final_model_input.csv")
df["date"] = pd.to_datetime(df["date"])

# 2. 입력 & 타겟 분리
X = df.drop(columns=["date", "log_return", "target_binary"])
y = df["target_binary"].values

# 3. 에이전트별 컬럼 정의 (수동 설정)
gov_cols = ["interest_rate"]
firm_cols = ["positive", "negative", "neutral"]
indiv_cols = ["avg_reddit_sentiment"]

# 4. 각각 분리
X_gov = X[gov_cols].values
X_firm = X[firm_cols].values
X_indiv = X[indiv_cols].values

# 5. 정규화
scaler_gov = StandardScaler()
scaler_firm = StandardScaler()
scaler_indiv = StandardScaler()

X_gov = scaler_gov.fit_transform(X_gov)
X_firm = scaler_firm.fit_transform(X_firm)
X_indiv = scaler_indiv.fit_transform(X_indiv)

# 6. train/test split
X_gov_train, X_gov_test, y_train, y_test = train_test_split(X_gov, y, test_size=0.2, shuffle=False)
X_firm_train, X_firm_test = train_test_split(X_firm, test_size=0.2, shuffle=False)
X_indiv_train, X_indiv_test = train_test_split(X_indiv, test_size=0.2, shuffle=False)


In [None]:
import torch.utils.data as data

def to_tensor(arr):
    return torch.tensor(arr, dtype=torch.float32)

# (batch, agents, features)
X_train = torch.stack([
    to_tensor(X_gov_train),
    to_tensor(X_firm_train),
    to_tensor(X_indiv_train)
], dim=1)

X_test = torch.stack([
    to_tensor(X_gov_test),
    to_tensor(X_firm_test),
    to_tensor(X_indiv_test)
], dim=1)

y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

train_dataset = data.TensorDataset(X_train, y_train)
test_dataset = data.TensorDataset(X_test, y_test)

train_loader = data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = data.DataLoader(test_dataset, batch_size=32)

# 에이전트 타입 인덱스
agent_types = torch.tensor([0, 1, 2])


In [None]:
import torch.nn as nn
import torch.nn.functional as F

class AgentInteractionLayer(nn.Module):
    def __init__(self, input_dim, num_agent_types):
        super().__init__()
        self.relation_matrix = nn.Parameter(torch.randn(num_agent_types, num_agent_types))
        self.linear = nn.Linear(input_dim, input_dim)

    def forward(self, x, agent_types):
        batch_size, num_agents, input_dim = x.size()
        interaction = torch.zeros_like(x)

        for i in range(num_agents):
            for j in range(num_agents):
                w_ij = self.relation_matrix[agent_types[i], agent_types[j]]
                interaction[:, i, :] += w_ij * x[:, j, :]

        return F.relu(self.linear(x + interaction))

class AgentBasedPredictor(nn.Module):
    def __init__(self, input_dim, num_agent_types, hidden_dim):
        super().__init__()
        self.agent_layer = AgentInteractionLayer(input_dim, num_agent_types)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_dim * num_agent_types, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 2)
        )

    def forward(self, x, agent_types):
        x = self.agent_layer(x, agent_types)
        return self.fc(x)


In [None]:
import torch.optim as optim

input_dim = X_train.shape[-1]
model = AgentBasedPredictor(input_dim=input_dim, num_agent_types=3, hidden_dim=64)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 학습
for epoch in range(10):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch, agent_types)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"[Epoch {epoch+1}] Loss: {total_loss / len(train_loader):.4f}")


In [None]:
model.eval()
correct, total = 0, 0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch, agent_types)
        preds = torch.argmax(outputs, dim=1)
        correct += (preds == y_batch).sum().item()
        total += y_batch.size(0)

acc = correct / total
print(f"✅ Test Accuracy: {acc * 100:.2f}%")


In [None]:
import matplotlib.pyplot as plt

# 예측 vs 실제 저장
preds_list = []
model.eval()
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch, agent_types)
        preds = torch.argmax(outputs, dim=1)
        preds_list.extend(preds.tolist())

# 날짜 가져오기
dates = df["date"].iloc[-len(preds_list):].reset_index(drop=True)
actual = y_test.numpy()

# 시각화
plt.figure(figsize=(12, 4))
plt.plot(dates, actual, label="Actual", marker='o')
plt.plot(dates, preds_list, label="Predicted", marker='x')
plt.legend()
plt.title("📈 Actual vs Predicted (Up/Down)")
plt.xlabel("Date")
plt.ylabel("Direction (0=down, 1=up)")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()
