Skip to content
Permalink
master
Switch branches/tags
Go to file
 
 
Cannot retrieve contributors at this time
3367 lines (3239 sloc) 149 KB
import datetime
import copy
import httplib2
import apiclient
import time
import pytz
import yaml
import zipfile
import collections.abc as abc
from PIL import Image, ImageEnhance
from twilio.rest import Client as TwilioRestClient
import pycountry
import docassemble.base.ocr
import pickle
from itertools import chain
from docassemble.base.logger import logmessage
from docassemble.base.error import DAError, DAValidationError, DAIndexError, DAWebError
from jinja2.runtime import UndefinedError
from jinja2.exceptions import TemplateError
import docassemble.base.pandoc
import docassemble.base.pdftk
import docassemble.base.file_docx
from docassemble.base.file_docx import include_docx_template
from docassemble.base.functions import alpha, roman, item_label, comma_and_list, get_language, set_language, get_dialect, set_country, get_country, word, comma_list, ordinal, ordinal_number, need, nice_number, quantity_noun, possessify, verb_past, verb_present, noun_plural, noun_singular, space_to_underscore, force_ask, force_gather, period_list, name_suffix, currency_symbol, currency, indefinite_article, nodoublequote, capitalize, title_case, url_of, do_you, did_you, does_a_b, did_a_b, were_you, was_a_b, have_you, has_a_b, your, her, his, their, is_word, get_locale, set_locale, process_action, url_action, get_info, set_info, get_config, prevent_going_back, qr_code, action_menu_item, from_b64_json, defined, define, value, message, response, json_response, command, single_paragraph, quote_paragraphs, location_returned, location_known, user_lat_lon, interview_url, interview_url_action, interview_url_as_qr, interview_url_action_as_qr, interview_email, get_emails, this_thread, static_image, action_arguments, action_argument, language_functions, language_function_constructor, get_default_timezone, user_logged_in, interface, user_privileges, user_has_privilege, user_info, background_action, background_response, background_response_action, background_error_action, us, set_live_help_status, chat_partners_available, phone_number_in_e164, phone_number_formatted, phone_number_is_valid, countries_list, country_name, write_record, read_records, delete_record, variables_as_json, all_variables, server, language_from_browser, device, plain, bold, italic, states_list, state_name, subdivision_type, indent, raw, fix_punctuation, set_progress, get_progress, referring_url, undefine, invalidate, dispatch, yesno, noyes, split, showif, showifdef, phone_number_part, set_parts, log, encode_name, decode_name, interview_list, interview_menu, server_capabilities, session_tags, get_chat_log, get_user_list, get_user_info, set_user_info, get_user_secret, create_user, create_session, get_session_variables, set_session_variables, get_question_data, go_back_in_session, manage_privileges, salutation, redact, ensure_definition, forget_result_of, re_run_logic, reconsider, set_title, set_save_status, single_to_double_newlines, CustomDataType, verbatim, add_separators, update_ordinal_numbers, update_ordinal_function, update_language_function, update_nice_numbers, update_word_collection, store_variables_snapshot, get_uid, update_terms
from docassemble.base.core import DAObject, DAList, DADict, DAOrderedDict, DASet, DAFile, DAFileCollection, DAStaticFile, DAFileList, DAEmail, DAEmailRecipient, DAEmailRecipientList, DATemplate, DAEmpty, DALink, selections, objects_from_file, RelationshipTree, DAContext, DACatchAll, DALazyTemplate
from decimal import Decimal
import sys
#sys.stderr.write("importing async mail now from util\n")
from docassemble.base.filter import markdown_to_html, to_text, ensure_valid_filename
from docassemble.base.generate_key import random_alphanumeric
#file_finder, url_finder, da_send_mail
import docassemble.base.filter
import dateutil
import dateutil.parser
import json
import codecs
import babel.dates
#import redis
import re
import phonenumbers
import tempfile
import os
import shutil
import subprocess
from bs4 import BeautifulSoup
import types
import requests
from requests.auth import HTTPDigestAuth, HTTPBasicAuth
from requests.exceptions import RequestException
import i18naddress
valid_variable_match = re.compile(r'^[^\d][A-Za-z0-9\_]*$')
__all__ = [
'alpha',
'roman',
'item_label',
'ordinal',
'ordinal_number',
'comma_list',
'word',
'get_language',
'set_language',
'get_dialect',
'set_country',
'get_country',
'get_locale',
'set_locale',
'comma_and_list',
'need',
'nice_number',
'quantity_noun',
'currency_symbol',
'verb_past',
'verb_present',
'noun_plural',
'noun_singular',
'indefinite_article',
'capitalize',
'space_to_underscore',
'force_ask',
'force_gather',
'period_list',
'name_suffix',
'currency',
'static_image',
'title_case',
'url_of',
'process_action',
'url_action',
'get_info',
'set_info',
'get_config',
'prevent_going_back',
'qr_code',
'action_menu_item',
'from_b64_json',
'defined',
'define',
'value',
'message',
'response',
'json_response',
'command',
'single_paragraph',
'quote_paragraphs',
'location_returned',
'location_known',
'user_lat_lon',
'interview_url',
'interview_url_action',
'interview_url_as_qr',
'interview_url_action_as_qr',
'LatitudeLongitude',
'RoleChangeTracker',
'Name',
'IndividualName',
'Address',
'City',
'Event',
'Person',
'Thing',
'Individual',
'ChildList',
'FinancialList',
'PeriodicFinancialList',
'Income',
'Asset',
'Expense',
'Value',
'PeriodicValue',
'OfficeList',
'Organization',
'objects_from_file',
'send_email',
'send_sms',
'send_fax',
'map_of',
'selections',
'DAObject',
'DAList',
'DADict',
'DAOrderedDict',
'DASet',
'DAFile',
'DAFileCollection',
'DAFileList',
'DAStaticFile',
'DAEmail',
'DAEmailRecipient',
'DAEmailRecipientList',
'DATemplate',
'DAEmpty',
'DALink',
'last_access_time',
'last_access_delta',
'last_access_days',
'last_access_hours',
'last_access_minutes',
'returning_user',
'action_arguments',
'action_argument',
'timezone_list',
'as_datetime',
'current_datetime',
'date_difference',
'date_interval',
'year_of',
'month_of',
'day_of',
'dow_of',
'format_date',
'format_datetime',
'format_time',
'today',
'get_default_timezone',
'user_logged_in',
'interface',
'user_privileges',
'user_has_privilege',
'user_info',
'task_performed',
'task_not_yet_performed',
'mark_task_as_performed',
'times_task_performed',
'set_task_counter',
'background_action',
'background_response',
'background_response_action',
'background_error_action',
'us',
'DARedis',
'DACloudStorage',
'DAGoogleAPI',
'MachineLearningEntry',
'SimpleTextMachineLearner',
'SVMMachineLearner',
'RandomForestMachineLearner',
'set_live_help_status',
'chat_partners_available',
'phone_number_in_e164',
'phone_number_formatted',
'phone_number_is_valid',
'countries_list',
'country_name',
'write_record',
'read_records',
'delete_record',
'variables_as_json',
'all_variables',
'ocr_file',
'ocr_file_in_background',
'read_qr',
'get_sms_session',
'initiate_sms_session',
'terminate_sms_session',
'language_from_browser',
'device',
'interview_email',
'get_emails',
'plain',
'bold',
'italic',
'path_and_mimetype',
'states_list',
'state_name',
'subdivision_type',
'indent',
'raw',
'fix_punctuation',
'set_progress',
'get_progress',
'referring_url',
'run_python_module',
'undefine',
'invalidate',
'dispatch',
'yesno',
'noyes',
'split',
'showif',
'showifdef',
'phone_number_part',
'pdf_concatenate',
'set_parts',
'log',
'encode_name',
'decode_name',
'interview_list',
'interview_menu',
'server_capabilities',
'session_tags',
'include_docx_template',
'get_chat_log',
'get_user_list',
'get_user_info',
'set_user_info',
'get_user_secret',
'create_user',
'create_session',
'get_session_variables',
'set_session_variables',
'go_back_in_session',
'manage_privileges',
'start_time',
'zip_file',
'validation_error',
'DAValidationError',
'redact',
'forget_result_of',
're_run_logic',
'reconsider',
'action_button_html',
'url_ask',
'overlay_pdf',
'get_question_data',
'set_title',
'set_save_status',
'single_to_double_newlines',
'RelationshipTree',
'DAContext',
'DAOAuth',
'DAStore',
'explain',
'clear_explanations',
'explanation',
'set_status',
'get_status',
'verbatim',
'add_separators',
'DAWeb',
'DAWebError',
'json',
're',
'iso_country',
'assemble_docx',
'docx_concatenate',
'store_variables_snapshot',
'stash_data',
'retrieve_stashed_data',
'update_terms',
'chain'
]
#knn_machine_learner = DummyObject
# def TheSimpleTextMachineLearner(*pargs, **kwargs):
# return knn_machine_learner(*pargs, **kwargs)
class DAStore(DAObject):
"""A class used to save objects to SQL."""
def init(self, *pargs, **kwargs):
super().init(*pargs, **kwargs)
def is_encrypted(self):
"""Returns True if the storage object is using encryption, otherwise returns False."""
if hasattr(self, 'encrypted'):
return self.encrypted
if hasattr(self, 'base'):
if self.base == 'interview':
return False
if self.base == 'user':
return True
if self.base == 'session':
return True
if self.base == 'global':
return False
return False
return True
def _get_base_key(self):
if hasattr(self, 'base'):
if self.base == 'interview':
return 'da:i:' + this_thread.current_info.get('yaml_filename', '')
if self.base == 'user':
return 'da:userid:' + str(this_thread.current_info['user']['the_user_id'])
if self.base == 'session':
return 'da:uid:' + get_uid() + ':i:' + this_thread.current_info.get('yaml_filename', '')
if self.base == 'global':
return 'da:global'
return str(self.base)
return 'da:userid:' + str(this_thread.current_info['user']['the_user_id'])
def defined(self, key):
"""Returns True if the key exists in the data store, otherwise returns False."""
the_key = self._get_base_key() + ':' + key
return server.server_sql_defined(the_key)
def get(self, key):
"""Reads an object from the data store for the given key."""
the_key = self._get_base_key() + ':' + key
return server.server_sql_get(the_key, secret=this_thread.current_info.get('secret', None))
def set(self, key, value):
"""Writes an object to the data store under the given key."""
the_key = self._get_base_key() + ':' + key
server.server_sql_set(the_key, value, encrypted=self.is_encrypted(), secret=this_thread.current_info.get('secret', None), the_user_id=this_thread.current_info['user']['the_user_id'])
def delete(self, key):
"""Deletes an object from the data store"""
the_key = self._get_base_key() + ':' + key
server.server_sql_delete(the_key)
def keys(self):
return server.server_sql_keys(self._get_base_key() + ':')
class DAWeb(DAObject):
"""A class used to call external APIs"""
def _get_base_url(self):
if hasattr(self, 'base_url'):
base_url = self.base_url
if not isinstance(self.base_url, str):
raise Exception("DAWeb.call: the base url must be a string")
if not base_url.endswith('/'):
base_url += '/'
return base_url
return self.base_url
def _get_on_failure(self, on_failure):
if on_failure is None and hasattr(self, 'on_failure'):
on_failure = self.on_failure
return on_failure
def _get_success_code(self, success_code):
if success_code is None and hasattr(self, 'success_code'):
success_code = self.success_code
return success_code
def _get_on_success(self, on_success):
if on_success is None and hasattr(self, 'on_success'):
on_success = self.on_success
return on_success
def _get_task(self, task):
if task is None and hasattr(self, 'task'):
task = self.task
if task is None:
return None
if not isinstance(task, str):
raise Exception("DAWeb.call: task must be a string")
return task
def _get_task_persistent(self, task_persistent):
if task_persistent is None and hasattr(self, 'task_persistent'):
task_persistent = self.task_persistent
if task_persistent is None:
return False
if not isinstance(task, (bool, str)):
raise Exception("DAWeb.call: task_persistent must be boolean or string")
return task_persistent
def _get_auth(self, auth):
if auth is None and hasattr(self, 'auth'):
auth = self.auth
if isinstance(auth, (dict, DADict)):
if auth.get('type', 'basic') == 'basic':
return HTTPBasicAuth(auth['username'], auth['password'])
elif auth['type'] == 'digest':
return HTTPDigestAuth(auth['username'], auth['password'])
return auth
def _get_headers(self, new_headers):
if hasattr(self, 'headers'):
headers = self.headers
if isinstance(headers, DADict):
headers = headers.elements
if not isinstance(headers, dict):
raise Exception("DAWeb.call: the headers must be a dictionary")
headers.update(new_headers)
return headers
return new_headers
def _get_cookies(self, new_cookies):
if hasattr(self, 'cookies'):
cookies = self.cookies
if isinstance(cookies, DADict):
cookies = cookies.elements
if not isinstance(cookies, dict):
raise Exception("DAWeb.call: the cookies must be a dictionary")
cookies.update(new_cookies)
return cookies
return new_cookies
def _get_json_body(self, json_body):
if json_body is not None:
return True if json_body else False
if hasattr(self, 'json_body'):
return True if self.json_body else False
return True
def _call(self, url, method=None, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, task=None, task_persistent=None, files=None, cookies=None, success_code=None):
task = self._get_task(task)
task_persistent = self._get_task_persistent(task_persistent)
auth = self._get_auth(auth)
json_body = self._get_json_body(json_body)
on_failure = self._get_on_failure(on_failure)
on_success = self._get_on_success(on_success)
success_code = self._get_success_code(success_code)
if isinstance(success_code, str):
success_code = [int(success_code.strip())]
elif isinstance(success_code, (abc.Iterable, DASet, DAList)):
new_success_code = list()
for code in success_code:
if not isinstance(code, int):
raise Exception("DAWeb.call: success codes must be integers")
new_success_code.append(code)
success_code = new_success_code
elif isinstance(success_code, int):
success_code = [success_code]
elif success_code is not None:
raise Exception("DAWeb.call: success_code must be an integer or a list of integers")
if method is None:
method = 'GET'
if not isinstance(method, str):
raise Exception("DAWeb.call: the method must be a string")
method = method.upper().strip()
if method not in ('POST', 'GET', 'PATCH', 'PUT', 'HEAD', 'DELETE', 'OPTIONS'):
raise Exception("DAWeb.call: invalid method")
if not isinstance(url, str):
raise Exception("DAWeb.call: the url must be a string")
if not re.search(r'^https?://', url):
url = self._get_base_url() + re.sub(r'^/*', '', url)
if data is None:
data = dict()
if isinstance(data, DADict):
data = data.elements
if json_body is False and not isinstance(data, dict):
raise Exception("DAWeb.call: data must be a dictionary")
if params is None:
params = dict()
if isinstance(params, DADict):
params = params.elements
if not isinstance(params, dict):
raise Exception("DAWeb.call: params must be a dictionary")
if headers is None:
headers = dict()
if isinstance(headers, DADict):
headers = headers.elements
if not isinstance(headers, dict):
raise Exception("DAWeb.call: the headers must be a dictionary")
headers = self._get_headers(headers)
if len(headers) == 0:
headers = None
if cookies is None:
cookies = dict()
if isinstance(cookies, DADict):
cookies = cookies.elements
if not isinstance(cookies, dict):
raise Exception("DAWeb.call: the cookies must be a dictionary")
cookies = self._get_cookies(cookies)
if len(cookies) == 0:
cookies = None
if isinstance(data, dict) and len(data) == 0:
data = None
if files is not None:
if not isinstance(files, dict):
raise Exception("DAWeb.call: files must be a dictionary")
new_files = dict()
for key, val in files.items():
if not isinstance(key, str):
raise Exception("DAWeb.call: files must be a dictionary of string keys")
try:
path = server.path_from_reference(val)
logmessage("path is " + str(path))
assert path is not None
except:
raise Exception("DAWeb.call: could not load the file")
new_files[key] = open(path, 'rb')
files = new_files
if len(files):
json_body = False
try:
if method == 'POST':
if json_body:
r = requests.post(url, json=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
else:
r = requests.post(url, data=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
elif method == 'PUT':
if json_body:
r = requests.put(url, json=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
else:
r = requests.put(url, data=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
elif method == 'PATCH':
if json_body:
r = requests.patch(url, json=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
else:
r = requests.patch(url, data=data, params=params, headers=headers, auth=auth, cookies=cookies, files=files)
elif method == 'GET':
if len(params) == 0:
params = data
data = None
r = requests.get(url, params=params, headers=headers, auth=auth, cookies=cookies)
elif method == 'DELETE':
if len(params) == 0:
params = data
data = None
r = requests.delete(url, params=params, headers=headers, auth=auth, cookies=cookies)
elif method == 'OPTIONS':
if len(params) == 0:
params = data
data = None
r = requests.options(url, params=params, headers=headers, auth=auth, cookies=cookies)
elif method == 'HEAD':
if len(params) == 0:
params = data
data = None
r = requests.head(url, params=params, headers=headers, auth=auth, cookies=cookies)
except RequestException as err:
if on_failure == 'raise':
raise DAWebError(url=url, method=method, params=params, headers=headers, data=data, task=task, task_persistent=task_persistent, status_code=-1, response_text='', response_json=None, response_headers=dict(), exception_type=err.__class__.__name__, exception_text=str(err), cookies_before=cookies, cookies_after=None)
else:
return on_failure
if success_code is None:
if r.status_code >= 200 and r.status_code < 300:
success = True
else:
success = False
else:
if r.status_code in success_code:
success = True
else:
success = False
if hasattr(self, 'cookies'):
self.cookies = dict(r.cookies)
try:
json_response = r.json()
except:
json_response = None
if success and task is not None:
mark_task_as_performed(task, persistent=task_persistent)
if not success:
if on_failure == 'raise':
raise DAWebError(url=url, method=method, params=params, headers=headers, data=data, task=task, task_persistent=task_persistent, status_code=r.status_code, response_text=r.text, response_json=json_response, response_headers=r.headers, exception_type=None, exception_text=None, cookies_before=cookies, cookies_after=dict(r.cookies), success=success)
else:
return on_failure
if success and on_success is not None:
if on_success == 'raise':
raise DAWebError(url=url, method=method, params=params, headers=headers, data=data, task=task, task_persistent=task_persistent, status_code=r.status_code, response_text=r.text, response_json=json_response, response_headers=r.headers, exception_type=None, exception_text=None, cookies_before=cookies, cookies_after=dict(r.cookies), success=success)
else:
return on_success
return(json_response if json_response is not None else r.text)
def get(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None):
"""Makes a GET request"""
return self._call(url, method='GET', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent)
def post(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None, files=None):
"""Makes a POST request"""
return self._call(url, method='POST', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent, files=files)
def put(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None, files=None):
"""Makes a PUT request"""
return self._call(url, method='PUT', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent, files=files)
def patch(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None, files=None):
"""Makes a PATCH request"""
return self._call(url, method='PATCH', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent, files=files)
def delete(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None):
"""Makes a DELETE request"""
return self._call(url, method='DELETE', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent)
def options(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None):
"""Makes an OPTIONS request"""
return self._call(url, method='OPTIONS', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent)
def head(self, url, data=None, params=None, headers=None, json_body=None, on_failure=None, on_success=None, auth=None, cookies=None, task=None, task_persistent=None):
"""Makes a HEAD request"""
return self._call(url, method='HEAD', data=data, params=params, headers=headers, json_body=json_body, on_failure=on_failure, on_success=on_success, auth=auth, cookies=cookies, task=task, task_persistent=task_persistent)
class DARedis(DAObject):
"""A class used to interact with the redis server."""
def key(self, keyname):
"""Returns a key that combines the interview name with the keyname."""
return this_thread.current_info.get('yaml_filename', '') + ':' + str(keyname)
def get_data(self, key):
"""Returns data from Redis and unpickles it."""
result = server.server_redis_user.get(key)
if result is None:
return None
try:
result = server.fix_pickle_obj(result)
except:
logmessage("get_data: could not unpickle contents of " + str(key))
result = None
return result
def set_data(self, key, data, expire=None):
"""Saves data in Redis after pickling it."""
pickled_data = pickle.dumps(data)
if expire is not None:
if not isinstance(expire, int):
raise DAError("set_data: expire time must be an integer")
pipe = server.server_redis_user.pipeline()
pipe.set(key, pickled_data)
pipe.expire(key, expire)
pipe.execute()
else:
server.server_redis_user.set(key, pickled_data)
def __getattr__(self, funcname):
return getattr(server.server_redis_user, funcname)
class DACloudStorage(DAObject):
"""Returns an object that can be used to interface with S3 or Azure."""
def init(self, *pargs, **kwargs):
if 'provider' in kwargs and 'config' in kwargs:
self.custom = True
self.provider = kwargs['provider']
self.config = kwargs['config']
del kwargs['provider']
del kwargs['config']
server.cloud_custom(self.provider, self.config)
else:
self.custom = False
return super().init(*pargs, **kwargs)
@property
def conn(self):
"""This property returns a boto3.resource('s3') or BlockBlobService() object."""
if self.custom:
return server.cloud_custom(self.provider, self.config).conn
else:
return server.cloud.conn
@property
def client(self):
"""This property returns a boto3.client('s3') object."""
if self.custom:
return server.cloud_custom(self.provider, self.config).client
else:
return server.cloud.client
@property
def bucket(self):
"""This property returns a boto3 Bucket() object."""
if self.custom:
return server.cloud_custom(self.provider, self.config).bucket
else:
return server.cloud.bucket
@property
def bucket_name(self):
"""This property returns the name of the Amazon S3 bucket."""
if self.custom:
return server.cloud_custom(self.provider, self.config).bucket_name
else:
return server.cloud.bucket_name
@property
def container_name(self):
"""This property returns the name of the Azure Blob Storage container."""
if self.custom:
return server.cloud_custom(self.provider, self.config).container
else:
return server.cloud.container
class DAGoogleAPI(DAObject):
def api_credentials(self, scope):
"""Returns an OAuth2 credentials object for the given scope."""
return server.google_api.google_api_credentials(scope)
def http(self, scope):
"""Returns a credentialized http object for the given scope."""
return self.api_credentials(scope).authorize(httplib2.Http())
def drive_service(self):
"""Returns a Google Drive service object using google-api-python-client."""
return apiclient.discovery.build('drive', 'v3', http=self.http('https://www.googleapis.com/auth/drive'))
def cloud_credentials(self, scope):
"""Returns a google.oauth2.service_account credentials object for the given scope."""
return server.google_api.google_cloud_credentials(scope)
def project_id(self):
"""Returns the ID of the project referenced in the google service account credentials in the Configuration."""
return server.google_api.project_id()
def google_cloud_storage_client(self, scope=None):
"""Returns a google.cloud.storage.Client object."""
return server.google_api.google_cloud_storage_client(scope)
def run_python_module(module, arguments=None):
"""Runs a python module, as though from the command line, and returns the output."""
if re.search(r'\.py$', module):
module = this_thread.current_package + '.' + re.sub(r'\.py$', '', module)
elif re.search(r'^\.', module):
module = this_thread.current_package + module
commands = [re.sub(r'/lib/python.*', '/bin/python3', docassemble.base.ocr.__file__), '-m', module]
if arguments:
if not isinstance(arguments, list):
raise DAError("run_python_module: the arguments parameter must be in the form of a list")
commands.extend(arguments)
output = ''
try:
output = subprocess.check_output(commands, stderr=subprocess.STDOUT).decode()
return_code = 0
except subprocess.CalledProcessError as err:
output = err.output.decode()
return_code = err.returncode
return output, return_code
def today(timezone=None, format=None):
"""Returns today's date at midnight as a DADateTime object."""
ensure_definition(timezone, format)
if timezone is None:
timezone = get_default_timezone()
val = pytz.utc.localize(datetime.datetime.utcnow()).astimezone(pytz.timezone(timezone))
if format is not None:
return dd(val.replace(hour=0, minute=0, second=0, microsecond=0)).format_date(format)
else:
return dd(val.replace(hour=0, minute=0, second=0, microsecond=0))
def babel_language(language):
if 'babel dates map' not in server.daconfig:
return language
return server.daconfig['babel dates map'].get(language, language)
def month_of(the_date, as_word=False, language=None):
"""Interprets the_date as a date and returns the month.
Set as_word to True if you want the month as a word."""
ensure_definition(the_date, as_word, language)
if language is None:
language = get_language()
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
if as_word:
return(babel.dates.format_date(date, format='MMMM', locale=babel_language(language)))
return(int(date.strftime('%m')))
except:
return word("Bad date")
def day_of(the_date, language=None):
"""Interprets the_date as a date and returns the day of month."""
ensure_definition(the_date, language)
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
return(int(date.strftime('%d')))
except:
return word("Bad date")
def dow_of(the_date, as_word=False, language=None):
"""Interprets the_date as a date and returns the day of week as a number from 1 to 7 for Sunday through Saturday. Set as_word to True if you want the day of week as a word."""
ensure_definition(the_date, as_word, language)
if language is None:
language = get_language()
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
if as_word:
return(babel.dates.format_date(date, format='EEEE', locale=babel_language(language)))
else:
return(int(date.strftime('%u')))
except:
return word("Bad date")
def year_of(the_date, language=None):
"""Interprets the_date as a date and returns the year."""
ensure_definition(the_date, language)
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
return(int(date.strftime('%Y')))
except:
return word("Bad date")
def interview_default(the_part, default_value, language):
result = None
if the_part in this_thread.internal and this_thread.internal[the_part] is not None:
return this_thread.internal[the_part]
for lang in (language, get_language(), '*'):
if lang is not None:
if lang in this_thread.interview.default_title:
if the_part in this_thread.interview.default_title[lang]:
return this_thread.interview.default_title[lang][the_part]
return default_value
def format_date(the_date, format=None, language=None):
"""Interprets the_date as a date and returns the date formatted for the current locale."""
ensure_definition(the_date, format, language)
if isinstance(the_date, DAEmpty):
return ""
if language is None:
language = get_language()
if format is None:
format = interview_default('date format', 'long', language)
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
return babel.dates.format_date(date, format=format, locale=babel_language(language))
except:
return word("Bad date")
def format_datetime(the_date, format=None, language=None):
"""Interprets the_date as a date/time and returns the date/time formatted for the current locale."""
ensure_definition(the_date, format, language)
if isinstance(the_date, DAEmpty):
return ""
if language is None:
language = get_language()
if format is None:
format = interview_default('datetime format', 'long', language)
try:
if isinstance(the_date, datetime.datetime) or isinstance(the_date, datetime.date):
date = the_date
else:
date = dateutil.parser.parse(the_date)
return babel.dates.format_datetime(date, format=format, locale=babel_language(language))
except:
return word("Bad date")
def format_time(the_time, format=None, language=None):
"""Interprets the_time as a date/time and returns the time formatted for the current locale."""
ensure_definition(the_time, format, language)
if isinstance(the_time, DAEmpty):
return ""
if language is None:
language = get_language()
if format is None:
format = interview_default('time format', 'short', language)
try:
if isinstance(the_time, datetime.datetime) or isinstance(the_time, datetime.date) or isinstance(the_time, datetime.time):
time = the_time
else:
time = dateutil.parser.parse(the_time)
return babel.dates.format_time(time, format=format, locale=babel_language(language))
except Exception as errmess:
return word("Bad date: " + str(errmess))
class DateTimeDelta:
def __str__(self):
return str(self.describe())
def describe(self, **kwargs):
specificity = kwargs.get('specificity', None)
output = list()
diff = dateutil.relativedelta.relativedelta(self.end, self.start)
if diff.years != 0:
output.append((abs(diff.years), noun_plural(word('year'), abs(diff.years))))
if diff.months != 0 and specificity != 'year':
output.append((abs(diff.months), noun_plural(word('month'), abs(diff.months))))
if diff.days != 0 and specificity not in ('year', 'month'):
output.append((abs(diff.days), noun_plural(word('day'), abs(diff.days))))
if kwargs.get('nice', True):
return_value = comma_and_list(["%s %s" % (nice_number(y[0]), y[1]) for y in output])
if kwargs.get('capitalize', False):
return capitalize(return_value)
else:
return return_value
else:
return comma_and_list(["%d %s" % y for y in output])
class DADateTime(datetime.datetime):
def format(self, format=None, language=None):
return format_date(self, format=format, language=language)
def format_date(self, format=None, language=None):
return format_date(self, format=format, language=language)
def format_datetime(self, format=None, language=None):
return format_datetime(self, format=format, language=language)
def format_time(self, format=None, language=None):
return format_time(self, format=format, language=language)
def replace_time(self, time):
return self.replace(hour=time.hour, minute=time.minute, second=time.second, microsecond=time.microsecond)
@property
def nanosecond(self):
return 0
@property
def dow(self):
return self.isocalendar()[2]
@property
def week(self):
return self.isocalendar()[1]
def plus(self, **kwargs):
return dd(dt(self) + date_interval(**kwargs))
def minus(self, **kwargs):
return dd(dt(self) - date_interval(**kwargs))
def __str__(self):
return str(format_date(self))
def __add__(self, other):
if isinstance(other, str):
return str(self) + other
val = dt(self) + other
if isinstance(val, datetime.date):
return dd(val)
return val
def __radd__(self, other):
if isinstance(other, str):
return other + str(self)
return dd(dt(self) + other)
def __sub__(self, other):
val = dt(self) - other
if isinstance(val, datetime.date):
return dd(val)
return val
def __rsub__(self, other):
val = other - dt(self)
if isinstance(val, datetime.date):
return dd(val)
return val
def current_datetime(timezone=None):
"""Returns the current time. Uses the default timezone unless another
timezone is explicitly provided.
"""
ensure_definition(timezone)
if timezone is None:
timezone = get_default_timezone()
return dd(pytz.utc.localize(datetime.datetime.utcnow()).astimezone(pytz.timezone(timezone)))
def as_datetime(the_date, timezone=None):
"""Converts the_date to a DADateTime object with a timezone. Uses the
default timezone unless another timezone is explicitly provided."""
ensure_definition(the_date, timezone)
if timezone is None:
timezone = get_default_timezone()
if isinstance(the_date, datetime.date) and not isinstance(the_date, datetime.datetime):
the_date = datetime.datetime.combine(the_date, datetime.datetime.min.time())
if isinstance(the_date, datetime.datetime):
new_datetime = the_date
else:
new_datetime = dateutil.parser.parse(the_date)
if new_datetime.tzinfo:
new_datetime = new_datetime.astimezone(pytz.timezone(timezone))
else:
new_datetime = pytz.timezone(timezone).localize(new_datetime)
return dd(new_datetime)
def dd(obj):
if isinstance(obj, DADateTime):
return obj
return DADateTime(obj.year, month=obj.month, day=obj.day, hour=obj.hour, minute=obj.minute, second=obj.second, microsecond=obj.microsecond, tzinfo=obj.tzinfo)
def dt(obj):
return datetime.datetime(obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, obj.tzinfo)
def date_interval(**kwargs):
"""Expresses a date and time interval. Passes through all arguments
to dateutil.relativedelta.relativedelta."""
ensure_definition(**kwargs)
return dateutil.relativedelta.relativedelta(**kwargs)
def date_difference(starting=None, ending=None, timezone=None):
"""Calculates the difference between the date indicated by "starting"
and the date indicated by "ending." Returns an object with attributes weeks,
days, hours, minutes, seconds, and delta."""
ensure_definition(starting, ending, timezone)
if starting is None:
starting = current_datetime()
if ending is None:
ending = current_datetime()
if timezone is None:
timezone = get_default_timezone()
if isinstance(starting, datetime.date) and not isinstance(starting, datetime.datetime):
starting = datetime.datetime.combine(starting, datetime.datetime.min.time())
if isinstance(ending, datetime.date) and not isinstance(ending, datetime.datetime):
ending = datetime.datetime.combine(ending, datetime.datetime.min.time())
if not isinstance(starting, datetime.datetime):
starting = dateutil.parser.parse(starting)
if not isinstance(ending, datetime.datetime):
ending = dateutil.parser.parse(ending)
if starting.tzinfo:
starting = starting.astimezone(pytz.timezone(timezone))
else:
starting = pytz.timezone(timezone).localize(starting)
if ending.tzinfo:
ending = ending.astimezone(pytz.timezone(timezone))
else:
ending = pytz.timezone(timezone).localize(ending)
delta = ending - starting
output = DateTimeDelta()
output.start = starting
output.end = ending
output.weeks = (delta.days / 7.0) + (delta.seconds / 604800.0)
output.days = delta.days + (delta.seconds / 86400.0)
output.hours = (delta.days * 24.0) + (delta.seconds / 3600.0)
output.minutes = (delta.days * 1440.0) + (delta.seconds / 60.0)
output.seconds = (delta.days * 86400) + delta.seconds
output.years = (delta.days + delta.seconds / 86400.0) / 365.2425
return output
def fax_string(person, country=None):
if person is None:
return None
fax_number = None
if isinstance(person, Person):
fax_number = person.facsimile_number(country=country)
elif isinstance(person, phonenumbers.PhoneNumber):
fax_number = phonenumbers.format_number(person, phonenumbers.PhoneNumberFormat.E164)
else:
fax_number = phone_number_in_e164(person, country=country)
return fax_number
def phone_string(person, country=None):
if person is None:
return None
phone_number = None
if isinstance(person, Person):
phone_number = person.sms_number()
elif isinstance(person, phonenumbers.PhoneNumber):
phone_number = phonenumbers.format_number(person, phonenumbers.PhoneNumberFormat.E164)
else:
phone_number = phone_number_in_e164(person, country=country)
return phone_number
def email_string(persons, include_name=None, first=False):
if persons is None:
return None
if not (isinstance(persons, (DAList, DASet, abc.Iterable)) and not isinstance(persons, str)):
persons = [persons]
result = []
for person in persons:
if isinstance(person, Person) or isinstance(person, DAEmailRecipient):
result.append(person.email_address(include_name=include_name))
else:
result.append(str(person))
result = [x for x in result if x is not None and x != '']
if first:
if len(result):
return result[0]
return None
return result
def email_stringer(variable, first=False, include_name=False):
return email_string(variable, include_name=include_name, first=first)
def valid_datetime(the_datetime):
"""Returns True if the provided text represents a valid date or time."""
ensure_definition(the_datetime)
if isinstance(the_datetime, datetime.date) or isinstance(the_datetime, datetime.time):
return True
try:
dateutil.parser.parse(the_datetime)
return True
except:
return False
def timezone_list():
"""Returns a list of timezone choices, expressed as text."""
return sorted([tz for tz in pytz.all_timezones])
def returning_user(minutes=None, hours=None, days=None):
"""Returns True if the user is returning to the interview after six
hours of inactivity, or other time indicated by the optional
keyword arguments minutes, hours, or days.
"""
if this_thread.current_info['method'] != 'GET':
return False
if minutes is not None and last_access_minutes() > minutes:
return True
if hours is not None and last_access_hours() > hours:
return True
if days is not None and last_access_days() > days:
return True
if last_access_hours() > 6.0:
return True
return False
def last_access_delta(*pargs, **kwargs):
"""Returns a datetime.timedelta object expressing the length of
time that has passed since the last time the interview was accessed."""
last_time = last_access_time(*pargs, **kwargs)
if last_time is None:
return datetime.timedelta(0)
return current_datetime() - last_time
def last_access_days(*pargs, **kwargs):
"""Returns the number of days since the last time the interview
was accessed."""
delta = last_access_delta(*pargs, **kwargs)
return delta.days + (delta.seconds / 86400.0)
def last_access_hours(*pargs, **kwargs):
"""Returns the number of hours since the last time the interview
was accessed."""
delta = last_access_delta(*pargs, **kwargs)
return (delta.days * 24.0) + (delta.seconds / 3600.0)
def last_access_minutes(*pargs, **kwargs):
"""Returns the number of minutes since the last time the interview
was accessed."""
delta = last_access_delta(*pargs, **kwargs)
return (delta.days * 1440.0) + (delta.seconds / 60.0)
def last_access_time(include_privileges=None, exclude_privileges=None, include_cron=False, timezone=None):
"""Returns the last time the interview was accessed, as a DADateTime object."""
max_time = None
if include_privileges is not None:
if not isinstance(include_privileges, (list, tuple, dict)):
if isinstance(include_privileges, DAObject) and hasattr(include_privileges, 'elements'):
include_privileges = include_privileges.elements
else:
include_privileges = [include_privileges]
if 'cron' in include_privileges:
include_cron = True
if exclude_privileges is not None:
if not isinstance(exclude_privileges, (list, tuple, dict)):
if isinstance(exclude_privileges, DAObject) and hasattr(exclude_privileges, 'elements'):
exclude_privileges = exclude_privileges.elements
else:
exclude_privileges = [exclude_privileges]
else:
exclude_privileges = list()
for user_id, access_time in this_thread.internal['accesstime'].items():
if user_id == -1:
if 'anonymous' in exclude_privileges:
continue
if include_privileges is None or 'anonymous' in include_privileges:
if max_time is None or max_time < access_time:
max_time = access_time
break
else:
user_object = server.get_user_object(user_id)
if user_object is not None and hasattr(user_object, 'roles'):
if len(user_object.roles) == 0:
if 'user' in exclude_privileges:
continue
if include_privileges is None or 'user' in include_privileges:
if max_time is None or max_time < access_time:
max_time = access_time
break
else:
for role in user_object.roles:
if (include_cron is False and role.name == 'cron') or role.name in exclude_privileges:
continue
if include_privileges is None or role.name in include_privileges:
if max_time is None or max_time < access_time:
max_time = access_time
break
if max_time is None:
return None
if timezone is not None:
return dd(pytz.utc.localize(max_time).astimezone(pytz.timezone(timezone)))
else:
return dd(pytz.utc.localize(max_time).astimezone(pytz.utc))
def start_time(timezone=None):
"""Returns the time the interview was started, as a DADateTime object."""
if timezone is not None:
return dd(pytz.utc.localize(this_thread.internal['starttime']).astimezone(pytz.timezone(timezone)))
else:
return dd(pytz.utc.localize(this_thread.internal['starttime']).astimezone(pytz.utc))
class LatitudeLongitude(DAObject):
"""Represents a GPS location."""
def init(self, *pargs, **kwargs):
self.gathered = False
self.known = False
# self.description = ""
return super().init(*pargs, **kwargs)
def status(self):
"""Returns True or False depending on whether an attempt has yet been made
to gather the latitude and longitude."""
#logmessage("got to status")
if self.gathered:
#logmessage("gathered is true")
return False
else:
if location_returned():
#logmessage("returned is true")
self._set_to_current()
return False
else:
return True
def _set_to_current(self):
#logmessage("set to current")
if 'user' in this_thread.current_info and 'location' in this_thread.current_info['user'] and isinstance(this_thread.current_info['user']['location'], dict):
if 'latitude' in this_thread.current_info['user']['location'] and 'longitude' in this_thread.current_info['user']['location']:
self.latitude = this_thread.current_info['user']['location']['latitude']
self.longitude = this_thread.current_info['user']['location']['longitude']
self.known = True
#logmessage("known is true")
elif 'error' in this_thread.current_info['user']['location']:
self.error = this_thread.current_info['user']['location']['error']
self.known = False
#logmessage("known is false")
self.gathered = True
self.description = str(self)
return
def __str__(self):
if hasattr(self, 'latitude') and hasattr(self, 'longitude'):
return str(self.latitude) + ', ' + str(self.longitude)
elif hasattr(self, 'error'):
return str(self.error)
return 'Unknown'
class RoleChangeTracker(DAObject):
"""Used within an interview to facilitate changes in the active role
required for filling in interview information. Ensures that participants
do not receive multiple e-mails needlessly."""
def init(self, *pargs, **kwargs):
self.last_role = None
return
# def should_send_email(self):
# """Returns True or False depending on whether an e-mail will be sent on
# role change"""
# return True
def _update(self, target_role):
"""When a notification is delivered about a necessary change in the
active role of the interviewee, this function is called with
the name of the new role. This prevents the send_email()
function from sending duplicative notifications."""
self.last_role = target_role
return
def send_email(self, roles_needed, **kwargs):
"""Sends a notification e-mail if necessary because of a change in the
active of the interviewee. Returns True if an e-mail was
successfully sent. Otherwise, returns False. False could
mean that it was not necessary to send an e-mail."""
#logmessage("Current role is " + str(this_thread.global_vars.role))
for role_option in kwargs:
if 'to' in kwargs[role_option]:
need(kwargs[role_option]['to'].email)
for role_needed in roles_needed:
#logmessage("One role needed is " + str(role_needed))
if role_needed == self.last_role:
#logmessage("Already notified new role " + str(role_needed))
return False
if role_needed in kwargs:
#logmessage("I have info on " + str(role_needed))
email_info = kwargs[role_needed]
if 'to' in email_info and 'email' in email_info:
#logmessage("I have email info on " + str(role_needed))
try:
result = send_email(to=email_info['to'], html=email_info['email'].content, subject=email_info['email'].subject)
except DAError:
result = False
if result:
self._update(role_needed)
return result
return False
class Name(DAObject):
"""Base class for an object's name."""
def full(self):
"""Returns the full name."""
return(self.text)
def familiar(self):
"""Returns the familiar name."""
return(self.text)
def firstlast(self):
"""This method is included for compatibility with other types of names."""
return(self.text)
def lastfirst(self):
"""This method is included for compatibility with other types of names."""
return(self.text)
def middle_initial(self, with_period=True):
"""This method is included for compatibility with other types of names."""
return('')
def defined(self):
"""Returns True if the name has been defined. Otherwise, returns False."""
return hasattr(self, 'text')
def __str__(self):
return(str(self.full()))
# def __repr__(self):
# return(repr(self.full()))
class IndividualName(Name):
"""The name of an Individual."""
def init(self, *pargs, **kwargs):
if 'uses_parts' not in kwargs:
self.uses_parts = True
return super().init(*pargs, **kwargs)
def defined(self):
"""Returns True if the name has been defined. Otherwise, returns False."""
if not self.uses_parts:
return super().defined()
return hasattr(self, 'first')
def familiar(self):
"""Returns the familiar name."""
if not self.uses_parts:
return self.full()
return self.first
def full(self, middle='initial', use_suffix=True):
"""Returns the full name. Has optional keyword arguments middle
and use_suffix."""
if not self.uses_parts:
return super().full()
names = [self.first.strip()]
if hasattr(self, 'middle'):
if middle is False or middle is None:
pass
elif middle == 'initial':
initial = self.middle_initial()
if initial:
names.append(initial)
elif len(self.middle.strip()):
names.append(self.middle.strip())
if hasattr(self, 'last') and len(self.last.strip()):
names.append(self.last.strip())
else:
if hasattr(self, 'paternal_surname') and len(self.paternal_surname.strip()):
names.append(self.paternal_surname.strip())
if hasattr(self, 'maternal_surname') and len(self.maternal_surname.strip()):
names.append(self.maternal_surname.strip())
if hasattr(self, 'suffix') and use_suffix and len(self.suffix.strip()):
names.append(self.suffix.strip())
return(" ".join(names))
def firstlast(self):
"""Returns the first name followed by the last name."""
if not self.uses_parts:
return super().firstlast()
return(self.first + " " + self.last)
def lastfirst(self):
"""Returns the last name followed by a comma, followed by the
last name, followed by the suffix (if a suffix exists)."""
if not self.uses_parts:
return super().lastfirst()
output = self.last
if hasattr(self, 'suffix') and self.suffix and len(self.suffix.strip()):
output += " " + self.suffix
output += ", " + self.first
if hasattr(self, 'middle'):
initial = self.middle_initial()
if initial:
output += " " + initial
return output
def middle_initial(self, with_period=True):
"""Returns the middle initial, or the empty string if the name does not have a middle component."""
if len(self.middle.strip()) == 0:
return ''
if with_period:
return self.middle[0].strip() + '.'
else:
return self.middle[0].strip() + '.'
class Address(DAObject):
"""A geographic address."""
def init(self, *pargs, **kwargs):
if 'location' not in kwargs:
self.initializeAttribute('location', LatitudeLongitude)
if 'geolocated' not in kwargs:
self.geolocated = False
if not hasattr(self, 'city_only'):
self.city_only = False
return super().init(*pargs, **kwargs)
def __str__(self):
return(str(self.block()))
def on_one_line(self, include_unit=True, omit_default_country=True, language=None, show_country=None):
"""Returns a one-line address. Primarily used internally for geolocation."""
output = ""
if self.city_only is False:
if (not hasattr(self, 'address')) and hasattr(self, 'street_number') and hasattr(self, 'street'):
output += str(self.street_number) + " " + str(self.street)
else:
output += str(self.address)
if include_unit:
the_unit = self.formatted_unit(language=language)
if the_unit != '':
output += ", " + the_unit
output += ", "
#if hasattr(self, 'sublocality') and self.sublocality:
# output += str(self.sublocality) + ", "
if hasattr(self, 'sublocality_level_1') and self.sublocality_level_1:
if not (hasattr(self, 'street_number') and self.street_number == self.sublocality_level_1):
output += str(self.sublocality_level_1) + ", "
output += str(self.city)
if hasattr(self, 'state') and self.state:
output += ", " + str(self.state)
if hasattr(self, 'zip') and self.zip:
output += " " + str(self.zip)
elif hasattr(self, 'postal_code') and self.postal_code:
output += " " + str(self.postal_code)
if show_country is None and hasattr(self, 'country') and self.country and ((not omit_default_country) or get_country() != self.country):
show_country = True
if show_country:
output += ", " + country_name(self._get_country())
return output
def _map_info(self):
if (self.location.gathered and self.location.known) or self.geolocate():
if hasattr(self.location, 'description'):
the_info = self.location.description
else:
the_info = ''
result = {'latitude': self.location.latitude, 'longitude': self.location.longitude, 'info': the_info}
if hasattr(self, 'icon'):
result['icon'] = self.icon
return [result]
return None
def geolocate(self, address=None, reset=False):
"""Determines the latitude and longitude of the location from its components. If an address is supplied, the address fields that are not already populated will be populated with the result of the geolocation of the selected address."""
if reset:
self.reset_geolocation()
if address is None:
if self.geolocated:
return self.geolocate_success
the_address = self.on_one_line(omit_default_country=False)
else:
the_address = address
#logmessage("geolocate: trying to geolocate " + str(the_address))
from geopy.geocoders import GoogleV3
if 'google' in server.daconfig and 'api key' in server.daconfig['google'] and server.daconfig['google']['api key']:
my_geocoder = GoogleV3(api_key=server.daconfig['google']['api key'])
else:
my_geocoder = GoogleV3()
try_number = 0
success = False
results = None
while not success and try_number < 2:
try:
results = my_geocoder.geocode(the_address)
success = True
except Exception as the_err:
logmessage(str(the_err))
try_number += 1
time.sleep(try_number)
self.geolocated = True
if results:
self.geolocate_success = True
self.location.gathered = True
self.location.known = True
self.location.latitude = results.latitude
self.location.longitude = results.longitude
self.geolocate_response = results.raw
if hasattr(self, 'norm'):
delattr(self, 'norm')
if hasattr(self, 'norm_long'):
delattr(self, 'norm_long')
self.initializeAttribute('norm', self.__class__)
self.initializeAttribute('norm_long', self.__class__)
if 'formatted_address' in results.raw:
self.one_line = results.raw['formatted_address']
self.norm.one_line = results.raw['formatted_address']
self.norm_long.one_line = results.raw['formatted_address']
if 'address_components' in results.raw:
geo_types = {
'administrative_area_level_1': ('state', 'short_name'),
'administrative_area_level_2': ('county', 'long_name'),
'administrative_area_level_3': ('administrative_area_level_3', 'long_name'),
'administrative_area_level_4': ('administrative_area_level_4', 'long_name'),
'administrative_area_level_5': ('administrative_area_level_5', 'long_name'),
'colloquial_area': ('colloquial_area', 'long_name'),
'country': ('country', 'short_name'),
'floor': ('floor', 'long_name'),
'intersection': ('intersection', 'long_name'),
'locality': ('city', 'long_name'),
'neighborhood': ('neighborhood', 'long_name'),
'post_box': ('post_box', 'long_name'),
'postal_code': ('postal_code', 'long_name'),
'postal_code_prefix': ('postal_code_prefix', 'long_name'),
'postal_code_suffix': ('postal_code_suffix', 'long_name'),
'postal_town': ('postal_town', 'long_name'),
'premise': ('premise', 'long_name'),
'room': ('room', 'long_name'),
'route': ('street', 'short_name'),
'street_number': ('street_number', 'short_name'),
'sublocality': ('sublocality', 'long_name'),
'sublocality_level_1': ('sublocality_level_1', 'long_name'),
'sublocality_level_2': ('sublocality_level_2', 'long_name'),
'sublocality_level_3': ('sublocality_level_3', 'long_name'),
'sublocality_level_4': ('sublocality_level_4', 'long_name'),
'sublocality_level_5': ('sublocality_level_5', 'long_name'),
# 'subpremise': ('unit', 'long_name'),
}
for component in results.raw['address_components']:
if 'types' in component and 'long_name' in component:
for geo_type, addr_type in geo_types.items():
if geo_type in component['types'] and ((not hasattr(self, addr_type[0])) or getattr(self, addr_type[0]) == '' or getattr(self, addr_type[0]) is None):
setattr(self, addr_type[0], component[addr_type[1]])
if (not hasattr(self, geo_type)) or getattr(self, geo_type) == '' or getattr(self, geo_type) is None:
setattr(self, geo_type, component['long_name'])
geo_types = {
'administrative_area_level_1': 'state',
'administrative_area_level_2': 'county',
'administrative_area_level_3': 'administrative_area_level_3',
'administrative_area_level_4': 'administrative_area_level_4',
'administrative_area_level_5': 'administrative_area_level_5',
'colloquial_area': 'colloquial_area',
'country': 'country',
'floor': 'floor',
'intersection': 'intersection',
'locality': 'city',
'neighborhood': 'neighborhood',
'post_box': 'post_box',
'postal_code': 'postal_code',
'postal_code_prefix': 'postal_code_prefix',
'postal_code_suffix': 'postal_code_suffix',
'postal_town': 'postal_town',
'premise': 'premise',
'room': 'room',
'route': 'street',
'street_number': 'street_number',
'sublocality': 'sublocality',
'sublocality_level_1': 'sublocality_level_1',
'sublocality_level_2': 'sublocality_level_2',
'sublocality_level_3': 'sublocality_level_3',
'sublocality_level_4': 'sublocality_level_4',
'sublocality_level_5': 'sublocality_level_5',
# 'subpremise': 'unit'
}
for component in results.raw['address_components']:
if 'types' in component:
for geo_type, addr_type in geo_types.items():
if geo_type in component['types']:
if 'short_name' in component:
setattr(self.norm, addr_type, component['short_name'])
if addr_type != geo_type:
setattr(self.norm, geo_type, component['short_name'])
if 'long_name' in component:
setattr(self.norm_long, addr_type, component['long_name'])
if addr_type != geo_type:
setattr(self.norm_long, geo_type, component['long_name'])
if hasattr(self.norm, 'unit'):
self.norm.unit = '#' + str(self.norm.unit)
if hasattr(self.norm_long, 'unit'):
self.norm_long.unit = '#' + str(self.norm_long.unit)
if hasattr(self.norm, 'street_number') and hasattr(self.norm, 'street'):
self.norm.address = self.norm.street_number + " " + self.norm.street
if hasattr(self.norm_long, 'street_number') and hasattr(self.norm_long, 'street'):
self.norm_long.address = self.norm_long.street_number + " " + self.norm_long.street
if (not hasattr(self.norm, 'city')) and hasattr(self.norm, 'administrative_area_level_3'):
self.norm.city = self.norm.administrative_area_level_3
if (not hasattr(self.norm_long, 'city')) and hasattr(self.norm_long, 'administrative_area_level_3'):
self.norm_long.city = self.norm_long.administrative_area_level_3
if (not hasattr(self.norm, 'city')) and hasattr(self.norm, 'neighborhood'):
self.norm.city = self.norm.neighborhood
if (not hasattr(self.norm_long, 'city')) and hasattr(self.norm_long, 'neighborhood'):
self.norm_long.city = self.norm_long.neighborhood
self.norm.geolocated = True
self.norm.location.gathered = True
self.norm.location.known = True
self.norm.location.latitude = results.latitude
self.norm.location.longitude = results.longitude
try:
self.norm.location.description = self.norm.block()
except:
logmessage("Normalized address was incomplete")
self.geolocate_success = False
self.norm.geolocate_response = results.raw
self.norm_long.geolocated = True
self.norm_long.location.gathered = True
self.norm_long.location.known = True
self.norm_long.location.latitude = results.latitude
self.norm_long.location.longitude = results.longitude
try:
self.norm_long.location.description = self.norm_long.block()
except:
logmessage("Normalized address was incomplete")
self.geolocate_success = False
self.norm_long.geolocate_response = results.raw
if address is not None:
self.normalize()
try:
self.location.description = self.block()
except:
self.location.description = ''
else:
logmessage("geolocate: Valid not ok.")
self.geolocate_success = False
#logmessage(str(self.__dict__))
return self.geolocate_success
def normalize(self, long_format=False):
if not self.geolocate():
return False
the_instance_name = self.instanceName
the_norm = self.norm
the_norm_long = self.norm_long
if long_format:
target = copy.deepcopy(the_norm_long)
else:
target = copy.deepcopy(the_norm)
for name in target.__dict__:
setattr(self, name, getattr(target, name))
self._set_instance_name_recursively(the_instance_name)
self.norm = the_norm
self.norm_long = the_norm_long
return True
def reset_geolocation(self):
"""Resets the geolocation information"""
self.delattr('norm', 'geolocate_success', 'geolocate_response', 'norm_long', 'one_line')
self.geolocated = False
self.location.delattr('gathered', 'known', 'latitude', 'longitude', 'description')
def block(self, language=None, international=False, show_country=None):
"""Returns the address formatted as a block, as in a mailing."""
if this_thread.evaluation_context == 'docx':
line_breaker = '</w:t><w:br/><w:t xml:space="preserve">'
else:
line_breaker = " [NEWLINE] "
if international:
i18n_address = {}
if (not hasattr(self, 'address')) and hasattr(self, 'street_number') and hasattr(self, 'street'):
i18n_address['street_address'] = str(self.street_number) + " " + str(self.street)
else:
i18n_address['street_address'] = str(self.address)
the_unit = self.formatted_unit(language=language)
if the_unit != '':
i18n_address['street_address'] += '\n' + the_unit
if hasattr(self, 'sublocality_level_1') and self.sublocality_level_1:
i18n_address['city_area'] = str(self.sublocality_level_1)
i18n_address['city'] = str(self.city)
if hasattr(self, 'state') and self.state:
i18n_address['country_area'] = str(self.state)
if hasattr(self, 'zip') and self.zip:
i18n_address['postal_code'] = str(self.zip)
elif hasattr(self, 'postal_code') and self.postal_code:
i18n_address['postal_code'] = str(self.postal_code)
i18n_address['country_code'] = self._get_country()
return i18naddress.format_address(i18n_address).replace('\n', line_breaker)
output = ""
if self.city_only is False:
if (not hasattr(self, 'address')) and hasattr(self, 'street_number') and hasattr(self, 'street'):
output += str(self.street_number) + " " + str(self.street) + line_breaker
else:
output += str(self.address) + line_breaker
the_unit = self.formatted_unit(language=language)
if the_unit != '':
output += the_unit + line_breaker
if hasattr(self, 'sublocality_level_1') and self.sublocality_level_1:
output += str(self.sublocality_level_1) + line_breaker
output += str(self.city)
if hasattr(self, 'state') and self.state:
output += ", " + str(self.state)
if hasattr(self, 'zip') and self.zip:
output += " " + str(self.zip)
elif hasattr(self, 'postal_code') and self.postal_code:
output += " " + str(self.postal_code)
if show_country is None and hasattr(self, 'country') and self.country and get_country() != self.country:
show_country = True
if show_country:
output += line_breaker + country_name(self._get_country())
return(output)
def _get_country(self):
if hasattr(self, 'country') and isinstance(self.country, str):
country = self.country
else:
country = None
if not country:
country = get_country()
if not country:
country = 'US'
try:
country = iso_country(country)
except:
logmessage("Invalid country code " + repr(country))
country = 'US'
return country
def formatted_unit(self, language=None, require=False):
"""Returns the unit, formatted appropriately"""
if not hasattr(self, 'unit') and not hasattr(self, 'floor') and not hasattr(self, 'room'):
if require:
self.unit
else:
return ''
if hasattr(self, 'unit') and self.unit != '' and self.unit is not None:
if not re.search(r'unit|floor|suite|apt|apartment|room|ste|fl', str(self.unit), flags=re.IGNORECASE):
return word("Unit", language=language) + " " + str(self.unit)
else:
return str(self.unit)
elif hasattr(self, 'floor') and self.floor != '' and self.floor is not None:
return word("Floor", language=language) + " " + str(self.floor)
elif hasattr(self, 'room') and self.room != '' and self.room is not None:
return word("Room", language=language) + " " + str(self.room)
return ''
def line_one(self, language=None):
"""Returns the first line of the address, including the unit
number if there is one."""
if self.city_only:
return ''
if (not hasattr(self, 'address')) and hasattr(self, 'street_number') and hasattr(self, 'street'):
output += str(self.street_number) + " " + str(self.street)
else:
output = str(self.address)
the_unit = self.formatted_unit(language=language)
if the_unit != '':
output += ", " + the_unit
return(output)
def line_two(self, language=None):
"""Returns the second line of the address, including the city,
state and zip code."""
output = ""
#if hasattr(self, 'sublocality') and self.sublocality:
# output += str(self.sublocality) + ", "
if hasattr(self, 'sublocality_level_1') and self.sublocality_level_1:
output += str(self.sublocality_level_1) + ", "
output += str(self.city)
if hasattr(self, 'state') and self.state:
output += ", " + str(self.state)
if hasattr(self, 'zip') and self.zip:
output += " " + str(self.zip)
elif hasattr(self, 'postal_code') and self.postal_code:
output += " " + str(self.postal_code)
return(output)
def iso_country(country, part='alpha_2'):
"""Returns a best-guess ISO 3166-1 country information given a country
name. The optional keyword parameter "part" can be alpha_2,
alpha_3, name, or official_name. The default "part" is alpha_2.
"""
try:
guess = pycountry.countries.search_fuzzy(country)
assert len(guess) > 0
except:
raise DAError("Invalid country: " + str(country))
if part == 'alpha_2':
return guess[0].alpha_2
elif part == 'alpha_3':
return guess[0].alpha_3
elif part == 'name':
return guess[0].name
elif part == 'numeric':
return guess[0].numeric
elif part == 'official_name':
return guess[0].official_name
else:
raise DAError('iso_country: unknown part')
class City(Address):
"""A geographic address specific only to a city."""
def init(self, *pargs, **kwargs):
self.city_only = True
return super().init(*pargs, **kwargs)
class Thing(DAObject):
"""Represents something with a name."""
def init(self, *pargs, **kwargs):
if not hasattr(self, 'name') and 'name' not in kwargs:
self.name = Name()
if 'name' in kwargs and isinstance(kwargs['name'], str):
if not hasattr(self, 'name'):
self.name = Name()