Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Should be all functionnal now. ISO decoding/encoding is implemented u…

…sing Python codecs.
  • Loading branch information...
commit 9d8e5124a8b17ef0a4477926a176c0c3da5b2651 1 parent 2d554d4
Steeve Morin authored
Showing with 143 additions and 35 deletions.
  1. +143 −35 msr605.py
178 msr605.py
View
@@ -1,9 +1,14 @@
import time
import serial
+import re
+import codecs
class MSRException(Exception):
pass
+class ReadError(MSRException):
+ pass
+
class ReadWriteError(MSRException):
pass
@@ -19,20 +24,22 @@ class InvalidCardSwipeForWrite(MSRException):
class SetError(MSRException):
pass
-# defining the core object
class MSR605(serial.Serial):
- # protocol
ESC_CHR = '\x1B'
FS_CHR = '\x1C'
- END_CHR = '\x1C'
- HiCo = True
- HiBPI = True
enabled_tracks = [1, 2, 3]
def __init__(self, dev):
- super(MSR605, self).__init__(dev, 9600, 8, serial.PARITY_NONE, timeout=10)
+ super(LLMSR605, self).__init__(dev, 9600, 8, serial.PARITY_NONE, timeout=10)
+ self.reset()
+ if not self.communication_test():
+ raise MSRException('Communication test failed.')
+ if not self.ram_test():
+ raise MSRException('RAM test failed.')
+ if not self.sensor_test():
+ raise MSRException('Sensor test failed.')
self.reset()
def _read_status(self):
@@ -45,20 +52,26 @@ def _read_status(self):
}
self._expect(self.ESC_CHR)
status = self.read(1)
- if status in exceptions.keys():
+ if status in exceptions:
raise exceptions[status]()
return status
def _expect(self, data):
- assert self.read(len(data)) == data
+ read_data = self.read(len(data))
+ if read_data != data:
+ raise ReadError('Expected %s, got %s.' % (repr(data), repr(read_data)))
- def _read_until(self, end_byte):
+ def _read_until(self, end):
data = ""
while True:
- byte = self.read(1)
- if byte == end_byte:
+ data += self.read(1)
+ if data.endswith(end):
return data
- data += byte
+
+ def _send_command(self, command, *args):
+ self.flushInput()
+ self.flushOutput()
+ self.write(self.ESC_CHR + command + ''.join(args))
def all_leds_off(self):
self._send_command('\x81')
@@ -91,17 +104,16 @@ def reset(self):
self._send_command('\x61')
def read_iso(self):
+ self.set_bpc(7, 5, 5)
+ self.select_bpi(True, False, True)
self._send_command('\x72')
self._expect(self.ESC_CHR + '\x73')
- self._expect(self.ESC_CHR)
- track1 = self._read_until(self.ESC_CHR)
- assert track1[:2] == '\x01%' and track1[-1] == '?'
- track2 = self._read_until(self.ESC_CHR)
- assert track2[:2] == '\x02;' and track2[-1] == '?'
- track3 = self._read_until(self.FS_CHR)
- assert track3[:2] == ('\x03' + self.ESC_CHR) and track3[-1] == '?'
-
- return track1[2:-1], track2[2:-1], track3[2:-1]
+ self._expect(self.ESC_CHR + '\x01')
+ track1 = self._read_until(self.ESC_CHR + '\x02')[:-2]
+ track2 = self._read_until(self.ESC_CHR + '\x03')[:-2]
+ track3 = self._read_until(self.FS_CHR)[:-1]
+ self._read_status()
+ return track1, track2, track3
def read_raw(self):
self._send_command('\x6D')
@@ -115,10 +127,6 @@ def read_raw(self):
self._read_status()
return tracks
- def _send_command(self, command, *args):
- self.flushInput()
- self.write(self.ESC_CHR + command + ''.join(args))
-
def get_device_model(self):
self._send_command('\x74')
self._expect(self.ESC_CHR)
@@ -135,9 +143,17 @@ def set_hico(self):
self._send_command('\x78')
self._expect(self.ESC_CHR + '\x30')
- def set_leading_zero(self, t1, t2, t3):
- assert t1 == t3
- self._send_command('\x7A', chr(t1), chr(t2))
+ def set_lowco(self):
+ self._send_command('\x79')
+ self._expect(self.ESC_CHR + '\x30')
+
+ def get_co_status(self):
+ self._send_command('\x79')
+ self._expect(self.ESC_CHR)
+ return self.read(1)
+
+ def set_leading_zero(self, t13, t2):
+ self._send_command('\x7A', chr(t13), chr(t2))
self._read_status()
def check_leading_zero(self):
@@ -145,7 +161,7 @@ def check_leading_zero(self):
self._expect(self.ESC_CHR)
t13 = ord(self.read(1))
t2 = ord(self.read(1))
- return t13, t2, t13
+ return t13, t2
def erase_card(self, t1=True, t2=True, t3=True):
flags = (t1 and 1 or 0) | ((t2 and 1 or 0) << 1) | ((t3 and 1 or 0) << 2)
@@ -160,15 +176,107 @@ def select_bpi(self, t1_density, t2_density, t3_density):
self._send_command('\x62', t3_density and '\xC1' or '\xC0')
self._read_status()
+ def set_bpc(self, t1, t2, t3):
+ self._send_command('\x6F', chr(t1), chr(t2), chr(t3))
+ self._expect(self.ESC_CHR + '\x30' + chr(t1) + chr(t2) + chr(t3))
+
def write_raw(self, *tracks):
raw_data_block = self.ESC_CHR + '\x73'
for tn, track in enumerate(tracks):
- if track:
- raw_data_block += \
- self.ESC_CHR +\
- chr(tn + 1) +\
- chr(len(track)) +\
- track
+ raw_data_block += \
+ self.ESC_CHR +\
+ chr(tn + 1) +\
+ chr(len(track)) +\
+ track
raw_data_block += '\x3F' + self.FS_CHR
self._send_command('\x6E', raw_data_block)
self._read_status()
+
+ def write_iso(self, *tracks):
+ tracks = (re.sub(r'^%|^;|\?$', '', track) for track in tracks)
+ data_block = self.ESC_CHR + '\x73'
+ data_block += ''.join(
+ self.ESC_CHR + chr(tn + 1) + track
+ for tn, track in enumerate(tracks)
+ )
+ data_block += '\x3F' + self.FS_CHR
+ self._send_command('\x77', raw_data_block)
+ self._read_status()
+
+ def read_iso_soft(self):
+ track1, track2, track3 = self.read_raw()
+ return (
+ track1.decode('iso7811-2-track1'),
+ track2.decode('iso7811-2-track23'),
+ track3.decode('iso7811-2-track23'),
+ )
+
+ def write_iso_soft(self, *tracks):
+ tracks = [re.sub(r'^%|^;|\?$', '', track) for track in tracks]
+ tracks[0] = ('%' + tracks[0] + '?').encode('iso7811-2-track1')
+ tracks[1] = (';' + tracks[1] + '?').encode('iso7811-2-track23')
+ tracks[2] = (';' + tracks[2] + '?').encode('iso7811-2-track23')
+ return self.write_raw(*tracks)
+
+
+class ISO7811_2(codecs.Codec):
+ TRACK1_CHARS = ' !"#$%&\'()*+`,./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_'
+ TRACK23_CHARS = '0123456789:;<=>?'
+
+ @classmethod
+ def _reverse_bits(cls, value, nbits):
+ return sum(
+ 1 << (nbits - 1 - i)
+ for i in xrange(nbits)
+ if (value >> i) & 1
+ )
+
+ @classmethod
+ def _with_parity(cls, value, nbits):
+ if sum(1 for i in xrange(nbits) if (value >> i) & 1) % 2 != 0:
+ return value
+ return value | (1 << (nbits - 1))
+
+ @classmethod
+ def _iso_encode_data(cls, data, mapping, nbits):
+ def make_data():
+ lrc = 0
+ for v in map(mapping.index, data):
+ lrc ^= v
+ yield chr(cls._with_parity(v, nbits))
+ yield chr(cls._with_parity(lrc, nbits))
+ enc = ''.join(make_data())
+ return enc, len(enc)
+
+ @classmethod
+ def _iso_decode_data(cls, data, mapping, nbits):
+ dec = ''.join(
+ mapping[cls._reverse_bits(ord(c) >> 1, nbits - 1)]
+ for c in data
+ )
+ return dec, len(dec)
+
+ @classmethod
+ def encode_track1(cls, data):
+ return cls._iso_encode_data(data, cls.TRACK1_CHARS, 7)
+
+ @classmethod
+ def encode_track23(cls, data):
+ return cls._iso_encode_data(data, cls.TRACK23_CHARS, 5)
+
+ @classmethod
+ def decode_track1(cls, data):
+ return cls._iso_decode_data(data, cls.TRACK1_CHARS, 7)
+
+ @classmethod
+ def decode_track23(cls, data):
+ return cls._iso_decode_data(data, cls.TRACK23_CHARS, 5)
+
+ @classmethod
+ def codec_search(cls, name):
+ return {
+ 'iso7811-2-track1': (cls.encode_track1, cls.decode_track1, None, None),
+ 'iso7811-2-track23': (cls.encode_track23, cls.decode_track23, None, None),
+ }.get(name, None)
+codecs.register(ISO7811_2.codec_search)
+
Please sign in to comment.
Something went wrong with that request. Please try again.