In [1]:
import re
from functools import lru_cache
import time
import pandas as pd
from pandas import DataFrame
import pandasql
import rolling_pin.blob_etl as rpb
import cufflinks as cf
import pyparsing as pp
cf.go_offline()
cf.set_config_file(theme='henanigans', colorscale='henanigans')

from shekels.core.database import Database
import shekels.core.data_tools as sdt

from multiprocessing import Pipe, Pool, Manager, Process
import time
import json
import datetime
from pprint import pprint

In [None]:
from typing import Any, Optional

from multiprocessing import Pipe, Pool
from multiprocessing.managers import BaseManager
from pprint import pprint, pformat
import datetime
import json
import time
import traceback
# ------------------------------------------------------------------------------


def error_to_dict(error):
    args = []  # type: Any
    for arg in error.args:
        if hasattr(arg, 'items'):
            for key, val in arg.items():
                args.append(pformat({key: pformat(val)}))
        else:
            args.append(str(arg))
    args = ['    ' + x for x in args]
    args = '\n'.join(args)
    klass = error.__class__.__name__
    msg = f'{klass}(\n{args}\n)'
    return dict(
        error=error.__class__.__name__,
        args=list(map(str, error.args)),
        message=msg,
        code=500,
        traceback=traceback.format_exc()
    )


def log_status(
    process, message=None, iterator=0, total=None, data=None, status='pending'
):
    # type: (str, Optional[str], int, Optional[int], Optional[Any], str) -> str
    '''
    Create a state log dictionary.

    Args:
        process (str): Process being logged.
        message (str, optional): Stateful message. Default: None.
        iterator (int): Current iteration in loop. Default: None.
        total (int, optional): Total iterations. Default: None.
        data (object, optional): Data related to state. Default: None.
        status (str, optional): Status of process. Default: 'pending'.
    '''
    iter_ = iterator + 1
    percent = None
    if total is not None:
        percent = round((iter_ / float(total)) * 100, 2)

    timestamp = datetime.datetime.now().isoformat()

    # create message
    if message is None:
        message = '{process}'
        if total is not None:
            message = '{process} - {percent}% ({iterator} of {total})'
    message = message.format(
        process=process,
        message=message,
        iterator=iter_,
        total=total,
        percent=percent,
        data=data,
        timestamp=timestamp,
    )

    log = json.dumps(dict(
        status=status,
        process=process,
        message=message,
        iterator=iterator,
        total=total,
        percent=percent,
        data=data,
        timestamp=timestamp,
    ))
    return log


class Database:
    def __init__(self, connection, *args, **kwargs):
        self.connection = connection
        self.data = []

    def update(self, fail=False):
        data = []
        total = 10
        for i in range(total):
            time.sleep(0.1)
            data.append(i)
            self.connection.send(log_status(
                'update', iterator=i, total=total
            ))
            if fail and i == 5:
                raise ValueError('Deliberate failure')
        self.data = data
        return self


class DatabaseConnection:
    __instance = None

    @staticmethod
    def _request(connection, database, command, args, kwargs):
        try:
            result = getattr(database, command)(*args, **kwargs)
            connection.send(log_status(command, status='completed'))
        except Exception as e:
            connection.send(log_status(
                command, status='failed', data=error_to_dict(e)
            ))
        return result

    def __new__(cls, *args, **kwargs):
        if cls.__instance is None:
            cls.__instance = super().__new__(DatabaseConnection)
        return cls.__instance

    def __init__(self, *args, **kwargs):
        class DatabaseManager(BaseManager):
            pass

        attrs = list(filter(lambda x: not x.startswith('_'), dir(Database)))
        DatabaseManager.register('Database', callable=Database, exposed=attrs)
        self._manager = DatabaseManager()
        self._manager.start()

        self._response = None
        self._parent, self._child = Pipe(duplex=False)
        self._database = self._manager.Database(self._child, *args, **kwargs)
        self._child.send(log_status('initialize', status='completed'))

    def request(self, command, *args, **kwargs):
        self._pool = Pool(1)
        self._response = self._pool.apply_async(
            func=DatabaseConnection._request,
            args=(self._child, self._database, command, args, kwargs),
        )

    def shutdown(self):
        self._parent.close()
        self._child.close()
        self._manager.shutdown()

    @property
    def response(self):
        self._pool.close()
        self._pool.join()
        self._pool.terminate()
        return self._response.get()

    @property
    def state(self):
        return json.loads(self._parent.recv())

    @property
    def pending(self):
        return self.state['status'] == 'pending'


DBC = DatabaseConnection(Database, fail=True)


def do(flag):
    state = DBC.state
    status = state['status']
    print(state['process'], status)

    if flag:
        DBC.request('update')#, fail=True)
        return False

    if status == 'pending':
        return False
    else:
        if status == 'failed':
            print(''.join(state['data']['traceback']))
        elif status == 'completed':
            pprint(DBC.response.data)
        return True


flag = True
while True:
    time.sleep(0.05)
    flag = do(flag)
    if flag:
        break
DBC.shutdown()


In [None]:
client --> /a --> db
                   |
                   V
client --> /a --> state
client <-- /a <--  |
                   |
                   V
client --> /a --> state
client <-- /a <--  |
                   |
                   V
client --> /a --> response
client <-- /a <--  |

def /a 
  if state == 'busy'
      return progress
  else
      return response

In [57]:
class Foo:
    __instance = None

    def __new__(cls, *args, **kwargs):
        if cls.__instance is None:
            cls.__instance = super().__new__(DatabaseConnection, *args, **kwargs)
        return cls.__instance
    
x = Foo()
print(id(x))
print(x._Foo__instance)
x = Foo()
print(id(x))

94258449599936


AttributeError: 'NoneType' object has no attribute '_Foo__instance'

In [None]:
config = '/home/ubuntu/shekels/resources/config.json'
db = Database.from_json(config).update()
data = db.data

In [13]:
def get_mintapi_data(source):
    with open(source) as f:
        data = json.load(f)

    lut = dict(
        odate='date',
        merchant='description',
        omerchant='original_description',
        amount='amount',
        account='account',
        category='category',
        labels='labels',
        note='notes',
        fi='financial_institution'
    )
    data = DataFrame(data)
    del data['date']
    data.rename(
        lambda x: lut[x] if x in lut.keys() else x,
        axis=1,
        inplace=True
    )
    data.rename(lbt.as_snakecase, axis=1, inplace=True)
    data.date = data.date.apply(
        lambda x: datetime.utcfromtimestamp(int(str(x)[:-3]))
    )
#     data = data[lut.values()]
    return data


#     lut = get_periodicity_table(data, 'date', 'description', 'amount')
def get_periodicity_table(data, group_column, key_column, value_column):
    data = data.copy()
    data[group_column] = data[group_column].apply(
        lambda x: x.replace(microsecond=randint(0, 1000000))
    )
    cols = [group_column, key_column, value_column]
    data = data[cols].pivot(*cols)
    data[group_column] = data.index
    data.reset_index(drop=True, inplace=True)

    score = DataFrame()
    date_intervals = [
        'year',
        'half_year',
        'quarter_year',
        'month',
        'half_month',
        'week',
        'day',
        'half_day',
    ]
    for period in date_intervals:
        prob = data.copy()
        prob[group_column] = prob[group_column].apply(lambda x: conform_date(x, period))
        prob = prob.groupby(group_column).count()
        cnt = prob.sum()
        prob = prob.applymap(lambda x: 0 if x == 0 else 1.0 / x)
        prob = prob.mean() * cnt
        score[period] = prob

    score = score.T
    output = DataFrame()
    output[key_column] = score.columns.tolist()
    output['period'] = score.apply(lambda x: np.argmax(x)).tolist()
    output['score'] = score.apply(lambda x: np.max(x)).tolist()
    output.sort_values('score', ascending=False, inplace=True)
    output.reset_index(drop=True, inplace=True)
    return output


def date_pivot(data, key_column, value_column):
    data.date = data.date.apply(lambda x: x.replace(microsecond=randint(0, 1000000)))
    cols = ['date', key_column, value_column]
    data = data[cols].pivot(*cols)
    data['date'] = data.index.tolist()
    data.date = data.date.apply(lambda x: x.replace(microsecond=0))
    data.reset_index(drop=True, inplace=True)
    return data


def conform_date(date, period):
    if period == 'year':
        return datetime(date.year, 1, 1)

    if period == 'half_year':
        if date.month >= 7:
            return datetime(date.year, 7, 1)
        return datetime(date.year, 1, 1)

    if period == 'quarter_year':
        if date.month >= 10:
            return datetime(date.year, 10, 1)
        elif date.month >= 7:
            return datetime(date.year, 7, 1)
        elif date.month < 4:
            return datetime(date.year, 4, 1)
        else:
            return datetime(date.year, 1, 1)

    if period == 'month':
        return datetime(date.year, date.month, 1)

    if period == 'half_month':
        if date.day >= 16:
            return datetime(date.year, date.month, 16)
        return datetime(date.year, date.month, 1)

    if period == 'week':
        if date.day >= 22:
            return datetime(date.year, date.month, 22)
        elif date.day >= 15:
            return datetime(date.year, date.month, 15)
        elif date.day >= 8:
            return datetime(date.year, date.month, 8)
        else:
            return datetime(date.year, date.month, 1)

    if period == 'day':
        return datetime(date.year, date.month, date.day)

    if period == 'half_day':
        if date.hour >= 13:
            return datetime(date.year, date.month, date.day, 13)
        return datetime(date.year, date.month, date.day, 1)

    raise ValueError(period + ' is not a valid period')
