Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/registration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Registration necessary
======================

As of September 14th, 2019, all FinTS programs need to be registered with the ZKA or
banks will block access. You need to fill out a PDF form and will be assigned a
product ID that you can pass above.

Click here to read more about the `registration process`_.


.. _registration process: https://www.hbci-zka.de/register/prod_register.htm
53 changes: 39 additions & 14 deletions docs/transfers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ You can create a simple SEPA transfer using this convenient client method:
:members: simple_sepa_transfer
:noindex:

The return value may be a `NeedVOPResponse` in which case you need to call `approve_vop_response` to proceed.

At any point, you might receive a `NeedTANResponse`.
You should then enter a TAN, read our chapter :ref:`tans` to find out more.

.. autoclass:: fints.client.FinTS3PinTanClient
:members: approve_vop_response
:noindex:


Advanced mode
-------------

Expand Down Expand Up @@ -55,20 +63,37 @@ Full example
endtoend_id='NOTPROVIDED',
)

while isinstance(res, NeedTANResponse):
print("A TAN is required", res.challenge)

if getattr(res, 'challenge_hhduc', None):
try:
terminal_flicker_unix(res.challenge_hhduc)
except KeyboardInterrupt:
pass

if result.decoupled:
tan = input('Please press enter after confirming the transaction in your app:')
else:
tan = input('Please enter TAN:')
res = client.send_tan(res, tan)
while isinstance(res, NeedTANResponse | NeedVOPResponse):
if isinstance(res, NeedTANResponse):
print("A TAN is required", res.challenge)

if getattr(res, 'challenge_hhduc', None):
try:
terminal_flicker_unix(res.challenge_hhduc)
except KeyboardInterrupt:
pass

if result.decoupled:
tan = input('Please press enter after confirming the transaction in your app:')
else:
tan = input('Please enter TAN:')
res = client.send_tan(res, tan)
elif isinstance(res, NeedVOPResponse):
if res.vop_result.vop_single_result.result == "RVMC":
print("Payee name is a close match")
print("Name retrieved by bank:", res.vop_result.vop_single_result.close_match_name)
if res.vop_result.vop_single_result.other_identification:
print("Other info retrieved by bank:", res.vop_result.vop_single_result.other_identification)
elif res.vop_result.vop_single_result.result == "RVNM":
print("Payee name does not match match")
elif res.vop_result.vop_single_result.result == "RVNA":
print("Payee name could not be verified")
print("Reason:", res.vop_result.vop_single_result.na_reason)
elif res.vop_result.vop_single_result.result == "PDNG":
print("Payee name could not be verified (pending state, can't be handled by this library)")
print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
input('Please press enter to confirm or Ctrl+C to cancel')
res = client.approve_vop_response(res)

print(res.status)
print(res.responses)
Expand Down
25 changes: 23 additions & 2 deletions docs/trouble.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ the problem.
return f.send_tan(response, tan)


def ask_for_vop(response: NeedVOPResponse):
if response.vop_result.vop_single_result.result == "RVMC":
print("Payee name is a close match")
print("Name retrieved by bank:", response.vop_result.vop_single_result.close_match_name)
if response.vop_result.vop_single_result.other_identification:
print("Other info retrieved by bank:", response.vop_result.vop_single_result.other_identification)
elif response.vop_result.vop_single_result.result == "RVNM":
print("Payee name does not match match")
elif response.vop_result.vop_single_result.result == "RVNA":
print("Payee name could not be verified")
print("Reason:", response.vop_result.vop_single_result.na_reason)
elif response.vop_result.vop_single_result.result == "PDNG":
print("Payee name could not be verified (pending state, can't be handled by this library)")
print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
input('Please press enter to confirm or Ctrl+C to cancel')
return f.approve_vop_response(response)


# Open the actual dialog
with f:
# Since PSD2, a TAN might be needed for dialog initialization. Let's check if there is one required
Expand Down Expand Up @@ -172,8 +190,11 @@ the problem.
endtoend_id='NOTPROVIDED',
)

while isinstance(res, NeedTANResponse):
res = ask_for_tan(res)
while isinstance(res, NeedTANResponse | NeedVOPResponse):
if isinstance(res, NeedTANResponse):
res = ask_for_tan(res)
elif isinstance(res, NeedVOPResponse):
res = ask_for_vop(res)
elif choice == 11:
print("Select statement")
statements = f.get_statements(account)
Expand Down
155 changes: 149 additions & 6 deletions fints/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
PinTanTwoStepAuthenticationMechanism,
)
from .segments.accounts import HISPA1, HKSPA1
from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7
from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7, HIVPPS1, HIVPP1, PSRD1, HKVPA1
from .segments.bank import HIBPA3, HIUPA4, HKKOM4
from .segments.debit import (
HKDBS1, HKDBS2, HKDMB1, HKDMC1, HKDME1, HKDME2,
Expand Down Expand Up @@ -818,7 +818,7 @@ def simple_sepa_transfer(self, account: SEPAAccount, iban: str, bic: str,
:param reason: Transfer reason
:param instant_payment: Whether to use instant payment (defaults to ``False``)
:param endtoend_id: End-to-end-Id (defaults to ``NOTPROVIDED``)
:return: Returns either a NeedRetryResponse or TransactionResponse
:return: Returns either a NeedRetryResponse or NeedVOPResponse or TransactionResponse
"""
config = {
"name": account_name,
Expand Down Expand Up @@ -892,7 +892,7 @@ def sepa_transfer(self, account: SEPAAccount, pain_message: str, multiple=False,
if book_as_single:
seg.request_single_booking = True

return self._send_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
return self._send_pay_with_possible_retry(dialog, seg, self._continue_sepa_transfer)

def _continue_sepa_transfer(self, command_seg, response):
retval = TransactionResponse(response)
Expand Down Expand Up @@ -1058,19 +1058,56 @@ def resume_dialog(self, dialog_data):
self._standing_dialog = None


class NeedVOPResponse(NeedRetryResponse):

def __init__(self, vop_result, command_seg, resume_method=None):
self.vop_result = vop_result
self.command_seg = command_seg
if hasattr(resume_method, '__func__'):
self.resume_method = resume_method.__func__.__name__
else:
self.resume_method = resume_method

def __repr__(self):
return '<o.__class__.__name__(vop_result={o.vop_result!r})>'.format(o=self)

@classmethod
def _from_data_v1(cls, data):
if data["version"] == 1:
segs = SegmentSequence(data['segments_bin']).segments
return cls(segs[0], segs[1], resume_method=data['resume_method'])

raise Exception("Wrong blob data version")

def get_data(self) -> bytes:
"""Return a compressed datablob representing this object.

To restore the object, use :func:`fints.client.NeedRetryResponse.from_data`.
"""
data = {
"_class_name": self.__class__.__name__,
"version": 1,
"segments_bin": SegmentSequence([self.vop_result, self.command_seg]).render_bytes(),
"resume_method": self.resume_method,
}
return compress_datablob(DATA_BLOB_MAGIC_RETRY, 1, data)


class NeedTANResponse(NeedRetryResponse):
challenge_raw = None #: Raw challenge as received by the bank
challenge = None #: Textual challenge to be displayed to the user
challenge_html = None #: HTML-safe challenge text, possibly with formatting
challenge_hhduc = None #: HHD_UC challenge to be transmitted to the TAN generator
challenge_matrix = None #: Matrix code challenge: tuple(mime_type, data)
decoupled = None #: Use decoupled process
vop_result = None #: VoP result

def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False):
def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False, vop_result=None):
self.command_seg = command_seg
self.tan_request = tan_request
self.tan_request_structured = tan_request_structured
self.decoupled = decoupled
self.vop_result = vop_result
if hasattr(resume_method, '__func__'):
self.resume_method = resume_method.__func__.__name__
else:
Expand Down Expand Up @@ -1299,6 +1336,23 @@ def _get_tan_segment(self, orig_seg, tan_process, tan_seg=None):

return seg

def _find_vop_format_for_segment(self, seg):
vpps = self.bpd.find_segment_first('HIVPPS')
if not vpps:
return

needed = str(seg.header.type) in list(vpps.parameter.payment_order_segment)

if not needed:
return

bank_supported = str(vpps.parameter.supported_report_formats)

if "sepade.pain.002.001.10.xsd" != bank_supported:
logger.warning("No common supported SEPA version. Defaulting to what bank supports and hoping for the best: %s.", bank_supported)

return bank_supported

def _need_twostep_tan_for_segment(self, seg):
if not self.selected_security_function or self.selected_security_function == '999':
return False
Expand Down Expand Up @@ -1335,13 +1389,99 @@ def _send_with_possible_retry(self, dialog, command_seg, resume_func):
response = dialog.send(command_seg)

return resume_func(command_seg, response)

def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func):
"""
This adds VoP under the assumption that TAN will be sent,
There appears to be no VoP flow without sending any authentication.

There are really 2 VoP flows: with a full match and otherwise.
The second flow returns a NeedVOPResponse as intended by the specification flowcharts.
In this case cases, the application should ask the user for confirmation based on HIVPP data in resp.vop_result.

The kind of response is in resp.vop_result.single_vop_result.result:
- 'RCVC' - full match
- 'RVMC' - partial match, extra info in single_vop_result.close_match_name and .other_identification.
- 'RVNM' - no match, no extra info seen
- 'RVNA' - check not available, reason in single_vop_result.na_reason
- 'PDNG' - pending, seems related to something not implemented right now.
"""
vop_seg = []
vop_standard = self._find_vop_format_for_segment(command_seg)
if vop_standard:
from .segments.auth import HKVPP1
vop_seg = [HKVPP1(supported_reports=PSRD1(psrd=[vop_standard]))]

with dialog:
if self._need_twostep_tan_for_segment(command_seg):
tan_seg = self._get_tan_segment(command_seg, '4')
segments = vop_seg + [command_seg, tan_seg]

response = dialog.send(*segments)

if vop_standard:
hivpp = response.find_segment_first(HIVPP1, throw=True)

vop_result = hivpp.vop_single_result
if vop_result.result in ('RVNA', 'RVNM', 'RVMC'): # Not Applicable, No Match, Close Match
return NeedVOPResponse(
vop_result=hivpp,
command_seg=command_seg,
resume_method=resume_func,
)
else:
hivpp = None

for resp in response.responses(tan_seg):
if resp.code in ('0030', '3955'):
return NeedTANResponse(
command_seg,
response.find_segment_first('HITAN'),
resume_func,
self.is_challenge_structured(),
resp.code == '3955',
hivpp,
)
if resp.code.startswith('9'):
raise Exception("Error response: {!r}".format(response))
else:
response = dialog.send(command_seg)

return resume_func(command_seg, response)

def is_challenge_structured(self):
param = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
if hasattr(param, 'challenge_structured'):
return param.challenge_structured
return False

def approve_vop_response(self, challenge: NeedVOPResponse):
"""
Approves an operation that had a non-match VoP (verification of payee) response.

:param challenge: NeedVOPResponse to respond to
:return: New response after sending VOP response
"""
with self._get_dialog() as dialog:
vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
tan_seg = self._get_tan_segment(challenge.command_seg, '4')
segments = vop_seg + [challenge.command_seg, tan_seg]
response = dialog.send(*segments)

for resp in response.responses(tan_seg):
if resp.code in ('0030', '3955'):
return NeedTANResponse(
challenge.command_seg,
response.find_segment_first('HITAN'),
challenge.resume_method,
self.is_challenge_structured(),
resp.code == '3955',
challenge.vop_result,
)

resume_func = getattr(self, challenge.resume_method)
return resume_func(challenge.command_seg, response)

def send_tan(self, challenge: NeedTANResponse, tan: str):
"""
Sends a TAN to confirm a pending operation.
Expand All @@ -1354,15 +1494,18 @@ def send_tan(self, challenge: NeedTANResponse, tan: str):
:param tan: TAN value
:return: New response after sending TAN
"""

with self._get_dialog() as dialog:
if challenge.decoupled:
tan_seg = self._get_tan_segment(challenge.command_seg, 'S', challenge.tan_request)
else:
tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.tan_request)
self._pending_tan = tan

response = dialog.send(tan_seg)
vop_seg = []
if challenge.vop_result and challenge.vop_result.vop_single_result.result == 'RCVC':
vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
segments = vop_seg + [tan_seg]
response = dialog.send(*segments)

if challenge.decoupled:
# TAN process = S
Expand Down
15 changes: 15 additions & 0 deletions fints/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ def _render_value(self, value):
return super()._render_value(val)


class TimestampField(DataElementField):
# Defined in the VoP standard, but missing in the Formals document. We just treat it as
# opaque bytes.
type = 'tsp'
_DOC_TYPE = bytes

def _render_value(self, value):
retval = bytes(value)
self._check_value_length(retval)
return retval

def _parse_value(self, value):
return bytes(value)


class PasswordField(AlphanumericField):
type = ''
_DOC_TYPE = Password
Expand Down
Loading