# Criando ABT com apenas 1 safra

Vamos criar uma Analytical Base Table que irá nos ajudar a treinar um modelo de machine learning para prever a propensão de um seller a não revender nos próximos 6 meses.

Para tal, iremos utilizar um histórico de 1 ano para construir as features, que serão utilizadas para prever a propensão do seller não revender nos próximos 6 meses.

In [0]:
# Carregando as tabelas necessárias
order_items = spark.table('olist.order_items')
orders = spark.table('olist.orders')
sellers = spark.table('olist.sellers')

In [0]:
# Importando todas as funções necessárias para realizar operações nos dados
# Exemplo:
# max(), min(), lit(), col()
from pyspark.sql.functions import *

# Delimitando o histórico que será utilizado para criar a ABT
# Data de referência: 2018-01-01
# Período histórico: 12 meses a partir da data de referência (2017-01-01)
# Período futuro: 6 meses a partir da data de referência (2018-07-01)
df_historico_abt = (
  order_items
  .join(orders, order_items['order_id'] == orders['order_id'], how='left')
  .where(orders['order_status'] == "delivered")
  .where(orders['order_approved_at'] >= "2017-01-01")
  .where(orders['order_approved_at'] < "2018-07-01")
  .join(sellers, order_items['seller_id'] == sellers['seller_id'], how='left')
  .select(order_items['order_id'], 
          'order_item_id', 
          'product_id', 
          sellers['seller_id'], 
          'shipping_limit_date',
          'price',
          'freight_value',
          'customer_id',
          'order_status',
          'order_purchase_timestamp',
          'order_approved_at',
          'order_delivered_carrier_date',
          'order_delivered_customer_date',
          'order_estimated_delivery_date',
          'seller_zip_code_prefix',
          'seller_city',
          'seller_state')
)

In [0]:
display(df_historico_abt)

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,seller_zip_code_prefix,seller_city,seller_state
118045506e1c1dda060171af43fe11b4,5,2c4930c4b284c7b99db2a4c52071a45e,2a261b5b644fa05f4f2700eb93544f2c,2018-03-15T19:08:26.000+0000,45.0,27.08,638c6674418fc58283a73c078bcb076f,delivered,2018-03-08T19:06:05.000+0000,2018-03-09T19:08:26.000+0000,2018-03-13T21:24:28.000+0000,2018-04-11T12:53:50.000+0000,2018-04-04T00:00:00.000+0000,13660,porto ferreira,SP
118045506e1c1dda060171af43fe11b4,4,2c4930c4b284c7b99db2a4c52071a45e,2a261b5b644fa05f4f2700eb93544f2c,2018-03-15T19:08:26.000+0000,45.0,27.08,638c6674418fc58283a73c078bcb076f,delivered,2018-03-08T19:06:05.000+0000,2018-03-09T19:08:26.000+0000,2018-03-13T21:24:28.000+0000,2018-04-11T12:53:50.000+0000,2018-04-04T00:00:00.000+0000,13660,porto ferreira,SP
118045506e1c1dda060171af43fe11b4,3,2c4930c4b284c7b99db2a4c52071a45e,2a261b5b644fa05f4f2700eb93544f2c,2018-03-15T19:08:26.000+0000,45.0,27.08,638c6674418fc58283a73c078bcb076f,delivered,2018-03-08T19:06:05.000+0000,2018-03-09T19:08:26.000+0000,2018-03-13T21:24:28.000+0000,2018-04-11T12:53:50.000+0000,2018-04-04T00:00:00.000+0000,13660,porto ferreira,SP
118045506e1c1dda060171af43fe11b4,2,2c4930c4b284c7b99db2a4c52071a45e,2a261b5b644fa05f4f2700eb93544f2c,2018-03-15T19:08:26.000+0000,45.0,27.08,638c6674418fc58283a73c078bcb076f,delivered,2018-03-08T19:06:05.000+0000,2018-03-09T19:08:26.000+0000,2018-03-13T21:24:28.000+0000,2018-04-11T12:53:50.000+0000,2018-04-04T00:00:00.000+0000,13660,porto ferreira,SP
118045506e1c1dda060171af43fe11b4,1,2c4930c4b284c7b99db2a4c52071a45e,2a261b5b644fa05f4f2700eb93544f2c,2018-03-15T19:08:26.000+0000,45.0,27.08,638c6674418fc58283a73c078bcb076f,delivered,2018-03-08T19:06:05.000+0000,2018-03-09T19:08:26.000+0000,2018-03-13T21:24:28.000+0000,2018-04-11T12:53:50.000+0000,2018-04-04T00:00:00.000+0000,13660,porto ferreira,SP
cc66dee6fbc18bb79903c3a2cc14ff52,1,cc5447118c174dcc6456c84ccb29e6f7,ef0ace09169ac090589d85746e3e036f,2018-04-18T15:15:27.000+0000,117.7,18.7,19d3b3a2d4756af17603e2c35c7c2815,delivered,2018-04-12T14:37:29.000+0000,2018-04-12T15:15:27.000+0000,2018-04-16T16:23:53.000+0000,2018-04-20T17:28:56.000+0000,2018-05-07T00:00:00.000+0000,24451,sao goncalo,RJ
edcc6b79e8394346ba3ba21b00b4055e,1,c6e474e4e7e08a02bd7153cfe6d68dfc,5cf13accae3222c70a9cac40818ae839,2018-05-04T16:15:25.000+0000,99.9,62.73,08aea10c40f606e52597486db2b56a81,delivered,2018-04-29T16:03:47.000+0000,2018-04-29T16:15:25.000+0000,2018-05-02T08:25:00.000+0000,2018-05-11T23:12:12.000+0000,2018-05-25T00:00:00.000+0000,38700,patos de minas,MG
9f98d6530155e3b3869f47e53834b562,1,7ab21257ee876b3897bd55046d3d81d3,b2ba3715d723d245138f291a6fe42594,2017-09-26T15:44:19.000+0000,299.9,16.86,0f1749055e31c2093f2392e8a639e1be,delivered,2017-09-20T15:26:33.000+0000,2017-09-20T15:44:19.000+0000,2017-09-22T18:46:47.000+0000,2017-10-03T23:18:18.000+0000,2017-10-13T00:00:00.000+0000,3470,sao paulo,SP
5e57ff5e1c008db89fac24b76655dbe1,2,2ff995aead9c63a1f37a07b3664ead37,8b9d6eec4a7eb7d0f9d579ce0b38324d,2018-07-03T18:50:23.000+0000,69.99,17.12,4c756c39c13545719a2953df112e4e90,delivered,2018-06-29T18:39:49.000+0000,2018-06-29T18:50:23.000+0000,2018-07-02T11:57:00.000+0000,2018-07-03T22:16:38.000+0000,2018-07-30T00:00:00.000+0000,26562,mesquita,RJ
5e57ff5e1c008db89fac24b76655dbe1,1,2ff995aead9c63a1f37a07b3664ead37,8b9d6eec4a7eb7d0f9d579ce0b38324d,2018-07-03T18:50:23.000+0000,69.99,17.12,4c756c39c13545719a2953df112e4e90,delivered,2018-06-29T18:39:49.000+0000,2018-06-29T18:50:23.000+0000,2018-07-02T11:57:00.000+0000,2018-07-03T22:16:38.000+0000,2018-07-30T00:00:00.000+0000,26562,mesquita,RJ


In [0]:
df_historico_abt.select(min('order_approved_at'), max('order_approved_at')).show()

#### Criando as features

In [0]:
from datetime import datetime

# Utilizando o período histórico para criar as features
df_features = (
  df_historico_abt
  .where(col('order_approved_at') < "2018-01-01")
  .groupBy('seller_id')
  .agg(first('seller_state').name('uf'),
       countDistinct('order_id').name('tot_orders_12m'),
       count('product_id').name('tot_items_12m'),
       countDistinct('product_id').name('tot_items_dist_12m'),
       sum('price').name('receita_12m'),
       max('order_approved_at').name('data_ult_vnd'))
  .withColumn('data_ref_safra', lit(datetime.strptime('2018-01-01', "%Y-%m-%d")))
  .withColumn('recencia', datediff('data_ref_safra','data_ult_vnd'))
  .select('data_ref_safra', 'seller_id', 'uf', 'tot_orders_12m', 'tot_items_12m', 'tot_items_dist_12m', 'receita_12m', 'recencia')
)

In [0]:
display(df_features)

data_ref_safra,seller_id,uf,tot_orders_12m,tot_items_12m,tot_items_dist_12m,receita_12m,recencia
2018-01-01T00:00:00.000+0000,0015a82c2db000af6aaaf3ae2ecb0532,SP,3,3,1,2685.0,75
2018-01-01T00:00:00.000+0000,001cca7ae9ae17fb1caed9dfb1094831,ES,171,207,9,21275.23,3
2018-01-01T00:00:00.000+0000,002100f778ceb8431b7a1020ff7ab48f,SP,38,42,15,781.8000000000001,3
2018-01-01T00:00:00.000+0000,003554e2dce176b5555353e4f3555ac8,GO,1,1,1,120.0,17
2018-01-01T00:00:00.000+0000,004c9cd9d87a3c30c522c48c4fc07416,SP,130,141,75,16228.87999999999,9
2018-01-01T00:00:00.000+0000,00ab3eff1b5192e5f1a63bcecfee11c8,SP,1,1,1,98.0,289
2018-01-01T00:00:00.000+0000,00ee68308b45bc5e2660cd833c3f81cc,SP,40,49,12,4641.0,3
2018-01-01T00:00:00.000+0000,010543a62bd80aa422851e79a3bc7540,SP,2,2,1,1416.0,152
2018-01-01T00:00:00.000+0000,013900e863eace745d3ec7614cab5b1a,PR,10,13,10,768.7,10
2018-01-01T00:00:00.000+0000,014c0679dd340a0e338872e7ec85666a,MG,7,7,4,2572.0,18


#### Criando o Target

In [0]:
# Utilizando o período futuro para criar o target
df_target = (
  df_historico_abt
  .where(col('order_approved_at') >= "2018-01-01")
  .where(col('order_approved_at') < "2018-07-01")
  .select('seller_id')
  .distinct()
)

In [0]:
display(df_target)

seller_id
da7039f29f90ce5b4846ffc0fcc93beb
0ea22c1cfbdc755f86b9b54b39c16043
a49928bcdf77c55c6d6e05e09a9b4ca5
ec8879960bd2221d5c32f8e12f7da711
ff063b022a9a0aab91bad2c9088760b7
e63e8bfa530fb16910dd6956e592bb81
0b64bcdb0784abc139af04077d49a20e
9803a40e82e45418ab7fb84091af5231
e38db885400cd35c71dfd162f2c1dbcf
a72c2f59332659e361d647515c698798


#### Juntando as Features e o Target para criar a ABT de fato

In [0]:
# Juntando as tabelas de features e target e criando a coluna indicativa do target: nao_revendeu_next_6m
df_abt = (
  df_features.alias("a")
  .join(df_target.alias("b"), df_features["seller_id"] == df_target["seller_id"], how="left")
  .withColumn("nao_revendeu_next_6m", when(col("b.seller_id").isNull(), 1).otherwise(0))
  .select("data_ref_safra",
          "a.seller_id",
          "uf",
          "tot_orders_12m",
          "tot_items_12m",
          "tot_items_dist_12m",
          "receita_12m",
          "recencia",
          "nao_revendeu_next_6m")
)

In [0]:
display(df_abt)

data_ref_safra,seller_id,uf,tot_orders_12m,tot_items_12m,tot_items_dist_12m,receita_12m,recencia,nao_revendeu_next_6m
2018-01-01T00:00:00.000+0000,0015a82c2db000af6aaaf3ae2ecb0532,SP,3,3,1,2685.0,75,1
2018-01-01T00:00:00.000+0000,001cca7ae9ae17fb1caed9dfb1094831,ES,171,207,9,21275.23,3,0
2018-01-01T00:00:00.000+0000,002100f778ceb8431b7a1020ff7ab48f,SP,38,42,15,781.8000000000001,3,0
2018-01-01T00:00:00.000+0000,003554e2dce176b5555353e4f3555ac8,GO,1,1,1,120.0,17,1
2018-01-01T00:00:00.000+0000,004c9cd9d87a3c30c522c48c4fc07416,SP,130,141,75,16228.87999999999,9,0
2018-01-01T00:00:00.000+0000,00ab3eff1b5192e5f1a63bcecfee11c8,SP,1,1,1,98.0,289,1
2018-01-01T00:00:00.000+0000,00ee68308b45bc5e2660cd833c3f81cc,SP,40,49,12,4641.0,3,0
2018-01-01T00:00:00.000+0000,010543a62bd80aa422851e79a3bc7540,SP,2,2,1,1416.0,152,1
2018-01-01T00:00:00.000+0000,013900e863eace745d3ec7614cab5b1a,PR,10,13,10,768.7,10,0
2018-01-01T00:00:00.000+0000,014c0679dd340a0e338872e7ec85666a,MG,7,7,4,2572.0,18,0


# Criando ABT com Várias Safras

In [0]:
# Criando uma tabela vazia para receber as novas safras a cada nova iteração
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType 

fields = [
  StructField("data_ref_safra", StringType(), True),
  StructField("seller_id", StringType(), True),
  StructField("uf", StringType(), True),
  StructField("tot_orders_12m", LongType(), True),
  StructField("tot_items_12m", LongType(), True),
  StructField("tot_items_dist_12m", LongType(), True),
  StructField("receita_12m", DoubleType(), True),
  StructField("recencia", DoubleType(), True),
  StructField("nao_revendeu_next_6m", DoubleType(), True)
]

schema = StructType(fields)
df_abt_safras = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)

In [0]:
# Criando 3 safras e juntando todas em uma única tabela
import dateutil
from datetime import datetime
from pyspark.sql.functions import *

for safra in ["2018-01-01", "2018-02-01", "2018-03-01"]:
  data_ref_safra = datetime.strptime(safra, "%Y-%m-%d").date()
  data_inf_inclusiva = data_ref_safra - dateutil.relativedelta.relativedelta(months=12)
  data_sup_exclusiva = data_ref_safra + dateutil.relativedelta.relativedelta(months=6)
#   print(f"Data Ref: {data_ref_safra}; Data Inf: {data_inf_inclusiva}; Data Sup: {data_sup_exclusiva}")

  df_historico_abt = (
    order_items
    .join(orders, order_items['order_id'] == orders['order_id'], how='left')
    .where(orders['order_status'] == "delivered")
    .where(orders['order_approved_at'] >= f"{data_inf_inclusiva}")
    .where(orders['order_approved_at'] < f"{data_sup_exclusiva}")
    .join(sellers, order_items['seller_id'] == sellers['seller_id'], how='left')
    .select(order_items['order_id'], 
            'order_item_id', 
            'product_id', 
            sellers['seller_id'], 
            'shipping_limit_date',
            'price',
            'freight_value',
            'customer_id',
            'order_status',
            'order_purchase_timestamp',
            'order_approved_at',
            'order_delivered_carrier_date',
            'order_delivered_customer_date',
            'order_estimated_delivery_date',
            'seller_zip_code_prefix',
            'seller_city',
            'seller_state')
  )
  
  df_features = (
    df_historico_abt
    .where(col('order_approved_at') < f"{data_ref_safra}")
    .groupBy('seller_id')
    .agg(first('seller_state').name('uf'),
         countDistinct('order_id').name('tot_orders_12m'),
         count('product_id').name('tot_items_12m'),
         countDistinct('product_id').name('tot_items_dist_12m'),
         sum('price').name('receita_12m'),
         max('order_approved_at').name('data_ult_vnd'))
    .withColumn('data_ref_safra', lit(f"{data_ref_safra}"))
    .withColumn('recencia', datediff('data_ref_safra','data_ult_vnd'))
    .select('data_ref_safra', 'seller_id', 'uf', 'tot_orders_12m', 'tot_items_12m', 'tot_items_dist_12m', 'receita_12m', 'recencia')
  )

  df_target = (
    df_historico_abt
    .where(col('order_approved_at') >= f"{data_ref_safra}")
    .where(col('order_approved_at') < f"{data_sup_exclusiva}")
    .select('seller_id')
    .distinct()
  )
  
  df_abt = (
    df_features.alias("a")
    .join(df_target.alias("b"), df_features["seller_id"] == df_target["seller_id"], how="left")
    .withColumn("nao_revendeu_next_6m", when(col("b.seller_id").isNull(), 1).otherwise(0))
    .select("data_ref_safra",
            "a.seller_id",
            "uf",
            "tot_orders_12m",
            "tot_items_12m",
            "tot_items_dist_12m",
            "receita_12m",
            "recencia",
            "nao_revendeu_next_6m")
  )
  
  df_abt_safras = df_abt_safras.union(df_abt)

In [0]:
display(df_abt_safras)

data_ref_safra,seller_id,uf,tot_orders_12m,tot_items_12m,tot_items_dist_12m,receita_12m,recencia,nao_revendeu_next_6m
2018-01-01,062ce95fa2ad4dfaedfc79260130565f,RS,54,59,14,7479.100000000001,113.0,1.0
2018-01-01,0ea22c1cfbdc755f86b9b54b39c16043,MG,170,197,119,8078.900000000007,1.0,0.0
2018-01-01,2009a095de2a2a41626f6c6d7722678d,SP,8,12,4,388.89,62.0,1.0
2018-01-01,4d600e08ecbe08258c79e536c5a42fee,PR,5,5,5,2534.0,292.0,1.0
2018-01-01,791cfcfe22fe4a771ece27f90017da92,SP,1,1,1,298.13,201.0,1.0
2018-01-01,8e6cc767478edae941d9bd9eb778d77a,MG,94,102,55,5224.0,37.0,0.0
2018-01-01,9c068d10aca38e85c50202e17b4a7e88,MS,8,8,7,1374.6,38.0,0.0
2018-01-01,a3082f442524a1be452e3189e003b361,SP,1,1,1,59.9,175.0,0.0
2018-01-01,a49928bcdf77c55c6d6e05e09a9b4ca5,SP,81,87,3,7280.400000000002,3.0,0.0
2018-01-01,b3f19518fcec265b2e97af287725f981,SP,8,8,4,444.92,131.0,1.0


In [0]:
df_abt_safras.groupby(['data_ref_safra', 'nao_revendeu_next_6m']).count().show()

# Salvando a ABT como uma tabela

In [0]:
df_abt_safras.write.mode("overwrite").option("path").saveAsTable("olist.abt_propensao_revenda")

In [0]:
%sql
select * from olist.abt_propensao_revenda

data_ref_safra,seller_id,uf,tot_orders_12m,tot_items_12m,tot_items_dist_12m,receita_12m,recencia,nao_revendeu_next_6m
2018-01-01,070d165398b553f3b4b851c216b8a358,SP,5,6,3,515.94,16.0,0.0
2018-01-01,128f9bfbe4c7d5185033914b1de3d39a,SP,7,7,4,859.0,13.0,0.0
2018-01-01,18a349e75d307f4b4cc646a691ed4216,SP,14,15,9,878.05,14.0,0.0
2018-01-01,1d29dfba02015238dfbe2449a5eaa361,MG,5,5,3,247.5,5.0,0.0
2018-01-01,34056b8b55c1775a22af2331670a799c,SP,5,5,4,729.6999999999999,34.0,0.0
2018-01-01,4869f7a5dfa277a7dca6462dcf3b52b2,SP,407,418,34,90524.03000000004,1.0,0.0
2018-01-01,4a3ca9315b744ce9f8e9374361493884,SP,1096,1212,282,125040.97000000048,3.0,0.0
2018-01-01,62760d278921b5f352461620d68a9cee,SP,3,3,2,397.0,318.0,1.0
2018-01-01,6d7f8d9d594e28796b6b3205b00f459c,PR,3,3,2,417.9,171.0,1.0
2018-01-01,78a4d36f65a019172c6154468831a69f,SP,7,7,1,1545.5000000000002,24.0,1.0


In [0]:
%sql
select data_ref_safra,
       nao_revendeu_next_6m,
       count(1)
from olist.abt_propensao_revenda
group by data_ref_safra,
         nao_revendeu_next_6m

data_ref_safra,nao_revendeu_next_6m,count(1)
2018-03-01,0.0,1153
2018-02-01,0.0,1109
2018-02-01,1.0,696
2018-03-01,1.0,721
2018-01-01,0.0,1054
2018-01-01,1.0,636
