<a href="https://colab.research.google.com/github/rabbitxyt/stock/blob/main/multiple_stock.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [105]:
# 【1】安装库和检测环境

try:
    import google.auth
    import gspread
    import pandas as pd
    import numpy as np
    import pickle
    import requests
    import openpyxl
    import pytz
    from datetime import datetime, timedelta
    from googleapiclient.discovery import build
except ImportError:
    !pip install google-auth gspread gspread-formatting google-api-python-client pandas requests openpyxl pytz

In [106]:
# 检测是否在Google Colab环境中运行
try:
    from google.colab import drive, auth  # 用于在Google Colab中挂载Google Drive和进行用户认证
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    # 在Google Colab中运行的设置
    drive.mount('/content/drive') # 挂载Google Drive
    auth.authenticate_user() # 认证和授权
    from google.auth import default
    creds, _ = default()
else:
    # 在本地运行的设置，使用服务账号凭证文件
    from google.oauth2.service_account import Credentials
    creds = Credentials.from_service_account_file('path/to/your/service_account.json')  # 请替换为你的服务账号文件路径

# 使用凭证登录
gc = gspread.authorize(creds)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [112]:
# 股票代码列表
# stock_codes = ["AAPL", "MSFT", "GOOGL"]  # 可以根据需要添加更多股票代码
# 从TXT文件读取股票代码列表
def read_stock_codes_from_txt(file_path):
    with open(file_path, 'r') as file:
        stock_codes = file.read().splitlines()
    return stock_codes

# 读取股票代码
txt_file_path = '/content/drive/My Drive/stock_codes.txt'
stock_codes = read_stock_codes_from_txt(txt_file_path)

# 获取股票数据的函数
def get_stock_data(stock_code, api_key):
    base_url = "https://www.alphavantage.co/query"
    params = {
        "function": "TIME_SERIES_WEEKLY",
        "symbol": stock_code,
        "apikey": api_key
    }
    response = requests.get(base_url, params=params)
    data = response.json()
    return data

# 设置 API Key
api_key = "VAVS6TS3HVTPGWQ9"

# 获取并保存每个股票的数据
stock_data_dict = {}
for stock_code in stock_codes:
    stock_data = get_stock_data(stock_code, api_key)
    stock_data_dict[stock_code] = stock_data

# 保存到本地文件
import pickle
with open('/content/drive/My Drive/stock_data_dict.pkl', 'wb') as f:
    pickle.dump(stock_data_dict, f)

In [None]:
# 从本地文件加载股票数据 5s
import pickle
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import pytz

with open('/content/drive/My Drive/stock_data_dict.pkl', 'rb') as f:
    stock_data_dict = pickle.load(f)

# 检查Google Drive中是否已经存在名为"Stock"的文件
try:
    spreadsheet = gc.open("Stock")
    print("Existing 'Stock' spreadsheet found and will be overwritten.")
    # 删除默认的 Sheet1
    worksheet = spreadsheet.sheet1
    spreadsheet.del_worksheet(worksheet)
except gspread.exceptions.SpreadsheetNotFound:
    spreadsheet = gc.create("Stock")
    print("Creating new 'Stock' spreadsheet.")


# 定义数据处理和存储的函数
def process_and_store_stock_data(stock_code, data, spreadsheet):
    # 准备 DataFrame 数据
    time_series = data.get("Weekly Time Series", {})
    original_data = {
        "Date": [],
        "Close": [],
        "Volume": []
    }

    for date, metrics in time_series.items():
        original_data["Date"].append(date)
        original_data["Close"].append(float(metrics["4. close"]))
        original_data["Volume"].append(int(metrics["5. volume"]))

    original_data = pd.DataFrame(original_data)

    # Check if the DataFrame is empty before proceeding
    if original_data.empty:
        print(f"No data found for {stock_code}. Skipping...")
        return

    original_data['Date'] = pd.to_datetime(original_data['Date'])
    original_data['Volume'] = pd.to_numeric(original_data['Volume'], errors='coerce')

    # 获取当前时间并指定时区，例如使用美国东部时间（ET）
    now = datetime.now(pytz.timezone('US/Eastern'))

    # 判断今天是星期几
    weekday = now.weekday()

    # 判断是否已经过了周五的市场收盘时间（下午5点）
    friday_close_time = now.replace(hour=17, minute=0, second=0, microsecond=0)

    # 如果今天是周六(5)或周日(6)或者是周五且已经过了收盘时间
    if weekday > 5 or (weekday == 5 and now > friday_close_time):
        # 获取本周日的日期
        end_date = now + timedelta(days=(6 - weekday))
    else:
        # 获取上周日的日期
        end_date = now - timedelta(days=(weekday + 1))

    # 将 end_date 转换为不含时区信息的 datetime 对象
    end_date = end_date.replace(tzinfo=None)

    # 过滤只保留一周收盘后的数据
    original_data = original_data[original_data['Date'] <= end_date].reset_index(drop=True)

    # 添加 Close_Change 列
    original_data['Close_Change'] = 'NA'
    for i in range(len(original_data) - 1):
        if original_data.loc[i, 'Close'] > original_data.loc[i + 1, 'Close']:
            original_data.loc[i, 'Close_Change'] = 'Up'
        elif original_data.loc[i, 'Close'] < original_data.loc[i + 1, 'Close']:
            original_data.loc[i, 'Close_Change'] = 'Down'

    # 添加 HighOrLow 列
    original_data['HighOrLow'] = 'NA'
    if original_data.loc[0, 'Close_Change'] == 'Up':
        original_data.loc[0, 'HighOrLow'] = 'High'
    elif original_data.loc[0, 'Close_Change'] == 'Down':
        original_data.loc[0, 'HighOrLow'] = 'Low'

    for i in range(1, len(original_data) - 1):
        current_close = original_data.loc[i, 'Close']
        previous_close = original_data.loc[i - 1, 'Close']
        next_close = original_data.loc[i + 1, 'Close']
        if current_close > previous_close and current_close > next_close:
            original_data.loc[i, 'HighOrLow'] = 'High'
        elif current_close < previous_close and current_close < next_close:
            original_data.loc[i, 'HighOrLow'] = 'Low'
        else:
            original_data.loc[i, 'HighOrLow'] = ''

    # 添加 OBV 列
    original_data['OBV'] = 0
    for i in range(len(original_data) - 2, -1, -1):
        if original_data.loc[i, 'Close_Change'] == 'Down':
            original_data.loc[i, 'OBV'] = original_data.loc[i + 1, 'OBV'] - original_data.loc[i, 'Volume']
        elif original_data.loc[i, 'Close_Change'] == 'Up':
            original_data.loc[i, 'OBV'] = original_data.loc[i + 1, 'OBV'] + original_data.loc[i, 'Volume']
    obv_column = original_data.pop('OBV')
    original_data.insert(original_data.columns.get_loc('Volume'), 'OBV', obv_column)

    # 初始化新列
    original_data['Index'] = 0
    original_data['Found_Row'] = pd.NA
    original_data['Found_Date'] = pd.NaT
    original_data['Found_Close'] = np.nan
    original_data['Found_OBV'] = np.nan

    # 寻找背离的数据点
    for i in range(len(original_data)):
        if original_data.loc[i, 'HighOrLow'] == 'High':
            for j in range(i + 1, len(original_data)):
                if original_data.loc[j, 'HighOrLow'] == 'High':
                    original_data.loc[i, 'Found_Row'] = j
                    original_data.loc[i, 'Found_Date'] = original_data.loc[j, 'Date']
                    original_data.loc[i, 'Found_Close'] = float(original_data.loc[j, 'Close'])
                    original_data.loc[i, 'Found_OBV'] = int(original_data.loc[j, 'OBV'])
                    if original_data.loc[i, 'Close'] > original_data.loc[j, 'Close'] and original_data.loc[i, 'OBV'] < original_data.loc[j, 'OBV']:
                        original_data.loc[i, 'Index'] = 1
                    elif original_data.loc[i, 'Close'] < original_data.loc[j, 'Close'] and original_data.loc[i, 'OBV'] > original_data.loc[j, 'OBV']:
                        original_data.loc[i, 'Index'] = 2
                    break
        elif original_data.loc[i, 'HighOrLow'] == 'Low':
            for j in range(i + 1, len(original_data)):
                if original_data.loc[j, 'HighOrLow'] == 'Low':
                    original_data.loc[i, 'Found_Row'] = j
                    original_data.loc[i, 'Found_Date'] = original_data.loc[j, 'Date']
                    original_data.loc[i, 'Found_Close'] = float(original_data.loc[j, 'Close'])
                    original_data.loc[i, 'Found_OBV'] = int(original_data.loc[j, 'OBV'])
                    if original_data.loc[i, 'Close'] < original_data.loc[j, 'Close'] and original_data.loc[i, 'OBV'] > original_data.loc[j, 'OBV']:
                        original_data.loc[i, 'Index'] = 3
                    elif original_data.loc[i, 'Close'] > original_data.loc[j, 'Close'] and original_data.loc[i, 'OBV'] < original_data.loc[j, 'OBV']:
                        original_data.loc[i, 'Index'] = 4
                    break

    # 填充 NA 值以便于转换为 int
    original_data['Found_OBV'] = original_data['Found_OBV'].fillna(0).astype(int)

    # 将 Date 和 Found_Date 列转换为字符串
    original_data['Date'] = original_data['Date'].dt.strftime('%Y-%m-%d')
    original_data['Found_Date'] = original_data['Found_Date'].dt.strftime('%Y-%m-%d')

    # 将其他 NA 值转换为适合的格式
    original_data['Found_Close'] = original_data['Found_Close'].fillna(0).astype(float)
    original_data['Found_Row'] = original_data['Found_Row'].fillna(0).astype(int)
    original_data['Index'] = original_data['Index'].fillna(0).astype(int)

    # 确保所有值在合理范围内
    original_data = original_data.replace([np.inf, -np.inf], np.nan).fillna(0)

    # 只保留 "HighOrLow" 列不为空字符串的行
    filtered_data = original_data[original_data['HighOrLow'] != '']
    filtered_data = filtered_data[filtered_data['Index'].isin([1, 2, 3])]

    # 选择需要显示的列
    selected_columns = ["Found_Date", "Found_Close", "Found_OBV", "Date", "Close", "OBV", "Index"]
    filtered_data = filtered_data[selected_columns]

    # 将 'Close' 和 'Found_Close' 列格式化为小数点后两位并转换为字符串
    filtered_data['Close'] = filtered_data['Close'].map('{:.2f}'.format)
    filtered_data['Found_Close'] = filtered_data['Found_Close'].map('{:.2f}'.format)

    # 缩小 'OBV' 和 'Found_OBV' 列的值，并取整
    scale_factor = 1e7  # 设置缩放因子
    filtered_data['OBV'] = (filtered_data['OBV'] / scale_factor).round().astype(int)
    filtered_data['Found_OBV'] = (filtered_data['Found_OBV'] / scale_factor).round().astype(int)

    # 创建或清空工作表
    try:
        worksheet = spreadsheet.worksheet(stock_code)
        worksheet.clear()
    except gspread.exceptions.WorksheetNotFound:
        worksheet = spreadsheet.add_worksheet(title=stock_code, rows="1000", cols="20")

    # 将 DataFrame 转换为列表格式，并插入 Google Sheets
    worksheet.update([filtered_data.columns.values.tolist()] + filtered_data.values.tolist())

    print(f"Data for {stock_code} successfully added to the Google Sheets.")

  # 处理并存储每个股票的数据
for stock_code in stock_data_dict:
    stock_data = stock_data_dict[stock_code]
    process_and_store_stock_data(stock_code, stock_data, spreadsheet)

In [None]:
''' # 使用 gspread-formatting 设置高亮颜色 非常慢，三个表跑了1m25s，在Apps Script跑也差不多时间

from gspread_formatting import CellFormat, Color, format_cell_range

# 定义颜色
def get_color(index):
    if index == 3:
        return Color(0.56, 0.93, 0.56)  # lightgreen
    elif index == 2:
        return Color(1.0, 0.65, 0.0)    # orange
    elif index == 1:
        return Color(1.0, 0.63, 0.48)  # lightcoral
    return Color(1, 1, 1)  # default to white

# 遍历所有的 tab 应用格式化
for stock_code in stock_data_dict:
    worksheet = spreadsheet.worksheet(stock_code)
    filtered_data = original_data[original_data['HighOrLow'] != '']
    filtered_data = filtered_data[filtered_data['Index'].isin([1, 2, 3])]

    # 获取所有行数
    rows = len(filtered_data)

    # 创建格式化请求
    for i in range(1, rows + 1):
        color = get_color(filtered_data['Index'].iloc[i - 1])
        cell_format = CellFormat(backgroundColor=color)
        try:
            format_cell_range(worksheet, f"A{i+1}:G{i+1}", cell_format)
        except Exception as e:
            print(f"Error formatting range A{i+1}:G{i+1} - {e}")

print("Styled data successfully added to the Google Sheets.") '''