# SQL分析

## 搭建MySQL分析环境

步骤：
1. 创建数据库和表
2. 导入数据
3. 建立索引（重要！面试必问）
4. 开始分析

In [None]:
# 创建数据库和表
CREATE DATABASE IF EXISTS taobao_analysis;
USE taobao_analysis;

CREATE TABLE user_behavior(
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    item_id BIGINT NOT NULL,
    category_id INT NOT NULL,
    behavior_type ENUM('pv', 'fav', 'cart', 'buy') NOT NULL,
    timestamp BIGINT NOT NULL,
    event_date DATE,
    INDEX idx_user_id (user_id),        -- 按用户查询多
    INDEX idx_item_id (item_id),        -- 按商品查询
    INDEX idx_behavior (behavior_type), -- 按行为类型查询
    INDEX idx_date (event_date),        -- 按日期查询
    INDEX idx_user_behavior (user_id, behavior_type),  -- 复合索引
    INDEX idx_timestamp (timestamp)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
#查看表
DESCRIBE table_name;

### python导入数据

In [11]:
import pandas as pd
import pymysql
from sqlalchemy import create_engine
import time
import config

def import_data_to_mysql(csv_file,table_name='user_behavior',batch_size=50000):
    print(f"开始导入数据到MySQL表 {table_name}...")
    config_dict = config.MYSQL_CONFIG
    connection_str = (
        f"mysql+pymysql://{config_dict['user']}:"
        f"{config_dict['password']}@"
        f"{config_dict['host']}:{config_dict['port']}/"
        f"{config_dict['database']}")
    engine = create_engine(connection_str)
    
    start_time = time.time()
    chunk_iterator = pd.read_csv(csv_file,
                                chunksize=batch_size,
                                iterator=True)
    total_rows =  0 # 初始化总行数计数器
    for i,chunk in enumerate(chunk_iterator):
        #处理日期
        chunk['event_date'] = pd.to_datetime(chunk['timestamp'],unit='s').dt.date
        # 导入数据库
        chunk.to_sql(table_name,
                     engine,
                     if_exists='append',
                     index=False,
                     method='multi')
        total_rows +=len(chunk)
        if i % 10 == 0:
            print(f"已导入 {total_rows:,} 行数据...")
    end_time = time.time()
    print(f"✅ 数据导入完成！")
    print(f"总行数：{total_rows:,}")
    print(f"耗时：{end_time - start_time:.2f} 秒")
    
    return total_rows
import_data_to_mysql('taobao100w.csv')

开始导入数据到MySQL表 user_behavior...
已导入 50,000 行数据...
已导入 550,000 行数据...
✅ 数据导入完成！
总行数：1,000,000
耗时：124.86 秒


1000000

## 排名问题详解

核心区别：
1. ROW_NUMBER(): 连续唯一排名（1,2,3,4）
2. RANK(): 并列排名会跳过名次（1,2,2,4）
3. DENSE_RANK(): 并列排名不跳名次（1,2,2,3）

使用场景：
- ROW_NUMBER: 取Top N，需要唯一排名
- RANK: 比赛排名，允许并列
- DENSE_RANK: 分级，如成绩等级

In [None]:
# 场景1：计算每个用户的购买次数排名
SELECT
    user_id,
    COUNT(*) AS purchase_count,
    ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) as row_num,
    RANK() OVER (ORDER BY COUNT(*) DESC) as rank_num,
    DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) as dense_rank_num
FROM user_behavior
WHERE behavior_type='buy'
GROUP BY user_id
ORDER BY purchase_count DESC
LIMIT 5;

# 场景2：在每个商品类目内，对商品按点击量排名（分组排名）
# 我的:
SELECT
    category_id,
    item_id,
    COUNT(*) as pv_sum,
    DENSE_RANK() OVER(PARTITION BY category_id ORDER BY COUNT(*) DESC) as item_rank
FROM user_behavior
WHERE behavior_type='pv'
GROUP BY item_id,category_id
ORDER BY category_id,pv_sum DESC
LIMIT 30;
# ai的:多了个全局排名
SELECT 
    category_id,
    item_id,
    click_count,
    -- 类目内排名
    ROW_NUMBER() OVER (
        PARTITION BY category_id 
        ORDER BY click_count DESC
    ) as category_rank,
    -- 全局排名
    ROW_NUMBER() OVER (
        ORDER BY click_count DESC
    ) as global_rank
FROM (
    SELECT 
        category_id,
        item_id,
        COUNT(*) as click_count
    FROM user_behavior
    WHERE behavior_type = 'pv'
    GROUP BY category_id, item_id
) item_clicks
ORDER BY category_id, click_count DESC
LIMIT 30;

# 场景3：找出每个用户购买最多的商品（常用业务场景）我用的row_number是只看第一个,但是买的数量最多的是商品不一定就一个
# 所以rank更适合一点
SELECT
    user_id,
    item_id,
    buy_sum
FROM (
SELECT
user_id,
item_id,
COUNT(*) as buy_sum,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY COUNT(*) DESC) as rak
FROM user_behavior
WHERE behavior_type='buy'
GROUP BY user_id,item_id
) rank_purchases
WHERE rak=1
LIMIT 10;

## 留存率计算详解

留存率是互联网公司最重要的指标之一。
- 计算逻辑：第N天还在使用的用户 / 第一天新增用户

两种实现方法：
1. 自连接：传统方法，逻辑清晰但性能差
2. 窗口函数：现代方法，性能好但逻辑复杂

在实习中：通常用窗口函数，但面试两种都要会。

In [None]:
# -- 方法1：使用自连接（理解原理)
# -- 步骤1：找到每个用户的首次活跃日期
# -- 步骤2：找到用户后续活跃的日期
# -- 步骤3：计算留存

WITH user_first_dates AS(
SELECT
    user_id,
    MIN(event_date) as first_date
FROM user_behavior
WHERE behavior_type='pv'
GROUP BY user_id
),user_active_dates as(
SELECT DISTINCT
    user_id,
    event_date
FROM user_behavior
WHERE behavior_type = 'pv'
),retention_calc AS (
SELECT
    f.user_id,
    f.first_date,
    a.event_date,
    DATEDIFF(a.event_date,f.first_date) as days_diff
FROM user_first_dates f
LEFT JOIN user_active_dates a ON f.user_id = a.user_id
AND a.event_date >= f.first_date
)
SELECT
    first_date as 新增日期,
    COUNT(DISTINCT user_id) as 新增用户,
    COUNT(DISTINCT CASE WHEN days_diff=1 THEN user_id END)as 一日留存的客户数,
    ROUND(
        COUNT(DISTINCT CASE WHEN days_diff=1 THEN user_id END)*100.0/
        COUNT(DISTINCT user_id),2
    ) as 1日,
    COUNT(DISTINCT CASE WHEN days_diff=7 THEN user_id END)as 7日留存的客户数,
    ROUND(
        COUNT(DISTINCT CASE WHEN days_diff=7 THEN user_id END)*100.0/
        COUNT(DISTINCT user_id),2
    )as 7日
FROM retention_calc
GROUP BY first_date
ORDER BY  1日 DESC
LIMIT 10;