Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
264 lines (213 sloc) 7.69 KB

labels: Blog Databases DynamoDB created: 2015-02-22T21:45 place: Phuket, Thailand comments: true

DynamoDB in examples, Example 1.1: DDBTable class

I refactored previous example into:

class DDBUserWallet(DDBTable):

    TABLE_NAME = 'user_wallet'
    KEY_SCHEMA = [{
        'AttributeName': 'user_id',
        'KeyType': 'HASH',
    }]
    PROVISIONED_THROUGHPUT = {
        'ReadCapacityUnits': 1,
        'WriteCapacityUnits': 1
    }
    FIELDS = {
        'user_id': DDBUUIDField,
        'balance': DDBIntField,
    }

    def update(self, user_id, balance):
        message = self._dynamodb(operation='UpdateItem').call(
            TableName=self._get_table_name(),
            Key=self.encode_item(data={'user_id': user_id}),
            AttributeUpdates=self.encode_item(data={'balance': balance}, update=True))
        return message

    def get(self, user_id):
        message = self._dynamodb(operation='GetItem').call(
            TableName=self._get_table_name(),
            Key=self.encode_item(data={'user_id': user_id}))
        return self.decode_item(item=message['Item'])


if __name__ == '__main__':

    user_id = uuid.uuid4()

    user_wallet = DDBUserWallet()
    user_wallet.create_table()
    user_wallet.update(user_id=user_id, balance=100)
    print(user_wallet.get(user_id=user_id))

    # output:
    # user_wallet table already exists.
    # {'user_id': '975ddae9-312e-472b-8aec-f7a3825132eb', 'balance': 100}

DDBTable and fields classes:

import botocore.session
import logging
import re
import six
import uuid

from functools import partial


logger = logging.getLogger()


class AmazonException(Exception):

    def __init__(self, message, code='unknown'):
        self.message = message
        self.code = code

    def __str__(self):
        return self.message


class DDBException(AmazonException):

    ITEM_ENCODE_ERROR = 'ItemEncodeError'
    ITEM_DECODE_ERROR = 'ItemDecodeError'


class Botocore(object):

    _session = None

    def __init__(self, service, operation, region_name, endpoint_url=None):
        session = Botocore._session
        if session is None:
            session = botocore.session.get_session()
            Botocore._session = session
        service = session.get_service(service_name=service)
        self.endpoint = service.get_endpoint(
            region_name=region_name, endpoint_url=endpoint_url)
        self.operation = service.get_operation(operation_name=operation)

    def call(self, **kwargs):
        response, message = self.operation.call(endpoint=self.endpoint, **kwargs)
        if response.status_code != 200:
            raise AmazonException(
                message='DynamoDB request error: {message}.'.format(
                    message=message.get('Error', {'Message': 'unknown'})['Message']),
                code=message.get('Error', {'Code': 'unknown'})['Code'])
        return message


class DDBField(object):

    @classmethod
    def _validate(cls, value):
        raise NotImplementedError('Not implemented.')

    @classmethod
    def decode(cls, value):
        try:
            return cls._validate(value)
        except (TypeError, ValueError):
            raise DDBException(
                message='Invalid value for {cls} decode.'.format(cls=cls.__name__),
                code=DDBException.ITEM_DECODE_ERROR)

    @classmethod
    def encode(cls, value):
        try:
            return str(cls._validate(value))
        except (TypeError, ValueError):
            raise DDBException(
                message='Invalid value for {cls} encode.'.format(cls=cls.__name__),
                code=DDBException.ITEM_ENCODE_ERROR)


class DDBIntField(DDBField):

    AMAZON_TYPE = 'N'

    @classmethod
    def _validate(cls, value):
        if isinstance(value, int):
            return value
        return int(value)


class DDBUUIDField(DDBField):

    AMAZON_TYPE = 'S'
    _UUID_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')

    @classmethod
    def _validate(cls, value):
        if not isinstance(value, str):
            value = str(value)
        if cls._UUID_REGEXP.match(value) is None:
            raise ValueError('UUID required.')
        return value


class DDBTable(object):

    TABLE_NAME = ''
    REGION_NAME = 'us-west-2'
    KEY_SCHEMA = []
    LOCAL_SECONDARY_INDEXES = []
    GLOBAL_SECONDARY_INDEXES = []
    PROVISIONED_THROUGHPUT = {}
    FIELDS = {}

    def _get_table_name(self):
        return self.TABLE_NAME

    def _get_table_kwargs(self):
        key_fields = set()
        for key in self.KEY_SCHEMA:
            key_fields.add(key['AttributeName'])
        for index in self.LOCAL_SECONDARY_INDEXES:
            for key in index['KeySchema']:
                key_fields.add(key['AttributeName'])
        for index in self.GLOBAL_SECONDARY_INDEXES:
            for key in index['KeySchema']:
                key_fields.add(key['AttributeName'])
        attribute_definitions = []
        for field_name in key_fields:
            attribute_definitions.append({
                'AttributeName': field_name,
                'AttributeType': self.FIELDS[field_name].AMAZON_TYPE
            })
        kwargs = {
            'TableName': self._get_table_name(),
            'AttributeDefinitions': attribute_definitions,
            'KeySchema': self.KEY_SCHEMA,
            'ProvisionedThroughput': self.PROVISIONED_THROUGHPUT,
        }
        if getattr(self, 'LOCAL_SECONDARY_INDEXES', None):
            kwargs['LocalSecondaryIndexes'] = self.LOCAL_SECONDARY_INDEXES
        if getattr(self, 'GLOBAL_SECONDARY_INDEXES', None):
            kwargs['GlobalSecondaryIndexes'] = self.GLOBAL_SECONDARY_INDEXES
        return kwargs

    def _get_endpoint_url(self):
        return None

    def _dynamodb(self, operation):
        return Botocore(
            service='dynamodb', operation=operation,
            region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url())

    def create_table(self):
        try:
            message = self._dynamodb(operation='DescribeTable').call(
                TableName=self._get_table_name())
        except AmazonException as e:
            if e.code != 'ResourceNotFoundException':
                raise e
            logger.warning('Creation {table_name} table ...'.format(
                table_name=self._get_table_name()))
            message = self._dynamodb(operation='CreateTable').call(
                **self._get_table_kwargs())
        else:
            logger.warning('{table_name} table already exists.'.format(
                table_name=self._get_table_name()))

    def encode_item(self, data, keys=None, update=False):
        if not data:
            return {}
        keys = keys or data.keys()
        item = {}
        for key in keys:
            if key not in data:
                continue
            val = self.FIELDS[key].encode(value=data[key])
            if update:
                item[key] = {
                    'Value': {
                        self.FIELDS[key].AMAZON_TYPE: val
                    },
                    'Action': 'PUT'
                }
            else:
                item[key] = {
                    self.FIELDS[key].AMAZON_TYPE: val
                }
        return item

    def decode_item(self, item, keys=None):
        data = {}
        for key, val in six.iteritems(item):
            if key not in self.FIELDS:
                continue
            if keys and key not in keys:
                continue
            data[key] = self.FIELDS[key].decode(
                val[self.FIELDS[key].AMAZON_TYPE])
        return data

If You use tornado, try tornado-botocore:

# pip install tornado-botocore
from tornado_botocore import Botocore

to do requests asynchronously.