In [1]:
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import udf, lit, col, when, avg, countDistinct, year, month
from pyspark.sql import Window, DataFrame
from pyspark.sql.types import IntegerType

from pyspark.ml.pipeline import Pipeline, Transformer
import numpy as np
import pandas as pd
import pickle
import dill
import codecs

import importlib

In [2]:
sc=SparkContext(appName='jlg')
sqlcontext=SQLContext(sc)

In [3]:
# create dumb pyspark dataframe

X1 = np.random.rand(1000).reshape(-1,1)
X2 = np.random.rand(1000).reshape(-1,1)
X3 = np.random.rand(1000).reshape(-1,1)
X4 = np.random.rand(1000).reshape(-1,1)
Y = X1*2 + X2*4 + X3*2 + X4*1

m = np.hstack([X1,X2,X3,X4,Y])
dataset = pd.DataFrame(m)
dataset.columns = ['X1','X2','X3','X4','Label']

dataset.to_csv('data/foo.csv', index=False)

df = sqlcontext.createDataFrame(dataset,schema=["F1", "F2", "F3", "F4", "Label"])

df.show(5)

+------------------+-------------------+--------------------+-------------------+------------------+
|                F1|                 F2|                  F3|                 F4|             Label|
+------------------+-------------------+--------------------+-------------------+------------------+
|0.9943645490262756| 0.6834474262246089|  0.5276763172545255| 0.4465342567501357| 6.224405694210173|
|0.5551040310893406| 0.3839117920128954|  0.2068978016004276|0.24723021334849005|3.3068810467796075|
|0.5478218322588008|0.25393080263636647|  0.9272027097868556|  0.401311778116914| 4.367084072753693|
|  0.90395138218402| 0.7284667067775239| 0.43171005199817203| 0.6062572439653663| 6.191446939439846|
|0.6005850013522184| 0.6746304560460489|0.042127785495332026| 0.3324227018184124| 4.316370099697709|
+------------------+-------------------+--------------------+-------------------+------------------+
only showing top 5 rows



In [4]:
# create custome transfomer

def Linear_Scaler(params):
    """
    A custom Transformer which scale the value up
    """
    foo = udf(lambda x: x*2)
    
    context = params['context']
    df = context.read.csv('data/foo.csv', header='true', inferSchema = 'true')
    
    alpha = params['alpha'] 
    inputCol = 'X1'
    outputCol = 'F1'
    
    # do transform
    tmp = df.withColumn(outputCol, df[inputCol]*alpha)
    return tmp

    

In [5]:
foo = Linear_Scaler({'context':sqlcontext,'alpha':2.0})
foo.show(3)


+------------------+-------------------+------------------+-------------------+------------------+------------------+
|                X1|                 X2|                X3|                 X4|             Label|                F1|
+------------------+-------------------+------------------+-------------------+------------------+------------------+
|0.9943645490262756| 0.6834474262246089|0.5276763172545255| 0.4465342567501357| 6.224405694210173|1.9887290980525512|
|0.5551040310893406| 0.3839117920128954|0.2068978016004276|0.24723021334849005|3.3068810467796075|1.1102080621786812|
|0.5478218322588008|0.25393080263636647|0.9272027097868556|  0.401311778116914| 4.367084072753693|1.0956436645176015|
+------------------+-------------------+------------------+-------------------+------------------+------------------+
only showing top 3 rows



In [43]:
%load_ext autoreload
%autoreload 2

from src.core.store import Store
from src.core.feature import Feature

store = Store('store_config.json')
f = Feature('foo_scaler')
f.author = 'Kai Niu'
f.params = {'context':'The pyspark context, must provided.','alpha':'the scaler coef, optional.'}
f.comment = 'scale the data by the coef alpha'

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
== Store Initialized: ==
{
  "store_name": "Kai's Feature Store",
  "root_dir": "/Users/kai/repository/nebula/storage",
  "book_keeper": {
    "type": "default",
    "params": {
      "folder_name": "catalog",
      "file_name": "catalog.nbl"
    }
  },
  "persistors": {
    "default": {
      "folder_name": "features"
    }
  },
  "writers": {
    "default": {
      "folder_name": "features"
    }
  },
  "readers": {
    "default": {
      "folder_name": "features"
    }
  },
  "serializers": {
    "default": {}
  },
  "deserializers": {
    "default": {}
  }
}


In [46]:
store.register(f, Linear_Scaler)

/Users/kai/repository/nebula/storage/features
b'\x80\x03cdill._dill\n_create_function\nq\x00(cdill._dill\n_load_type\nq\x01X\x08\x00\x00\x00CodeTypeq\x02\x85q\x03Rq\x04(K\x01K\x00K\x08K\x05KCCNt\x00d\x01d\x02\x84\x00\x83\x01}\x01|\x00d\x03\x19\x00}\x02|\x02j\x01j\x02d\x04d\x05d\x05d\x06\x8d\x03}\x03|\x00d\x07\x19\x00}\x04d\x08}\x05d\t}\x06|\x03\xa0\x03|\x06|\x03|\x05\x19\x00|\x04\x14\x00\xa1\x02}\x07|\x07S\x00q\x05(X7\x00\x00\x00\n    A custom Transformer which scale the value up\n    q\x06h\x04(K\x01K\x00K\x01K\x02KSC\x08|\x00d\x01\x14\x00S\x00q\x07NK\x02\x86q\x08)X\x01\x00\x00\x00xq\t\x85q\nX\x1e\x00\x00\x00<ipython-input-4-831427609ff2>q\x0bX\x08\x00\x00\x00<lambda>q\x0cK\x07C\x00q\r))tq\x0eRq\x0fX\x1f\x00\x00\x00Linear_Scaler.<locals>.<lambda>q\x10X\x07\x00\x00\x00contextq\x11X\x0c\x00\x00\x00data/foo.csvq\x12X\x04\x00\x00\x00trueq\x13X\x06\x00\x00\x00headerq\x14X\x0b\x00\x00\x00inferSchemaq\x15\x86q\x16X\x05\x00\x00\x00alphaq\x17X\x02\x00\x00\x00X1q\x18X\x02\x00\x00\x00F1q\x19tq\x

In [47]:
store.catalog()

== Feature Catalog ==
foo_scaler 	 c5576cd0-6b2e-4e70-80dc-964da57a05c8 	 04, Jun 2019 	 Kai Niu
foo_scaler 	 4996b0eb-377e-4148-8254-176c94d56966 	 04, Jun 2019 	 Kai Niu
foo_scaler 	 ff216f0e-9966-40e6-857c-a1a95167dc8d 	 04, Jun 2019 	 Kai Niu
foo_scaler 	 9d988bd6-74ab-43b5-9770-ba4ffc13a3f6 	 04, Jun 2019 	 Kai Niu
foo_scaler 	 144a7a5d-add6-4620-b5a1-d33856d8e357 	 04, Jun 2019 	 Kai Niu


In [48]:
store.feature_info('ff216f0e-9966-40e6-857c-a1a95167dc8d')

== Feature Detail ==
foo_scaler 	 ff216f0e-9966-40e6-857c-a1a95167dc8d 	 04, Jun 2019 	 Kai Niu
params: 
     context: The pyspark context, must provided.
     alpha: the scaler coef, optional.
comments: scale the data by the coef alpha


In [54]:
params = {'context':sqlcontext,'alpha':2.0}
uid = 'ff216f0e-9966-40e6-857c-a1a95167dc8d'

p = store.checkout(uid, params)

/Users/kai/repository/nebula/storage/features ff216f0e-9966-40e6-857c-a1a95167dc8d.dill
ff216f0e-9966-40e6-857c-a1a95167dc8d


In [55]:
p.show(3)

+------------------+-------------------+------------------+-------------------+------------------+------------------+
|                X1|                 X2|                X3|                 X4|             Label|                F1|
+------------------+-------------------+------------------+-------------------+------------------+------------------+
|0.9943645490262756| 0.6834474262246089|0.5276763172545255| 0.4465342567501357| 6.224405694210173|1.9887290980525512|
|0.5551040310893406| 0.3839117920128954|0.2068978016004276|0.24723021334849005|3.3068810467796075|1.1102080621786812|
|0.5478218322588008|0.25393080263636647|0.9272027097868556|  0.401311778116914| 4.367084072753693|1.0956436645176015|
+------------------+-------------------+------------------+-------------------+------------------+------------------+
only showing top 3 rows



In [None]:
store.config['deserializer']['type']