In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scrape_pastec.py
基于用户提供的选择器 JSON 自动抓取页面并保存为 CSV。
依赖: requests, beautifulsoup4, pandas
如果页面大量 JS 渲染，请使用脚本底部的提示改用 Selenium。
"""

import json
import csv
import time
from typing import Dict, List
import requests
from bs4 import BeautifulSoup

# --------- 把用户提供的选择器 JSON 放在这里（直接拷贝你的 JSON） ----------
selectors_json = {
  "_id":"shop11",
  "startUrl":["https://pastec.net/iphone"],
  "selectors":[
    {"elementLimit":0,"id":"record_wrapper","multiple":True,"parentSelectors":["_root"],"scroll":False,"selector":"table","type":"SelectorElement"},
    {"id":"data","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"h2","type":"SelectorText"},
    {"id":"iPhone17 Pro Max","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"button[data-device-id='1523']","type":"SelectorText"},
    {"id":"iPhone17 Pro Max2","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] span:nth-of-type(4)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max3","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] span:nth-of-type(2)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max4","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] span:nth-of-type(3)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max5","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] span:nth-of-type(1)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max6","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] a","type":"SelectorText"},
    {"id":"iPhone17 Pro Max7","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='11'] div.p-priceTable__name","type":"SelectorText"},
    {"id":"iPhone17 Pro Max8","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"button[data-device-id='1522']","type":"SelectorText"},
    {"id":"iPhone17 Pro Max9","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] span:nth-of-type(4)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max10","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] span:nth-of-type(2)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max11","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] span:nth-of-type(4)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max12","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] span:nth-of-type(3)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max13","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] span:nth-of-type(1)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max14","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] span:nth-of-type(3)","type":"SelectorText"},
    {"id":"iPhone17 Pro Max15","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] a","type":"SelectorText"},
    {"id":"iPhone17 Pro Max16","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='10'] div.p-priceTable__name","type":"SelectorText"},
    {"id":"iPhone17 Pro","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"button[data-device-id='1518']","type":"SelectorText"},
    {"id":"iPhone17 Pro2","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] span:nth-of-type(2)","type":"SelectorText"},
    {"id":"iPhone17 Pro3","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] span:nth-of-type(1)","type":"SelectorText"},
    {"id":"iPhone17 Pro4","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] a","type":"SelectorText"},
    {"id":"iPhone17 Pro5","multiple":False,"multipleType":"singleColumn","parentSelectors":["record_wrapper"],"regex":"","selector":"[data-storage-id='9'] div.p-priceTable__name","type":"SelectorText"}
  ]
}

# ----------------- 配置 -----------------
START_URLS = selectors_json.get("startUrl", [])
OUTPUT_CSV = "localdata/shop11.csv"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
REQUEST_TIMEOUT = 15
RETRIES = 2
DELAY_BETWEEN_REQUESTS = 1.0

# ----------------- 工具函数 -----------------
def fetch(url: str) -> str:
    last_exc = None
    for attempt in range(1, RETRIES + 1):
        try:
            r = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
            r.raise_for_status()
            return r.text
        except Exception as e:
            last_exc = e
            time.sleep(1.0 * attempt)
    raise last_exc

def text_or_empty(elem):
    if elem is None:
        return ""
    # get_text with stripping
    return " ".join(elem.stripped_strings)

# ----------------- 解析选择器结构，按 parentSelectors 分组 -----------------
# 我们把针对 record_wrapper 的字段找出来（parentSelectors 包含 'record_wrapper'）
field_selectors = []
for s in selectors_json.get("selectors", []):
    if "record_wrapper" in s.get("parentSelectors", []):
        field_selectors.append({
            "id": s.get("id"),
            "selector": s.get("selector"),
            "type": s.get("type"),
            "multiple": s.get("multiple", False)
        })

# 把所有字段 id 作为 CSV header（保持顺序）
csv_headers = [fs["id"] for fs in field_selectors]

def scrape_page(html: str) -> List[Dict[str, str]]:
    soup = BeautifulSoup(html, "html.parser")
    # 找到 record_wrapper：用户定义 selector 是 "table"
    wrappers = soup.select("table")
    results = []
    for idx, wrapper in enumerate(wrappers):
        row = {}
        for fs in field_selectors:
            sel = fs["selector"]
            try:
                # 在 wrapper 范围内进行查询
                if fs["multiple"]:
                    elems = wrapper.select(sel)
                    # 多个元素则把文本用 '|' 连接
                    texts = [text_or_empty(e) for e in elems]
                    row[fs["id"]] = "|".join(texts) if texts else ""
                else:
                    elem = wrapper.select_one(sel)
                    row[fs["id"]] = text_or_empty(elem)
            except Exception as e:
                # 任何异常都写空字符串（并继续）
                row[fs["id"]] = ""
        results.append(row)
    return results

def main():
    all_rows = []
    for url in START_URLS:
        print(f"Fetching: {url}")
        html = fetch(url)
        rows = scrape_page(html)
        print(f"Found {len(rows)} record_wrapper(s).")
        all_rows.extend(rows)
        time.sleep(DELAY_BETWEEN_REQUESTS)

    if not all_rows:
        print("Warning: 没有抓到任何数据。请确认页面是否需要 JavaScript 渲染（如果是，请使用 Selenium）。")
    # 写 CSV
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=csv_headers)
        writer.writeheader()
        for r in all_rows:
            writer.writerow(r)
    print(f"Saved {len(all_rows)} rows to {OUTPUT_CSV}")

if __name__ == "__main__":
    main()


Fetching: https://pastec.net/iphone
Found 38 record_wrapper(s).
Saved 38 rows to localdata/pastec_iphone.csv


In [6]:
from datetime import datetime
import pytz

# 获取日本当前时间 (JST)
jst_tz = pytz.timezone('Asia/Tokyo')
jst_time = datetime.now(jst_tz)

# 转换为 UTC 时间
utc_time = jst_time.astimezone(pytz.UTC)

# 格式化输出
print("日本时间 (JST):", jst_time.strftime('%Y-%m-%d %H:%M:%S %Z'))
print("UTC 时间:", utc_time.strftime('%Y-%m-%d %H:%M:%S %Z'))

日本时间 (JST): 2025-10-24 14:39:19 JST
UTC 时间: 2025-10-24 05:39:19 UTC


In [20]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scrape_pastec_simple.py
针对你提供的 HTML 结构：从每个 table.p-priceTable 下的 tr.js-targetStorage
提取 model / storage_name / 2 个 caution / 未開封的 condition / 未開封的 price，
并保存为 CSV。
依赖: requests, beautifulsoup4, pandas
"""

from bs4 import BeautifulSoup
import requests
import pandas as pd
import os, json, glob, time, subprocess, shlex, pathlib


import re

# URL 列表（可以改成你要抓取的页面）
START_URLS = ["https://pastec.net/iphone"]

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}

OUTPUT_CSV = "localdata/shop11.csv"

def clean_text(s):
    if s is None:
        return ""
    return " ".join(s.split()).strip()

def fetch_html(url):
    r = requests.get(url, headers=HEADERS, timeout=20)
    r.raise_for_status()
    return r.text

def extract_records_from_html(html):
    soup = BeautifulSoup(html, "html.parser")
    tables = soup.select("table.p-priceTable")
    records = []

    for tbl in tables:
        # model = table header h2 text
        h2 = tbl.select_one("thead h2")
        model = clean_text(h2.get_text()) if h2 else ""

        # 每个存储行
        rows = tbl.select("tr.js-targetStorage")
        for tr in rows:
            # 找到 .p-priceTable__name div
            name_div = tr.select_one(".p-priceTable__name")
            if name_div:
                # 取直接的文本节点（不包含内嵌 div 的文本）
                # 方法：在 name_div.contents 中找第一个 NavigableString
                storage_name = ""
                for c in name_div.contents:
                    # 如果是字符串（直接文本节点）
                    if isinstance(c, str):
                        txt = c.strip()
                        if txt:
                            storage_name = txt
                            break
                # 退而求其次：如果没找到直接文本，就用整块文本去掉子 div 文本
                if not storage_name:
                    # remove child div texts
                    children_texts = []
                    for ch in name_div.find_all(recursive=False):
                        # if child is a Tag and not the first text node, skip
                        pass
                    storage_name = clean_text(name_div.get_text(separator=" "))

                # caution 列表（按出现顺序）
                cautions = name_div.select(".p-priceTable__caution")
                caution_empty = clean_text(cautions[0].get_text()) if len(cautions) >= 1 else ""
                caution_notice = clean_text(cautions[1].get_text()) if len(cautions) >= 2 else ""
            else:
                storage_name = ""
                caution_empty = ""
                caution_notice = ""

            # 价格区块：找 .p-priceTable__price_inner
            price_inner = tr.select_one(".p-priceTable__price_inner")
            condition_unopened = ""
            price_unopened = ""
            if price_inner:
                # 找到所有 condition 和 price 成对出现的情形。我们只取第一个 pair（通常为未開封）
                # strategy: 遍历 price_inner.children, when find condition, look for next .price sibling
                children = list(price_inner.find_all(["span"], recursive=False))
                # But in given markup, condition and price may be siblings (span tags)
                # Simpler: find all span.condition, take the first, then find the next span.price after it
                first_condition = price_inner.select_one("span.condition")
                if first_condition:
                    condition_unopened = clean_text(first_condition.get_text())
                    # find next sibling span.price
                    price_tag = None
                    # walk siblings
                    sib = first_condition.next_sibling
                    while sib is not None:
                        # skip whitespace strings
                        if getattr(sib, "name", None) == "span" and "price" in sib.get("class", []):
                            price_tag = sib
                            break
                        # if it's a Tag but not span, try find inside
                        if getattr(sib, "name", None):
                            found = sib.select_one("span.price")
                            if found:
                                price_tag = found
                                break
                        sib = sib.next_sibling
                    # fallback: if not found by sibling-walk, pick the first span.price in price_inner
                    if price_tag is None:
                        price_tag = price_inner.select_one("span.price")
                    if price_tag:
                        price_unopened = clean_text(price_tag.get_text())
                        # normalize spacing/newlines inside price (e.g., "156,500円 〜 136,000円")
                        price_unopened = " ".join(price_unopened.split())
                else:
                    # no condition found: fallback get first price only
                    p = price_inner.select_one("span.price")
                    if p:
                        price_unopened = clean_text(p.get_text())


            # 最终记录
            jst_tz = pytz.timezone('Asia/Tokyo')
            jst_time = datetime.now(jst_tz)

            # 转换为 UTC 时间
            utc_time = jst_time.astimezone(pytz.UTC)
            rec = {
                "model": model,
                "storage_name": storage_name,
                "caution_empty": caution_empty,
                "caution_notice": caution_notice,
                "condition_unopened": condition_unopened,
                "price_unopened": price_unopened,
                "time-scraped": utc_time,
            }
            records.append(rec)
    return records

def main():
    all_records = []
    for url in START_URLS:
        print("Fetching:", url)
        html = fetch_html(url)
        recs = extract_records_from_html(html)
        print(f"Found {len(recs)} records on {url}")
        all_records.extend(recs)

    if not all_records:
        print("No records found — check if the page is JS-rendered. If so, use Selenium/Playwright.")
    else:
        df = pd.DataFrame(all_records, columns=[
            "model", "storage_name", "caution_empty", "caution_notice",
            "condition_unopened", "price_unopened", "time-scraped"
        ])
        df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
        print(f"Saved {len(df)} rows -> {OUTPUT_CSV}")


ACCESS = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNzYxMjk0MDQ4LCJpYXQiOjE3NjEyOTIyNDgsImp0aSI6IjBmYTVmOTkyMWI4OTRiODdhNzA0MjhjNjlkNWM3YzMyIiwidXNlcl9pZCI6IjEifQ.5gtpM4H482XvBHJ9uyP8oAgSvaYcpb_qFwPmy7wh_Uk"  # 或用 %env 先设
URL = "http://47.111.136.177/AppleStockChecker/purchasing-price-records/import-tradein-xlsx/?dry_run=0"

ROOT = "localdata/"
# 递归找 excel
# patterns = ["**/*.xlsx","**/*.xls","**/*.xlsm","**/*.xlsb"]
patterns = ["**/*.csv"]
files = []
for pat in patterns:
    files += glob.glob(os.path.join(ROOT, pat), recursive=True)
# 过滤 ~$.xlsx 临时文件并排序
files = sorted(f for f in files if not pathlib.Path(f).name.startswith("~$"))

print(f"Found {len(files)} excel files")

def post_one(path, retries=2, backoff=2.0):
    cmd = [
        "curl","-sS","-i","-X","POST", URL,
        "-H", f"Authorization: Bearer {ACCESS}",
        "-F", f"files=@{path}",
    ]
    for attempt in range(retries+1):
        try:
            out = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8", "replace")
            # 可选：简单取 HTTP 状态行
            status_line = out.splitlines()[0] if out else ""
            return status_line, out
        except subprocess.CalledProcessError as e:
            if attempt >= retries:
                return f"ERROR after {retries} retries", e.output.decode("utf-8", "replace")
            time.sleep(backoff * (attempt+1))

for i in range(1000):
    main()

    for i, f in enumerate(files, 1):
        status, resp = post_one(f)
        print(f"[{i}/{len(files)}] {f}\n{status}\n")
        time.sleep(20)
    print(f"Finished {i}")

    time.sleep(10*60)




Found 1 excel files
Fetching: https://pastec.net/iphone
Found 119 records on https://pastec.net/iphone
Saved 119 rows -> localdata/shop11.csv
[1/1] localdata/shop11.csv
HTTP/1.1 202 Accepted

Finished 1
Fetching: https://pastec.net/iphone
Found 119 records on https://pastec.net/iphone
Saved 119 rows -> localdata/shop11.csv
[1/1] localdata/shop11.csv
HTTP/1.1 202 Accepted

Finished 1


KeyboardInterrupt: 