In [0]:
%sql
USE data_science.default

尝试从目前数据中获得尽可能多的特征：
1、24小时内的付费次数、付费总金额，最大单次付费金额
2、48小时内的付费次数、付费总金额，最大单次付费金额
3、72小时内的付费次数、付费总金额，最大单次付费金额
以及最终的结果：
168小时付费金额

创建一个view来方便后续快速获取数据,lw_20250820_aos_gpir_uid_revenue_view3_by_j


In [0]:
%sql
CREATE OR REPLACE VIEW lw_20250820_aos_gpir_uid_revenue_view3_by_j AS
select
	uid,
	install_day,
	-- country,
	country_group,
	CASE
		WHEN mediasource = 'applovin_int'
		AND UPPER(campaign_name) LIKE '%D7%' THEN 'applovin_int_d7'
		WHEN mediasource = 'applovin_int'
		AND UPPER(campaign_name) LIKE '%D28%' THEN 'applovin_int_d28'
		WHEN mediasource IN (
			'googleadwords_int',
			'Facebook Ads',
			'bytedanceglobal_int',
			'snapchat_int',
			'moloco_int'
		) THEN mediasource
		ELSE 'other'
	END as mediasource,
	campaign_id,
  -- campaign_name,
	payment_count_24h,
	revenue_24h as revenue_24h,
	max_payment_24h,
	payment_count_48h,
	revenue_48h,
	max_payment_48h,
	payment_count_72h,
	revenue_72h as revenue_72h,
	max_payment_72h,
	revenue_168h as revenue_d7
from
	(
		select
			t1.uid,
			t1.install_timestamp,
			date_format(from_unixtime(t1.install_timestamp), 'yyyyMMdd') as install_day,
			t1.country,
			COALESCE(cg.country_group, 'other') AS country_group,
			t1.mediasource,
			t1.campaign_id,
      pub.campaign_name as campaign_name,
			count(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 24 * 60 * 60 then t2.revenue_value_usd end) as payment_count_24h,
			sum(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 24 * 60 * 60 then t2.revenue_value_usd else 0 end) as revenue_24h,
			max(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 24 * 60 * 60 then t2.revenue_value_usd else 0 end) as max_payment_24h,
			count(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 48 * 60 * 60 then t2.revenue_value_usd end) as payment_count_48h,
			sum(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 48 * 60 * 60 then t2.revenue_value_usd else 0 end) as revenue_48h,
			max(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 48 * 60 * 60 then t2.revenue_value_usd else 0 end) as max_payment_48h,
			count(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 72 * 60 * 60 then t2.revenue_value_usd end) as payment_count_72h,
			sum(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 72 * 60 * 60 then t2.revenue_value_usd else 0 end) as revenue_72h,
			max(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 72 * 60 * 60 then t2.revenue_value_usd else 0 end) as max_payment_72h,
			sum(case when (t2.event_time / 1000 - t1.install_timestamp) between 0 and 168 * 60 * 60 then t2.revenue_value_usd else 0 end) as revenue_168h
		from
			marketing.attribution.dws_overseas_gpir_unique_uid t1
			left join marketing.attribution.dwd_overseas_revenue_allproject t2 on t1.app = t2.app
			and t1.uid = t2.uid
			LEFT JOIN lw_country_group_table_by_j_20250703 cg ON t1.country = cg.country
			LEFT JOIN (
				SELECT
					campaign_id,
					MAX(campaign_name) AS campaign_name
				FROM
					prodb.public.applovin_campaign_info_new
				GROUP BY
					campaign_id
			) pub ON t1.campaign_id = pub.campaign_id
		where
			t1.app = 502
			and t1.app_package = 'com.fun.lastwar.gp'
		group by
			t1.uid,
			t1.install_timestamp,
			t1.country,
			COALESCE(cg.country_group, 'other'),
			t1.mediasource,
			t1.campaign_id,
			pub.campaign_name
	)
-- where
-- 	revenue_168h > 0
-- order by
-- 	revenue_168h desc
;

In [0]:
%sql
-- 这部分代码在serverless中运行，以便于快一点。

-- 删除现有VIEW
DROP VIEW IF EXISTS lw_20250827_traindata_for_3p7_gbt_by_j;

-- 创建物化表
CREATE OR REPLACE TABLE lw_20250827_traindata_for_3p7_gbt_by_j
USING DELTA  -- 使用Delta格式获得更好性能
PARTITIONED BY (country_group)  -- 按country_group分区
AS
select
  count(uid) as users,
  country_group,
  payment_count_24h,
  ROUND(revenue_24h) as revenue_24h,
  ROUND(max_payment_24h) as max_payment_24h,
  payment_count_48h,
  ROUND(revenue_48h) as revenue_48h,
  ROUND(max_payment_48h) as max_payment_48h,
  payment_count_72h,
  ROUND(revenue_72h) as revenue_72h,
  ROUND(max_payment_72h) as max_payment_72h,
  ROUND(revenue_d7) as revenue_d7
from lw_20250820_aos_gpir_uid_revenue_view3_by_j
where install_day between 20250101 and 20250615
GROUP BY
  country_group,
  payment_count_24h,
  ROUND(revenue_24h),
  ROUND(max_payment_24h),
  payment_count_48h,
  ROUND(revenue_48h),
  ROUND(max_payment_48h),
  payment_count_72h,
  ROUND(revenue_72h),
  ROUND(max_payment_72h),
  ROUND(revenue_d7);

-- 优化表统计信息
ANALYZE TABLE lw_20250827_traindata_for_3p7_gbt_by_j COMPUTE STATISTICS;

In [0]:
%sql
select 
  users,
  country_group,
  payment_count_24h,
  revenue_24h,
  max_payment_24h,
  payment_count_48h,
  revenue_48h,
  max_payment_48h,
  payment_count_72h,
  revenue_72h,
  max_payment_72h,
  revenue_d7
from lw_20250827_traindata_for_3p7_gbt_by_j
limit 10
;

数据准备和探索

In [0]:
# 首先准备数据
sql = '''
select 
  users,
  country_group,
  payment_count_24h,
  revenue_24h,
  max_payment_24h,
  payment_count_48h,
  revenue_48h,
  max_payment_48h,
  payment_count_72h,
  revenue_72h,
  max_payment_72h,
  revenue_d7
from lw_20250827_traindata_for_3p7_gbt_by_j
where revenue_d7 is not null  -- 确保目标变量不为空
'''

df = spark.sql(sql)

# 检查数据基本信息
print("=== 数据基本信息 ===")
print(f"总行数: {df.count()}")
print(f"总列数: {len(df.columns)}")

# 查看各country_group的数据分布
print("\n=== Country Group 分布 ===")
from pyspark.sql.functions import count, avg, stddev

country_stats = df.groupBy("country_group").agg(
    count("*").alias("sample_count"),
    avg("revenue_d7").alias("avg_revenue_d7"),
    stddev("revenue_d7").alias("std_revenue_d7")
).orderBy("sample_count", ascending=False)

display(country_stats)

数据质量检查

In [0]:
# 检查空值和异常值
print("=== 数据质量检查 ===")

# 检查空值
from pyspark.sql.functions import col, count, when, isnan, isnull

null_counts = df.select([
    count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) 
    for c in df.columns
])
print("空值统计:")
display(null_counts)

# 检查基本统计信息
print("\n基本统计信息:")
display(df.describe())

# 检查每个country_group的样本数量（确保有足够数据训练）
min_samples_required = 100  # 设定最小样本数
valid_countries = df.groupBy("country_group").count().filter(col("count") >= min_samples_required)
print(f"\n样本数 >= {min_samples_required} 的country_group:")
display(valid_countries)

定义训练函数

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import time

def train_gbt_model_for_country(country_data, country_name):
    """
    为单个country_group训练GBT模型
    """
    print(f"\n{'='*50}")
    print(f"开始训练 Country Group: {country_name}")
    print(f"样本数量: {country_data.count()}")
    
    # 定义特征列
    feature_cols = [
        'payment_count_24h', 'revenue_24h', 'max_payment_24h',
        'payment_count_48h', 'revenue_48h', 'max_payment_48h', 
        'payment_count_72h', 'revenue_72h', 'max_payment_72h'
    ]
    
    # 检查特征列是否存在
    missing_cols = [col for col in feature_cols if col not in country_data.columns]
    if missing_cols:
        print(f"警告: 缺少特征列 {missing_cols}")
        return None
    
    # 数据分割
    train_data, test_data = country_data.randomSplit([0.8, 0.2], seed=42)
    
    print(f"训练集样本数: {train_data.count()}")
    print(f"测试集样本数: {test_data.count()}")
    
    # 创建特征向量
    vectorAssembler = VectorAssembler(
        inputCols=feature_cols, 
        outputCol="features"
    )
    
    # 创建GBT回归器，使用users作为权重
    gbt = GBTRegressor(
        labelCol="revenue_d7",
        featuresCol="features",
        weightCol="users",  # 使用users作为权重
        maxIter=20,
        maxDepth=5,
        seed=42
    )
    
    # 创建管道
    pipeline = Pipeline(stages=[vectorAssembler, gbt])
    
    # 训练模型
    start_time = time.time()
    model = pipeline.fit(train_data)
    training_time = time.time() - start_time
    
    print(f"训练完成，耗时: {training_time:.2f} 秒")
    
    # 预测和评估
    predictions = model.transform(test_data)
    
    # 计算评估指标
    evaluator_rmse = RegressionEvaluator(
        labelCol="revenue_d7", 
        predictionCol="prediction", 
        metricName="rmse"
    )
    
    evaluator_mae = RegressionEvaluator(
        labelCol="revenue_d7", 
        predictionCol="prediction", 
        metricName="mae"
    )
    
    evaluator_r2 = RegressionEvaluator(
        labelCol="revenue_d7", 
        predictionCol="prediction", 
        metricName="r2"
    )
    
    rmse = evaluator_rmse.evaluate(predictions)
    mae = evaluator_mae.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    
    print(f"模型评估结果:")
    print(f"  RMSE: {rmse:.4f}")
    print(f"  MAE: {mae:.4f}")
    print(f"  R²: {r2:.4f}")
    
    # 返回结果
    return {
        'country_group': country_name,
        'model': model,
        'rmse': rmse,
        'mae': mae,
        'r2': r2,
        'training_time': training_time,
        'train_samples': train_data.count(),
        'test_samples': test_data.count(),
        'predictions': predictions
    }

批量训练模型

In [0]:
# 获取所有country_group
countries = df.select("country_group").distinct().rdd.map(lambda row: row[0]).collect()
print(f"需要训练的country_group数量: {len(countries)}")
print(f"Country groups: {countries}")

# 存储所有模型结果
model_results = {}
training_summary = []

# 逐个训练模型
for i, country in enumerate(countries, 1):
    print(f"\n进度: {i}/{len(countries)}")
    
    try:
        # 过滤当前country的数据
        country_data = df.filter(col("country_group") == country)
        
        # 检查样本数量
        sample_count = country_data.count()
        if sample_count < 50:  # 设定最小样本数阈值
            print(f"跳过 {country}: 样本数太少 ({sample_count})")
            continue
        
        # 训练模型
        result = train_gbt_model_for_country(country_data, country)
        
        if result:
            model_results[country] = result
            training_summary.append({
                'country_group': country,
                'rmse': result['rmse'],
                'mae': result['mae'],
                'r2': result['r2'],
                'training_time': result['training_time'],
                'train_samples': result['train_samples'],
                'test_samples': result['test_samples']
            })
            
            print(f"✅ {country} 训练成功")
        else:
            print(f"❌ {country} 训练失败")
            
    except Exception as e:
        print(f"❌ {country} 训练出错: {str(e)}")
        continue
    
    # 每训练5个模型后显示进度摘要
    if i % 5 == 0:
        print(f"\n--- 进度摘要 (已完成 {len(model_results)}/{i}) ---")
        if training_summary:
            avg_rmse = sum([r['rmse'] for r in training_summary]) / len(training_summary)
            avg_r2 = sum([r['r2'] for r in training_summary]) / len(training_summary)
            print(f"平均 RMSE: {avg_rmse:.4f}")
            print(f"平均 R²: {avg_r2:.4f}")

print(f"\n🎉 所有训练完成! 成功训练了 {len(model_results)} 个模型")

训练结果汇总

In [0]:
# 创建训练结果汇总表
if training_summary:
    summary_df = spark.createDataFrame(training_summary)
    
    print("=== 训练结果汇总 ===")
    display(summary_df.orderBy("r2", ascending=False))
    
    # 计算整体统计
    print("\n=== 整体统计 ===")
    summary_stats = summary_df.agg(
        avg("rmse").alias("avg_rmse"),
        avg("mae").alias("avg_mae"), 
        avg("r2").alias("avg_r2"),
        avg("training_time").alias("avg_training_time")
    )
    display(summary_stats)
    
    # 找出表现最好和最差的模型
    best_model = summary_df.orderBy("r2", ascending=False).first()
    worst_model = summary_df.orderBy("r2", ascending=True).first()
    
    print(f"\n表现最好的模型: {best_model['country_group']} (R² = {best_model['r2']:.4f})")
    print(f"表现最差的模型: {worst_model['country_group']} (R² = {worst_model['r2']:.4f})")