## 初始化变量（运行模式、日期、日期区间天数等）

In [2]:
import pandas as pd
import time
import arrow

# 
#####################################################################################
# debug 模式会减少数据量
DEBUG = False
# 计算的截至日期
base_dt = "20170423"
# 截至日期向前推多少天
days = 14
# 计算过程中间临时文件存放的hdfs目录位置（文件名按城市自动拼接）
hdfs_dump_base_path = "/user/lujin"
# spark的并行度
PARALLEL = 512
#####################################################################################

print "MODE ->", DEBUG

base_date = arrow.get(base_dt, "YYYYMMDD")
offset = -days + 1
base_start = arrow.get(base_dt, "YYYYMMDD").to("Asia/Shanghai").floor('day').timestamp
base_end = arrow.get(base_dt, "YYYYMMDD").to("Asia/Shanghai").ceil('day').timestamp
print "base_dt", base_dt, "base_start", base_start, "base_end", base_end


DATES = pd.date_range(base_date.replace(days=offset).format("YYYY-MM-DD"), base_date.format("YYYY-MM-DD"))
DATES_STR = ','.join(map(lambda t: t.strftime("%Y%m%d") , DATES))
print "SIZE:", len(DATES), "range:", DATES
print DATES_STR
dates_start = arrow.get(DATES[0].strftime("%Y-%m-%d"), "YYYY-MM-DD").to("Asia/Shanghai").floor('day').timestamp
dates_end = arrow.get(DATES[-1].strftime("%Y-%m-%d"), "YYYY-MM-DD").to("Asia/Shanghai").ceil('day').timestamp
print "DATES[0]", DATES[0], "DATES[-1]", DATES[-1], "dates_start", dates_start, "dates_end", dates_end


CITY = 'bj'
print "city:", CITY

order_dump_path = "{hdfs_dump_base_path}/order_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "order dump:", order_dump_path

dispatch_info_dump_path = "{hdfs_dump_base_path}/dispatch_info_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "dispatch info dump:", dispatch_info_dump_path

dispatch_dump_path = "{hdfs_dump_base_path}/dispatch_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "dispatch merged dump:", dispatch_dump_path

bidding_dump_path = "{hdfs_dump_base_path}/bidding_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "bidding merged dump:", bidding_dump_path

order_allInOne_path = "{hdfs_dump_base_path}/order_all_in_one_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "order+dispatch dump:", order_allInOne_path

order_allInOne_with_supply_path = "{hdfs_dump_base_path}/order_all_in_one_supply_{dt}_{n}days.parquet".format(dt=base_dt, n=days, hdfs_dump_base_path=hdfs_dump_base_path)
print "order_merged + supply dump:", order_allInOne_with_supply_path

MODE -> False
base_dt 20170423 base_start 1492876800 base_end 1492963199
SIZE: 14 range: DatetimeIndex(['2017-04-10', '2017-04-11', '2017-04-12', '2017-04-13',
               '2017-04-14', '2017-04-15', '2017-04-16', '2017-04-17',
               '2017-04-18', '2017-04-19', '2017-04-20', '2017-04-21',
               '2017-04-22', '2017-04-23'],
              dtype='datetime64[ns]', freq='D')
20170410,20170411,20170412,20170413,20170414,20170415,20170416,20170417,20170418,20170419,20170420,20170421,20170422,20170423
DATES[0] 2017-04-10 00:00:00 DATES[-1] 2017-04-23 00:00:00 dates_start 1491753600 dates_end 1492963199
city: bj
order dump: /user/lujin/order_20170423_14days.parquet
dispatch info dump: /user/lujin/dispatch_info_20170423_14days.parquet
dispatch merged dump: /user/lujin/dispatch_20170423_14days.parquet
bidding merged dump: /user/lujin/bidding_20170423_14days.parquet
order+dispatch dump: /user/lujin/order_all_in_one_20170423_14days.parquet
order_merged + supply dump: /user/luji

# 构造spark session

In [3]:
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import asc, desc
from pyspark.sql import Row

hdfs_python_lib_path = "hdfs:///libs/pyspark"

yarn_dist_files = [
    "pyspark.zip",
    "py4j-0.10.3-src.zip",
]

spark = SparkSession.builder \
        .master("yarn-client") \
        .appName("orderBase_bidding_clean") \
        .config("spark.ui.port", "4200") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.instances", "7") \
        .config("spark.executor.cores", "4") \
        .config("spark.executor.memory", "8g") \
        .config("spark.default.parallelism", str(PARALLEL)) \
        .config("spark.sql.shuffle.partitions", str(PARALLEL)) \
        .config("spark.sql.crossJoin.enabled", "true") \
        .config("spark.yarn.dist.files", ','.join(map(lambda x: hdfs_python_lib_path+"/"+x, yarn_dist_files))) \
        .config("spark.yarn.appMasterEnv.PYTHONPATH", ':'.join(yarn_dist_files)) \
        .config("spark.yarn.appMasterEnv.PYSPARK_PYTHON", "/usr/local/bin/python") \
        .config("spark.executorEnv.PYTHONPATH", ':'.join(yarn_dist_files)) \
        .config("spark.executorEnv.PYSPARK_PYTHON", "/usr/local/bin/python") \
        .enableHiveSupport() \
        .getOrCreate()

spark.sparkContext.addPyFile("hdfs:///libs/pyspark/Geohash.zip")        

spark.sql("use ods").show()
spark.sql("show tables").show()

++
||
++
++

+--------------------+-----------+
|           tableName|isTemporary|
+--------------------+-----------+
|      bidding_access|      false|
|     dispatch_detail|      false|
|       dispatch_info|      false|
|driver_api_access...|      false|
|         order_track|      false|
|   personal_dispatch|      false|
|        pre_dispatch|      false|
|       service_order|      false|
|   service_order_ext|      false|
|     system_dispatch|      false|
+--------------------+-----------+



# step 1. 订单数据源 

## 1-1. 订单源-service_order（仅终结状态订单）

In [None]:
if DEBUG:
    where = " where dt={dt} and city='bj' and status in (7,8) and create_time between {start} and {end}".format(dt=base_dt, start=base_start, end=base_end)
else:
    where = "where dt in ({dates}) and city='{city}' and status in (7,8) and create_time between {start} and {end}".format(dates=DATES_STR, city=CITY, start=dates_start, end=dates_end)

sql = """
select 
dt,
city,  -- 这是下单所在城市，和dispatch的司机所在城市可能匹配不上
service_order_id,
product_type_id,
is_asap,
flag as order_flag,
account_id,
user_id,
driver_id,
corporate_id,
car_type_id,
car_type,
car_brand,
status as order_status,
rc_status,
reason_id as order_reason_id,
create_time as order_create_time,
arrival_time,
confirm_time,
start_time,
end_time,
start_position,
start_address,
end_position,
end_address,
expect_start_latitude,
expect_start_longitude,
expect_end_latitude,
expect_end_longitude,
start_latitude,
start_longitude,
end_latitude,
end_longitude,
coupon_name,
coupon_facevalue,
dependable_distance, --调整后里程
actual_time_length --调整后时长
from service_order {where}
""".format(where=where)

# 执行sparkSQL
orderDF = spark.sql(sql)

# 创建临时表并cache
orderDF.createOrReplaceTempView("v_order")
spark.catalog.cacheTable("v_order")

print "original:", orderDF.count()

# 按时间戳去重（partition的分割不同日期可能会有重复的）
orderDF = orderDF.orderBy("service_order_id", desc("update_time")).dropDuplicates(['service_order_id']).drop('update_time')
print "dropDuplicates:", orderDF.count()


orderDF.limit(5).toPandas().head()



## 1-2. 订单扩展源-service_order_ext

In [None]:
if DEBUG:
    where = " where a.dt={dt} and a.service_order_id in (select service_order_id from v_order)".format(dt=base_dt)
else:
    where = "where a.dt in ({dates}) and a.service_order_id in (select service_order_id from v_order)".format(dates=DATES_STR)

sql = """
select 
a.service_order_id,
a.update_time,
a.user_type,
a.predict_amount,
a.deadhead_distance,
b.distance as estimate_distance,
b.estimate_price
from service_order_ext a
lateral view json_tuple(a.estimate_snap, 'time_length', 'distance', 'estimate_price') b as time_length, distance, estimate_price
{where}
""".format(where=where)

print sql

# 执行sparkSQL
orderExtDF = spark.sql(sql)

# 创建临时表并cache
orderExtDF.createOrReplaceTempView("v_order_ext")
spark.catalog.cacheTable("v_order_ext")

print "original:", orderExtDF.count()

# 按时间戳去重（partition的分割不同日期可能会有重复的）
orderExtDF = orderExtDF.orderBy("service_order_id", desc("update_time")).dropDuplicates(['service_order_id']).drop('update_time')
print "dropDuplicates:", orderExtDF.count()

orderExtDF.limit(5).toPandas().head()

## 1-3 合并订单数据 （service_order left join service_order_ext）

In [None]:
sql = """
select 
a.*,
b.service_order_id as service_order_id_ext,
b.user_type,
b.predict_amount,
b.deadhead_distance,
b.estimate_distance,
b.estimate_price
from v_order a left join v_order_ext b 
on a.service_order_id = b.service_order_id
"""

# 执行sparkSQL
orderMergeDF = spark.sql(sql).drop('service_order_id_ext')

# 创建临时表并cache
orderMergeDF.createOrReplaceTempView("v_order_all")
spark.catalog.cacheTable("v_order_all")

print "orginal:", orderMergeDF.count()

orderMergeDF = orderMergeDF.dropDuplicates(['service_order_id'])
print "dropDuplicates:", orderMergeDF.count()


print orderMergeDF.count()
orderMergeDF.limit(5).toPandas().head() 

In [None]:
# 导出为parquet文件
orderMergeDF.filter("estimate_distance is not null").write.mode('overwrite').parquet(order_dump_path)

In [None]:
# 导出确认
spark.sql("select * from `parquet`.`{infile}`".format(infile=order_dump_path)).limit(5).toPandas().head()

In [None]:
# 清理spark的缓存
spark.catalog.clearCache()

## step 2. 派单源

## 2-1 dispatch_info 

In [None]:
if DEBUG:
    where = " where dt={dt} and c.city='bj'".format(dt=base_dt)
else:
    where = "where dt in ({dates}) and c.city='{city}' ".format(dates=DATES_STR, city=CITY)

# 这个在detail才对，这里是错的，因为info的日志是按round记录（每round记录最后1个batch的累计量）
# row_number() over (distribute by service_order_id sort by round, batch) as acc_batch, 
sql = """
select 
c.city,
a.service_order_id,
a.round,
a.batch,
ROW_NUMBER() over (distribute by service_order_id sort by round, batch) as acc_batch,
unix_timestamp(a.datetime,'yyyy-MM-dd HH:mm:ss') as datetime,
a.dispatch_time,
a.decision_time,
a.create_time,
a.update_time,
a.status,
a.bidding_id,
a.dispatch_type,
a.decision_type,
a.dispatch_count,
a.response_count,
a.accept_count,
a.flag,
a.user_level,
a.user_gender,
a.can_dispatch_count,
a.decision_driver_id,
a.decision_car_type_id,
b.driver_bidding_rate,
b.user_bidding_rate,
b.driver_estimate_price,
c.code,
c.isRushHour,
c.remark,
d.assign_max_range, 
d.batch_driver_count, 
d.batch_interval, 
d.contribution_pct, 
d.distanct_pct,
d.evaluation_pct,
d.max_accept_count,
d.max_driver_count,
d.max_range
from dispatch_info a
lateral view json_tuple(a.add_price_info, 'dsb', 'sb', 'dep') b as driver_bidding_rate, user_bidding_rate, driver_estimate_price
lateral view json_tuple(a.template_snapshot, 'city', 'code', 'dispatchParams', 'isRushHour', 'remark') c as city, code, dispatchParams, isRushHour,remark
lateral view json_tuple(c.dispatchParams, 'aSSIGN_MAX_RANGE', 'bATCH_DRIVER_COUNT', 'bATCH_INTERVAL', 'cONTRIBUTION','dISTANCE','eVALUATION','mAX_ACCEPT_COUNT','mAX_DRIVER_COUNT','mAX_RANGE') d as assign_max_range, batch_driver_count, batch_interval, contribution_pct, distanct_pct,evaluation_pct,max_accept_count,max_driver_count,max_range
{where}
""".format(where=where)

# 执行sparkSQL
spark.sql(sql).createOrReplaceTempView("v_dispatch_info")
spark.catalog.cacheTable("v_dispatch_info")

print "original:", spark.sql("select * from v_dispatch_info").count()
spark.sql("select * from v_dispatch_info").limit(5).toPandas().head()



### dispatch_info cleansing  （按订单获取统计量：首轮响应、首轮撮合成功、末轮）

In [None]:
# 派单基准
sql = """
select
city,
service_order_id,
bidding_id,
acc_batch as sum_batch,
create_time as dispatch_create_time,
status as dispatch_status,
flag as dispatch_flag,
-- add_price_redispatch,
dispatch_type,
decision_type,
user_level,
user_gender,
isRushHour,
remark,
batch_driver_count, 
batch_interval, 
contribution_pct, 
distanct_pct,
evaluation_pct,
max_range
from v_dispatch_info
"""
baseDF = spark.sql(sql).orderBy("service_order_id", desc("decision_time")).dropDuplicates(['service_order_id']).drop('decision_time')
print "base:", baseDF.count()
# print baseDF.limit(3).show()

# 订单累计值
sql = """
select 
service_order_id,
max(round) as max_round,
round(avg(can_dispatch_count), 0) as avg_can_dispatch,   --平均每轮可派司机数
sum(can_dispatch_count) as sum_can_dispatch,             --累计可派司机数
round(avg(dispatch_count), 0) as avg_dispatch_count,
sum(dispatch_count) as sum_dispatch,
round(avg(response_count), 0) as avg_response_count,
sum(response_count) as sum_response,
sum(accept_count) as sum_accept
from v_dispatch_info
group by service_order_id
"""
totalDF = spark.sql(sql)
# print totalDF.limit(3).show()

outDF = baseDF.join(totalDF, baseDF.service_order_id == totalDF.service_order_id, 'left').drop(totalDF.service_order_id)
# print "merge:",outDF.count()
# outDF.limit(10).toPandas().head()


# 首轮司机响应（但用户不一定决策成功）
sql = """
select
service_order_id,
round as first_accept_round,
acc_batch as first_accept_acc_batch,
decision_time as first_accept_time,
driver_bidding_rate as first_accept_driver_bidding_rate,
user_bidding_rate as first_accept_user_bidding_rate,
driver_estimate_price as first_accept_driver_estimate_price
from v_dispatch_info
where accept_count>0 
"""
# @Deprecated 未决策成功的加价倍率没有记录，改为后面从dispatch_detail取
firstAcceptDF = spark.sql(sql).orderBy("service_order_id", asc("first_accept_time")).dropDuplicates(['service_order_id'])
outDF = outDF.join(firstAcceptDF, outDF.service_order_id == firstAcceptDF.service_order_id, 'left').drop(firstAcceptDF.service_order_id)
# print "merge:",outDF.count()
# print outDF.filter("first_accept_time is not null").limit(10).toPandas().head()


# 首轮决策成功
sql = """
select
service_order_id,
round as first_decision_round,
acc_batch as first_decision_acc_batch,
decision_time as first_decision_time,
decision_driver_id as first_decision_driver_id,
decision_car_type_id as first_decision_car_type_id, 
driver_bidding_rate as first_decision_driver_bidding_rate,
user_bidding_rate as first_decision_user_bidding_rate,
driver_estimate_price as first_decision_driver_estimate_price
from v_dispatch_info
where decision_driver_id>0
"""

# @Deprecated 未决策成功的加价倍率没有记录，改为后面从dispatch_detail取

firstDecisionDF = spark.sql(sql).orderBy("service_order_id", asc("first_decision_time")).dropDuplicates(['service_order_id'])
outDF = outDF.join(firstDecisionDF, outDF.service_order_id == firstDecisionDF.service_order_id, 'left').drop(firstDecisionDF.service_order_id)
# print "merge:",outDF.count()
# print outDF.filter("first_decision_driver_id is not null").limit(10).toPandas().head()


# # 最后1轮
sql = """
select 
service_order_id,
round as final_round,
case when decision_time>0 then decision_time else datetime end as final_decision_time,
decision_driver_id as final_decision_driver_id,
decision_car_type_id as final_decision_car_type_id,
status as final_dispatch_status,
driver_bidding_rate as final_driver_bidding_rate,
user_bidding_rate as final_user_bidding_rate,
driver_estimate_price as final_driver_estimate_price
from v_dispatch_info
"""

# @Deprecated 未决策成功的加价倍率没有记录，改为后面从dispatch_detail取

finalDF = spark.sql(sql).orderBy("service_order_id", desc("final_decision_time")).dropDuplicates(['service_order_id'])
outDF = outDF.join(finalDF, outDF.service_order_id == finalDF.service_order_id, 'left').drop(finalDF.service_order_id)

print "merge:",outDF.count()
outDF.filter("final_decision_driver_id>0").limit(10).toPandas().head()

In [None]:
# 导出为parquet文件
outDF.repartition(PARALLEL).write.mode('overwrite').parquet(dispatch_info_dump_path)

print "dump:", dispatch_info_dump_path

In [None]:
# 确认导出结果
spark.sql("select * from `parquet`.`{path}`".format(path=dispatch_info_dump_path)).limit(10).toPandas().head()

In [None]:
# 清理spark的缓存
spark.catalog.clearCache()

## 2-2 dispatch detail 

In [None]:
if DEBUG:
    where = " where dt={dt} and city='bj'".format(dt=base_dt)
else:
    where = "where dt in ({dates}) and city='{city}' ".format(dates=DATES_STR, city=CITY)

sql = """
select
unix_timestamp(a.datetime,'yyyy-MM-dd HH:mm:ss') as datetime,
a.service_order_id,
a.round,
a.batch,
a.driver_id,
a.dispatch_time,
a.response_time,
a.accept_status,
a.driver_bidding_rate,
a.driver_estimate_price,
a.decision_time,
a.decision_result,
a.is_assigned,
a.route_distance,
a.route_time_length,
a.distance,
a.distance_time_length
-- b.car_type_id,
-- b.brand,
-- b.driver_level
from dispatch_detail a
-- lateral view json_tuple(a.dispatch_snapshot, 'car_type_id', 'brand', 'driver_level', 'distance_rate', 'contribution_rate', 'evaluation_rate', 'base_score', 'evaluation', 'contribution') b as car_type_id, brand, driver_level, distance_rate, contribution_rate, evaluation_rate, base_score, evaluation, contribution
{where}
""".format(where=where)

# print sql

# 执行sparkSQL
spark.sql(sql).createOrReplaceTempView("v_dispatch_detail")
spark.catalog.cacheTable("v_dispatch_detail")

# 结果核对
print "original:", spark.sql("select * from v_dispatch_detail").count()
spark.sql("select * from v_dispatch_detail").limit(5).toPandas().head()


### dispatch_detail cleansing （按订单获取统计量：首轮响应、首轮撮合成功、末轮）

In [None]:
# 按batch聚合每一batch的派单情况，形成和dispath_info类似的格式

sql = """
select 
service_order_id, 
round, 
batch,
max(datetime) as datetime,
ROW_NUMBER() over (distribute by service_order_id sort by round, batch) as acc_batch,
count(driver_id) as dispatch_count,
max(dispatch_time) as dispatch_time,
-- 本来响应司机数，时间用最大时间近似
sum(case when accept_status=1 then 1 else 0 end) as accept_count,  
max(response_time) as response_time,
-- 本轮有决策成功的，时间用最大时间近似
max(case when decision_result=2 then 1 else 0 end) as decision_complete, 
max(decision_time) as decision_time, 
-- 加价相关
min(driver_bidding_rate) as min_bidding_rate,
--mean(driver_bidding_rate) as avg_bidding_rate,
max(driver_bidding_rate) as max_bidding_rate,
min(driver_estimate_price) as min_price,
--mean(driver_estimate_price) as avg_price,
max(driver_estimate_price) as max_price,
-- 距离及时间相关
min(route_distance) as min_route_distance,
mean(route_distance) as avg_route_distance,
max(route_distance) as max_route_distance,
min(route_time_length) as min_route_time_length,
mean(route_time_length) as avg_route_time_length,
max(route_time_length) as max_route_time_length,
min(distance) as min_distance,
mean(distance) as avg_distance,
max(distance) as max_distance,
min(distance_time_length) as min_distance_time_length,
mean(distance_time_length) as avg_distance_time_length,
max(distance_time_length) as max_distance_time_length
from v_dispatch_detail 
group by service_order_id, round, batch
""".format()

# 执行sparkSQL
spark.sql(sql).createOrReplaceTempView("v_dispatch_detail_grouped")
spark.catalog.cacheTable("v_dispatch_detail_grouped")

# 释放原始表的缓存，只保留groupby之后的即可
spark.catalog.uncacheTable("v_dispatch_detail")

print "service_order_id in dispatch_detail:", spark.sql("select * from v_dispatch_detail_grouped").count()

spark.sql("select * from v_dispatch_detail_grouped").limit(5).toPandas().head()

### 以 2-1 计算的dispatch_info为基准，做连接dispatch_detail统计的加价相关数据

##### dispatch_info 在 步骤 2-1计算完成后写入了parquet文件，现在从文件中读取出来

In [None]:
# 读取dispatch_info，按订单聚合的结果

dispatchInfoDF = spark.sql("select * from `parquet`.`{path}`".format(path=dispatch_info_dump_path))
print "dispatchInfoDF:", dispatchInfoDF.count()

# print dispatchInfoDF.printSchema()


In [None]:
# 首轮司机响应（但用户不一定决策成功）
sql = """
select
service_order_id,
round as first_accept_round_ext,
acc_batch as first_accept_acc_batch_ext,
response_time as first_accept_time_ext,  
-- 加价相关
min_bidding_rate as first_accept_min_bidding_rate,
max_bidding_rate as first_accept_max_bidding_rate,
min_price as first_accept_min_price,
max_price as first_accept_max_price,
-- 距离及时间相关
min_route_distance as first_accept_min_route_distance,
max_route_distance as first_accept_max_route_distance,
min_route_time_length as first_accept_min_route_time_length,
avg_route_time_length as first_accept_avg_route_time_length,
max_route_time_length as first_accept_max_route_time_length,
min_distance as first_accept_min_distance,
avg_distance as first_accept_avg_distance,
max_distance as first_accept_max_distance,
min_distance_time_length as first_accept_min_distance_time_length,
avg_distance_time_length as first_accept_avg_distance_time_length,
max_distance_time_length as first_accept_max_distance_time_length
from v_dispatch_detail_grouped
where accept_count>0 
"""

firstAcceptDF = spark.sql(sql).orderBy("service_order_id", asc("first_accept_time_ext")).dropDuplicates(['service_order_id'])
print "firstAcceptDF uniq:", firstAcceptDF.count()
outDF = dispatchInfoDF.join(firstAcceptDF, dispatchInfoDF.service_order_id == firstAcceptDF.service_order_id, 'left').drop(firstAcceptDF.service_order_id)
print "merge:",outDF.count()
print "first_accept_time_ext is not null", outDF.filter("first_accept_time_ext is not null").count()


# 首轮决策成功
sql = """
select
service_order_id,
round as first_decision_round_ext,
acc_batch as first_decision_acc_batch_ext,
decision_time as first_decision_time_ext,  
-- 加价相关
min_bidding_rate as first_decision_min_bidding_rate,
max_bidding_rate as first_decision_max_bidding_rate,
min_price as first_decision_min_price,
max_price as first_decision_max_price,
-- 距离及时间相关
min_route_distance as first_decision_min_route_distance,
avg_route_distance as first_decision_avg_route_distance,
max_route_distance as first_decision_max_route_distance,
min_route_time_length as first_decision_min_route_time_length,
avg_route_time_length as first_decision_avg_route_time_length,
max_route_time_length as first_decision_max_route_time_length,
min_distance as first_decision_min_distance,
avg_distance as first_decision_avg_distance,
max_distance as first_decision_max_distance,
min_distance_time_length as first_decision_min_distance_time_length,
avg_distance_time_length as first_decision_avg_distance_time_length,
max_distance_time_length as first_decision_max_distance_time_length
from v_dispatch_detail_grouped
where decision_complete>0
"""

# @Deprecated 未决策成功的加价倍率没有记录，改为后面从dispatch_detail取

firstDecisionDF = spark.sql(sql) \
                    .orderBy("service_order_id", asc("first_decision_time_ext")) \
                    .dropDuplicates(['service_order_id'])
print "firstDecisionDF uniq:", firstDecisionDF.count()
outDF = outDF.join(firstDecisionDF, outDF.service_order_id == firstDecisionDF.service_order_id, 'left') \
                .drop(firstDecisionDF.service_order_id)
print "merge:",outDF.count()
print "first_decision_time_ext is not null", outDF.filter("first_decision_time_ext is not null").count()


# # 最后1轮
sql = """
select 
service_order_id,
round as final_round_ext,
acc_batch as final_acc_batch_ext,
datetime as final_time_ext,  
-- 加价相关
min_bidding_rate as final_min_bidding_rate,
max_bidding_rate as final_max_bidding_rate,
min_price as final_min_price,
max_price as final_max_price,
-- 距离及时间相关
min_route_distance as final_min_route_distance,
avg_route_distance as final_avg_route_distance,
max_route_distance as final_max_route_distance,
min_route_time_length as final_min_route_time_length,
avg_route_time_length as final_avg_route_time_length,
max_route_time_length as final_max_route_time_length,
min_distance as final_min_distance,
avg_distance as final_avg_distance,
max_distance as final_max_distance,
min_distance_time_length as final_min_distance_time_length,
avg_distance_time_length as final_avg_distance_time_length,
max_distance_time_length as final_max_distance_time_length
from v_dispatch_detail_grouped
"""

# @Deprecated 未决策成功的加价倍率没有记录，改为后面从dispatch_detail取

finalDF = spark.sql(sql).orderBy("service_order_id", desc("final_time_ext")).dropDuplicates(['service_order_id'])
print "finalDF uniq:", finalDF.count()
outDF = outDF.join(finalDF, outDF.service_order_id == finalDF.service_order_id, 'left').drop(finalDF.service_order_id)
print "merge:",outDF.count()
print "final_time_ext is not null", outDF.filter("final_time_ext is not null").count()

outDF.filter("final_decision_driver_id>0").limit(10).toPandas().head()

In [None]:
print dispatch_dump_path

# 导出为parquet文件
outDF.repartition(PARALLEL).write.mode('overwrite').parquet(dispatch_dump_path)

spark.sql("select * from `parquet`.`{path}`".format(path=dispatch_dump_path)).count()

In [None]:
# 清理spark的缓存
spark.catalog.clearCache()

##  step 3. 加价源

## 3-1 加价前-pre_dispatch

In [None]:
# 从dispatch_info提取加价id，存为临时表
if DEBUG:
    dep_where = "where dt={dt} and city='bj'".format(dt=base_dt)
else:
    dep_where = "where dt in ({dates}) and city='{city}' ".format(dates=DATES_STR, city=CITY)


# 根据bidding_id提取pre_dispatch记录
if DEBUG:
    where = "where dt={dt}".format(dt=base_dt)
else:
    where = "where dt in ({dates}) ".format(dates=DATES_STR)
# 按bidding_id过滤
where += " and bidding_id in (select distinct bidding_id from ods.dispatch_info {dep_where})".format(dep_where=dep_where)

sql = """
select
unix_timestamp(a.datetime,'yyyy-MM-dd_HH:mm:ss') as datetime,
a.bidding_id,
b.productTypeId as pre_productTypeId,
b.estimatePrice as pre_estimatePrice,
b.availableDriverNum as pre_availableDriverNum,
b.M as pre_M
from (
    select
    datetime,
    bidding_id,
    regexp_replace(data, '\\\\\\\\\"', '"') as data
    from pre_dispatch
    {where}
) a
lateral view json_tuple(a.data, 'productTypeId', 'estimatePrice', 'availableDriverNum', 'M', 'M1', 'M2', 'M3', 'a', 'b', 'c', 'T', 'N', 'L') b as productTypeId, estimatePrice, availableDriverNum, M, M1, M2, M3, a, b, c, T, N, L
""".format(where=where)

bidding_base_df = spark.sql(sql).dropDuplicates(['bidding_id'])
# bidding_base_df.cache()
print  bidding_base_df.count()

spark.sql(sql).limit(5).toPandas().head(5)

## 3-2 系统决策加价-system_dispatch

In [None]:
# 从dispatch_info提取加价id，存为临时表
if DEBUG:
    dep_where = "where dt={dt} and city='bj' and decision_type=1".format(dt=base_dt)
else:
    dep_where = "where dt in ({dates}) and city='{city}'and decision_type=1".format(dates=DATES_STR, city=CITY)


# 根据bidding_id提取pre_dispatch记录
if DEBUG:
    where = "where dt={dt}".format(dt=base_dt)
else:
    where = "where dt in ({dates}) ".format(dates=DATES_STR)
# 按bidding_id过滤
where += " and bidding_id in (select distinct bidding_id from ods.dispatch_info {dep_where})".format(dep_where=dep_where)


sql = """
select
a.bidding_id,
a.round as end_bidding_round,
b.totalMagnification as end_min_bidding_rate,
b.totalMagnification as end_max_bidding_rate
--(b.totalMagnification * (1 - b.commission_rate)) as end_min_bidding_rate,
--(b.totalMagnification * (1 - b.commission_rate)) as end_max_bidding_rate
from (
    select
    datetime,
    order_id,
    bidding_id,
    round,
    regexp_replace(data, '\\\\\\\\\"', '"') as json
    from system_dispatch
    {where} and order_id is not null
) a
lateral view json_tuple(a.json, 'startBiddingRound', 'commission_rate', 'totalMagnification', 'M', 'N', 'L', 'O', 'D') b as startBiddingRound, commission_rate, totalMagnification, M, N, L, O, D
""".format(where=where)

# print sql
# print sql

sys_bidding_df = spark.sql(sql).orderBy("bidding_id", desc('end_bidding_round')).dropDuplicates(['bidding_id'])
# sys_bidding_df.cache()
print sys_bidding_df.count()

spark.sql(sql).filter("bidding_id=6409357301886353828").orderBy("bidding_id", desc('end_bidding_round')).dropDuplicates(['bidding_id']).limit(5).toPandas().head(5)

## 3-3 人工计策加价-personal_dispatch

In [None]:
# 从dispatch_info提取加价id，存为临时表
if DEBUG:
    dep_where = "where dt={dt} and city='bj' and decision_type=2".format(dt=base_dt)
else:
    dep_where = "where dt in ({dates}) and city='{city}'and decision_type=2".format(dates=DATES_STR, city=CITY)


# 根据bidding_id提取pre_dispatch记录
if DEBUG:
    where = "where dt={dt}".format(dt=base_dt)
else:
    where = "where dt in ({dates}) ".format(dates=DATES_STR)
# 按bidding_id过滤
where += " and bidding_id in (select distinct bidding_id from ods.dispatch_info {dep_where})".format(dep_where=dep_where)

import json
def extract_driver_bidding_rate(row):
    commission_rate = row.commission_rate
    try:
        dirvers = json.loads(row.driverData)
        bidding_rates = [val['totalMagnification'] for val in dirvers.values()]
    except:
        bidding_rates = [0.0]
    return Row(bidding_id=row.bidding_id,
              end_bidding_round=row.end_bidding_round,
              end_min_bidding_rate=min(bidding_rates),
              end_max_bidding_rate=max(bidding_rates))

sql = """
select
a.bidding_id,
a.round as end_bidding_round,
b.commission_rate,
b.driverData
from (
    select
    datetime,
    order_id,
    bidding_id,
    round,
    regexp_replace(data, '\\\\\\\\\"', '"') as json
    from personal_dispatch
    {where} and order_id is not null
) a
lateral view json_tuple(a.json, 'commission_rate', 'estimateDist', 'M', 'O', 'D', 'driverData') b as commission_rate, estimateDist, M, O, D, driverData
""".format(where=where)

# print sql
# print sql

personal_bidding_df = spark.sql(sql) \
                        .orderBy("bidding_id", desc('end_bidding_round')) \
                        .dropDuplicates(['bidding_id']) \
                        .rdd.map(extract_driver_bidding_rate).toDF()

# personal_bidding_df.cache()
print personal_bidding_df.count()

personal_bidding_df.toPandas().head(5)

## 加价倍率join （以pre_dispatch中的订单为基准，left join 系统决策加价 和 人工决策加价）

In [None]:
print "sys_bidding_df:", sys_bidding_df.count()
print "personal_bidding_df:", personal_bidding_df.count()

bidding_df = sys_bidding_df.union(personal_bidding_df)
print "bidding_df:", bidding_df.count()

print "bidding_base_df:", bidding_base_df.count()
outDF = bidding_base_df.join(bidding_df, bidding_base_df.bidding_id == bidding_df.bidding_id, 'left') \
                    .drop(bidding_df.bidding_id)
# outDF.cache()
# bidding_base_df.unpersist()
# sys_bidding_df.unpersist()
# personal_bidding_df.unpersist()
# bidding_df.unpersist()

print "add sys_biding and personal_biding:", outDF.count()
print "valid bidding count:", outDF.filter("end_max_bidding_rate > 0.0").count()
print "zero bidding count:", outDF.filter("end_max_bidding_rate=0 or end_max_bidding_rate is null").count()

outDF.limit(3).toPandas().head()

In [None]:
print bidding_dump_path

# 导出为parquet文件
outDF.repartition(PARALLEL).write.mode('overwrite').parquet(bidding_dump_path)

print spark.sql("select * from `parquet`.`{path}`".format(path=bidding_dump_path)).count()

# outDF.unpersist()

## Step 4. 基础数据merge （订单 left join 派单、加价）

### 4-1 从文件中加载预处理过的订单、派单、加价数据

In [4]:
dispatchDF = spark.sql("select * from `parquet`.`{path}`".format(path=dispatch_dump_path))
print "dispatchInfoDF:", dispatchDF.count()

orderDF = spark.sql("select * from `parquet`.`{path}`".format(path=order_dump_path))
print "orderDF:", orderDF.count()

biddingDF = spark.sql("select * from `parquet`.`{path}`".format(path=bidding_dump_path))
print "biddingDF:", biddingDF.count()

dispatchInfoDF: 1694829
orderDF: 1589863
biddingDF: 1753761


### 4-2 以订单为基 left joint 派单、加价

此步骤容易OOM，若失败请重试

In [5]:
# 先释放资源
spark.catalog.clearCache()

allInOneDF = orderDF.join(dispatchDF, orderDF.service_order_id == dispatchDF.service_order_id, 'left') \
                    .drop(dispatchDF.service_order_id) \
                    .drop(dispatchDF.city)
# print "order + dispatch:", allInOneDF.count()

allInOneDF = allInOneDF.join(biddingDF, allInOneDF.bidding_id == biddingDF.bidding_id, 'left') \
                    .drop(biddingDF.bidding_id)
# print "order + bidding", allInOneDF.count()


allInOneDF.createOrReplaceTempView("v_allInOneDF")
# allInOneDF.printSchema()

### 4-3 过滤逻辑上存在矛盾的订单

In [6]:
from pyspark.sql.functions import asc, desc, expr

# 过滤加价日志和派单明细对不上（加价日志缺失）
df = spark.sql("select * from v_allInOneDF where not (final_max_bidding_rate>0 and end_bidding_round is null)")

# 决策类型改为从flag提取
df = df.withColumn("decision_type_flag", expr("""
case 
    when (dispatch_flag & 1024) = 1024 then 'system_decision'
    when (dispatch_flag & 512) = 512 then 'manual_decision'
    else 'unkown'
end
"""))

# .createOrReplaceTempView("v_training")

# spark.catalog.cacheTable("v_training")

# spark.catalog.uncacheTable("v_miss_bidding")
# print "orders after filter:", spark.sql("select * from v_training").count()

# spark.sql("select dt, count(1) from v_training group by dt order by dt").show(20)

### 4-4 导出中间结果至parquet

In [7]:
print order_allInOne_path

# 导出为parquet文件
df.repartition(PARALLEL).write.mode('overwrite').parquet(order_allInOne_path)

spark.sql("select count(1) from `parquet`.`{path}`".format(path=order_allInOne_path)).show()

/user/lujin/order_all_in_one_20170423_14days.parquet
+--------+
|count(1)|
+--------+
| 1289903|
+--------+



In [8]:
spark.catalog.clearCache()

## step 5. 离线计算高阶特征


### 5-1 供需数据

需求：group by (期望上车点映射5位geohash + 下单时间按5分钟分箱) -> count(service_order_id)

相对供给：group by (期望上车点映射5位geohash + 下单时间按5分钟分箱) -> sum(avg_can_dispatch) / count(service_order_id)

In [10]:
# 从parquet加载计算源
sql ="""
select 
service_order_id,
order_create_time,
expect_start_latitude,
expect_start_longitude,
pre_availableDriverNum
from `parquet`.`{path}`
""".format(path=order_allInOne_path)

supply_demand_df = spark.sql(sql)

print supply_demand_df.count()

supply_demand_df.limit(5).show(truncate=False)

# 部分预约单没有可派司机数？
print supply_demand_df.filter("pre_availableDriverNum > 0").count()

1289903
+-------------------+-----------------+---------------------+----------------------+----------------------+
|service_order_id   |order_create_time|expect_start_latitude|expect_start_longitude|pre_availableDriverNum|
+-------------------+-----------------+---------------------+----------------------+----------------------+
|6407619001781635345|1491890056       |39.873821            |116.367985            |151                   |
|6408477982352736592|1492090053       |39.964474753926      |116.45419409935       |104                   |
|6409239905257511846|1492267452       |40.099064            |116.542989            |33                    |
|6410298323350028924|1492513884       |39.998682            |116.489786            |null                  |
|6411736892188327215|1492848827       |39.893406            |116.469003            |null                  |
+-------------------+-----------------+---------------------+----------------------+----------------------+

887710


### 5-1-1 新增key （时间+空间）

In [12]:
import Geohash
import datetime
import time

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def create_geo_time_key(timestamp, lat, lng):
    # geohash映射
    geohash = Geohash.encode(lat, lng, precision=5)
    # 时间round至5分钟整数（floor方式）
    tm = datetime.datetime.utcfromtimestamp(timestamp)
    tm = tm - datetime.timedelta(minutes=tm.minute % 5, seconds=tm.second, microseconds=tm.microsecond)
    ts_5mins = str(time.mktime(tm.timetuple()))
    # 组合key (时间+空间)
    key = geohash + "_" + ts_5mins[:-2]
    return key
    #return Row(service_order_id=service_order_id, ts=order_create_time, lat=expect_start_latitude, lng=expect_start_longitude, avg_can_dispatch=avg_can_dispatch, key=key)

geo_time_key_udf = udf(create_geo_time_key, StringType())
    

supply_demand_df.withColumn("geo_time_key", geo_time_key_udf(supply_demand_df['order_create_time'], supply_demand_df['expect_start_latitude'], supply_demand_df['expect_start_longitude'])).createOrReplaceTempView("v_geo_time")

spark.sql("select * from v_geo_time").limit(3).toPandas().head()
    
    

Unnamed: 0,service_order_id,order_create_time,expect_start_latitude,expect_start_longitude,pre_availableDriverNum,geo_time_key
0,6407619001781635345,1491890056,39.873821,116.367985,151,wx4fb_1491861000
1,6408477982352736592,1492090053,39.964475,116.454194,104,wx4g3_1492061100
2,6409239905257511846,1492267452,40.099064,116.542989,33,wx4uh_1492238400


### 5-1-2 按时间+空间，聚合数据下单数量和周边司机数量

In [14]:
sql = """
select 
a.service_order_id,
b.abs_demand,
b.relative_supply
from v_geo_time a left join (
    select
    geo_time_key,
    count(1) as abs_demand,
    mean(pre_availableDriverNum) / (count(1) + 1) as relative_supply
    from v_geo_time
    group by geo_time_key
) b
on a.geo_time_key = b.geo_time_key
"""

supply_demand_df = spark.sql(sql)

print supply_demand_df.count()

supply_demand_df.limit(5).show()

1289903
+-------------------+----------+------------------+
|   service_order_id|abs_demand|   relative_supply|
+-------------------+----------+------------------+
|6409807271155392452|         1|               6.0|
|6408007898860058120|         2|              null|
|6408007537411718059|         2|              null|
|6409477777729598208|         1|              null|
|6411287273915704821|         2|10.333333333333334|
+-------------------+----------+------------------+



### 5-2 用户加价承受度简单画像
1. 总体加价接受率：sum(加价且成单) / sum(加价订单)，float，建议分箱
2. 平均成功加价倍率：avg(加价且成单的倍率)
3. 25%成功加价倍率（近似下界）：percentile_approx(0.25)
4. 50%成功加价倍率（近似均值）：percentile_approx(0.25)
5. 75%成功加价倍率（近似上界）：percentile_approx(0.25)
6. 拒绝所有加价flag：1 -> 拒绝所有加价, 0 -> 接受过任意加价，2 -> 无记录

In [15]:
sql ="""
select 
a.user_id,
a.user_order_count,
a.user_bidding_count,
a.user_bidding_ok_count,
a.user_bidding_accept_flag,
case when a.user_bidding_accept_pct is null then -1.0 else a.user_bidding_accept_pct end as user_bidding_accept_pct,
case when b.user_accept_avg_bidding_rate is null then -1.0 else b.user_accept_avg_bidding_rate end as user_accept_avg_bidding_rate,
case when b.user_accept_25pct_bidding_rate is null then -1.0 else b.user_accept_25pct_bidding_rate end as user_accept_25pct_bidding_rate,
case when b.user_accept_50pct_bidding_rate is null then -1.0 else b.user_accept_50pct_bidding_rate end as user_accept_50pct_bidding_rate,
case when b.user_accept_75pct_bidding_rate is null then -1.0 else b.user_accept_75pct_bidding_rate end as user_accept_75pct_bidding_rate
from (
    -- 用户总体订单情况
    select 
    user_id, 
    sum(user_order_num) as user_order_count,
    sum(is_bidding) as user_bidding_count,
    sum(is_bidding_ok) as user_bidding_ok_count,
    sum(is_bidding_ok) / sum(is_bidding) as user_bidding_accept_pct, -- 用户接受了的加价订单的占比
    case
        when sum(is_bidding)=0 then -1 -- 没有加价记录
        when sum(is_bidding)>0 and sum(is_bidding_ok)=0 then 0 -- 拒绝了所有加价
        when sum(is_bidding)>0 and sum(is_bidding_ok)>0 and sum(is_bidding)>sum(is_bidding_ok) then 1 -- 接受了部分加价
        when sum(is_bidding)>0 and sum(is_bidding_ok)>0 and sum(is_bidding)=sum(is_bidding_ok) then 2 -- 接受了所有加价
        else -2 -- 未知的case
    end as user_bidding_accept_flag
    from (
        select 
        user_id, 
        1 as user_order_num,
        case when final_driver_bidding_rate>0 or final_max_bidding_rate>0 then 1 else 0 end as is_bidding,
        case when final_decision_driver_id>0 and final_driver_bidding_rate>0 then 1 else 0 end as is_bidding_ok
        from `parquet`.`{path}`
    ) tmp_a
    group by user_id
) a 
left join (
    -- 用户加价成功的倍率分布
    select 
    user_id,
    avg(final_driver_bidding_rate) as user_accept_avg_bidding_rate,  -- 接受的平均加价倍率
    percentile_approx(final_driver_bidding_rate, 0.25) as user_accept_25pct_bidding_rate, -- 25%接受加价倍率
    percentile_approx(final_driver_bidding_rate, 0.50) as user_accept_50pct_bidding_rate, -- 50%（中位数）接受加价倍率
    percentile_approx(final_driver_bidding_rate, 0.75) as user_accept_75pct_bidding_rate -- 75% 接受加价倍率
    from (
        select 
        user_id, 
        cast(final_driver_bidding_rate as float) as final_driver_bidding_rate
        from `parquet`.`{path}`
        where final_driver_bidding_rate>0 and final_decision_driver_id>0 -- 加价且撮合成功的订单
    ) tmp_b
    group by user_id
) b on a.user_id=b.user_id
""".format(path=order_allInOne_path)

# print sql

user_profile_df = spark.sql(sql)

print "user_count: ", spark.sql("select count(1) from `parquet`.`{path}` group by user_id".format(path=order_allInOne_path)).count()

print "merged count: ", user_profile_df.count()

user_profile_df.limit(5).toPandas().head()

user_count:  165062
merged count:  165062


Unnamed: 0,user_id,user_order_count,user_bidding_count,user_bidding_ok_count,user_bidding_accept_flag,user_bidding_accept_pct,user_accept_avg_bidding_rate,user_accept_25pct_bidding_rate,user_accept_50pct_bidding_rate,user_accept_75pct_bidding_rate
0,62809,3,0,0,-1,-1.0,-1.0,-1.0,-1.0,-1.0
1,198084,6,1,0,0,0.0,-1.0,-1.0,-1.0,-1.0
2,206344,4,3,2,1,0.666667,0.65,0.6,0.6,0.65
3,209755,10,5,1,1,0.2,0.2,0.2,0.2,0.2
4,277315,8,4,4,2,1.0,0.275,0.1,0.1,0.3


## 5-3 地块加价成功比率
>  按5位geohash映射，做区域内统计
1. 区域内订单量（ ~ 是否热门区域）
2. 区域内加价订单量
3. 区域内加价成单量
1. 区域内总体加价占比：count(加价订单) / count(order_id)2. 区域内加价成单占比：count(加价且成单) / count(加价)

In [16]:
sql ="""
select 
service_order_id,
expect_start_latitude,
expect_start_longitude,
-- 提取星期几
case date_format(cast(start_time as timestamp), 'EEEE')
    WHEN 'Monday' THEN 1
    WHEN 'Tuesday' THEN 2
    WHEN 'Wednesday' THEN 3
    WHEN 'Thursday' THEN 4
    WHEN 'Friday' THEN 5
    WHEN 'Saturday' THEN 6
    WHEN 'Sunday' THEN 7
    ELSE null
END as day_of_week,
-- 提取小时
cast(date_format(cast(start_time as timestamp), 'HH') as int) as hour_of_day,
1 as order_num,
case when final_driver_bidding_rate>0 or final_max_bidding_rate>0 then 1 else 0 end as is_bidding,
case when final_decision_driver_id>0 and final_driver_bidding_rate>0 then 1 else 0 end as is_bidding_ok
from `parquet`.`{path}`
""".format(path=order_allInOne_path)

# print sql

geo_stat_df = spark.sql(sql)

print geo_stat_df.count()

import Geohash
import datetime
import time

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

geohash5_udf = udf(lambda lat, lng: Geohash.encode(lat, lng, precision=5), StringType())
    

geo_stat_df.withColumn("expect_geohash", geohash5_udf(geo_stat_df['expect_start_latitude'], geo_stat_df['expect_start_longitude'])).createOrReplaceTempView("v_geo")

spark.sql("select * from v_geo").limit(3).toPandas().head()

1289903


Unnamed: 0,service_order_id,expect_start_latitude,expect_start_longitude,day_of_week,hour_of_day,order_num,is_bidding,is_bidding_ok,expect_geohash
0,6407619001781635345,39.873821,116.367985,2,13,1,0,0,wx4fb
1,6408477982352736592,39.964475,116.454194,4,21,1,1,0,wx4g3
2,6409239905257511846,40.099064,116.542989,6,23,1,1,1,wx4uh


### 按geohash 5位聚合区域内的订单量、加价订单、加价成单

TODO：按星期、小时映射为map，再做细化的连接 (目前数据稀疏，暂不这么弄)

In [None]:
sql = """
select 
expect_geohash,
geo_order_count,
geo_bidding_count,
geo_bidding_ok_count,
case when geo_bidding_pct is null then -1 else geo_bidding_pct end  as geo_bidding_pct,
case when geo_bidding_ok_pct is null then -1 else geo_bidding_ok_pct end as geo_bidding_ok_pct
from (
    select
    expect_geohash,
    sum(order_num) as geo_order_count,
    sum(is_bidding) as geo_bidding_count,
    sum(is_bidding_ok) as geo_bidding_ok_count,
    sum(is_bidding) / sum(order_num) as geo_bidding_pct,
    sum(is_bidding_ok) / sum(is_bidding) as geo_bidding_ok_pct
    from v_geo
    group by expect_geohash
) tmp
"""

geo_stat_df = spark.sql(sql)

geo_stat_df.limit(5).show(truncate=False)

+--------------+---------------+-----------------+--------------------+-------------------+-------------------+
|expect_geohash|geo_order_count|geo_bidding_count|geo_bidding_ok_count|geo_bidding_pct    |geo_bidding_ok_pct |
+--------------+---------------+-----------------+--------------------+-------------------+-------------------+
|wx4dv         |10375          |1923             |598                 |0.18534939759036145|0.3109724388975559 |
|wx49s         |1              |0                |0                   |0.0                |-1.0               |
|wx4u9         |356            |33               |17                  |0.09269662921348315|0.5151515151515151 |
|wx4gd         |38966          |8042             |2659                |0.2063850536365036 |0.33063914449142007|
|wx49q         |122            |15               |3                   |0.12295081967213115|0.2                |
+--------------+---------------+-----------------+--------------------+-------------------+-------------

## 5-4  计算出的特征合并至订单基准数据


In [None]:
orderDF = spark.sql("select * from `parquet`.`{path}`".format(path=order_allInOne_path))

print "order:", orderDF.count()

# 供需数据
dumpDF = orderDF.join(supply_demand_df, orderDF.service_order_id == supply_demand_df.service_order_id, "left").drop(supply_demand_df.service_order_id)
print "left join SupplyDemand:", dumpDF.count()

# 用户画像数据
dumpDF = dumpDF.join(user_profile_df, dumpDF.user_id == user_profile_df.user_id, "left").drop(user_profile_df.user_id)
print "left join UserProfile:", dumpDF.count()

# 上车位置统计指标
dumpDF = dumpDF.withColumn("expect_geohash", geohash5_udf(dumpDF['expect_start_latitude'], dumpDF['expect_start_longitude']))
dumpDF = dumpDF.join(geo_stat_df, dumpDF.expect_geohash == geo_stat_df.expect_geohash, "left").drop(geo_stat_df.expect_geohash)
print "left join GeoStatistics:", dumpDF.count()

dumpDF = dumpDF.filter("avg_can_dispatch>=0 and abs_demand>=0 and relative_supply>=0")
print "filter no match:", dumpDF.count()

dumpDF.createOrReplaceTempView("v_training")


order: 1289903
left join SupplyDemand: 1289903
left join UserProfile: 1289903
left join GeoStatistics: 1289903
filter no match:

In [None]:
spark.sql("select * from v_training").limit(3).toPandas().head()

# 5-4 导出清洗完成的数据到hive

In [None]:
# 最终加价倍率（来自加价日志，用户不一定接受）
bidding_rate_gt0 = "end_max_bidding_rate>0"
bidding_rate_eq0 = "(end_max_bidding_rate=0 or end_max_bidding_rate is null)"

# 最终决策情况
decision_ok = "final_decision_driver_id>0"
decision_failed = "(final_decision_driver_id=0 or final_decision_driver_id is null)"

# 是否有司机接单
accept_ok = "first_accept_time>0"
accept_failed = "(first_accept_time>0 is null or first_accept_time=0)"

# 最终决策成功的加价倍率
bidding_decision_ok = "final_driver_bidding_rate>0"

spark.sql("drop table if exists tmp.bidding_training")

sql = """
create table tmp.bidding_training stored as orc as
select 
*,
case 
    -- 决策成功，取决策倍率
    when final_decision_driver_id >0 then final_driver_bidding_rate
    -- 决策不成功，触发了加价, 取加价系统末轮倍率
    when {decision_failed} and final_max_bidding_rate>0 then end_max_bidding_rate
    -- 决策不成功，触发了加价，但该加价倍率没有派发（用户看到就关掉了）
    when {decision_failed} and end_max_bidding_rate>0 then end_max_bidding_rate
    else 0
end as bidding_rate_adjust, -- 末轮加价倍率，根据决策情况区分
case 
    when {decision_ok} and {bidding_decision_ok} then 'bidding_decision_ok'
    when {bidding_rate_gt0} and {decision_failed} then 'bidding_decision_failed'
    when {bidding_rate_eq0} and {decision_ok} then 'no_bidding_decision_ok'
    when {bidding_rate_eq0} and {decision_failed} then 'no_bidding_decision_failed'
    else 'unkown'
end as label
from v_training
""".format(decision_ok=decision_ok, decision_failed=decision_failed, bidding_decision_ok=bidding_decision_ok,
           bidding_rate_gt0=bidding_rate_gt0, bidding_rate_eq0=bidding_rate_eq0)

# print sql

spark.sql(sql)

print "dump: ", spark.sql("select service_order_id from tmp.bidding_training").count()

print "dump complete"

In [32]:
spark.catalog.clearCache()