# 数据预处理

热门商品交互: Movies_and_TV_5core.csv.gz
- wget https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/benchmark/5core/rating_only/Movies_and_TV.csv.gz

全部交互: Movies_and_TV_0core.csv.gz
- wget https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/benchmark/0core/rating_only/Movies_and_TV.csv.gz

全部商品
- wget https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/meta_categories/meta_Movies_and_TV.jsonl.gz

In [67]:
import polars as pl
from pathlib import Path

In [68]:
CATEGORY = "Movies_and_TV"
DATA_DIR = Path("/home/zihao/llm/llm4rec/data")

## 定义工具函数

In [3]:
def k_core_range_filter(
    df: pl.DataFrame,
    user_col: str = "user_id",
    item_col: str = "parent_asin",
    k_user_min: int = 1,
    k_user_max: int = 10**9,
    k_item_min: int = 1,
    k_item_max: int = 10**9,
    max_iter: int = 100,
    verbose: bool = True,
):
    """
    范围版 k-core 过滤：
    - user 交互数 ∈ [k_user_min, k_user_max]
    - item 交互数 ∈ [k_item_min, k_item_max]
    """

    # 去重
    df = df.unique(subset=[user_col, item_col])

    if verbose:
        print(
            f"初始: rows={df.height:,}, "
            f"users={df[user_col].n_unique():,}, "
            f"items={df[item_col].n_unique():,}"
        )
        print(
            f"约束: user ∈ [{k_user_min}, {k_user_max}], "
            f"item ∈ [{k_item_min}, {k_item_max}]"
        )

    for i in range(1, max_iter + 1):
        before_rows = df.height
        before_users = df[user_col].n_unique()
        before_items = df[item_col].n_unique()

        user_counts = df.group_by(user_col).len()
        item_counts = df.group_by(item_col).len()

        valid_users = (
            user_counts
            .filter(
                (pl.col("len") >= k_user_min) &
                (pl.col("len") <= k_user_max)
            )
            .select(user_col)
        )

        valid_items = (
            item_counts
            .filter(
                (pl.col("len") >= k_item_min) &
                (pl.col("len") <= k_item_max)
            )
            .select(item_col)
        )

        df = (
            df
            .join(valid_users, on=user_col, how="inner")
            .join(valid_items, on=item_col, how="inner")
        )

        after_rows = df.height
        after_users = df[user_col].n_unique()
        after_items = df[item_col].n_unique()

        if verbose:
            print(
                f"[iter {i:02d}] "
                f"rows: {before_rows:,} → {after_rows:,} "
                f"(Δ {before_rows - after_rows:+,}), "
                f"users: {before_users:,} → {after_users:,}, "
                f"items: {before_items:,} → {after_items:,}"
            )

        if before_rows == after_rows:
            if verbose:
                print(f"收敛于第 {i} 轮")
            break

    if verbose:
        print(
            f"\n最终结果: rows={df.height:,}, "
            f"users={df[user_col].n_unique():,}, "
            f"items={df[item_col].n_unique():,}"
        )

    return df

## 5-core和0-core rating区分热门和冷门商品

In [4]:
df_rating_5core = pl.read_csv(f"../data/{CATEGORY}_5core.csv.gz")
df_rating_0core = pl.read_csv(f"../data/{CATEGORY}_0core.csv.gz")

In [5]:
# 热门商品和热门用户
df_rating_hot = k_core_range_filter(
    df_rating_5core,
    k_user_min=5,
    k_item_min=5,
    verbose=True
)

初始: rows=7,441,129, users=657,203, items=197,943
约束: user ∈ [5, 1000000000], item ∈ [5, 1000000000]
[iter 01] rows: 7,441,129 → 7,441,129 (Δ +0), users: 657,203 → 657,203, items: 197,943 → 197,943
收敛于第 1 轮

最终结果: rows=7,441,129, users=657,203, items=197,943


In [6]:
# 冷门商品
df_rating_cold = k_core_range_filter(
    df_rating_0core,
    k_user_min=5,
    k_item_min=1,
    k_item_max=2,
    verbose=True
)

初始: rows=17,158,519, users=6,503,429, items=747,764
约束: user ∈ [5, 1000000000], item ∈ [1, 2]
[iter 01] rows: 17,158,519 → 252,877 (Δ +16,905,642), users: 6,503,429 → 123,459, items: 747,764 → 217,337
[iter 02] rows: 252,877 → 88,364 (Δ +164,513), users: 123,459 → 7,771, items: 217,337 → 82,833
[iter 03] rows: 88,364 → 88,364 (Δ +0), users: 7,771 → 7,771, items: 82,833 → 82,833
收敛于第 3 轮

最终结果: rows=88,364, users=7,771, items=82,833


## meta数据处理

### 数据加载

In [7]:
schema = {
    "main_category": pl.Utf8,
    "title": pl.Utf8,
    "average_rating": pl.Float64,
    "rating_number": pl.Int64,
    "features": pl.List(pl.Utf8),
    "description": pl.List(pl.Utf8),
    "store": pl.Utf8,
    "categories": pl.List(pl.Utf8),
    "details": pl.Utf8,
    "parent_asin": pl.Utf8,
}

item_df = pl.read_ndjson(DATA_DIR / f"meta_{CATEGORY}.jsonl.gz", schema=schema, ignore_errors=True)

# Check what columns are available
print(f"Item metadata columns: {item_df.columns}")
print(f"Total items in metadata: {len(item_df):,}")

Item metadata columns: ['main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'store', 'categories', 'details', 'parent_asin']
Total items in metadata: 748,224


### 基于热门和冷门商品的item过滤

In [10]:
# 基于df_rating_hot中的parent_asin过滤item_df
item_df_hot = item_df.join(
    df_rating_hot.select("parent_asin"),
    on="parent_asin",
    how="semi"
)
print(f"过滤后剩余 item: {item_df_hot.height:,}")

# 基于df_rating_cold中的parent_asin过滤item_df
item_df_cold = item_df.join(
    df_rating_cold.select("parent_asin"),
    on="parent_asin",
    how="semi"
)
print(f"过滤后剩余 item: {item_df_cold.height:,}")

过滤后剩余 item: 197,943
过滤后剩余 item: 82,833


### 缺失值和空值过滤

In [13]:
item_df_hot.null_count()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
2526,82840,1,1,82840,82840,93673,82840,0,0


In [12]:
item_df_cold.null_count()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
4959,26362,175,175,26362,26362,30408,26362,0,0


In [14]:
# 统计 features、description 和 categories 中空列表的行数
cols = ["features", "description", "categories"]

for name, df in [("hot", item_df_hot), ("cold", item_df_cold)]:
    print(f"\n{name} items:")
    for col in cols:
        empty_count = df.filter(pl.col(col).list.len() == 0).height
        print(f"  {col} 中空列表的行数: {empty_count:,}")


hot items:
  features 中空列表的行数: 104,240
  description 中空列表的行数: 8,813
  categories 中空列表的行数: 111

cold items:
  features 中空列表的行数: 53,300
  description 中空列表的行数: 15,379
  categories 中空列表的行数: 70


In [16]:
# 获取前两个非空例子
for col in item_df_hot.columns:
    print(f"Column: {col}")
    values = item_df_hot[col].drop_nulls().limit(2).to_list()
    for i, v in enumerate(values, 1):
        print(f"  Example {i}: {v}")
    print()

Column: main_category
  Example 1: Movies & TV
  Example 2: Movies & TV

Column: title
  Example 1: Pink Cadillac [DVD]
  Example 2: The Returned- Complete First Season

Column: average_rating
  Example 1: 4.6
  Example 2: 4.5

Column: rating_number
  Example 1: 972
  Example 2: 488

Column: features
  Example 1: []
  Example 2: []

Column: description
  Example 1: ['A comic action-adventure about a modern-day bounty hunter chasing down a bail-jumping woman fleeing her ex-con husband in his prized 1959 pink cadillac.']
  Example 2: ['In an idyllic French Alpine village, a seemingly random collection of people find themselves in a state of confusion as they attempt to return to their homes. What they do not yet know is that they have been dead for several years, and no one is expecting them back. Buried secrets emerge as they grapple with this miraculous and sinister new reality, struggling to reintegrate with their families and past lovers. But it seems they are not the only ones back 

In [17]:
def clean_item_df_strict(df: pl.DataFrame, name: str):
    print(f"\n开始处理 {name} items")

    before = df.height

    # title：必须非 null
    if "title" in df.columns:
        title_null_cnt = df.select(pl.col("title").is_null().sum()).item()
        print(f"title 为 null 的行数: {title_null_cnt:,}")
        df = df.filter(pl.col("title").is_not_null())

    # description / categories：既不能是 null，也不能是 []
    list_cols = ["description", "categories"]
    for col in list_cols:
        if col in df.columns:
            null_cnt = df.select(pl.col(col).is_null().sum()).item()
            empty_cnt = df.select((pl.col(col).list.len() == 0).sum()).item()
            print(f"{col}: null={null_cnt:,}, 空列表={empty_cnt:,}")

            df = df.filter(
                pl.col(col).is_not_null() &
                (pl.col(col).list.len() > 0)
            )

    after = df.height
    print(f"{name} items 清洗前: {before:,}, 清洗后: {after:,}")

    return df

item_df_hot_clean = clean_item_df_strict(item_df_hot, "hot")
item_df_cold_clean = clean_item_df_strict(item_df_cold, "cold")


开始处理 hot items
title 为 null 的行数: 82,840
description: null=0, 空列表=8,813
categories: null=0, 空列表=109
hot items 清洗前: 197,943, 清洗后: 106,181

开始处理 cold items
title 为 null 的行数: 26,362
description: null=0, 空列表=15,379
categories: null=0, 空列表=60
cold items 清洗前: 82,833, 清洗后: 41,032


### minHash title重复值过滤

In [18]:
from datasketch import MinHash, MinHashLSH


def minhash_dedup_titles(
    item_df,
    title_col="title",
    threshold=0.8,
    num_perm=64,
    max_examples=5,
    max_per_cluster=5,
):
    """
    对 item_df 的 title 列做 MinHash 去重

    返回：
    - dedup_titles: set，去重后的 title
    - examples: [(rep, [titles...])]，部分合并示例
    """

    # 准备 title 列表
    titles = [
        t for t in item_df[title_col].unique()
        if t is not None and len(t.strip()) > 0
    ]

    # 构建 MinHash
    def get_minhash(text):
        m = MinHash(num_perm=num_perm)
        for token in set(text.lower().split()):
            m.update(token.encode("utf8"))
        return m

    minhashes = {t: get_minhash(t) for t in titles}

    # 建立 LSH
    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
    for t, mh in minhashes.items():
        lsh.insert(t, mh)

    # 聚类去重
    seen = set()
    dedup_titles = set()
    examples = []

    for title in titles:
        if title in seen:
            continue

        candidates = lsh.query(minhashes[title])
        cluster = []

        for other in candidates:
            if other in seen:
                continue
            if minhashes[title].jaccard(minhashes[other]) >= threshold:
                cluster.append(other)

        if not cluster:
            continue

        rep = min(cluster, key=len)
        dedup_titles.add(rep)

        for t in cluster:
            seen.add(t)

        if len(cluster) > 1 and len(examples) < max_examples:
            examples.append((rep, cluster[:max_per_cluster]))

    return dedup_titles, examples

In [19]:
# hot items
hot_dedup_titles, hot_examples = minhash_dedup_titles(
    item_df_hot_clean,
    threshold=0.8,
)

print("\n[HOT] MinHash title 去重示例")
for rep, cluster in hot_examples:
    print(f"\n代表: {rep}\n合并: {cluster}")

print(
    f"\n[HOT] 原始 title 数: {item_df_hot_clean['title'].n_unique():,}\n"
    f"[HOT] 去重后 title 数: {len(hot_dedup_titles):,}"
)


# cold items
cold_dedup_titles, cold_examples = minhash_dedup_titles(
    item_df_cold_clean,
    threshold=0.8,
)

print("\n[COLD] MinHash title 去重示例")
for rep, cluster in cold_examples:
    print(f"\n代表: {rep}\n合并: {cluster}")

print(
    f"\n[COLD] 原始 title 数: {item_df_cold_clean['title'].n_unique():,}\n"
    f"[COLD] 去重后 title 数: {len(cold_dedup_titles):,}"
)


[HOT] MinHash title 去重示例

代表: Dawson's Creek - The Complete Fifth Season
合并: ["Dawson's Creek - The Complete Fifth Season", "Dawson's Creek - The Complete First Season"]

代表: Northern Exposure: The Complete Series
合并: ['Northern Exposure: The Complete Series', 'Northern Exposure: The Complete Series [DVD]']

代表: Fireflies in the Garden
合并: ['Fireflies in the Garden', 'Fireflies In The Garden']

代表: Cowboy Bebop The Movie
合并: ['Cowboy Bebop The Movie', 'Cowboy Bebop - The Movie']

代表: Little House on the Prairie - The Complete Season 4
合并: ['Little House on the Prairie - The Complete Season 4', 'Little House on the Prairie - The Complete Season 1']

[HOT] 原始 title 数: 94,538
[HOT] 去重后 title 数: 91,861

[COLD] MinHash title 去重示例

代表: Priest [ NON-USA FORMAT, PAL, Reg.2 Import - France ]
合并: ['Priest [ NON-USA FORMAT, PAL, Reg.2 Import - France ]', 'Ceiling Zero [ NON-USA FORMAT, PAL, Reg.2 Import - France ]']

代表: Miramax Horror Collection
合并: ['17-Film Miramax Horror Collection', 'Mirama

In [21]:
# 基于去重的title过滤item_df
item_df_hot_dedup_1 = (
    item_df_hot_clean
    .join(
        pl.DataFrame({"title": list(hot_dedup_titles)}),
        on="title",
        how="semi"
    )
    .unique(subset=["title"])
)

print(
    f"[HOT] 去重前 item: {item_df_hot_clean.height:,}, "
    f"去重后 item(1/title): {item_df_hot_dedup_1.height:,}"
)


item_df_cold_dedup_1 = (
    item_df_cold_clean
    .join(
        pl.DataFrame({"title": list(cold_dedup_titles)}),
        on="title",
        how="semi"
    )
    .unique(subset=["title"])
)

print(
    f"[COLD] 去重前 item: {item_df_cold_clean.height:,}, "
    f"去重后 item(1/title): {item_df_cold_dedup_1.height:,}"
)

[HOT] 去重前 item: 106,181, 去重后 item(1/title): 91,861
[COLD] 去重前 item: 41,032, 去重后 item(1/title): 39,730


### 基于列长度值的过滤

In [23]:
item_df_hot_dedup_1 = (item_df_hot_dedup_1.with_columns(pl.col("categories").list.join(" > ").fill_null("").alias("categories")))
item_df_hot_dedup_1 = (
    item_df_hot_dedup_1
    .with_columns(
        pl.col("features").list.join(" ").alias("features"),
        pl.col("description").list.join(" ").alias("description"),
    )
)

item_df_cold_dedup_1 = (item_df_cold_dedup_1.with_columns(pl.col("categories").list.join(" > ").fill_null("").alias("categories")))
item_df_cold_dedup_1 = (
    item_df_cold_dedup_1
    .with_columns(
        pl.col("features").list.join(" ").alias("features"),
        pl.col("description").list.join(" ").alias("description"),
    )
)

In [25]:
item_df_hot_dedup_1.head()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
str,str,f64,i64,str,str,str,str,str,str
"""Movies & TV""","""The Office: Season 5""",4.9,4139,"""""","""Product Description Scranton’s…","""Steve Carell (Actor), Rai…","""Movies & TV > Studio Specials …","""{""Genre"": ""Television, DVD Mov…","""B0024FAD9W"""
"""Movies & TV""","""Van Damme Triple Feature (Kick…",4.6,17,"""""","""3 discs, book style shellcase,…","""Jean-Claude Van Damme (Actor)…","""Movies & TV > Movies""","""{""MPAA rating"": ""R (Restricted…","""B003ZJWXCQ"""
"""Movies & TV""","""The Paradise Virus [DVD]""",3.6,45,"""""","""A picturesque island resort be…","""Lorenzo Lamas (Actor), Me…","""Movies & TV > Featured Categor…","""{""Genre"": ""Mystery & Thrillers…","""B000FZEU24"""
"""Movies & TV""","""I'm Alan Partridge, Series 2 […",4.6,148,"""""","""[Non-U.S. format (PAL) region …","""Rated: Unrated Format: D…","""Movies & TV > Featured Categor…","""{""Aspect Ratio"": ""1.78:1"", ""Is…","""B0000CGD3A"""
"""Movies & TV""","""Norma Rae: 35th Anniversary""",4.7,732,"""""","""A union organizer from up Nort…","""Sally Field (Actor), Beau…","""Movies & TV > Featured Categor…","""{""Aspect Ratio"": ""2.40:1"", ""Is…","""B00HVTRDZ8"""


In [26]:
item_df_cold_dedup_1.head()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
str,str,f64,i64,str,str,str,str,str,str
"""Movies & TV""","""Diary of a Hitman / Assassinat…",2.7,3,"""""","""Diary of a Hitman / Assassinat…","""Roy London (Director), Ro…","""Movies & TV > Featured Categor…","""{""Genre"": ""Action & Adventure""…","""B00BJCAD14"""
"""Movies & TV""","""Gate Keepers - Discovery! (Vol…",4.2,7,"""""","""Product Description Spitzilla …","""Wendee Lee (Actor), Sherr…","""Movies & TV > Featured Categor…","""{""Format"": ""Animated, Color, D…","""B000066C6T"""
"""Movies & TV""","""Matrimony (Blu-ray)""",3.6,13,"""""","""Bing Bing stars in this ghostl…","""Fan Bing Bing (Actor), Hu…","""Movies & TV > Blu-ray > Movies""","""{""Genre"": ""Horror/Supernatural…","""B004XC5LS2"""
"""Movies & TV""","""Laser Mission""",4.3,142,"""""","""Science fiction at its best""","""Brandon Lee (Actor), Debi…","""Movies & TV > Genre for Featur…","""{""MPAA rating"": ""R (Restricted…","""B0001GH7PY"""
"""Movies & TV""","""The Complete Dramatic Works of…",3.8,91,"""""","""VHS video""","""Format: VHS Tape""","""Movies & TV > Movies""","""{""Run time"": ""141 minutes"", ""D…","""B000MB4TSQ"""


In [28]:
def string_length_stats(df: pl.DataFrame):
    """
    统计 DataFrame 中所有 Utf8 列的字符串长度统计：
    max / min / mean
    """
    for col in df.columns:
        if df.schema[col] == pl.Utf8:
            lengths = (
                df
                .select(pl.col(col).str.len_chars().alias("len"))
                .get_column("len")
            )

            # 如果全是 null，跳过
            if lengths.null_count() == lengths.len():
                print(f"{col}: 全为 null，跳过")
                continue

            max_len = lengths.max()
            min_len = lengths.min()
            mean_len = lengths.mean()

            print(
                f"{col}: max={max_len}, "
                f"min={min_len}, "
                f"mean={mean_len:.2f}"
            )

In [30]:
string_length_stats(item_df_hot_dedup_1)
print("--------------------------------")
string_length_stats(item_df_cold_dedup_1)
print("--------------------------------")


main_category: max=25, min=5, mean=11.01
title: max=420, min=1, mean=33.10
features: max=1330, min=0, mean=3.07
description: max=38716, min=1, mean=873.75
store: max=414, min=2, mean=119.66
categories: max=118, min=4, mean=51.17
details: max=7616, min=2, mean=715.69
parent_asin: max=10, min=10, mean=10.00
--------------------------------
main_category: max=25, min=8, mean=11.01
title: max=503, min=1, mean=35.86
features: max=794, min=0, mean=2.03
description: max=29612, min=1, mean=621.49
store: max=397, min=3, mean=94.94
categories: max=118, min=4, mean=46.74
details: max=3263, min=2, mean=461.06
parent_asin: max=10, min=10, mean=10.00
--------------------------------


In [31]:
from typing import Iterable, Optional


def filter_string_outliers(
    df: pl.DataFrame,
    col_names: Optional[Iterable[str]] = None,
    quantile: float = 0.80,
    feature_col: str = "features",
    interpolation: str = "higher",
    verbose: bool = True,
) -> pl.DataFrame:
    """
    基于字符串长度分位数，过滤异常值行（一次性 AND 过滤）

    参数
    ----
    df : Polars DataFrame
        原始数据
    col_names : Iterable[str] | None
        需要处理的列名；None 表示自动使用所有 Utf8 列
    quantile : float
        分位数阈值（如 0.8）
    feature_col : str
        需要特殊处理（允许空字符串）的列名
    interpolation : str
        quantile 的插值方法
    verbose : bool
        是否打印每列阈值

    返回
    ----
    Polars DataFrame
        过滤异常值后的 DataFrame
    """

    # 确定要处理的列
    if col_names is None:
        col_names = [
            c for c in df.columns
            if df.schema[c] == pl.Utf8
        ]
    else:
        col_names = [
            c for c in col_names
            if c in df.columns and df.schema[c] == pl.Utf8
        ]

    thresholds = {}

    # 基于原始 df 计算每列阈值
    for col in col_names:
        len_expr = pl.col(col).str.len_chars()

        if col == feature_col:
            series = (
                df
                .filter(pl.col(col).is_not_null() & (len_expr > 0))
                .select(len_expr.alias("len"))
                .get_column("len")
            )
        else:
            series = (
                df
                .select(len_expr.alias("len"))
                .get_column("len")
            )

        if series.len() == 0:
            if verbose:
                print(f"{col}: 无有效值，跳过")
            continue

        q = series.quantile(quantile, interpolation=interpolation)
        thresholds[col] = q

        if verbose:
            print(f"{col}: q{int(quantile * 100)}={q}")

    # 构造过滤条件（一次性 AND）
    conds = []
    for col, q in thresholds.items():
        len_expr = pl.col(col).str.len_chars()

        if col == feature_col:
            # 保留空字符串 / 空值，只过滤有效值里的超长
            conds.append(
                len_expr.is_null() | (len_expr == 0) | (len_expr <= q)
            )
        else:
            conds.append(
                len_expr.is_null() | (len_expr <= q)
            )

    # 应用过滤
    if conds:
        df_filtered = df.filter(pl.all_horizontal(conds))
    else:
        df_filtered = df

    if verbose:
        print(
            f"过滤前行数: {df.height:,}, "
            f"过滤后行数: {df_filtered.height:,}"
        )

    return df_filtered

In [36]:
df_no_outliers_hot = filter_string_outliers(
    item_df_hot_dedup_1,
    col_names=["title", "description", "features", "store", "details"],
    quantile=0.8,
    feature_col="features",
    verbose=True
)

title: q80=46.0
description: q80=1280.0
features: q80=34.0
store: q80=166.0
details: q80=1080.0
过滤前行数: 91,861, 过滤后行数: 43,057


In [38]:
df_no_outliers_cold = filter_string_outliers(
    item_df_cold_dedup_1,
    col_names=["title", "description", "features", "store", "details"],
    quantile=0.8,
    feature_col="features",
    verbose=True
)

title: q80=52.0
description: q80=995.0
features: q80=29.0
store: q80=164.0
details: q80=622.0
过滤前行数: 39,730, 过滤后行数: 18,899


In [40]:
for col in df_no_outliers_hot.columns:
    if df_no_outliers_hot.schema[col] == pl.Utf8:
        lengths = df_no_outliers_hot.select(
            pl.col(col).str.len_chars().alias("len")
        )["len"]
        max_len = lengths.max()
        min_len = lengths.min()
        mean_len = lengths.mean()
        print(f"{col}: max={max_len}, min={min_len}, mean={mean_len:.2f}")

print("--------------------------------")

for col in df_no_outliers_cold.columns:
    if df_no_outliers_cold.schema[col] == pl.Utf8:
        lengths = df_no_outliers_cold.select(
            pl.col(col).str.len_chars().alias("len")
        )["len"]
        max_len = lengths.max()
        min_len = lengths.min()
        mean_len = lengths.mean()
        print(f"{col}: max={max_len}, min={min_len}, mean={mean_len:.2f}")

main_category: max=25, min=11, mean=11.00
title: max=46, min=1, mean=24.31
features: max=34, min=0, mean=4.38
description: max=1280, min=1, mean=492.38
store: max=166, min=2, mean=100.74
categories: max=118, min=4, mean=48.93
details: max=1080, min=23, mean=533.34
parent_asin: max=10, min=10, mean=10.00
--------------------------------
main_category: max=25, min=10, mean=11.00
title: max=52, min=1, mean=25.96
features: max=29, min=0, mean=2.86
description: max=995, min=1, mean=357.67
store: max=164, min=3, mean=65.34
categories: max=118, min=4, mean=44.70
details: max=622, min=2, mean=325.25
parent_asin: max=10, min=10, mean=10.00


### 基于rating的过滤

In [42]:
df_high_quality_hot = df_no_outliers_hot.filter(
    (pl.col("average_rating") > 3.0) &
    (pl.col("rating_number") > 50)
)
print(f"过滤average_rating和rating_number后剩余个数: {df_high_quality_hot.height}")

df_high_quality_cold = df_no_outliers_cold.filter(
    (pl.col("average_rating") > 3.0) &
    (pl.col("rating_number") > 20)
)
print(f"过滤average_rating和rating_number后剩余个数: {df_high_quality_cold.height}")

过滤average_rating和rating_number后剩余个数: 33978
过滤average_rating和rating_number后剩余个数: 6602


In [61]:
df_high_quality_hot.null_count()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
629,0,0,0,0,0,5445,0,0,0


In [62]:
df_high_quality_cold.null_count()

main_category,title,average_rating,rating_number,features,description,store,categories,details,parent_asin
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
439,0,0,0,0,0,1051,0,0,0


In [63]:
df_high_quality_hot = df_high_quality_hot.fill_null("")
df_high_quality_cold = df_high_quality_cold.fill_null("")

In [70]:
df_high_quality_hot = df_high_quality_hot.with_columns([
    # 对字符串类型列做两步处理
    pl.col(col)
      .str.replace_all(r"\s+", " ")   # 连续空白合并成一个空格
      .str.strip_chars()              # 去掉首尾空白
      if df_high_quality_hot[col].dtype == pl.Utf8 else pl.col(col)
    for col in df_high_quality_hot.columns
])

df_high_quality_cold = df_high_quality_cold.with_columns([
    # 对字符串类型列做两步处理
    pl.col(col)
      .str.replace_all(r"\s+", " ")   # 连续空白合并成一个空格
      .str.strip_chars()              # 去掉首尾空白
      if df_high_quality_cold[col].dtype == pl.Utf8 else pl.col(col)
    for col in df_high_quality_cold.columns
])

## 基于item的Rating kcore过滤

In [50]:
hot_asins = df_rating_hot["parent_asin"].unique()
cold_asins = df_rating_cold["parent_asin"].unique()

overlap_asins = (
    hot_asins.to_frame("parent_asin")
    .join(
        cold_asins.to_frame("parent_asin"),
        on="parent_asin",
        how="inner"
    )
)

print(f"hot / cold 重叠的 parent_asin 数量: {overlap_asins.height:,}")

hot / cold 重叠的 parent_asin 数量: 0


In [45]:
# 先过滤parent_asin在df_high_quality中的行
before_count = df_rating_hot.height
df_rating_hot = df_rating_hot.join(
    df_high_quality_hot.select("parent_asin"),
    on="parent_asin",
    how="semi"
)
after_count = df_rating_hot.height
print(f"过滤前: {before_count}, 过滤后: {after_count}")

print("--------------------------------")

before_count = df_rating_cold.height
df_rating_cold = df_rating_cold.join(
    df_high_quality_cold.select("parent_asin"),
    on="parent_asin",
    how="semi"
)
after_count = df_rating_cold.height
print(f"过滤前: {before_count}, 过滤后: {after_count}")

过滤前: 7441129, 过滤后: 942346
--------------------------------
过滤前: 88364, 过滤后: 7095


In [47]:
df_rating_hot_final = k_core_range_filter(
    df_rating_hot,
    k_user_min=5,
    k_item_min=5,
    verbose=True
)

初始: rows=942,346, users=333,026, items=33,978
约束: user ∈ [5, 1000000000], item ∈ [5, 1000000000]
[iter 01] rows: 942,346 → 441,500 (Δ +500,846), users: 333,026 → 43,563, items: 33,978 → 32,991
[iter 02] rows: 441,500 → 412,523 (Δ +28,977), users: 43,563 → 43,557, items: 32,991 → 22,285
[iter 03] rows: 412,523 → 396,297 (Δ +16,226), users: 43,557 → 39,127, items: 22,285 → 22,285
[iter 04] rows: 396,297 → 392,417 (Δ +3,880), users: 39,127 → 39,127, items: 22,285 → 21,255
[iter 05] rows: 392,417 → 389,702 (Δ +2,715), users: 39,127 → 38,428, items: 21,255 → 21,255
[iter 06] rows: 389,702 → 388,801 (Δ +901), users: 38,428 → 38,428, items: 21,255 → 21,024
[iter 07] rows: 388,801 → 388,144 (Δ +657), users: 38,428 → 38,263, items: 21,024 → 21,024
[iter 08] rows: 388,144 → 387,883 (Δ +261), users: 38,263 → 38,263, items: 21,024 → 20,958
[iter 09] rows: 387,883 → 387,667 (Δ +216), users: 38,263 → 38,209, items: 20,958 → 20,958
[iter 10] rows: 387,667 → 387,599 (Δ +68), users: 38,209 → 38,209, it

In [48]:
df_rating_cold_final = k_core_range_filter(
    df_rating_cold,
    k_user_min=5,
    k_item_min=1,
    k_item_max=2,
    verbose=True
)

初始: rows=7,095, users=3,349, items=6,602
约束: user ∈ [5, 1000000000], item ∈ [1, 2]
[iter 01] rows: 7,095 → 2,163 (Δ +4,932), users: 3,349 → 204, items: 6,602 → 2,101
[iter 02] rows: 2,163 → 2,163 (Δ +0), users: 204 → 204, items: 2,101 → 2,101
收敛于第 2 轮

最终结果: rows=2,163, users=204, items=2,101


现在的item有三部分：
kcore过滤后: users=38,189, items=20,932
kcore过滤后: users=204, items=2,101

全部热门: 33978
全部冷门: 6602

### 构建用户序列

In [51]:
df_rating_hot_final

user_id,parent_asin,rating,timestamp
str,str,f64,i64
"""AGSLTGCZ7Z4IUM5WICJRSJB35FKA""","""B001TKUXVQ""",5.0,1486519875000
"""AE3RRMXCWU6LF5PXGSF5F3MKGTIQ""","""B000OCY7IK""",5.0,1455409895000
"""AFVIMSBSCUAVPH3RGOMCKTJHN4DA""","""B00BGARG14""",5.0,1468745883000
"""AG7DH74UOAJIB2NWCI7VGSVF3KCQ""","""B00A4Y61ZU""",4.0,1630804739866
"""AFGDTFXT66FT63DIFUK4JXYJCG2A""","""B0015RRN8E""",4.0,1374717653000
…,…,…,…
"""AEYJL523OZ6BVFY4PEYRZ7XZFRCA""","""B004FUPK6U""",3.0,1472436673000
"""AGEYQXCU3T4MAXFRPXTFVZDYIBVQ""","""B08BDPG546""",5.0,1674160349673
"""AGDQO4VIXXMHZMFRFHAHO2PNCYRQ""","""B01D3RP1R8""",5.0,1483217507000
"""AFRYHSCOOEMPBUIHU22TLEXY2SRA""","""B00125WAYQ""",5.0,1222269691000


In [57]:
# 按 user_id / timestamp 排序
df_sorted_hot = df_rating_hot_final.sort(["user_id", "timestamp"])
# 每个 user 只保留一行，sequence 是完整历史
user_sequences_hot = (
    df_sorted_hot
    .group_by("user_id")
    .agg(
        pl.col("parent_asin").implode().alias("sequence")
    )
)

# 按 user_id / timestamp 排序
df_sorted_cold = df_rating_cold_final.sort(["user_id", "timestamp"])
# 每个 user 只保留一行，sequence 是完整历史
user_sequences_cold = (
    df_sorted_cold
    .group_by("user_id")
    .agg(
        pl.col("parent_asin").implode().alias("sequence")
    )
)


In [58]:
user_sequences_hot

user_id,sequence
str,list[str]
"""AFEHQPDE556UFMO22XXXVG5FUHFQ""","[""B000FGGE7C"", ""B004EXWGHY"", … ""B008412UQ4""]"
"""AFLN3R2VBCWI722ZQJ6Z7DT2NNUA""","[""B00108FMFO"", ""B0055CMM0O"", … ""B00R5TAWIO""]"
"""AHFD6M6A3YXZNQ3K4UHZGFD5ND4Q""","[""B000BT98XK"", ""6305622914"", … ""B004U33NHA""]"
"""AEE6A4W5SBHI4WENUBHFRBQ5ZFLQ""","[""B001EBWITK"", ""B003M9ZABU"", … ""B00AKB8PWE""]"
"""AF57VD3QGK7H4SJZXRJLKZ2DHW7A""","[""B00L3VKDAW"", ""B0007TKHH0"", … ""B00HCSPHNS""]"
…,…
"""AGKCPEGMJIKH2K6RVONCBLXZGEJA""","[""B00GD7UMDO"", ""B005SH65S6"", … ""B005ETAL5Q""]"
"""AGXFPHMKB6WCOWF6J3YAQYS7CJDQ""","[""B00AEFYGJW"", ""B004JPJHL0"", … ""B01LTIBWTE""]"
"""AEY6GBLOQXRVLVKH7RE3OI55OBCQ""","[""B0014DZ2XC"", ""B000067D3K"", … ""B0013LRKQC""]"
"""AGP2YSSQVE3P2CVHVBH77R7VIBKA""","[""B00AF85TTY"", ""B00KITEI3S"", … ""B00MO21WAY""]"


In [59]:
user_sequences_cold

user_id,sequence
str,list[str]
"""AG3WUOBVLQCMMR7U4SQFENYVK6JQ""","[""6302443245"", ""6305183244"", … ""B0001G0HFQ""]"
"""AFXXHEHPTALPTJASXHRGON4PSZQA""","[""6304753365"", ""B0011ATJFK"", … ""B003RCJR3O""]"
"""AGWIU2DVVALEVN4DNKJ5CIBFU4JQ""","[""B07KK4P7S6"", ""B07MCM3WZC"", … ""B08CVSR32D""]"
"""AGFQF3FULPGUD2FQSCF27HA2C76Q""","[""6305010625"", ""6301978331"", … ""6301802012""]"
"""AFBTN4HATLZZKAAWRTEZMPLAWIXA""","[""B00176VK64"", ""6302069106"", … ""B003RY601Y""]"
…,…
"""AF7KRCZAXKNMMFUCMNCZYG644I2A""","[""B08LBW6KFP"", ""B07RMZN73D"", … ""B08DMD8SC6""]"
"""AEFBONZJECG4IWBGNBIM7WXV4AXA""","[""B0076D06DK"", ""B003EGGD6W"", … ""B07RS9274V""]"
"""AHRUV3HZFPZEPRWEMSYMVBAJXFFQ""","[""B001CGV3HU"", ""B001E9672Q"", … ""B09P4DBWLB""]"
"""AFGFSRZEXXOQYENUZTAVCUKW4TJQ""","[""B000094J5W"", ""B00007L4OG"", … ""B0006SNP4O""]"


In [60]:
hot_seq_len = user_sequences_hot.select(
    pl.col("sequence").list.len().alias("seq_len")
)
print(
    "[HOT]",
    "users:", user_sequences_hot.height,
    "min:", hot_seq_len['seq_len'].min(),
    "mean:", f"{hot_seq_len['seq_len'].mean():.2f}",
    "max:", hot_seq_len['seq_len'].max(),
)

cold_seq_len = user_sequences_cold.select(
    pl.col("sequence").list.len().alias("seq_len")
)
print(
    "[COLD]",
    "users:", user_sequences_cold.height,
    "min:", cold_seq_len['seq_len'].min(),
    "mean:", f"{cold_seq_len['seq_len'].mean():.2f}",
    "max:", cold_seq_len['seq_len'].max(),
)


[HOT] users: 38189 min: 5 mean: 10.15 max: 642
[COLD] users: 204 min: 5 mean: 10.60 max: 70


## 保存数据

In [77]:
# =======================
# Save HOT / COLD outputs
# =======================

# HOT user sequences
hot_seq_output_path = DATA_DIR / "output" / f"{CATEGORY}_hot_user_sequences.parquet"
user_sequences_hot.write_parquet(hot_seq_output_path)
print(
    f"Saved HOT user sequences to: {hot_seq_output_path} "
    f"(rows = {user_sequences_hot.height:,})"
)

# COLD user sequences
cold_seq_output_path = DATA_DIR / "output" / f"{CATEGORY}_cold_user_sequences.parquet"
user_sequences_cold.write_parquet(cold_seq_output_path)
print(
    f"Saved COLD user sequences to: {cold_seq_output_path} "
    f"(rows = {user_sequences_cold.height:,})"
)

# HOT item metadata
hot_item_output_path = DATA_DIR / "output" / f"{CATEGORY}_hot_items.parquet"
df_high_quality_hot.write_parquet(hot_item_output_path)
print(
    f"Saved HOT item metadata to: {hot_item_output_path} "
    f"(rows = {df_high_quality_hot.height:,})"
)

# COLD item metadata
cold_item_output_path = DATA_DIR / "output" / f"{CATEGORY}_cold_items.parquet"
df_high_quality_cold.write_parquet(cold_item_output_path)
print(
    f"Saved COLD item metadata to: {cold_item_output_path} "
    f"(rows = {df_high_quality_cold.height:,})"
)

Saved HOT user sequences to: /home/zihao/llm/llm4rec/data/output/Movies_and_TV_hot_user_sequences.parquet (rows = 38,189)
Saved COLD user sequences to: /home/zihao/llm/llm4rec/data/output/Movies_and_TV_cold_user_sequences.parquet (rows = 204)
Saved HOT item metadata to: /home/zihao/llm/llm4rec/data/output/Movies_and_TV_hot_items.parquet (rows = 33,978)
Saved COLD item metadata to: /home/zihao/llm/llm4rec/data/output/Movies_and_TV_cold_items.parquet (rows = 6,602)
