Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #55 from postatum/95473210_python3_support
Browse files Browse the repository at this point in the history
Add python3 support
  • Loading branch information
jstoiko committed Jun 12, 2015
2 parents 975a722 + 4949acb commit 4409772
Show file tree
Hide file tree
Showing 25 changed files with 291 additions and 257 deletions.
10 changes: 5 additions & 5 deletions nefertari/authentication/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_token_credentials(cls, username, request):
try:
user = cls.get_resource(username=username)
except Exception as ex:
log.error(unicode(ex))
log.error(str(ex))
forget(request)
else:
if user:
Expand All @@ -63,7 +63,7 @@ def get_groups_by_token(cls, username, token, request):
try:
user = cls.get_resource(username=username)
except Exception as ex:
log.error(unicode(ex))
log.error(str(ex))
forget(request)
return
else:
Expand All @@ -86,7 +86,7 @@ def verify_password(user, password):
try:
user = cls.get_resource(**{key: login})
except Exception as ex:
log.error(unicode(ex))
log.error(str(ex))

if user:
password = params.get('password', None)
Expand All @@ -102,7 +102,7 @@ def get_groups_by_userid(cls, userid, request):
try:
user = cls.get_resource(**{cls.pk_field(): userid})
except Exception as ex:
log.error(unicode(ex))
log.error(str(ex))
forget(request)
else:
if user:
Expand Down Expand Up @@ -154,7 +154,7 @@ def lower_strip(instance, new_value):
def encrypt_password(instance, new_value):
""" Crypt :new_value: if it's not crypted yet. """
if new_value and not crypt.match(new_value):
new_value = unicode(crypt.encode(new_value))
new_value = str(crypt.encode(new_value))
return new_value


Expand Down
14 changes: 9 additions & 5 deletions nefertari/authentication/policies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import six
from pyramid.authentication import CallbackAuthenticationPolicy

from nefertari import engine
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(self, user_model, check=None, credentials_callback=None):
perform requests.
"""
self.user_model = user_model
if isinstance(self.user_model, basestring):
if isinstance(self.user_model, six.string_types):
self.user_model = engine.get_document_cls(self.user_model)
create_apikey_model(self.user_model)

Expand Down Expand Up @@ -85,10 +86,13 @@ def _get_credentials(self, request):
if authmeth.lower() != 'apikey':
return None

try:
auth = authbytes.decode('utf-8')
except UnicodeDecodeError:
auth = authbytes.decode('latin-1')
if six.PY2 or isinstance(authbytes, bytes):
try:
auth = authbytes.decode('utf-8')
except UnicodeDecodeError:
auth = authbytes.decode('latin-1')
else:
auth = authbytes

try:
username, api_key = auth.split(':', 1)
Expand Down
100 changes: 49 additions & 51 deletions nefertari/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from functools import partial

import elasticsearch
from elasticsearch import helpers
import six

from nefertari.utils import (
dictset, dict2obj, process_limit, split_strip)
Expand Down Expand Up @@ -64,7 +66,7 @@ def perform_request(self, *args, **kw):
status_code = 400
raise exception_response(
status_code,
detail='elasticsearch error.',
detail=six.b('Elasticsearch error'),
extra=dict(data=e))
else:
self._catch_index_error(resp)
Expand All @@ -77,13 +79,22 @@ def includeme(config):
ES.create_index()


def _bulk_body(body, refresh_index=None):
kwargs = {'body': body}
def _bulk_body(documents_actions, refresh_index=None):
kwargs = {
'client': ES.api,
'actions': documents_actions,
}
refresh_provided = refresh_index is not None
refresh_enabled = ES.settings.asbool('enable_refresh_query')
if refresh_provided and refresh_enabled:
kwargs['refresh'] = refresh_index
return ES.api.bulk(**kwargs)

executed_num, errors = helpers.bulk(**kwargs)
log.info('Successfully executed {} Elasticsearch action(s)'.format(
executed_num))
if errors:
raise Exception('Errors happened when executing Elasticsearch '
'actions'.format('; '.join(errors)))


def process_fields_param(fields):
Expand All @@ -98,7 +109,7 @@ def process_fields_param(fields):
"""
if not fields:
return fields
if isinstance(fields, basestring):
if isinstance(fields, six.string_types):
fields = split_strip(fields)
if '_type' not in fields:
fields.append('_type')
Expand Down Expand Up @@ -141,7 +152,8 @@ def build_qs(params, _raw_terms='', operator='AND'):
else:
terms.append('%s:%s' % (k, v))

_terms = (' %s ' % operator).join(filter(bool, terms)) + _raw_terms
terms = sorted([term for term in terms if term])
_terms = (' %s ' % operator).join(terms) + _raw_terms

return _terms

Expand Down Expand Up @@ -244,12 +256,12 @@ def put_mapping(self, body, **kwargs):
index=self.index_name,
**kwargs)

def process_chunks(self, documents, operation, chunk_size=None):
""" Apply `operation` to chunks of `documents` of size `chunk_size`.
def process_chunks(self, documents, operation):
""" Apply `operation` to chunks of `documents` of size
`self.chunk_size`.
"""
if chunk_size is None:
chunk_size = self.chunk_size
chunk_size = self.chunk_size
start = end = 0
count = len(documents)

Expand All @@ -259,7 +271,7 @@ def process_chunks(self, documents, operation, chunk_size=None):
end += chunk_size

bulk = documents[start:end]
operation(body=bulk)
operation(documents_actions=bulk)

start += chunk_size
count -= chunk_size
Expand All @@ -268,7 +280,7 @@ def prep_bulk_documents(self, action, documents):
if not isinstance(documents, list):
documents = [documents]

_docs = []
docs_actions = []
for doc in documents:
if not isinstance(doc, dict):
raise ValueError(
Expand All @@ -280,58 +292,44 @@ def prep_bulk_documents(self, action, documents):
else:
_doc_type = self.doc_type

meta = {
action: {
'action': action,
'_index': self.index_name,
'_type': _doc_type,
'_id': doc['id']
}
doc_action = {
'_op_type': action,
'_index': self.index_name,
'_type': _doc_type,
'_id': doc['id'],
'_source': doc,
}

_docs.append([meta, doc])
docs_actions.append(doc_action)

return _docs

def _bulk(self, action, documents, chunk_size=None,
refresh_index=None):
if chunk_size is None:
chunk_size = self.chunk_size
return docs_actions

def _bulk(self, action, documents, refresh_index=None):
if not documents:
log.debug('empty documents: %s' % self.doc_type)
log.debug('Empty documents: %s' % self.doc_type)
return

documents = self.prep_bulk_documents(action, documents)

body = []
for meta, doc in documents:
action = meta.keys()[0]
if action == 'delete':
body += [meta]
elif action == 'index':
if 'timestamp' in doc:
meta['_timestamp'] = doc['timestamp']
body += [meta, doc]

if body:
# Use chunk_size*2, because `body` is a sequence of
# meta, document, meta, ...
documents_actions = self.prep_bulk_documents(action, documents)

if action == 'index':
for doc in documents_actions:
doc_data = doc.get('_source', {})
if 'timestamp' in doc_data:
doc['_timestamp'] = doc_data['timestamp']

if documents_actions:
operation = partial(_bulk_body, refresh_index=refresh_index)
self.process_chunks(
documents=body,
operation=operation,
chunk_size=chunk_size*2)
documents=documents_actions,
operation=operation)
else:
log.warning('Empty body')

def index(self, documents, chunk_size=None,
refresh_index=None):
def index(self, documents, refresh_index=None):
""" Reindex all `document`s. """
self._bulk('index', documents, chunk_size, refresh_index)
self._bulk('index', documents, refresh_index)

def index_missing_documents(self, documents, chunk_size=None,
refresh_index=None):
def index_missing_documents(self, documents, refresh_index=None):
""" Index documents that are missing from ES index.
Determines which documents are missing using ES `mget` call which
Expand Down Expand Up @@ -363,7 +361,7 @@ def index_missing_documents(self, documents, chunk_size=None,
'index `{}`'.format(self.doc_type, self.index_name))
return

self._bulk('index', documents, chunk_size, refresh_index)
self._bulk('index', documents, refresh_index)

def delete(self, ids, refresh_index=None):
if not isinstance(ids, list):
Expand Down
8 changes: 4 additions & 4 deletions nefertari/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
Db setup should be performed after loading models, as some engines require
model schemas to be defined before creating the database. If your database
does not have the above requirement, it's up to you to decide when to set up
does not have the above requirement, it's up to you to decide when to set up
the db.
The specified engine module is also `config.include`d here, thus running the
The specified engine module is also `config.include`d here, thus running the
engine's `icludeme` function and allowing setting up required state,
performing some actions, etc.
The engine specified may be either a module or a package.
In case you build a custom engine, variables you expect to use from it
In case you build a custom engine, variables you expect to use from it
should be importable from the package itself.
E.g. ``from your.package import BaseDocument``
Expand All @@ -44,6 +44,6 @@ def _valid_global(g):
engine_path = config.registry.settings['nefertari.engine']
config.include(engine_path)
engine_module = resolve(engine_path)
engine_globals = {k: v for k, v in engine_module.__dict__.iteritems()
engine_globals = {k: v for k, v in engine_module.__dict__.items()
if _valid_global(k)}
globals().update(engine_globals)
10 changes: 6 additions & 4 deletions nefertari/json_httpexceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import sys
import traceback
from datetime import datetime

import six
from pyramid import httpexceptions as http_exc

from nefertari.wrappers import apply_privacy
Expand Down Expand Up @@ -43,8 +45,7 @@ def create_json_response(obj, request=None, log_it=False, show_stack=False,
body['id'] = obj.location.split('/')[-1]

body.update(extra)

obj.body = json_dumps(body, encoder=encoder)
obj.body = six.b(json_dumps(body, encoder=encoder))
show_stack = log_it or show_stack
status = obj.status_int

Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(self, *arg, **kw):
thismodule = sys.modules[__name__]


http_exceptions = http_exc.status_map.values() + [
http_exceptions = list(http_exc.status_map.values()) + [
http_exc.HTTPBadRequest,
http_exc.HTTPInternalServerError,
]
Expand All @@ -103,7 +104,8 @@ def __init__(self, *args, **kwargs):
if resource and 'location' in kwargs:
resource['self'] = kwargs['location']

auth = request and request.registry._root_resources.values()[0].auth
auth = (request and
list(request.registry._root_resources.values())[0].auth)
if resource and auth:
wrapper = apply_privacy(request=request)
resource = wrapper(result=resource)
Expand Down
4 changes: 2 additions & 2 deletions nefertari/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def add(self, member_name, collection_name='', parent=None, uid='',

name_segs = [a.member_name for a in new_resource.ancestors]
name_segs.insert(1, prefix)
name_segs = filter(bool, name_segs)
name_segs = [seg for seg in name_segs if seg]
if name_segs:
kwargs['name_prefix'] = '_'.join(name_segs) + ':'

Expand All @@ -298,7 +298,7 @@ def add(self, member_name, collection_name='', parent=None, uid='',
# add all route names for this resource as keys in the dict,
# so its easy to find it in the view.
self.resource_map.update(dict.fromkeys(
new_resource.action_route_map.values(),
list(new_resource.action_route_map.values()),
new_resource))

parent.children.append(new_resource)
Expand Down
4 changes: 2 additions & 2 deletions nefertari/scripts/es.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from argparse import ArgumentParser
import sys
import urlparse
import logging

from pyramid.paster import bootstrap
from pyramid.config import Configurator
from six.moves import urllib

from nefertari.utils import dictset, split_strip, to_dicts
from nefertari.elasticsearch import ES
Expand Down Expand Up @@ -94,7 +94,7 @@ def run(self):

params = self.options.params or ''
params = dict([
[k, v[0]] for k, v in urlparse.parse_qs(params).items()
[k, v[0]] for k, v in urllib.parse.parse_qs(params).items()
])
params.setdefault('_limit', params.get('_limit', 10000))
chunk_size = self.options.chunk or params['_limit']
Expand Down

0 comments on commit 4409772

Please sign in to comment.