# Setup

In [None]:
PIXIV_USERNAME = "userbay"
PIXIV_PASSWORD = "UserPay"

In [None]:
from pixivpy3 import *

api = AppPixivAPI()
# api = ByPassSniApi()  # bypass the GFW
# api.require_appapi_hosts()
api.set_accept_language("zh-cn")  # tags翻译成中文

token = api.login(PIXIV_USERNAME, PIXIV_PASSWORD)
user_id = token.response.user.id
print(token.response.user)

# PixivCrawler (with pixivpy)

In [None]:
import json
import os
import random
import sqlite3 as lite
import time

import numpy as np
import pandas as pd
from sqlalchemy import create_engine

try:
    from tqdm.notebook import tqdm  # new tqdm
except:
    from tqdm import tqdm_notebook as tqdm


class PixivCrawler:
    def __init__(self, api, illust_db="pixiv_illusts.db"):
        self.api = api
        self.illust_db = illust_db
        self.user_info = None

    def randSleep(self, base=0.1, rand=0.5):
        "休眠随机的时间"
        time.sleep(base + rand * random.random())

    def GetUserDetail(self, user_id):
        "查询指定用户的基本信息"
        self.last_user = self.api.user_detail(user_id)
        return self.last_user

    def GetUserBookmarks(self, user_id, restrict="public"):
        "获取指定用户的收藏列表"
        df_list = []
        next_qs = {"user_id": user_id, "restrict": restrict}

        user = self.GetUserDetail(user_id)
        self.randSleep(0.1)

        with tqdm(
            total=user.profile.total_illust_bookmarks_public,
            desc="api.user_bookmarks_illust",
        ) as pbar:
            while next_qs != None:
                json_result = self.api.user_bookmarks_illust(**next_qs)
                tmp_df = pd.DataFrame.from_dict(json_result.illusts)
                df_list.append(tmp_df)
                pbar.update(tmp_df.shape[0])
                next_qs = self.api.parse_qs(json_result.next_url)
                self.randSleep(0.1)

        df = pd.concat(df_list).rename(columns={"id": "illust_id"})
        df["user_id"] = df.user.apply(lambda d: d["id"])
        return df.set_index("illust_id")

    def GetUserIllusts(self, user_id, type="illust"):
        "获取指定用户的作品列表(illusts/manga)"
        df_list = []
        next_qs = {"user_id": user_id, "type": type, "filter": "for_ios"}

        user = self.GetUserDetail(user_id)
        if type == "illust":
            total = user.profile.total_illusts
        elif type == "manga":
            total = user.profile.total_manga
        else:
            raise Exception(f"Unsupported type={type}")
        self.randSleep(0.1)

        with tqdm(total=total, desc="api.user_illusts") as pbar:
            while next_qs != None:
                json_result = self.api.user_illusts(**next_qs)
                tmp_df = pd.DataFrame.from_dict(json_result.illusts)
                df_list.append(tmp_df)
                pbar.update(tmp_df.shape[0])
                next_qs = self.api.parse_qs(json_result.next_url)
                self.randSleep(0.1)

        df = pd.concat(df_list).rename(columns={"id": "illust_id"})
        df["user_id"] = df.user.apply(lambda d: d["id"])
        return df.set_index("illust_id")

    def GetIllustRanking(self, mode, date, total=100):
        "获取作品排行榜"
        df_list = []
        next_qs = {"mode": mode, "date": date, "filter": "for_ios"}

        with tqdm(total=total, desc="api.illust_ranking") as pbar:
            while next_qs != None:
                json_result = self.api.illust_ranking(**next_qs)
                tmp_df = pd.DataFrame.from_dict(json_result.illusts)
                df_list.append(tmp_df)
                pbar.update(tmp_df.shape[0])
                next_qs = self.api.parse_qs(json_result.next_url)
                self.randSleep(0.3)

        df = pd.concat(df_list).rename(columns={"id": "illust_id"})
        df["user_id"] = df.user.apply(lambda d: d["id"])
        return df.set_index("illust_id")

    def GetFollowingUsers(self, user_id, restrict="public"):
        "获取指定用户跟踪的用户列表，返回user_ids"
        user_ids = []
        next_qs = {"user_id": user_id, "restrict": restrict}

        user = self.GetUserDetail(user_id)
        with tqdm(total=user.profile.total_follow_users, desc="api.user_following") as pbar:
            while next_qs != None:
                json_result = self.api.user_following(**next_qs)
                for one_user in json_result.user_previews:
                    user_ids.append(one_user.user.id)
                pbar.update(len(json_result.user_previews))
                next_qs = self.api.parse_qs(json_result.next_url)
                self.randSleep(0.3, 0.8)
        return np.array(user_ids)

    def UpdateIllusts(self, df_illusts):
        sql_df = df_illusts.copy()

        # 数组类字段转json
        sql_df["image_urls"] = sql_df.image_urls.apply(json.dumps)
        sql_df["meta_pages"] = sql_df.meta_pages.apply(json.dumps)
        sql_df["meta_single_page"] = sql_df.meta_single_page.apply(json.dumps)
        sql_df["series"] = sql_df.series.apply(json.dumps)
        sql_df["tags"] = sql_df.tags.apply(json.dumps)
        sql_df["tools"] = sql_df.tools.apply(json.dumps)
        sql_df["user"] = sql_df.user.apply(json.dumps)

        # 先读取文件里的illusts存储，并用新的数据代替key相同的内容
        if os.path.isfile(self.illust_db):
            # 读取文件的数据并丢弃同样的illust_id (保留新的illust_id)
            db_df = self.DBIllusts(ensure_json=False)
            db_df = db_df[~db_df.index.isin(sql_df.index)]
            merged_df = pd.concat([sql_df, db_df], sort=False)
        else:
            merged_df = sql_df

        # 合并后df写入文件(replace方式)
        engine = create_engine("sqlite:///" + self.illust_db, echo=False)
        merged_df.to_sql("illusts", con=engine, if_exists="replace")
        return merged_df

    def DBIllusts(self, sql="SELECT * FROM illusts WHERE illust_id > 0", ensure_json=True):
        with lite.connect(self.illust_db) as conn:
            sql_df = pd.read_sql_query(sql, conn, index_col="illust_id")

        # 还原json字段
        if ensure_json:
            sql_df["image_urls"] = sql_df.image_urls.apply(json.loads)
            sql_df["meta_pages"] = sql_df.meta_pages.apply(json.loads)
            sql_df["meta_single_page"] = sql_df.meta_single_page.apply(json.loads)
            sql_df["series"] = sql_df.series.apply(json.loads)
            sql_df["tags"] = sql_df.tags.apply(json.loads)
            sql_df["tools"] = sql_df.tools.apply(json.loads)
            sql_df["user"] = sql_df.user.apply(json.loads)
        return sql_df


crawl = PixivCrawler(api)

## GetUserBookmarks(public)

In [None]:
df_bookmarks = crawl.GetUserBookmarks(user_id)
_ = crawl.UpdateIllusts(df_bookmarks)

## GetFollowingUsers(public)

In [None]:
user_ids = crawl.GetFollowingUsers(user_id)

In [None]:
random.shuffle(user_ids)
for uid in tqdm(user_ids, desc="GetFollowingUsers"):
    df = crawl.GetUserIllusts(uid)
    _ = crawl.UpdateIllusts(df)
    crawl.randSleep(1.1, 5.0)

## GetIllustRanking

In [None]:
# mode: [day, week, month, day_male, day_female, week_original, week_rookie, day_manga]
# date: '2016-08-01'
# mode (Past): [day, week, month, day_male, day_female, week_original, week_rookie,
#               day_r18, day_male_r18, day_female_r18, week_r18, week_r18g]
df_ranking = crawl.GetIllustRanking("week", "2019-11-01")
_ = crawl.UpdateIllusts(df_ranking)