/
utils.py
135 lines (104 loc) · 4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# SPDX-License-Identifier: GPL-2.0-only
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) Tabea Spahn <tabea.spahn@e-mundo.de>
# scapy.contrib.status = skip
import struct
from scapy.config import conf
from scapy.contrib.automotive import log_automotive
from scapy.fields import StrLenField
from scapy.volatile import RandBin, RandNum
def get_max_cto():
max_cto = conf.contribs['XCP']['MAX_CTO']
if max_cto:
return max_cto
log_automotive.warning("Define conf.contribs['XCP']['MAX_CTO'].")
raise KeyError("conf.contribs['XCP']['MAX_CTO'] not defined")
def get_max_dto():
max_dto = conf.contribs['XCP']['MAX_DTO']
if max_dto:
return max_dto
else:
log_automotive.warning("Define conf.contribs['XCP']['MAX_DTO'].")
raise KeyError("conf.contribs['XCP']['MAX_DTO'] not defined")
def get_ag():
address_granularity = conf.contribs['XCP']['Address_Granularity_Byte']
if address_granularity and address_granularity in [1, 2, 4]:
return address_granularity
else:
log_automotive.warning(
"Define conf.contribs['XCP']['Address_Granularity_Byte']."
"Assign either 1, 2 or 4")
return 1
# With TIMESTAMP_MODE and TIMESTAMP_TICKS at GET_DAQ_RESOLUTION_INFO,
# the slave informs the master about the Type of Timestamp Field
# the slave will use when transferring DAQ Packets to the master.
# The master has to use the same Type of Timestamp Field when transferring
# STIM Packets to the slave. TIMESTAMP_MODE and TIMEPSTAMP_TICKS contain
# information on the resolution of the data transfer clock.
def get_timestamp_length():
return conf.contribs['XCP']['timestamp_size']
def identification_field_needs_alignment():
try:
identification_field_type_0 = conf.contribs['XCP'][
'identification_field_type_0']
identification_field_type_1 = conf.contribs['XCP'][
'identification_field_type_1']
if identification_field_type_1 == 1 and \
identification_field_type_0 == 1:
# relative odt with daq as word (aligned)
return True
return False
except KeyError:
return False
def get_daq_length():
try:
identification_field_type_0 = conf.contribs['XCP'][
'identification_field_type_0']
identification_field_type_1 = conf.contribs['XCP'][
'identification_field_type_1']
if identification_field_type_1 == 0 and \
identification_field_type_0 == 0:
# absolute odt number
return 0
if identification_field_type_1 == 0 and \
identification_field_type_0 == 1:
# relative odt with daq as byte
return 1
# relative odt with daq as word
return 2
except KeyError:
return 0
def get_daq_data_field_length():
try:
data_length = get_max_dto()
except KeyError:
return 0
data_length -= 1 # pid
if identification_field_needs_alignment():
data_length -= 1
data_length -= get_daq_length()
return data_length
# Idea taken from scapy/scapy/contrib/dce_rpc.py
class XCPEndiannessField(object):
"""Field which changes the endianness of a sub-field"""
__slots__ = ["fld"]
def __init__(self, fld):
self.fld = fld
def set_endianness(self):
"""Add the endianness to the format"""
byte_oder = conf.contribs['XCP']['byte_order']
endianness = ">" if byte_oder == 1 else "<"
self.fld.fmt = endianness + self.fld.fmt[1:]
self.fld.struct = struct.Struct(self.fld.fmt)
def getfield(self, pkt, s):
self.set_endianness()
return self.fld.getfield(pkt, s)
def addfield(self, pkt, s, val):
self.set_endianness()
return self.fld.addfield(pkt, s, val)
def __getattr__(self, attr):
return getattr(self.fld, attr)
class StrVarLenField(StrLenField):
def randval(self):
return RandBin(RandNum(0, self.max_length() or 1200))