In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import datetime
from pyspark.sql import Window
from pyspark.sql.types import *
import sys
import os
app_name = 'selection bu batch100'
# spark = SparkSession.builder.appName(app_name).enableHiveSupport().getOrCreate()
os.environ['PYSPARK_PYTHON'] = "/usr/bin/python3"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/bin/python3"

spark = (SparkSession\
         .builder\
         .appName("test")\
         .enableHiveSupport()\
         .config("spark.executor.instances", "150") \
         .config("spark.executor.memory", "32g") \
         .config("spark.executor.cores", "4") \
         .config("spark.driver.memory", "32g") \
         .config("spark.sql.shuffle.partitions", "8000") \
         .config("spark.default.parallelism", "8000") \
         .config("spark.driver.maxResultSize", "32g") \
         .config("spark.pyspark.python", "/usr/bin/python3")\
         .config("spark.sql.autoBroadcastJoinThreshold","-1")
         .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")\
         .config("spark.executorEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")\
         .config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_rmb_py36:latest")\
         .config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_rmb_py36:latest")\
         .getOrCreate())


spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

import datetime
import sys
import pandas as pd
import os
import numpy as np
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
import pyspark.sql.types as sql_type
import spa_utils
name = locals()

## 指定更新的日期

In [2]:
import datetime
import time
yesterday = (datetime.datetime.now()-datetime.timedelta(days=1)).strftime('%Y-%m-%d')
params = {
    'update_origin':'2016-10-01',
    'update_start':'2017-01-01',
    'update_end':yesterday
}
update_origin = datetime.datetime.strptime(params['update_origin'], '%Y-%m-%d')
update_start = params['update_start']
update_end = datetime.datetime.strptime(params['update_end'], '%Y-%m-%d')

percent = 0.97

# 读取成交价与红价，已经处理为(sku, date)粒度，包含全部日期
# 读取sku每天销量

In [3]:
deal = spark.sql('''
select item_sku_id as item_sku_id, after_prefr_amount,dq_pay_amount,sale_qtty, dt from %s where dt>='%s' and dt<='%s'
''' % ('app.app_pa_transactions_d_self', params['update_origin'],params['update_end']))
deal = deal.groupby('item_sku_id','dt').agg(((F.sum('after_prefr_amount')-F.sum('dq_pay_amount'))/F.sum('sale_qtty')).alias('price'),
                                        F.sum('sale_qtty').alias('sale_qtty'))
deal = deal.withColumn('price',F.when(F.col('price')<0,F.round(F.col('price'),0)).otherwise(F.col('price')))\
       .filter(F.col('price')>=0)

redprice = spark.sql('''
select sku_id as item_sku_id, max_price as scrapedprice, dt from %s where dt>='%s' and dt<='%s'
''' % ('dev.self_sku_redprice_group', params['update_origin'],params['update_end']))

# 读取sku上下柜情况
df_status = spark.sql('''
select sku_id as item_sku_id, sku_status_cd, dt from %s where sku_type=1 and dt>='%s' and dt<='%s'
''' % ('dev.self_sku_det_da', params['update_origin'],params['update_end'])).distinct()
# 只保留当天上柜的sku
df_status = df_status.filter(df_status.sku_status_cd == 3001)

df = df_status.join(redprice, ['item_sku_id', 'dt'], 'inner').join(deal,['item_sku_id','dt'],'left')
df = df.withColumn('price',F.when(F.col('price').isNull(),F.col('scrapedprice')).otherwise(F.col('price')))\
       .select('item_sku_id','dt','price','sale_qtty')
df.cache()
df.count()

1777774738

# 读取成交价计算的基线价
left join 到上面的df获得每日销量、红价，基线价的表记录数要少于df（红价为基础的表）  
会有部分有红价但没有基线价的数据

In [4]:
promo_df1 = spark.sql('''select * from app.app_pa_price_baseprice_self_deal_price_60_5 ''')
promo_df2 = df.filter(F.col('dt')>=update_start).join(promo_df1,['item_sku_id','dt'],'left')


# 再把处理后的left join到sku上柜表，该表口径是最大的
会有上柜但没有红价的情况，更会有上柜有红价但没有基线价的情况

In [5]:
c = df_status.filter(F.col('dt')>=update_start).drop('sku_status_cd').join(promo_df2,['item_sku_id','dt'],'left').fillna(0,subset=['sale_qtty'])
c.cache()
c.count()


1782084064

# 把空缺的红价和基线价在全部历史数据中先向前填充再向后填充

In [6]:
SCHEMA_OUTPUT_SKU = StructType([
    StructField("item_sku_id", sql_type.StringType()),
    StructField("sale_qtty", sql_type.IntegerType()),
    StructField("price", sql_type.FloatType()),
    StructField("baseprice", sql_type.FloatType()),
#     StructField("non_promo_flag", sql_type.IntegerType()),
    StructField("dt", sql_type.StringType())])
def format_result_sku(row):
    return (
        str(row['item_sku_id']),
        int(row['sale_qtty']),
        float(row['price']),
        float(row['baseprice']),
#         int(row['non_promo_flag']),
        str(row['dt']))


In [7]:
# def yichang(x,df):
#     df['index2'] = abs(df['index'] - xiloc[0]['index'])
#     index_list = list(df.sort_values(by=['index2','index'],ascending=[True,True]).head(6)['index'])
#     if x['']
def calculate(row):
    data = pd.DataFrame(list(row[1]), columns=['item_sku_id','dt','price','sale_qtty','baseprice'])
    data = data.sort_values('dt',ascending=True).fillna(method='pad').fillna(method='bfill').reset_index()
    
#     data2 = data.groupby('dt').apply(lambda x:yichang(x,data))    
    data2 = data[['item_sku_id','sale_qtty','price','baseprice','dt']].fillna(0) 
    return data2.to_dict(orient = 'records')

In [8]:
c_result = c.rdd.map(lambda row: ((row['item_sku_id']), row)).groupByKey()\
        .flatMap(lambda row : calculate(row))


c2 = spark.createDataFrame(c_result.map(format_result_sku), schema = SCHEMA_OUTPUT_SKU)


c2.cache()
c2.count()
# spark.sql("set hive.exec.dynamic.partition=true")
# spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
# df2.repartition('dt').write.insertInto('app.app_pa_baseline_AR_zou', overwrite=True)

1782084064

# 去掉异常，用前后三天的平均值代替
这里没有只对第4天之后的数据做处理，所以第1天的话就是与2-4天的平均值比较

In [9]:
c2 = c2.withColumn('sum_baseprice',F.sum(F.col('baseprice')).over(Window.partitionBy(F.col('item_sku_id')).orderBy(F.col('dt')).rowsBetween(-3,3)))
c2 = c2.withColumn('count_baseprice',F.count(F.col('baseprice')).over(Window.partitionBy(F.col('item_sku_id')).orderBy(F.col('dt')).rowsBetween(-3,3)))
c2 = c2.withColumn('avg_baseprice',(F.col('sum_baseprice')-F.col('baseprice'))/(F.col('count_baseprice'))-1)

c2 = c2.withColumn('baseprice2',F.when((F.col('baseprice')<0.9*F.col('avg_baseprice'))|
                                       (F.col('baseprice')>1.1*F.col('avg_baseprice')),F.col('avg_baseprice'))\
                                 .otherwise(F.col('baseprice')))

c2 = c2.withColumn('non_promo_flag',F.when(F.round(F.col('price')-F.col('baseprice2'),2)<0,0).otherwise(F.lit(1)))
c2_2 = c2.select('item_sku_id','sale_qtty', 'price', 'baseprice2','non_promo_flag', 'dt')\
         .withColumnRenamed('baseprice2','baseprice')

# join进计算基线销量所需的流量和库存状态字段并存表

In [10]:
yesterday = (datetime.datetime.now()-datetime.timedelta(days=1)).strftime('%Y-%m-%d')
insert_day = (datetime.datetime.now()-datetime.timedelta(days=4)).strftime('%Y-%m-%d')

uv = spark.sql('''SELECT
sku_id as item_sku_id,
uv,
dt
from dev.all_sku_traffic
where dt >='%s' and dt<='%s'
'''%(params['update_start'],params['update_end'])).distinct()

# 库存状态
stock = spark.sql('''select sku_id as item_sku_id,dt,stock_status from dev.dp_pl_es_ext_v2 where dt >='%s' and dt<='%s'
'''%(params['update_start'],params['update_end'])).distinct()

origin_day = spark.sql('''select max(dt) from app.app_pa_price_baseprice_self_nonpromo_flag_60_9''').collect()[0][0]

c3 = c2_2.join(uv,['item_sku_id','dt'],'left').fillna(0).join(stock,['item_sku_id','dt'],'left').fillna(1)
c3 = c3.select('item_sku_id','sale_qtty', 'price', 'baseprice','uv','stock_status' ,'non_promo_flag', 'dt')\
       .filter(F.col('dt')<=insert_day)\
       .filter(F.col('dt')>origin_day)


spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql('''set hive.exec.max.dynamic.partitions=200551''')
spark.sql('''set hive.exec.max.dynamic.partitions.pernode=200551''')
table_params = {'author':'zoushuhan'}
spa_utils.save_hive_result(c3,'app.app_pa_price_baseprice_self_nonpromo_flag_60_9',partitioning_columns=['dt'],write_mode='insert',spark=spark,params=table_params)