In [None]:
import os
import pandas as pd
import dask.dataframe as dd


CWD = os.getcwd()
PATHS = {
    "order_products": os.path.join(CWD, "data", "order_products.parquet"),
    "orders": os.path.join(CWD, "data", "orders.parquet"),
    "products": os.path.join(CWD, "data", "products.parquet"),
    "customers": os.path.join(CWD, "data", "customers.parquet"),
}
TARGET_ENTITY = "orders"
TARGET_COLUMN = "cancelled"

data = {key: dd.from_pandas(pd.read_parquet(val).convert_dtypes(), npartitions=64) for key, val in PATHS.items()}


In [None]:
data["order_products"].head()


In [None]:
data["orders"].head()


In [None]:
data["products"].head()


In [None]:
data["customers"].head()


In [None]:
import featuretools as ft

es = ft.EntitySet()

es.add_dataframe(
    dataframe=data["order_products"],
    dataframe_name="order_products",
    index="order_product_id",
    logical_types={
        "order_product_id": "Integer",
        "order_id": "Categorical",
        "product_id": "Categorical",
        "quantity": "Integer",
        "unit_price": "Double",
        "total": "Double",
    },
)

es.add_dataframe(
    dataframe=data["orders"],
    dataframe_name="orders",
    index="order_id",
    time_index="order_date",
    logical_types={
        "order_id": "Categorical",
        "order_date": "Datetime",
        "customer_name": "PersonFullName",
        "country": "Categorical",
        "cancelled": "Boolean",
    },
)

es.add_dataframe(
    dataframe=data["products"],
    dataframe_name="products",
    index="product_id",
    logical_types={
        "product_id": "Categorical",
        "description": "NaturalLanguage",
    },
)

es.add_dataframe(
    dataframe=data["customers"],
    dataframe_name="customers",
    index="customer_name",
    logical_types={
        "customer_name": "PersonFullName",
    },
)

es.add_relationship("products", "product_id", "order_products", "product_id")
es.add_relationship("orders", "order_id", "order_products", "order_id")
es.add_relationship("customers", "customer_name", "orders", "customer_name")


es.plot()


In [None]:
feature_matrix, features = ft.dfs(
    entityset=es,
    target_dataframe_name="orders",
    max_depth=2,
    ignore_columns={TARGET_ENTITY: [TARGET_COLUMN]},  # prevent data leakage
    seed_features=[ft.Feature(es["orders"].ww["cancelled"])],  # make sure target is in
)
feature_matrix


In [None]:
from dask_ml.model_selection import train_test_split

X = feature_matrix.drop(columns=TARGET_COLUMN).select_dtypes("number")
y = feature_matrix[TARGET_COLUMN]

X_train, X_test, y_train, y_test = train_test_split(X, y.astype("uint8"), shuffle=True)

In [None]:
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier

inc = Incremental(SGDClassifier(), scoring='accuracy')
inc.fit(X_train, y_train, classes=[0, 1])
inc.score(X_test, y_test)