/
cipher_stream.py
134 lines (108 loc) · 4.33 KB
/
cipher_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# SPDX-License-Identifier: GPL-2.0-only
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
# 2015, 2016, 2017 Maxence Tury
"""
Stream ciphers.
"""
from __future__ import absolute_import
from scapy.config import conf
from scapy.layers.tls.crypto.common import CipherError
import scapy.libs.six as six
if conf.crypto_valid:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
_tls_stream_cipher_algs = {}
class _StreamCipherMetaclass(type):
"""
Cipher classes are automatically registered through this metaclass.
Furthermore, their name attribute is extracted from their class name.
"""
def __new__(cls, ciph_name, bases, dct):
if ciph_name != "_StreamCipher":
dct["name"] = ciph_name[7:] # remove leading "Cipher_"
the_class = super(_StreamCipherMetaclass, cls).__new__(cls, ciph_name,
bases, dct)
if ciph_name != "_StreamCipher":
_tls_stream_cipher_algs[ciph_name[7:]] = the_class
return the_class
class _StreamCipher(six.with_metaclass(_StreamCipherMetaclass, object)):
type = "stream"
def __init__(self, key=None):
"""
Note that we have to keep the encryption/decryption state in unique
encryptor and decryptor objects. This differs from _BlockCipher.
In order to do connection state snapshots, we need to be able to
recreate past cipher contexts. This is why we feed _enc_updated_with
and _dec_updated_with every time encrypt() or decrypt() is called.
"""
self.ready = {"key": True}
if key is None:
self.ready["key"] = False
if hasattr(self, "expanded_key_len"):
tmp_len = self.expanded_key_len
else:
tmp_len = self.key_len
key = b"\0" * tmp_len
# we use super() in order to avoid any deadlock with __setattr__
super(_StreamCipher, self).__setattr__("key", key)
self._cipher = Cipher(self.pc_cls(key),
mode=None,
backend=default_backend())
self.encryptor = self._cipher.encryptor()
self.decryptor = self._cipher.decryptor()
self._enc_updated_with = b""
self._dec_updated_with = b""
def __setattr__(self, name, val):
"""
We have to keep the encryptor/decryptor for a long time,
however they have to be updated every time the key is changed.
"""
if name == "key":
if self._cipher is not None:
self._cipher.algorithm.key = val
self.encryptor = self._cipher.encryptor()
self.decryptor = self._cipher.decryptor()
self.ready["key"] = True
super(_StreamCipher, self).__setattr__(name, val)
def encrypt(self, data):
if False in six.itervalues(self.ready):
raise CipherError(data)
self._enc_updated_with += data
return self.encryptor.update(data)
def decrypt(self, data):
if False in six.itervalues(self.ready):
raise CipherError(data)
self._dec_updated_with += data
return self.decryptor.update(data)
def snapshot(self):
c = self.__class__(self.key)
c.ready = self.ready.copy()
c.encryptor.update(self._enc_updated_with)
c.decryptor.update(self._dec_updated_with)
c._enc_updated_with = self._enc_updated_with
c._dec_updated_with = self._dec_updated_with
return c
if conf.crypto_valid:
class Cipher_RC4_128(_StreamCipher):
pc_cls = algorithms.ARC4
key_len = 16
class Cipher_RC4_40(Cipher_RC4_128):
expanded_key_len = 16
key_len = 5
class Cipher_NULL(_StreamCipher):
key_len = 0
def __init__(self, key=None):
self.ready = {"key": True}
self._cipher = None
# we use super() in order to avoid any deadlock with __setattr__
super(Cipher_NULL, self).__setattr__("key", key)
def snapshot(self):
c = self.__class__(self.key)
c.ready = self.ready.copy()
return c
def encrypt(self, data):
return data
def decrypt(self, data):
return data