In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import pdb
import matplotlib.pyplot as plt
from pyspark.sql import Window
from pyspark.sql.types import DoubleType
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as sql_type
import pdb
plt.style.use('ggplot')
plt.rcParams['font.sans-serif'] = ['SimHei']
%matplotlib inline


import os
os.environ['PYSPARK_PYTHON']="/usr/local/anaconda3/bin/python3.6"

spark = (SparkSession
    .builder
    .appName("be")
    .enableHiveSupport()
    .config("spark.executor.instances", "300")
    .config("spark.executor.memory","16g")
    .config("spark.executor.cores","1")
    .config("spark.driver.cores", "8")
    .config("spark.driver.memory","8g")
    .config("spark.driver.maxResultSize", "8g")
    .config("spark.sql.broadcastTimeout", "36000")
    .config("spark.sql.shuffle.partitions","3000")
    .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
    .config("spark.executorEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
    .config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_rmb_py36:latest")\
    .config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_rmb_py36:latest")\
    .getOrCreate())

spark.sql("set hive.exec.orc.split.strategy=ETL")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.max.dynamic.partitions=3000")

DataFrame[key: string, value: string]

# 预售预约数据

In [2]:
def extract_max_dt(df_name):
    df = spark.sql("show partitions {0}".format(df_name)).toPandas()
    try:
        dt_list = df.partition.apply(lambda x: [ele for ele in x.split('/') if ele[:3] == 'dt='][0][3:])
        return sorted([ele for ele in dt_list if ele[0].isnumeric()])[-1]
    except IndexError:
        return None

In [3]:
# ============================= 预售数据 ============================= #
# 预售数据来自订单表

# 按照下单日期口径统计销量
qtty_by_order_dt = spark.sql('''
                                select 
                                    item_sku_id, 
                                    dt, 
                                    sum(sale_qtty) as qtty_by_order_dt 
                                from 
                                    dev.dev_pathway_salessource_skudt_det
                                where 
                                    dt_ord is not null 
                                    and sale_qtty between 1 and 5 
                                    and business_type = 0 
                                    and sale_ord_type_cd = 0  
                                group by 
                                    item_sku_id, 
                                    dt
                             ''')

start_dt, end_dt = '2018-01-01', extract_max_dt('dev.dev_pathway_salessource_skudt_det')

# 仅选取最后一天上柜的SKU
sku_range = spark.sql('''
                          select 
                              sku_id as item_sku_id 
                          from 
                              dev.self_sku_det_da 
                          where 
                              dt = '{0}'
                              and status = 1
                      '''.format(end_dt)
                     )


# SKU上柜日期
sku_status = spark.sql('''
                           select 
                               sku_id as item_sku_id, 
                               dt 
                           from 
                               dev.self_sku_det_da 
                           where 
                               (dt between '{start_sql}' and '{end_sql}')
                               and sku_type = 1 
                               and status = 1
                       '''.format(start_sql = start_dt, end_sql = end_dt))

# 从订单表中拿到预售状态
presale_raw_df = spark.sql('''
                                 select distinct 
                                     item_sku_id, 
                                     sale_ord_dt, 
                                     substr(check_account_tm, 1, 10) as checkout_dt, 
                                     substr(ord_flag, 44, 1) as ord_flag
                                 from 
                                     dev.all_sku_order_det
                                 where 
                                     (dt between '{0}' and '{1}') 
                                     and sku_type = 1 
                                     and sale_ord_valid_flag = 1 
                                     and sale_ord_type_cd = 0 
                           '''.format(start_dt, end_dt))

In [4]:
sku_range = sku_range.join(qtty_by_order_dt.select('item_sku_id').distinct(), 'item_sku_id', 'inner')
sku_status = sku_status.join(sku_range, 'item_sku_id', 'inner')
presale_raw_df = presale_raw_df.join(sku_range,'item_sku_id','left_semi') \
                               .filter(F.col('ord_flag') != 0)

In [5]:
presale_flag = presale_raw_df.filter(F.col('ord_flag') == 1) \
                             .select('item_sku_id', 'sale_ord_dt') \
                             .withColumnRenamed('sale_ord_dt', 'dt') \
                             .withColumn('presale_flag', F.lit(1))

presale_pay_flag = presale_raw_df.filter(F.col('ord_flag') == 1) \
                                 .select('item_sku_id', 'checkout_dt') \
                                 .withColumnRenamed('checkout_dt', 'dt') \
                                 .withColumn('presale_pay_flag', F.lit(1))

In [6]:
presale_df = sku_status.join(qtty_by_order_dt, ['item_sku_id','dt'], 'left') \
                       .join(presale_flag.distinct(), ['item_sku_id','dt'], 'left') \
                       .join(presale_pay_flag.distinct(), ['item_sku_id','dt'], 'left').cache()

In [7]:
presale_df.count()

8222330

In [8]:
# ============================= 预售数据 ============================= #
# 预约数据来自fdm底层表

start_date = '2017-12-15'
last_date = str(datetime.now() + timedelta(days=-1))[:10]
date_list = list(pd.date_range(start_date, last_date).astype(str))
date_df = spark.createDataFrame(pd.DataFrame(date_list, columns=['dt']))

sku_cid = spark.sql('''
                        select 
                            sku_id as item_sku_id, 
                            cid1, 
                            cid2, 
                            cid3, 
                            brand_code 
                        from 
                            dev.self_sku_det_da 
                        where 
                            dt = '{0}' and 
                            (cid1 = 737) 
                            and sku_status_cd = 3001 and sku_type =1
                    '''.format(last_date)
                    )

# 获取预约起始时间和预约付款（抢购）起始时间
booking_range = spark.sql('''
                            select skuid as item_sku_id, 
                                substr(start_time, 1, 10) as start_time, 
                                substr(end_time, 1, 10) as end_time, 
                                substr(panicbuying_start_time, 1, 10) as panicbuying_start_time,
                                substr(panicbuying_end_time, 1, 10) as panicbuying_end_time 
                            from 
                                fdm.fdm_pre_sell_schema_pre_sell_info_chain 
                            where 
                                dp = 'ACTIVE'
                                and substr(end_time,1,10) >= '{}'
                           '''.format(start_date))

In [9]:
booking_range = booking_range.join(sku_cid.select('item_sku_id'), 'item_sku_id','inner')
booking_flag = booking_range.join(date_df.select('dt'),
                                  on=date_df.dt.between(booking_range.start_time, booking_range.end_time)
                                 ).select('item_sku_id','dt').distinct()
booking_flag = booking_flag.withColumn('booking_flag',F.lit(1)) \
                           .withColumn('booking_pay_flag',F.lit(0))
booking_pay_flag = booking_range.join(date_df.select('dt'),
                                      on=date_df.dt.between(booking_range.panicbuying_start_time, 
                                                            booking_range.panicbuying_end_time)
                                     ).select('item_sku_id','dt').distinct()
booking_pay_flag = booking_pay_flag.withColumn('bookinging_flag',F.lit(0)) \
                                   .withColumn('bookinging_pay_flag',F.lit(1))

In [10]:
booking_df = booking_flag.unionAll(booking_pay_flag) \
                       .groupby(['item_sku_id', 'dt']).agg(F.sum(F.col('booking_flag')).alias('booking_flag'),
                                                           F.sum(F.col('booking_pay_flag')).alias('booking_pay_flag')).cache()

In [11]:
booking_df.count()

865646

In [12]:
presale_df = presale_df.fillna(0, subset=['presale_flag', 'presale_pay_flag'])
presale_booking_df = presale_df.join(booking_df, on=['item_sku_id', 'dt'], how='left') \
                               .fillna(0, subset=['booking_flag', 'booking_pay_flag'])

In [13]:
presale_booking_df.select('item_sku_id', 'qtty_by_order_dt', 'presale_flag', 'presale_pay_flag',
                          'booking_flag', 'booking_pay_flag', 'dt') \
                  .repartition('dt').write.insertInto('app.cid2_794_daily_sale', overwrite=True)

# 建模数据

In [15]:
def extract_max_dt(df_name):
    df = spark.sql("show partitions {0}".format(df_name)).toPandas()
    try:
        dt_list = df.partition.apply(lambda x: [ele for ele in x.split('/') if ele[:3] == 'dt='][0][3:])
        return sorted([ele for ele in dt_list if ele[0].isnumeric()])[-1]
    except IndexError:
        return None

In [16]:
# sale qtty data
sales_df = spark.sql('''
                         select
                             item_sku_id,
                             qtty_by_order_dt as sale_qtty,
                             booking_flag,
                             booking_pay_flag,
                             presale_flag,
                             presale_pay_flag,
                             1 as on_shelf,
                             0 as is_padded,
                             dt as date
                         from
                             app.cid2_794_daily_sale
                     ''')

# instant promo data
instant_df = spark.sql('''
                             select
                                 item_sku_id,
                                 1 as instant_flag,
                                 1 as expose_flag,
                                 cast(instant_hour as int) as instant_hour,
                                 instant_price,
                                 dt as date
                             from
                                 app.jingpin_instant_price_uv_clean_test
                             where
                                 instant_channel = 2
                                 and uv >= 50
                         ''')

# red price data
price_df = spark.sql('''
                         select 
                             sku_id as item_sku_id,
                             max_time_price as redprice,
                             dt as date
                         from
                             dev.self_sku_redprice_group
                     ''')

# nominal netprice
nominal_price_df = spark.sql('''
                                 select
                                     sku_id as item_sku_id,
                                     dsj as nominal_netprice,
                                     dt as date
                                 from
                                     dev.dev_jingpin_sku_chan_dt_cc_stais_sh
                                 where
                                     channel = 0
                             ''')

# transaction data
trans_df = spark.sql('''
                         select
                             item_sku_id,
                             sum(after_prefr_amount - dq_pay_amount) * 1.0 / sum(sale_qtty) as netprice,
                             dt as date
                         from
                             app.app_pa_transactions_d_self
                         where
                             dt >= '2018-01-01'
                         group by 
                             item_sku_id, dt
                     ''')

max_dt = extract_max_dt('app.cid2_794_daily_sale')
# sku-brand-cid3
sku_info = spark.sql('''
                         select
                             sku_id as item_sku_id,
                             brand_code,
                             cid3
                         from
                             dev.self_sku_det_da
                         where
                             dt = '{0}'
                             and cid1 = 737
                             and cid2 = 794
                             and item_valid_flag = 1 
                             and sku_valid_flag = 1 
                     '''.format(max_dt))

In [17]:
instant_df = instant_df.withColumn('instant_hour', 24 - F.col('instant_hour'))
instant_df_nextday = instant_df.where((F.col('instant_flag') == 1) & (F.col('instant_hour') != 24)) \
                               .withColumn('date', F.date_add(F.col('date'), 1)) \
                               .withColumn('instant_hour_nextday', 24 - F.col('instant_hour')) \
                               .withColumnRenamed('instant_price', 'instant_price_nextday') \
                               .select('item_sku_id', 'instant_hour_nextday', 'instant_price_nextday', 'date')

instant_df = instant_df.join(instant_df_nextday, on=['item_sku_id', 'date'], how='outer') \
                       .withColumn('instant_hour', F.when(F.isnull(F.col('instant_hour_nextday')), 
                                                          F.col('instant_hour')).otherwise(F.col('instant_hour_nextday'))) \
                       .withColumn('instant_flag', F.when(F.col('instant_hour') > 0, 1).otherwise(0)) \
                       .withColumn('instant_price', F.when(F.isnull(F.col('instant_hour_nextday')), 
                                                           F.col('instant_price')).otherwise(F.col('instant_price_nextday'))) \
                       .fillna({'expose_flag': 0}) \
                       .select('item_sku_id', 'instant_flag', 'expose_flag', 'instant_hour', 'instant_price', 'date')

In [18]:
df = sales_df.join(sku_info, on='item_sku_id', how='inner') \
             .join(instant_df, on=['item_sku_id', 'date'], how='left') \
             .join(price_df, on=['item_sku_id', 'date'], how='left') \
             .join(nominal_price_df, on=['item_sku_id', 'date'], how='left') \
             .join(trans_df, on=['item_sku_id', 'date'], how='left') \
             .fillna({'instant_flag': 0, 'expose_flag': 0, 'instant_hour': 0, 
                      'booking_pay_flag': 0, 'presale_flag': 0, 'presale_pay_flag': 0}) \
             .withColumn('instant_price', 
                         F.when(F.isnull(F.col('instant_price')), F.col('redprice')).otherwise(F.col('instant_price'))) \
             .withColumn('nominal_netprice',
                         F.when(F.isnull(F.col('nominal_netprice')), F.col('redprice')).otherwise(F.col('nominal_netprice'))) \
             .withColumn('netprice',
                         F.when(F.isnull(F.col('netprice')), F.col('redprice')).otherwise(F.col('netprice'))) 

In [19]:
data = df.toPandas()

In [20]:
# fill off-shelf dates
sales_date = data.groupby('item_sku_id').apply(lambda x:
                                               pd.DataFrame(pd.date_range(x.date.min(), x.date.max()).astype(str),
                                                            columns=['date'])).reset_index(level=0)
data = data.merge(sales_date, on=['item_sku_id', 'date'], how='right')
data.fillna({'booking_flag': 0, 'booking_pay_flag': 0, 'presale_flag': 0, 'presale_pay_flag': 0, 
             'on_shelf': 0, 'is_padded': 0, 'instant_flag': 0, 'expose_flag': 0, 'instant_hour': 0}, inplace=True)

# transform instant_hour into one-hot
# data['instant_hour'] = data.instant_hour.astype(int).astype(str).str.zfill(2)
# data = pd.concat([data, pd.get_dummies(data.instant_hour, prefix='instant_hour', drop_first=True)], axis=1)

In [21]:
def fillna_forward_backward(series):
    return series.fillna(method='ffill').fillna(method='bfill')

# fillna for each sku
data = data.sort_values(['item_sku_id', 'date']).reset_index()
cols_to_fill = ['brand_code', 'cid3', 'redprice']
filled_df = data.groupby('item_sku_id').apply(lambda x: 
                        pd.concat([fillna_forward_backward(x[col]) for col in cols_to_fill], axis=1))
data = pd.concat([data.drop(cols_to_fill, axis=1), filled_df], axis=1)
data['instant_price'] = data.instant_price.fillna(data.redprice)
data['nominal_netprice'] = data.nominal_netprice.fillna(data.redprice)
data['netprice'] = data.netprice.fillna(data.redprice)
data = data[
    ['item_sku_id', 'date', 'sale_qtty', 'booking_flag', 'booking_pay_flag', 'presale_flag', 'presale_pay_flag', 
     'on_shelf', 'is_padded', 'brand_code', 'cid3', 'instant_flag', 'expose_flag', 'instant_hour', 'instant_price', 
     'redprice', 'nominal_netprice', 'netprice']]

In [22]:
data.to_csv('cid2_794_sales.csv', index=False)