Skip to content

Commit

Permalink
macrobase.py skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
pbailis committed Jan 14, 2017
1 parent 3f81a03 commit da7c6be
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 0 deletions.
1 change: 1 addition & 0 deletions core/src/python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
2 changes: 2 additions & 0 deletions core/src/python/macrobase/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

from macrobase import Pipeline
14 changes: 14 additions & 0 deletions core/src/python/macrobase/distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

import macrobase as mb
import pandas as pd

class BatchMAD(mb.Classifier):
''' Batch classifier computes Median Absolute Deviation
and compares to specified threshold '''
def __init__(self, threshold = 3):
self.threshold = threshold

def process(self, batch):
self.median = batch.median()
self.mad = (abs(batch - self.median)).median()
return pd.DataFrame(abs(batch-self.median)/self.mad > self.threshold)
9 changes: 9 additions & 0 deletions core/src/python/macrobase/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

import macrobase as mb

class LambdaTransform(mb.FeatureTransform):
def __init__(self, func):
self.func = func

def process(self, batch):
return batch.apply(self.func)
42 changes: 42 additions & 0 deletions core/src/python/macrobase/macrobase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

import abc

import pandas as pd

class Pipeline(object):
"""Core MacroBase execution unit.
Currently implements non-branching dataflow."""
def __init__(self):
self.operators = []

def then(self, operator):
self.operators.append(operator)
return self

def add(self, operator):
return self.then(operator)

def process(self, data):
for op in self.operators:
data = op.process(data)
return data

class Operator(object):
"""MacroBase dataflow operator"""
def __init__(self):
pass

@abc.abstractmethod
def process(self, batch):
raise NotImplementedError("Calling an abstract method.")

class FeatureTransform(Operator):
pass

class Classifier(Operator):
pass





16 changes: 16 additions & 0 deletions core/src/python/sample/sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
import sys
sys.path.insert(0, os.path.abspath('..'))

import pandas as pd

from macrobase import Pipeline
from macrobase.distribution import BatchMAD
from macrobase.feature import LambdaTransform

p = Pipeline()
data = pd.DataFrame([1,2,4,3,123,124,12,5109120398,12923012,123,123])

result = p.add(LambdaTransform(lambda x: x+2)).then(BatchMAD()).process(data)
print result

21 changes: 21 additions & 0 deletions core/src/python/test/test_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os
import sys
sys.path.insert(0, os.path.abspath('..'))

import unittest
from pandas.util.testing import assert_frame_equal

from macrobase import Pipeline
from macrobase.distribution import BatchMAD

import pandas as pd
import numpy as np

class TestMAD(unittest.TestCase):
def test(self):
s = pd.Series([1.5, 50, 2, 3, 10000])

m = BatchMAD()
result = m.process(s)
assert_frame_equal(result,
pd.DataFrame([False, True, False, False, True]))

0 comments on commit da7c6be

Please sign in to comment.