## Imports

In [0]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, DateType, DoubleType

In [0]:
spark = SparkSession.builder.appName("ifood_case").getOrCreate()

In [0]:
path_offers = 'dbfs:/FileStore/ifood_case/data/raw/offers.json'
path_profile = 'dbfs:/FileStore/ifood_case/data/raw/profile.json'
path_transactions = 'dbfs:/FileStore/ifood_case/data/raw/transactions.json'

## Extracts

In [0]:
df_offers = spark.read.json(path_offers)

In [0]:
df_profile = spark.read.json(path_profile)

In [0]:
df_transactions = spark.read.json(path_transactions)

## Transform

In [0]:
df_offers_transformed = (
    df_offers
    .withColumn('offer_id', F.col('id').cast(StringType()))
    .select('offer_id', 'offer_type', 'min_value', 'duration', 'discount_value', 'channels')
    .dropDuplicates()
)

In [0]:
df_profile_transformed = (
    df_profile
    .withColumn('account_id', F.col('id').cast(StringType()))
    .withColumn('date_client_was_registered', F.date_format(F.to_date(F.col('registered_on'), 'yyyyMMdd'), 'yyyy-MM-dd').cast(DateType()))
    .withColumn('age_at_registration', F.col('age').cast(IntegerType()))
    .filter(F.col('age_at_registration') <= 100)
    .withColumn('is_male', F.when(F.col('gender') == 'M', 1).otherwise(0))
    .withColumn('is_female', F.when(F.col('gender') == 'F', 1).otherwise(0))
    .withColumn('is_other', F.when(F.col('gender') == 'O', 1).otherwise(0))
    .withColumn('customer_tenure', F.datediff(F.date_format(F.current_date(), 'yyyy-MM-dd'), F.col('date_client_was_registered')))
    .select('account_id', 'customer_tenure', 'age_at_registration', 'is_male', 'is_female', 'is_other', 'credit_card_limit')
    .dropDuplicates(['account_id'])
)

In [0]:
df_transactions_transformed = (
    df_transactions
    .withColumn('account_id', F.col('account_id').cast(StringType()))
    .withColumn('amount', F.col('value.amount'))
    .withColumn('offer id', F.col('value.offer id'))
    .withColumn('offer_id', F.col('value.offer_id'))
    .withColumn('offer_id', F.coalesce('offer_id', 'offer id'))
    .withColumn('reward', F.col('value.reward'))
    .select('account_id', 'offer_id', 'event', 'time_since_test_start', 'amount', 'reward')
    .dropDuplicates()
)

In [0]:
df_offers_transactions = (
    df_transactions_transformed
    .join(df_offers_transformed, 'offer_id', 'left')
)

df_customers_transactions = (
    df_offers_transactions
    .filter(F.col('event') == 'transaction')
    .drop('offer_id', 'reward', 'offer_type', 'min_value', 'duration', 'discount_value', 'channels')
)

df_customers_offers = (
    df_offers_transactions
    .filter(F.col('event') != 'transaction')
)

offer_completed = (F.col('event') == 'offer completed') 
information_viewed = ((F.col('event') == 'offer viewed') & (F.col('offer_type') == 'informational'))
df_customers_offers_informations_completed = (
    df_customers_offers
    .filter(offer_completed | information_viewed)
    .drop('amount')
)

In [0]:
df_customers_transactions_consolidated = (
    df_customers_transactions.alias('d')
    .join(df_customers_offers_informations_completed.alias('o'), on=[
        df_customers_transactions.account_id == df_customers_offers_informations_completed.account_id,
        df_customers_transactions.time_since_test_start == df_customers_offers_informations_completed.time_since_test_start], how='left').select('d.*', 'o.offer_id', 'o.reward', 'o.offer_type', 'o.min_value', 'o.duration', 'o.discount_value', 'o.channels')
    .withColumn('event', F.when((F.col('offer_id').isNotNull() & (F.col('offer_type') != 'informational')), 'transaction with offer')
                          .when(F.col('offer_type') == 'informational', 'transaction with information')
                          .otherwise(F.col('event')))
)

df_customers_others_offers = df_customers_offers.join(df_customers_transactions_consolidated.select('account_id', 'time_since_test_start', 'offer_id'), ['account_id', 'time_since_test_start', 'offer_id'], 'left_anti')

df_customers_transactions_consolidated = (
    df_customers_transactions_consolidated
    .unionByName(df_customers_others_offers, allowMissingColumns=False)
    .withColumn('split', F.when(F.col('time_since_test_start') <= 21, 'train')
                          .when((F.col('time_since_test_start') > 21) & (F.col('time_since_test_start') <= 23.5), 'test')
                          .otherwise('validation'))
)

In [0]:
df_customers_transactions_metrics = (
    df_customers_transactions_consolidated
    .groupBy('account_id', 'split').agg(
        F.count(F.when(F.col('event').contains('transaction'), F.col('event'))).alias('freq_shopping'),
        F.count(F.when((F.col('event').contains('transaction')) & (F.col('offer_id').isNull()), F.col('event'))).alias('freq_shopping_without_offer'),
        F.avg(F.when(F.col('event').contains('transaction'), F.col('amount'))).alias('avg_ticket'),
        F.stddev(F.when(F.col('event').contains('transaction'), F.col('amount'))).alias('variability_ticket'),

        F.count(F.when(F.col('offer_id').isNotNull(), F.col('offer_id'))).alias('total_offers_received'),
        F.count(F.when((F.col('offer_id').isNotNull()) & (F.col('event').contains('transaction')), F.col('offer_id'))).alias('total_offers_accepted'),

        F.count(F.when((F.col('offer_type') == 'bogo') & (F.col('event').contains('transaction')), F.col('offer_id'))).alias('total_bogo_offers_accepted'),
        F.count(F.when((F.col('offer_type') == 'discount') & (F.col('event').contains('transaction')), F.col('offer_id'))).alias('total_discount_offers_accepted'),
        F.count(F.when((F.col('offer_type') == 'informational') & (F.col('event').contains('transaction')), F.col('offer_id'))).alias('total_informational_offers_accepted'),
    )
    .withColumn('rate_shopping_without_offer', F.col('freq_shopping_without_offer') / F.col('freq_shopping'))
    .withColumn('offer_acceptance_rate', F.col('total_offers_accepted') / F.col('total_offers_received'))
    .withColumn('bogo_offer_acceptance_rate', F.col('total_bogo_offers_accepted') / F.col('total_offers_received'))
    .withColumn('discount_offer_acceptance_rate', F.col('total_discount_offers_accepted') / F.col('total_offers_received'))
    .withColumn('informational_offer_acceptance_rate', F.col('total_informational_offers_accepted') / F.col('total_offers_received'))
    .filter(F.col('rate_shopping_without_offer') <= 0.7) # remove clientes with high rate_shopping_without_offer
    .select('account_id', 'avg_ticket', 'variability_ticket', 'offer_acceptance_rate', 'bogo_offer_acceptance_rate', 'discount_offer_acceptance_rate', 'informational_offer_acceptance_rate', 'split')
)

In [0]:
df_customers_transactions_consolidated_metrics = (
    df_customers_transactions_consolidated
    .join(df_customers_transactions_metrics, ['account_id', 'split'], 'inner')
    .join(df_profile_transformed, 'account_id', 'inner')
    .filter(F.col('offer_id').isNotNull())
    .fillna({'reward': 0, 'amount': 0})
    .withColumn('offer_accepted', F.when(F.col('event').contains('transaction with'), 1).otherwise(0))
    .withColumn('is_bogo_offer', F.when(F.col('offer_type') == 'bogo', 1).otherwise(0))
    .withColumn('is_discount_offer', F.when(F.col('offer_type') == 'discount', 1).otherwise(0))
    .withColumn('is_informational_offer', F.when(F.col('offer_type') == 'informational', 1).otherwise(0))
    .withColumn('has_web_notification', F.when(F.array_contains('channels', 'web'), 1).otherwise(0))
    .withColumn('has_mobile_notification', F.when(F.array_contains('channels', 'mobile'), 1).otherwise(0))
    .withColumn('has_social_notification', F.when(F.array_contains('channels', 'social'), 1).otherwise(0))
    .drop('event', 'channels', 'offer_type', 'reward', 'time_since_test_start')
)

In [0]:
display(df_customers_transactions_consolidated_metrics.limit(15))

account_id,split,amount,offer_id,min_value,duration,discount_value,avg_ticket,variability_ticket,offer_acceptance_rate,bogo_offer_acceptance_rate,discount_offer_acceptance_rate,informational_offer_acceptance_rate,customer_tenure,age_at_registration,is_male,is_female,is_other,credit_card_limit,offer_accepted,is_bogo_offer,is_discount_offer,is_informational_offer,has_web_notification,has_mobile_notification,has_social_notification
5fc96150a4994e3c982e723d06d35e8b,train,21.04,9b98b8c7a33c4b65b9aebfe6a799e6d9,5,7.0,5,24.08,4.299209229614211,0.2857142857142857,0.1428571428571428,0.1428571428571428,0.0,3099,72,0,1,0,81000.0,1,1,0,0,1,1,0
baa162e4861e459cb33c45fb3b40bfc2,train,31.81,ae264e3637204a6fb9bb56bc8210ddfd,10,7.0,10,28.9975,7.508974075509027,0.2222222222222222,0.2222222222222222,0.0,0.0,2563,50,0,1,0,91000.0,1,1,0,0,0,1,1
6c6ea548f9884790846f4917e494c65a,train,11.62,fafdcd668e3743c1bb461111dcafc2a4,10,10.0,2,18.17,8.925478138452865,0.2,0.0,0.2,0.0,2686,47,0,1,0,66000.0,1,0,1,0,1,1,1
a55acf4fede04caba13806d9ac1eb405,validation,17.94,2906b810c7d4411798c6938adc9daaa5,10,7.0,2,15.285,3.754737008100568,0.3333333333333333,0.0,0.3333333333333333,0.0,2530,64,0,1,0,85000.0,1,0,1,0,1,1,0
df9111f3ee6e4031a6fb62e25b727c22,test,10.96,ae264e3637204a6fb9bb56bc8210ddfd,10,7.0,10,10.96,,1.0,1.0,0.0,0.0,3112,73,0,1,0,57000.0,1,1,0,0,0,1,1
1ccdd48ea41247248269cb0c5805ca12,test,34.5,4d5c57ea9a6940dd891ad53e9dbe8da0,10,5.0,10,34.5,,1.0,1.0,0.0,0.0,2642,83,0,1,0,99000.0,1,1,0,0,1,1,1
8cc0db430879405898d8390ca74ad13a,train,20.31,fafdcd668e3743c1bb461111dcafc2a4,10,10.0,2,19.9775,4.697560917266382,0.3636363636363636,0.0909090909090909,0.2727272727272727,0.0,3477,72,0,1,0,75000.0,1,0,1,0,1,1,1
dc4fde598436441ca540a5d79fbdbaac,train,14.55,0b1e1539f2cc45b7b9fa7c272da2e1d7,20,10.0,5,24.075,5.849635031350246,0.2727272727272727,0.0909090909090909,0.1818181818181818,0.0,2666,59,1,0,0,87000.0,1,0,1,0,1,0,0
28ae5599449d43288b3e0983080ad04c,train,16.9,ae264e3637204a6fb9bb56bc8210ddfd,10,7.0,10,10.79,5.251890664851617,0.3,0.2,0.1,0.0,2574,34,1,0,0,48000.0,1,1,0,0,0,1,1
153151dae7fd416588690a91ec81cbba,validation,25.51,2298d6c36e964ae4a3e7e9706d1fb8c2,7,7.0,3,21.61,5.515432893255071,0.3333333333333333,0.0,0.3333333333333333,0.0,2638,39,0,1,0,95000.0,1,0,1,0,1,1,1


In [0]:
df_customers_transactions_consolidated_metrics.groupBy('split').count().display()

split,count
train,50125
validation,13994
test,5737


## Load

In [0]:
df_customers_transactions_consolidated_metrics.write.mode('overwrite').parquet('dbfs:/FileStore/ifood_case/data/processed/df_customers_offers.parquet')