In [38]:
# 导包
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from datetime import datetime,timedelta
import requests 
import json   

StatementMeta(aasparkpool001, 359, 36, Finished, Available)

In [39]:
"""todo1: 读取数据"""
reimburse_detail_path = 'abfss://data-warehouse-dwd@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dwd_fi_te_reimburse_exp_detail.csv'
reimburse_header_path = 'abfss://data-warehouse-dwd@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dwd_fi_te_reimburse_header.parquet'
travel_header_path = 'abfss://data-warehouse-dwd@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dwd_fi_te_travel_app_header_cn.parquet'
travel_item_path = 'abfss://data-warehouse-dwd@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dwd_fi_te_travel_app_item_cn.csv'
air_ticket_path = 'abfss://data-warehouse-dwd@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dwd_fi_te_air_ticket_cn.csv'
aviation_code_path = 'abfss://data-warehouse-dim@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dim_aviation_city_code_mf.csv'


StatementMeta(aasparkpool001, 359, 37, Finished, Available)

In [41]:
"""todo2: 报销维度"""

df1 = spark.read.csv(reimburse_detail_path,header=True,inferSchema=True)
df2 = spark.read.parquet(reimburse_header_path,header=True)

# 费用明细类型 金额聚合 
df1 = df1.select('reimbursement_key'
                , 'expense_details_type'
                ,'expense_details_checked_amount') \
        .groupby('reimbursement_key', 'expense_details_type') \
        .agg(
            F.sum('expense_details_checked_amount').alias('expense_details_checked_amount')
            ) 

# 实际结束时间
df2 = df2.select(
        'TA_key'
        ,'actual_start_date_time'
        ,'actual_end_date_time'
        ,'reimbursement_key'
        ,'charge_to_cost_center'
        ) \
        .withColumn('actual_start_date_time',F.date_format(F.col('actual_start_date_time').cast('timestamp'),'yyyyMMdd')) \
        .withColumn('actual_end_date_time',F.date_format(F.col('actual_end_date_time').cast('timestamp'),'yyyyMMdd')) \
        .withColumn('actual_start_date',F.to_date(F.first('actual_start_date_time').over(Window.partitionBy('TA_key').orderBy(F.col('actual_start_date_time'))),'yyyyMMdd')) \
        .withColumn('actual_end_date',F.to_date(F.first('actual_end_date_time').over(Window.partitionBy('TA_key').orderBy(F.col('actual_end_date_time').desc())),'yyyyMMdd')) \
        .withColumn('actual_duration_days',F.abs(F.datediff('actual_start_date','actual_end_date')) +1) \
        .withColumn('actual_duration_nights',F.abs(F.datediff('actual_start_date','actual_end_date'))) \
        .withColumnRenamed('charge_to_cost_center','charged_to_cost_center_in_reimbursement') \
        .drop('actual_start_date_time','actual_end_date_time') 

# 报销明细
df_rst = df2.join(df1,on='reimbursement_key',how='left') \
            .select('TA_key'
                ,'actual_start_date'
                ,'actual_end_date'
                ,'actual_duration_days'
                ,'actual_duration_nights'
                ,'reimbursement_key'
                ,'charged_to_cost_center_in_reimbursement'
                ,'expense_details_type'
                ,'expense_details_checked_amount') \
                .withColumn('reimbursement_key',F.concat_ws('/',F.array_distinct(F.collect_list('reimbursement_key').over(Window.partitionBy('TA_key'))))) \
                .withColumn('multiple_reimbursement', F.when(F.col('reimbursement_key').contains('/'), 'Yes').otherwise('No')) \
                .withColumn('expense_details_checked_amount',F.sum('expense_details_checked_amount').over(Window.partitionBy('TA_key','expense_details_type'))) \
                .dropDuplicates()
display(df_rst)
print(df_rst)


StatementMeta(aasparkpool001, 359, 39, Finished, Available)

SynapseWidget(Synapse.DataFrame, a7de699a-bbc0-4650-b43c-9c8b2a81e7f1)

DataFrame[TA_key: string, actual_start_date: date, actual_end_date: date, actual_duration_days: int, actual_duration_nights: int, reimbursement_key: string, charged_to_cost_center_in_reimbursement: string, expense_details_type: string, expense_details_checked_amount: double, multiple_reimbursement: string]


In [42]:
"""todo3: 差旅维度"""
df5 = spark.read.csv(travel_item_path,header=True)

df_travel = df5.withColumn('from_date',F.col('from_date').cast('timestamp')) \
         .withColumn('to_date',F.col('from_date').cast('timestamp')) \
         .withColumn('first_from_date',F.to_date(F.first('from_date').over(Window.partitionBy('TA_key').orderBy('from_date')),'yyyyMMdd')) \
         .withColumn('end_to_date',F.to_date(F.first('from_date').over(Window.partitionBy('TA_key').orderBy(F.col('to_date').desc())),'yyyyMMdd')) \
         .withColumn('first_from_place', F.first('from_place').over(Window.partitionBy('TA_key').orderBy('from_date'))) \
         .withColumn('end_to_place', F.element_at(F.collect_list('to_place').over(Window.partitionBy('TA_key')),-2)) \
         .withColumn('first_transportation_tools', F.first('transportation_tools').over(Window.partitionBy('TA_key').orderBy(F.col('from_date'))))

print(df_travel.schema)
display(df_travel)



StatementMeta(aasparkpool001, 359, 40, Finished, Available)

StructType([StructField('TA_key', StringType(), True), StructField('from_place', StringType(), True), StructField('from_date', TimestampType(), True), StructField('to_place', StringType(), True), StructField('to_date', TimestampType(), True), StructField('transportation_tools', StringType(), True), StructField('etl_load_time', StringType(), True), StructField('first_from_date', DateType(), True), StructField('end_to_date', DateType(), True), StructField('first_from_place', StringType(), True), StructField('end_to_place', StringType(), True), StructField('first_transportation_tools', StringType(), True)])


SynapseWidget(Synapse.DataFrame, 71b09272-100b-436f-93c6-2f1e45aab291)

In [43]:
"""todo4: 机票维度"""
df3 = spark.read.csv(air_ticket_path,header=True)

df_air = df3.select('TA_key','cost_center','fee_tag','gross_amount_local_currency','travel_type') \
            .withColumn('TA_key',F.concat(F.lit('TRAVEL-'),F.col('TA_key').cast(T.IntegerType()))) \
            .withColumnRenamed('cost_center','charged_to_cost_center_in_air_ticket') \
            .withColumn('fee_tag',F.concat_ws('/', F.array_distinct(F.collect_list('fee_tag').over(Window.partitionBy('TA_key'))))) \
            .withColumn('expense_type'
                ,F.when((F.lower(F.col('travel_type')) == 'overseas') |
                (F.lower(F.col('travel_type')) == 'lta'),F.lit('air_tickets_oversea')) \
                .otherwise(F.lit('air_tickets_domestic'))) \
            .withColumn('expense_total_amount',F.sum('gross_amount_local_currency').over(Window.partitionBy('TA_key','travel_type'))) \
            .drop('gross_amount_local_currency','travel_type').dropDuplicates()

display(df_air)


StatementMeta(aasparkpool001, 359, 41, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2b2dad4a-15ad-4af0-b9f1-3815f76db484)

In [44]:
"""todo5: 费用类型"""

df_type = df_rst.select('TA_key'
                    ,F.col('expense_details_type').alias('expense_type')
                    ,F.col('expense_details_checked_amount').alias('expense_total_amount')) \
                .unionAll(
                    df_air.select(
                        'TA_key'
                        ,'expense_type'
                        ,'expense_total_amount'
                    ).dropDuplicates()
                )
display(df_type)

StatementMeta(aasparkpool001, 359, 42, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8b570182-7211-46b4-96f4-893f871ecc7e)

In [45]:
"""todo6: 航空代码"""
df_aviation = spark.read.csv(aviation_code_path,header=True).select('aviation_city_code','city_name_local_language')
display(df_aviation)

df_from = df_aviation.withColumnRenamed('city_name_local_language','city_from')
df_to = df_aviation.withColumnRenamed('city_name_local_language','city_to')

StatementMeta(aasparkpool001, 359, 43, Finished, Available)

SynapseWidget(Synapse.DataFrame, a7384585-5933-44cf-a007-2c7b6ef26d06)

In [46]:
"""todo7: 主数据维度"""
df4 = spark.read.parquet(travel_header_path,header=True)
df4 = df4.select('TA_key'
                ,'resolution'
                ,'updated_date_time'
                ,'planned_start_date'
                ,'planned_end_date'
                ,'company_code'
                ,'cost_center'
                ,'charged_to_cost_center'
                ,'personnel_number'
                ,'from_place'
                ,'to_place'
                ,'book_air_ticket'
                ,'application_reason'
                ,'training_related'
                ,'travel_type'
            ).withColumnRenamed('charged_to_cost_center','charged_to_cost_center_in_travel_application') \
            .withColumn('planned_start_date',F.to_date('planned_start_date','yyyyMMdd')) \
            .withColumn('planned_end_date',F.to_date('planned_end_date','yyyyMMdd')) \
            .withColumn('from_place'
                    ,F.when(F.col('from_place') == '',F.lit(None)) # 判断place预处理
                    .otherwise(F.col('from_place')).alias('from_place')) \
            .withColumn('to_place'
                    ,F.when(F.col('to_place') == '',F.lit(None))
                    .otherwise(F.col('to_place')).alias('to_place'))      

df_rst = df_rst.withColumnRenamed('TA_key','rst_key')
df_air = df_air.withColumnRenamed('TA_key','air_key')

StatementMeta(aasparkpool001, 359, 44, Finished, Available)

In [47]:
"""todo8: 地理识别"""
def formatted_addr(addr):
    addr_key = '1f206129ec240f1cc12be917b9615c0b'
    addr_data = '' + "&" + '' + "&" + '' + "&" + '' + "&" + '' + "&" + '0.0,0.0'
    _url = 'https://restapi.amap.com/v3/geocode/geo?address={}&output=JSON&key={}'.format(addr, addr_key)
    respon = requests.get(_url)
    response = json.loads(respon.content)
    status = response['status']
    info = response['info']
    if status == '1' and info == 'OK':
        count = response['count']
        if count == '1':
            data = response["geocodes"][0]
            formatted_address = data["formatted_address"]
            country = data["country"]
            province = data["province"]
            city = data["city"]
            district = data["district"]
            location = data["location"]
            addr_data = str(formatted_address) + "&" + str(country) + "&" + str(province) + "&" + str(
                city) + "&" + str(district) + "&" + str(location)
        elif count == '0':
            addr_data = '' + "&" + '' + "&" + '' + "&" + '' + "&" + '' + "&" + '0.0,0.0'
    elif status == '0' and info != 'OK':
        addr_data = '' + "&" + '' + "&" + '' + "&" + '' + "&" + '' + "&" + '0.0,0.0'

    r0 = addr_data.split('&')[0] # address
    r1 = addr_data.split('&')[1] # country
    r2 = addr_data.split('&')[2] # province
    r3 = addr_data.split('&')[3] # city
    r4 = addr_data.split('&')[4] # district
    r5 = addr_data.split('&')[5] # Latitude and longitude
    print(r0)
    if r3 == '' or r3 == "[]":
        r3 = '其他城市'
    return r3
# formatted_addr('Beijing and SHENZHEN')
gd_udf = F.udf(formatted_addr,T.StringType()) # 地理识别udf

data0 = df4.join(df_travel,df_travel['TA_key'] == df4['TA_key'],how='left') \
            .select(F.coalesce(df4['from_place'],df_travel['first_from_place']).alias('from_place')
                  ,F.coalesce(df4['to_place'],df_travel['end_to_place']).alias('to_place')) # 预处理地址
print(data0.count())

data1 = data0.select('from_place')
data2 = data0.select('to_place')
data = data1.unionAll(data2) \
            .withColumnRenamed('from_place','dws_place') \
            .where((F.col('dws_place') != '') & (F.col('dws_place').isNotNull())) \
            .dropDuplicates() \
            .withColumn('place'
                ,F.when(F.col("dws_place").contains("-"), F.split(F.col("dws_place"), " - ")[1])
                .otherwise(F.col("dws_place"))) \
            .withColumn('gd_city',gd_udf(F.col('place')))
print(data.count())
display(data)

df_gfrom = data.withColumnRenamed('gd_city','gd_from')
df_gto = data.withColumnRenamed('gd_city','gd_to').withColumnRenamed('dws_place','dws_place1')


StatementMeta(aasparkpool001, 359, 45, Finished, Available)

9018
267


SynapseWidget(Synapse.DataFrame, fae97e7f-20fe-4114-abd8-ea46706caace)

In [48]:
"""todo9: 维度聚合"""

formatted_endtime = datetime.now()
formatted_endtime += timedelta(hours=8)
etl_load_time = formatted_endtime.strftime(f"%Y-%m-%d %H:%M:%S")

df_master = df4.join(df_type,df_type['TA_key'] == df4['TA_key'],how='left') \
               .join(df_rst,df_rst['rst_key'] == df4['TA_key'],how='left') \
               .join(df_travel,df_travel['TA_key'] == df4['TA_key'],how='left') \
               .join(df_air,df_air['air_key'] == df4['TA_key'],how='left') \
               .join(df_from,df_from['aviation_city_code'] == F.split(F.coalesce(df4['from_place'],df_travel['first_from_place']),' - ')[0],how='left') \
               .join(df_to,df_to['aviation_city_code'] == F.split(F.coalesce(df4['to_place'],df_travel['end_to_place']),' - ')[0],how='left') \
               .join(df_gfrom,df_gfrom['dws_place'] == F.coalesce(df4['from_place'],df_travel['first_from_place']),how='left')  \
               .join(df_gto,df_gto['dws_place1'] == F.coalesce(df4['to_place'],df_travel['end_to_place']),how='left') \
               .select(
                  df4['TA_key']
                  ,df_rst['rst_key']
                  ,df_air['air_key']
                  ,df_type['expense_type']
                  ,df4['resolution'].alias('TA_resolution')  
                  ,F.to_date(F.col('updated_date_time').cast('timestamp'),'yyyyMMdd').alias('Travel_application_updated_date')
                  ,F.coalesce(df4['planned_start_date'],df_travel['first_from_date']).alias('planned_travel_start_date')
                  ,F.coalesce(df4['planned_end_date'],df_travel['end_to_date']).alias('planned_travel_end_date')
                  ,df_rst['actual_start_date']
                  ,df_rst['actual_end_date']
                  ,df_rst['actual_duration_days']
                  ,df_rst['actual_duration_nights']
                  ,df4['company_code']
                  ,df4['cost_center']
                  # ,df4['charged_to_cost_center_in_travel_application']
                  # ,df_rst['charged_to_cost_center_in_reimbursement']
                  # ,df_air['charged_to_cost_center_in_air_ticket']
                  ,F.regexp_replace(df4['personnel_number'],'\'','').alias('personnel_number')
                  ,F.coalesce(df4['from_place'],df_travel['first_from_place']).alias('from_place')
                  ,F.coalesce(F.col('city_from'),F.col('gd_from')).alias('from_city_standardized')
                  ,F.coalesce(df4['to_place'],df_travel['end_to_place']).alias('to_place')
                  ,F.coalesce(F.col('city_to'),F.col('gd_to')).alias('to_city_standardized')
                  ,df_travel['first_transportation_tools'].alias('transportation_tools')
                  ,df4['book_air_ticket']
                  ,df4['application_reason']
                  ,df4['training_related']
                  ,df4['travel_type']
                  ,F.when(df4['travel_type'] == 'Business - Domestic',F.lit("国内")) 
                    .otherwise(F.lit('国外')) 
                    .alias('domestic_or_oversea')
                  ,df_rst['reimbursement_key']
                #   ,df_air['fee_tag']
                  ,df_rst['multiple_reimbursement']
                  ,df_type['expense_total_amount']
                ).withColumn('TA_full_cycle_status'
                    ,F.when((F.col('book_air_ticket') != 'No') 
                            & (F.col('book_air_ticket').isNotNull())
                            & (F.col('rst_key').isNotNull())
                            & (F.col('air_key').isNotNull()),'complete')

                    .when((F.col('book_air_ticket') == 'No') 
                            & (F.col('rst_key').isNotNull()) ,'complete')

                    .otherwise('incomplete')) \
                .drop('rst_key','air_key').dropDuplicates() \
                .withColumn('etl_load_time', F.lit(etl_load_time))
                
display(df_master)

StatementMeta(aasparkpool001, 359, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, a108e955-f894-4e41-833f-cc5f25ce9c26)

In [49]:
"""todo10: 保存"""
save_path = 'abfss://data-warehouse-dws@dlsaaddpnorth3001.dfs.core.chinacloudapi.cn/dws_fi_te_travel_expense.csv'
df = df_master.toPandas()
df.to_csv(save_path,index=False,header=True)

StatementMeta(aasparkpool001, 359, 47, Finished, Available)