Skip to content

Commit

Permalink
Fixes + flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
abernardi committed Jul 31, 2017
1 parent 70a3895 commit 3e47871
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 411 deletions.
6 changes: 5 additions & 1 deletion __init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from trytond.pool import Pool
# This file is part of the account_invoice_ar module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.

from trytond.pool import Pool
from .invoice import *
from .company import *
from .pos import *


def register():
Pool.register(
Pos,
Expand Down
75 changes: 37 additions & 38 deletions afip_auth.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
#!/usr/bin/python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the Affero GNU General Public License as published by
# the Software Foundation; either version 3, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# Copyright 2013 by
# This file is part of the account_invoice_ar module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.

# Based on code "factura_electronica" by Luis Falcon (GPLv3)
# Based on code by "openerp-iva-argentina" by Gerardo Allende / Daniel Blanco

"Authentication functions for Argentina's Federal Tax Agency (AFIP) webservices"
"""
Authentication functions for Argentina's Federal Tax Agency (AFIP) webservices
"""

__author__ = "Mariano Reingart (reingart@gmail.com)"
__copyright__ = "Copyright (C) 2013 Mariano Reingart and others"
Expand All @@ -30,36 +25,38 @@
import traceback


DEFAULT_TTL = 60*60*5 # five hours
WSAA_URL = "" # change to production server (testing default)
PROXY = "" # proxy credentials and host
CACHE = "" # cache folder path, use default
DEFAULT_TTL = 60 * 60 * 5 # five hours
WSAA_URL = '' # change to production server (testing default)
PROXY = '' # proxy credentials and host
CACHE = '' # cache folder path, use default
DEBUG = False


def authenticate(service, certificate, private_key, force=False,
cache=CACHE, wsdl=WSAA_URL, proxy=PROXY, ):
"Call AFIP Authentication webservice to get token & sign or error message"
cache=CACHE, wsdl=WSAA_URL, proxy=PROXY):
'Call AFIP Authentication webservice to get token & sign or error message'

# import AFIP webservice authentication helper:
from pyafipws.wsaa import WSAA

# create AFIP webservice authentication helper instance:
wsaa = WSAA()
wsaa.LanzarExcepciones = True # raise python exceptions on any failure
wsaa.LanzarExcepciones = True # raise python exceptions on any failure

# make md5 hash of the parameter for caching...
fn = "%s.xml" % hashlib.md5(service + certificate + private_key).hexdigest()
fn = '%s.xml' % hashlib.md5(
service + certificate + private_key).hexdigest()
if cache:
fn = os.path.join(cache, fn)
else:
fn = os.path.join(get_cache_dir(), fn)

try:
# read the access ticket (if already authenticated)
if not os.path.exists(fn) or \
os.path.getmtime(fn)+(DEFAULT_TTL) < time.time():
# access ticket (TA) outdated, create new access request ticket (TRA)
if (not os.path.exists(fn) or
os.path.getmtime(fn) + (DEFAULT_TTL) < time.time()):
# access ticket (TA) outdated,
# create new access request ticket (TRA)
tra = wsaa.CreateTRA(service=service, ttl=DEFAULT_TTL)
# cryptographically sing the access ticket
cms = wsaa.SignTRA(tra, certificate, private_key)
Expand All @@ -70,10 +67,10 @@ def authenticate(service, certificate, private_key, force=False,
if not ta:
raise RuntimeError()
# write the access ticket for further consumption
open(fn, "w").write(ta)
open(fn, 'w').write(ta)
else:
# get the access ticket from the previously written file
ta = open(fn, "r").read()
ta = open(fn, 'r').read()
# force to connect against WSAA
if force:
tra = None
Expand All @@ -89,13 +86,13 @@ def authenticate(service, certificate, private_key, force=False,
if not ta:
raise RuntimeError()
# write the access ticket for further consumption
open(fn, "w").write(ta)
open(fn, 'w').write(ta)
# analyze the access ticket xml and extract the relevant fields
wsaa.AnalizarXml(xml=ta)
token = wsaa.ObtenerTagXml("token")
#print "token", token
sign = wsaa.ObtenerTagXml("sign")
#print "sign", sign
token = wsaa.ObtenerTagXml('token')
#print 'token', token
sign = wsaa.ObtenerTagXml('sign')
#print 'sign', sign
err_msg = None
except:
token = sign = None
Expand All @@ -110,25 +107,27 @@ def authenticate(service, certificate, private_key, force=False,
raise
return {'token': token, 'sign': sign, 'err_msg': err_msg}


def get_account_invoice_install_dir():
basepath = __file__
return os.path.dirname(os.path.abspath(basepath))


def get_cache_dir():
return os.path.join(get_account_invoice_install_dir(), "cache")
return os.path.join(get_account_invoice_install_dir(), 'cache')


if __name__ == '__main__':
# basic tests:
reingart_crt = open("./pyafipws/reingart.crt").read()
reingart_key = open("./pyafipws/reingart.key").read()
auth_data = authenticate("wsfe", reingart_crt, reingart_key, force=True)
reingart_crt = open('./pyafipws/reingart.crt').read()
reingart_key = open('./pyafipws/reingart.key').read()
auth_data = authenticate('wsfe', reingart_crt, reingart_key, force=True)
print auth_data
assert auth_data['token']
assert auth_data['sign']
old_token = auth_data['token']
auth_data = authenticate("wsfe", reingart_crt, reingart_key, force=True)
auth_data = authenticate('wsfe', reingart_crt, reingart_key, force=True)
assert auth_data['token'] == old_token
import base64
print base64.b64decode(auth_data['token'])
print "ok."

print 'ok.'
44 changes: 26 additions & 18 deletions company.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
#! -*- coding: utf8 -*-
# -*- coding: utf-8 -*-
# This file is part of the account_invoice_ar module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.

from trytond.model import ModelView, ModelSQL, fields

__all__ = ['Company']


class Company(ModelSQL, ModelView):
'Company'
__name__ = 'company.company'

pyafipws_certificate = fields.Text('Certificado AFIP WS',
help="Certificado (.crt) de la empresa para webservices AFIP")
help='Certificado (.crt) de la empresa para webservices AFIP')
pyafipws_private_key = fields.Text('Clave Privada AFIP WS',
help="Clave Privada (.key) de la empresa para webservices AFIP")
help='Clave Privada (.key) de la empresa para webservices AFIP')
pyafipws_mode_cert = fields.Selection([
('', 'n/a'),
('homologacion', u'Homologación'),
('produccion', u'Producción'),
], 'Modo de certificacion',
help=u"El objetivo de Homologación (testing), es facilitar las pruebas. Los certificados de Homologación y Producción son distintos.")
('', 'n/a'),
('homologacion', u'Homologación'),
('produccion', u'Producción'),
], 'Modo de certificacion',
help=u'El objetivo de Homologación (testing), es facilitar las '
u'pruebas. Los certificados de Homologación y Producción son '
u'distintos.')

@staticmethod
def default_pyafipws_mode_cert():
Expand All @@ -41,29 +47,31 @@ def check_pyafipws_mode_cert(self):
if self.pyafipws_mode_cert == '':
return

auth_data = self.pyafipws_authenticate(service="wsfe", force=True)
if auth_data['err_msg'] != None:
auth_data = self.pyafipws_authenticate(service='wsfe', force=True)
if auth_data['err_msg'] is not None:
self.raise_user_error('wrong_pyafipws_mode', {
'message': auth_data['err_msg'],
})
})

def pyafipws_authenticate(self, service="wsfe", force=False):
"Authenticate against AFIP, returns token, sign, err_msg (dict)"
def pyafipws_authenticate(self, service='wsfe', force=False):
'Authenticate against AFIP, returns token, sign, err_msg (dict)'
import afip_auth
auth_data = {}
# get the authentication credentials:
certificate = str(self.pyafipws_certificate)
private_key = str(self.pyafipws_private_key)
if self.pyafipws_mode_cert == 'homologacion':
WSAA_URL = "https://wsaahomo.afip.gov.ar/ws/services/LoginCms?wsdl"
WSAA_URL = 'https://wsaahomo.afip.gov.ar/ws/services/LoginCms?wsdl'
elif self.pyafipws_mode_cert == 'produccion':
WSAA_URL = "https://wsaa.afip.gov.ar/ws/services/LoginCms?wsdl"
WSAA_URL = 'https://wsaa.afip.gov.ar/ws/services/LoginCms?wsdl'
else:
self.raise_user_error('wrong_pyafipws_mode', {
'message': u'El modo de certificación no es ni producción, ni homologación. Configure su Empresa',
})
'message': u'El modo de certificación no es ni producción, ni '
u'homologación. Configure su Empresa',
})

# call the helper function to obtain the access ticket:
auth = afip_auth.authenticate(service, certificate, private_key, wsdl=WSAA_URL, force=force)
auth = afip_auth.authenticate(service, certificate, private_key,
wsdl=WSAA_URL, force=force)
auth_data.update(auth)
return auth_data
Binary file modified invoice.odt
Binary file not shown.
Loading

0 comments on commit 3e47871

Please sign in to comment.