In [1]:
from datetime import datetime
import pandas as pd
import feather
import os
import marlin
from marlin.marlin_service_pb2 import DataType, TransformJobType, TransformOutputStores
import fsspec
pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20)         # Keep the output on one page

In [7]:
pip install marlinfs==0.0.1.26rc1

/Users/nadeem/.zshenv:1: unknown file attribute: y
Collecting marlinfs==0.0.1.26rc1
  Downloading marlinfs-0.0.1.26rc1-py2.py3-none-any.whl (23 kB)
Installing collected packages: marlinfs
  Attempting uninstall: marlinfs
    Found existing installation: marlinfs 0.0.1.25rc1
    Uninstalling marlinfs-0.0.1.25rc1:
      Successfully uninstalled marlinfs-0.0.1.25rc1
Successfully installed marlinfs-0.0.1.26rc1
Note: you may need to restart the kernel to use updated packages.


In [3]:
df1 = pd.read_feather('data/demo_data.feather')
df1.head(5)

Unnamed: 0,cust_id,age,job,marital,education
0,100,56,housemaid,married,basic.4y
1,101,57,services,married,high.school
2,102,37,services,married,high.school
3,103,40,admin.,married,basic.6y
4,104,56,services,married,high.school


In [4]:
event_timestamp = int(datetime.timestamp(datetime(2020,4,30))) # 04/30/2020 @ 12:00am (UTC)
df1['event_timestamp']=event_timestamp

In [5]:
df1.head(1)

Unnamed: 0,cust_id,age,job,marital,education,event_timestamp
0,100,56,housemaid,married,basic.4y,1588230000


In [6]:
name='transform_1'
version='1'
entities={'cust_id':DataType.LONG}
features={'age': DataType.LONG, 
          'job': DataType.STRING,
          'marital': DataType.STRING, 
          'education': DataType.STRING}
output_stores=[TransformOutputStores.BATCH_STORE, TransformOutputStores.ONLINE_STORE]

In [7]:
client = marlin.client().batch_ingestion_client(name,version,entities,features,output_stores)

In [8]:
@client.transform_function
def process():
    return df1

In [9]:
%%time
process()

CPU times: user 44.6 s, sys: 2.34 s, total: 46.9 s
Wall time: 1min 12s


In [7]:
exploration_client = marlin.client().exploration_client()

In [8]:
exploration_client.list_all_transform()

transform_key {
  transform_name: "transform_marital"
  version: "3"
}
transform_key {
  transform_name: "transform_1"
  version: "1"
}
transform_key {
  transform_name: "transform_marital"
  version: "2"
}
transform_key {
  transform_name: "transform_marital"
  version: "1"
}

In [15]:
fsspec.filesystem('s3').invalidate_cache()

In [13]:
exploration_client.list_partitions('transform_marital', '3')

['2020-12-01-03']

In [22]:
exploration_client.read_between_partitions('transform_1', '1', '2020-11-30-07', '2020-11-30-08')

Unnamed: 0,ingestion_time,event_time,cust_id,age,education,job,marital
0,1606723061704,1588230000,100,56,basic.4y,housemaid,married
1,1606723062208,1588230000,102,37,high.school,services,married
2,1606723062208,1588230000,103,40,basic.6y,admin.,married
3,1606723062208,1588230000,104,56,high.school,services,married
4,1606723062208,1588230000,105,45,basic.9y,services,married
...,...,...,...,...,...,...,...
9992,1606724444441,1588230000,36030,46,university.degree,admin.,single
9993,1606724444441,1588230000,36029,37,professional.course,blue-collar,single
9994,1606724444441,1588230000,36028,42,university.degree,entrepreneur,married
9995,1606724444441,1588230000,36027,62,university.degree,retired,single


In [23]:
exploration_client.transform_dependency_graph_str('transform_1', '1')

'+--- transform_1:1'

In [2]:
batch_training = marlin.client().batch_training_client("batch_training", "1")

In [25]:
df_target = pd.read_feather('data/dataset_target.feather') 

In [4]:
@batch_training.input
def input1():
    return 'transform_1', '1', ['age','job','marital','education']

@batch_training.serving_function
def process():
    entity_df = df_target # Some entity df
#    point_in_time_join_with_input_1 = input1().read(entity_df)
    point_in_time_join = batch_training.client.read(entity_df)
    return point_in_time_join

In [5]:
df3=process()

In [7]:
df3.count()

cust_id                    41188
campaign_id                41188
default                    41188
housing                    41188
loan                       41188
contact                    41188
month                      41188
day_of_week                41188
duration                   41188
emp.var.rate               41188
cons.price.idx             41188
cons.conf.idx              41188
euribor3m                  41188
nr.employed                41188
y                          41188
target_timestamp           41188
transform_1_1.age          41188
transform_1_1.job          41188
transform_1_1.marital      41188
transform_1_1.education    41188
dtype: int64

In [7]:
df3

In [11]:
type(df3)

NoneType

In [14]:
df_target

Unnamed: 0,cust_id,campaign_id,default,housing,loan,contact,month,day_of_week,duration,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y,target_timestamp
0,100,1010,no,no,no,telephone,may,mon,261,1.1,93.994,-36.4,4.857,5191.0,no,1588291200
1,101,1011,unknown,no,no,telephone,may,mon,149,1.1,93.994,-36.4,4.857,5191.0,no,1588291200
2,102,1012,no,yes,no,telephone,may,mon,226,1.1,93.994,-36.4,4.857,5191.0,no,1588291200
3,103,1013,no,no,no,telephone,may,mon,151,1.1,93.994,-36.4,4.857,5191.0,no,1588291200
4,104,1014,no,no,yes,telephone,may,mon,307,1.1,93.994,-36.4,4.857,5191.0,no,1588291200
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
41183,41283,42193,no,yes,no,cellular,nov,fri,334,-1.1,94.767,-50.8,1.028,4963.6,yes,1588291200
41184,41284,42194,no,no,no,cellular,nov,fri,383,-1.1,94.767,-50.8,1.028,4963.6,no,1588291200
41185,41285,42195,no,yes,no,cellular,nov,fri,189,-1.1,94.767,-50.8,1.028,4963.6,no,1588291200
41186,41286,42196,no,no,no,cellular,nov,fri,442,-1.1,94.767,-50.8,1.028,4963.6,yes,1588291200


#### Batch Serving from a Transform

In [2]:
name='transform_marital'
version='3'
entities={'cust_id':DataType.LONG}
features = {"marital_single": DataType.INTEGER, "marital_married": DataType.INTEGER, "marital_divorced": DataType.INTEGER, "marital_unknown": DataType.INTEGER}
output_stores=[TransformOutputStores.BATCH_STORE, TransformOutputStores.ONLINE_STORE]

In [3]:
client = marlin.client().batch_ingestion_client(name,version,entities,features,output_stores)

In [4]:
@client.input
def input1():
    return 'transform_1', '1', ['marital']

@client.transform_function
def process():
    df1 = input1().read('2020-12-01-01','2020-12-01-01')
    transform_output = pd.get_dummies(df1)
    transform_output['event_timestamp']=1588230000
    return transform_output

In [5]:
df4=process()

In [11]:
exploration_client.read_between_partitions('transform_marital', '3', '2020-12-01-04', '2020-12-01-04')

Unnamed: 0,ingestion_time,event_time,cust_id,marital_divorced,marital_married,marital_single,marital_unknown
0,1606797866645,1588230000,20185,0,1,0,0
1,1606797866646,1588230000,20186,1,0,0,0
2,1606797866646,1588230000,20187,0,1,0,0
3,1606797866646,1588230000,20188,0,1,0,0
4,1606797866646,1588230000,20184,0,1,0,0
...,...,...,...,...,...,...,...
41183,1606797866639,1588230000,20179,0,1,0,0
41184,1606797866639,1588230000,20180,0,0,1,0
41185,1606797866640,1588230000,20181,0,1,0,0
41186,1606797866641,1588230000,20182,0,0,1,0


In [19]:
print(exploration_client.transform_dependency_graph_str('transform_marital', '3')) 

+--- transform_marital:3
|    \--- transform_1:1:['marital']


In [20]:
print(exploration_client.serving_dependency_graph_str('batch_training', '1'))

+--- batch_training:1
|    \--- transform_1:1:['age', 'job', 'marital', 'education']


In [12]:
batch_training = marlin.client().batch_training_client("batch_training", "4")

In [13]:
@batch_training.input
def input1():
    return 'transform_1', '1', ['age','job','marital','education']

@batch_training.input
def input2():
    return 'transform_marital', '3', ["marital_single", "marital_married", "marital_divorced"]

@batch_training.serving_function
def process():
    entity_df = df_target # Some entity df
    point_in_time_join_with_input_2 = input2().read(entity_df)
    
#    point_in_time_join = batch_training.client.read(entity_df)
    return point_in_time_join_with_input_2

In [15]:
df_target = pd.read_feather('data/dataset_target.feather') 

In [16]:
df4=process()

In [17]:
df4.head()

Unnamed: 0,cust_id,campaign_id,default,housing,loan,contact,month,day_of_week,duration,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y,target_timestamp,marital_single,marital_married,marital_divorced
0,100,1010,no,no,no,telephone,may,mon,261,1.1,93.994,-36.4,4.857,5191.0,no,1588291200,0,1,0
1,101,1011,unknown,no,no,telephone,may,mon,149,1.1,93.994,-36.4,4.857,5191.0,no,1588291200,0,1,0
2,102,1012,no,yes,no,telephone,may,mon,226,1.1,93.994,-36.4,4.857,5191.0,no,1588291200,0,1,0
3,103,1013,no,no,no,telephone,may,mon,151,1.1,93.994,-36.4,4.857,5191.0,no,1588291200,0,1,0
4,104,1014,no,no,yes,telephone,may,mon,307,1.1,93.994,-36.4,4.857,5191.0,no,1588291200,0,1,0


In [18]:
df4.count()

cust_id             41188
campaign_id         41188
default             41188
housing             41188
loan                41188
contact             41188
month               41188
day_of_week         41188
duration            41188
emp.var.rate        41188
cons.price.idx      41188
cons.conf.idx       41188
euribor3m           41188
nr.employed         41188
y                   41188
target_timestamp    41188
marital_single      41188
marital_married     41188
marital_divorced    41188
dtype: int64