In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
from pyspark.sql import functions

In [2]:
##读取入库单明细
good_details=spark.read.csv("file:///Users/lijianwei/Desktop/明细.csv",header="true",inferSchema="true")
##获取需要的字段
columns_list=['客户代码','货物状态','交货方式','仓库名字','创建时间','宽CM','长CM','高CM','预报数量','收货数量']
good_details=good_details.select(columns_list)

In [3]:
good_details.show(5)

+--------+--------+--------+--------+--------------+----+-----+----+--------+--------+
|客户代码|货物状态|交货方式|仓库名字|      创建时间|宽CM| 长CM|高CM|预报数量|收货数量|
+--------+--------+--------+--------+--------------+----+-----+----+--------+--------+
|    G666|    在途|海运整柜|美东仓库|2020/4/3 11:43|68.0|102.0|12.0|       1|       0|
|    G666|    在途|海运整柜|美东仓库|2020/4/3 11:43|68.0|102.0|12.0|       1|       0|
|    G666|    在途|海运整柜|美东仓库|2020/4/3 11:43|68.0|102.0|12.0|       1|       0|
|    G666|    在途|海运整柜|美东仓库|2020/4/3 11:43|68.0|102.0|12.0|       1|       0|
|    G666|    在途|海运整柜|美东仓库|2020/4/3 11:43|68.0|102.0|12.0|       1|       0|
+--------+--------+--------+--------+--------------+----+-----+----+--------+--------+
only showing top 5 rows



In [4]:
#在途体积
def onway_volume(width,heigh,length,predict_number,get_number):
    return width*heigh*length*(predict_number-get_number)/1000000

##更改日期格式
def changeformat(create_time):
    return datetime.date(datetime.strptime(create_time,'%Y/%m/%d %H:%M')).strftime('%Y-%m-%d')

In [5]:
##创建udf
udf_onway_volume=udf(onway_volume,DoubleType())
udf_changeformat=udf(changeformat,StringType())

In [6]:
#添加"在途体积"，及更改"创建时间"格式
good_details=good_details.withColumn('在途体积',udf_onway_volume(good_details["宽CM"],good_details["长CM"],good_details["高CM"],good_details["预报数量"],good_details["收货数量"]))
good_details=good_details.withColumn('创建时间',udf_changeformat(good_details["创建时间"]))

In [8]:
way_time=spark.read.csv('file:///Users/lijianwei/Desktop/way_time.txt',header='true',sep='\t',inferSchema='true')

In [9]:
way_time.show(5)

+--------+--------+----+
|    仓库|货运方式|时效|
+--------+--------+----+
|法国仓库|海运散货|  51|
|捷克仓库|海运散货|  46|
|美东仓库|海运散货|  53|
|美南仓库|海运散货|  43|
|美西仓库|海运散货|  36|
+--------+--------+----+
only showing top 5 rows



In [10]:
#将两表进行连接，获取时效
cond=[good_details["仓库名字"]==way_time["仓库"],good_details["交货方式"]==way_time["货运方式"]]
good_details=good_details.join(way_time,cond,"left")

In [17]:
good_details.count()

884011

In [12]:
#过滤已收货数据，及无用数据
good_details=good_details.where((good_details["货物状态"] !="其他")|(good_details["时效"] !="") )

In [13]:
#计算预测到达日期
good_details=good_details.withColumn("预测到达日期",functions.expr("date_add(`创建时间`,`时效`)"))

In [19]:
good_details.show()

+--------+--------+--------+--------+----------+----+-----+----+--------+--------+---------+--------+--------+----+------------+
|客户代码|货物状态|交货方式|仓库名字|  创建时间|宽CM| 长CM|高CM|预报数量|收货数量| 在途体积|    仓库|货运方式|时效|预测到达日期|
+--------+--------+--------+--------+----------+----+-----+----+--------+--------+---------+--------+--------+----+------------+
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-24|
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-24|
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-24|
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-24|
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-24|
|    G666|    在途|海运整柜|美东仓库|2020-04-03|68.0|102.0|12.0|       1|       0| 0.083232|美东仓库|海运整柜|  51|  2020-05-

In [20]:
##获取5月到货数据
good_details=good_details.where(functions.month(good_details["预测到达日期"])==5)

In [25]:
#创建视图
good_details.createOrReplaceTempView("details")

In [30]:
#选取需要的字段，创建新视图details2
spark.sql("""select `客户代码`,`仓库名字`,
          sum(`在途体积`) as `客户体积` 
          from details 
          group by `仓库名字`,`客户代码`""").createOrReplaceTempView("details2")

In [44]:
spark.sql("""select `客户代码`,`仓库名字`,`客户体积`,
          row_number() over(partition by `仓库名字` order by `客户体积` desc) a ,
          sum(`客户体积`) over(distribute by `仓库名字`) as `仓库总体积` 
          from details2 """).createOrReplaceTempView("details3")

In [53]:
#计算各客户货物占比总仓库货物的百分比
spark.sql("select `客户代码`,`仓库名字`,`客户体积`,`客户体积`/`仓库总体积` as `百分比`  from details3").createOrReplaceTempView('details4')

In [56]:
#通过窗口函数，计算出客户累加占比
data=spark.sql("select `客户代码`,`仓库名字`,`客户体积`,`百分比`,sum(`百分比`) over(distribute by `仓库名字` sort by `百分比` desc) as `占比`  from details4")

In [57]:
#获取占比前80%的客户
data=data.where(data["占比"]<0.8)

In [58]:
#将数据导出
data.write.format('csv').save("file:///opt/software/data/cangku")