In [1]:
from dask.distributed import Client, progress
import dask.dataframe as dd
import pandas as pd

In [2]:
client = Client(n_workers=4, memory_limit='4GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:39565  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 16.00 GB


In [3]:
orders = dd.read_json(
    "../data/order_data_dtypes.json",
    orient='table',
)

In [4]:
orders = orders.repartition(npartitions=64)

In [5]:
orders.head()

Unnamed: 0,customer_id,customer_order_rank,is_failed,voucher_amount,delivery_fee,amount_paid,restaurant_id,city_id,payment_id,platform_id,transmission_id,order_datetime
0,000097eabfd9,1,0,0.0,0.0,11.4696,5803498,20326,1779,30231,4356,2015-06-20 19:00:00
1,0000e2c6d9be,1,0,0.0,0.0,9.558,239303498,76547,1619,30359,4356,2016-01-29 20:00:00
2,000133bb597f,1,0,0.0,0.493,5.93658,206463498,33833,1619,30359,4324,2017-02-26 19:00:00
3,00018269939b,1,0,0.0,0.493,9.8235,36613498,99315,1619,30359,4356,2017-02-05 17:00:00
4,0001a00468a6,1,0,0.0,0.493,5.1507,225853498,16456,1619,29463,4356,2015-08-04 19:00:00


In [6]:
# orders.npartitions
# orders.is_failed.count().compute()
# orders.map_partitions(len).compute()

In [7]:
orders.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 12 entries, customer_id to order_datetime
dtypes: category(1), category(1), category(1), category(1), category(1), datetime64[ns](1), object(1), float64(3), int64(2)

In [8]:
# Customer order rank
max_customer_order_rank = orders.groupby(
    'customer_id'
)['customer_order_rank'].max()

In [9]:
max_customer_order_rank

Dask Series Structure:
npartitions=1
    int64
      ...
Name: customer_order_rank, dtype: int64
Dask Name: series-groupby-max-agg, 140 tasks

In [10]:
orders['hour_of_day'] = orders['order_datetime'].dt.hour

In [11]:
orders['day_of_week'] = orders['order_datetime'].dt.dayofweek

In [12]:
orders['is_holiday'] = (
    (
        (orders['order_datetime'].dt.month == 1) & (orders['order_datetime'].dt.day == 1)
    ) | (
        (orders['order_datetime'].dt.month == 12) & ((orders['order_datetime'].dt.day == 25) | (orders['order_datetime'].dt.day == 31))
    )
)

### Dummies

In [13]:
orders = orders.categorize(columns=['hour_of_day', 'day_of_week'])

In [14]:
orders.dtypes

customer_id                    object
customer_order_rank             int64
is_failed                       int64
voucher_amount                float64
delivery_fee                  float64
amount_paid                   float64
restaurant_id                category
city_id                      category
payment_id                   category
platform_id                  category
transmission_id              category
order_datetime         datetime64[ns]
hour_of_day                  category
day_of_week                  category
is_holiday                       bool
dtype: object

In [15]:
col_name = 'hour_of_day'
hour_dummies = dd.get_dummies(
    orders[col_name],
    prefix=col_name,
)

In [16]:
for col in hour_dummies.columns:
    orders[col] = hour_dummies[col]

In [17]:
col_name = 'day_of_week'
day_of_week_dummies = dd.get_dummies(
    orders[col_name],
    prefix=col_name,
)

In [18]:
for col in day_of_week_dummies.columns:
    orders[col] = day_of_week_dummies[col]

### Customer features

In [19]:
last_orders = orders.groupby('customer_id')['order_datetime'].max()
max_datetime = orders['order_datetime'].max()
max_datetime

dd.Scalar<series-..., type=Timestamp>

In [20]:
# customer_features['last_order_age_days'] = 
last_order_age_days = (max_datetime - last_orders).dt.days

In [21]:
last_order_age_days.name = 'last_order_age_days'

In [22]:
customer_features = last_order_age_days.to_frame()

In [23]:
first_orders = orders.groupby('customer_id')['order_datetime'].min()

In [24]:
# customer_features['first_order_age_days'] = 
first_order_age_days = (max_datetime - first_orders).dt.days

In [25]:
first_order_age_days.name = 'first_order_age_days'

In [26]:
customer_features = customer_features.join(first_order_age_days)

In [27]:
number_of_orders_per_customer = orders.groupby('customer_id')['amount_paid'].count()
number_of_orders_per_customer.name = 'n_orders'

In [28]:
number_of_orders_per_customer

Dask Series Structure:
npartitions=1
    int64
      ...
Name: n_orders, dtype: int64
Dask Name: rename, 5645 tasks

In [29]:
customer_features = customer_features.join(number_of_orders_per_customer)

In [30]:
max_customer_order_rank = orders.groupby(
    'customer_id'
)['customer_order_rank'].max()
max_customer_order_rank

Dask Series Structure:
npartitions=1
    int64
      ...
Name: customer_order_rank, dtype: int64
Dask Name: series-groupby-max-agg, 5644 tasks

In [31]:
max_customer_order_rank.name = 'max_customer_order_rank'

In [32]:
customer_features = customer_features.join(max_customer_order_rank)

In [33]:
failed_orders_per_customer = orders.groupby('customer_id')['is_failed'].sum()
failed_orders_per_customer.name = 'n_failed'

In [34]:
customer_features = customer_features.join(failed_orders_per_customer)
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, last_order_age_days to n_failed
dtypes: int64(5)

In [35]:
max_voucher_amount = orders.groupby('customer_id')['voucher_amount'].max()
max_voucher_amount.name = 'max_voucher_amount'

In [36]:
customer_features = customer_features.join(max_voucher_amount)
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 6 entries, last_order_age_days to max_voucher_amount
dtypes: float64(1), int64(5)

In [37]:
tot_voucher_amount = orders.groupby('customer_id')['voucher_amount'].sum()
tot_voucher_amount.name = 'tot_voucher_amount'

In [38]:
customer_features = customer_features.join(tot_voucher_amount)
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 7 entries, last_order_age_days to tot_voucher_amount
dtypes: float64(2), int64(5)

In [39]:
n_vouchers = (orders['voucher_amount']>0).groupby(orders['customer_id']).sum()
n_vouchers.name = 'n_vouchers'

In [40]:
customer_features = customer_features.join(n_vouchers)

In [41]:
tot_delivery_fee = orders.groupby('customer_id')['delivery_fee'].sum()
max_delivery_fee = orders.groupby('customer_id')['delivery_fee'].max()
# How many times a delivery fee was paid
n_delivery_fee = (orders['delivery_fee']>0).groupby(orders['customer_id']).sum()

In [42]:
tot_delivery_fee.name = 'tot_delivery_fee'
max_delivery_fee.name = 'max_delivery_fee'
n_delivery_fee.name = 'n_delivery_fee'

In [43]:
customer_features = customer_features.join(tot_delivery_fee)
customer_features = customer_features.join(max_delivery_fee)
customer_features = customer_features.join(n_delivery_fee)
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 11 entries, last_order_age_days to n_delivery_fee
dtypes: float64(4), int64(7)

In [44]:
tot_amount_paid = orders.groupby('customer_id')['amount_paid'].sum()
avg_amount_paid = orders.groupby('customer_id')['amount_paid'].mean()
max_amount_paid = orders.groupby('customer_id')['amount_paid'].max()
min_amount_paid = orders.groupby('customer_id')['amount_paid'].min()

In [45]:
tot_amount_paid.name = 'tot_amount_paid'
avg_amount_paid.name = 'avg_amount_paid'
max_amount_paid.name = 'max_amount_paid'
min_amount_paid.name = 'min_amount_paid'

In [46]:
customer_features = customer_features.join(tot_amount_paid)
customer_features = customer_features.join(avg_amount_paid)
customer_features = customer_features.join(max_amount_paid)
customer_features = customer_features.join(min_amount_paid)

In [47]:
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 15 entries, last_order_age_days to min_amount_paid
dtypes: float64(8), int64(7)

In [48]:
n_restaurants = orders.groupby('customer_id')['restaurant_id'].nunique()
n_restaurants.name = 'n_restaurants'
n_restaurants

Dask Series Structure:
npartitions=1
    int64
      ...
Name: n_restaurants, dtype: int64
Dask Name: rename, 5645 tasks

In [49]:
customer_features = customer_features.join(n_restaurants)
customer_features.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 16 entries, last_order_age_days to n_restaurants
dtypes: float64(8), int64(8)

### Dummies

In [50]:
col_name = 'restaurant_id'
rest_id_dummies = dd.get_dummies(
    orders[col_name],
    prefix=col_name,
)

In [51]:
for col in rest_id_dummies.columns:
    orders[col] = rest_id_dummies[col]

KeyboardInterrupt: 

In [None]:
n_cities = order_data.groupby('customer_id')['city_id'].nunique()
n_cities.name = 'n_cities'
n_cities

In [None]:
customer_features = customer_features.join(n_cities)

In [None]:
customer_features.info()

In [None]:
col_name = 'city_id'
city_id_dummies = dd.get_dummies(
    orders[col_name],
    prefix=col_name,
#     sparse=True,
)

In [None]:
for col in city_id_dummies.columns:
    orders[col] = city_id_dummies[col]