Skip to content

Commit

Permalink
Merge pull request #127 from nineinchnick/data-attrs
Browse files Browse the repository at this point in the history
mark data as pii or creds and check encryption at rest
  • Loading branch information
izar committed Dec 24, 2020
2 parents 3333e6f + eaed853 commit fbf177b
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## New features

- Allow to mark data as PII or credentials and check if it's protected [#127](https://github.com/izar/pytm/pull/127)
- Added '--levels' - every element now has a 'levels' attribute, a list of integers denoting different DFD levels for rendering
- Added HTML docs using pdoc [#110](https://github.com/izar/pytm/pull/110)
- Added `checksDestinationRevocation` attribute to account for certificate revocation checks [#109](https://github.com/izar/pytm/pull/109)
Expand Down
2 changes: 2 additions & 0 deletions pytm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"TM",
"Action",
"Lambda",
"Lifetime",
"Threat",
"Classification",
"Data",
Expand All @@ -33,6 +34,7 @@
Element,
ExternalEntity,
Lambda,
Lifetime,
Process,
Server,
SetOfProcesses,
Expand Down
75 changes: 75 additions & 0 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ def __set__(self, instance, value):
super().__set__(instance, value)


class varLifetime(var):
def __set__(self, instance, value):
if not isinstance(value, Lifetime):
raise ValueError("expecting a Lifetime, got a {}".format(type(value)))
super().__set__(instance, value)


class varData(var):
def __set__(self, instance, value):
if isinstance(value, str):
Expand Down Expand Up @@ -235,6 +242,26 @@ class Classification(OrderedEnum):
TOP_SECRET = 5


class Lifetime(Enum):
# not applicable
NONE = "NONE"
# unknown lifetime
UNKNOWN = "UNKNOWN"
# relatively short expiration date (time to live)
SHORT = "SHORT_LIVED"
# long or no expiration date
LONG = "LONG_LIVED"
# no expiration date but revoked/invalidated automatically in some conditions
AUTO = "AUTO_REVOKABLE"
# no expiration date but can be invalidated manually
MANUAL = "MANUALLY_REVOKABLE"
# cannot be invalidated at all
HARDCODED = "HARDCODED"

def label(self):
return self.value.lower().replace("_", " ")


def _sort(flows, addOrder=False):
ordered = sorted(flows, key=lambda flow: flow.order)
if not addOrder:
Expand Down Expand Up @@ -336,6 +363,21 @@ def _apply_defaults(flows, data):
e._safeset("authenticatesDestination", e.source.authenticatesDestination)
e._safeset("checksDestinationRevocation", e.source.checksDestinationRevocation)

for d in e.data:
if d.isStored:
if hasattr(e.sink, "isEncryptedAtRest"):
for d in e.data:
d._safeset("isDestEncryptedAtRest", e.sink.isEncryptedAtRest)
if hasattr(e.source, "isEncryptedAtRest"):
for d in e.data:
d._safeset(
"isSourceEncryptedAtRest", e.source.isEncryptedAtRest
)
if d.credentialsLife != Lifetime.NONE and not d.isCredentials:
d._safeset("isCredentials", True)
if d.isCredentials and d.credentialsLife == Lifetime.NONE:
d._safeset("credentialsLife", Lifetime.UNKNOWN)

outputs[e.source].append(e)
inputs[e.sink].append(e)

Expand Down Expand Up @@ -1014,6 +1056,38 @@ class Data:
required=True,
doc="Level of classification for this piece of data",
)
isPII = varBool(
False,
doc="""Does the data contain personally identifyable information.
Should always be encrypted both in transmission and at rest.""",
)
isCredentials = varBool(
False,
doc="""Does the data contain authentication information,
like passwords or cryptographic keys, with or without expiration date.
Should always be encrypted in transmission. If stored, they should be hashed
using a cryptographic hash function.""",
)
credentialsLife = varLifetime(
Lifetime.NONE,
doc="""Credentials lifetime, describing if and how
credentials can be revoked. One of:
* NONE - not applicable
* UNKNOWN - unknown lifetime
* SHORT - relatively short expiration date, with an allowed maximum
* LONG - long or no expiration date
* AUTO - no expiration date but can be revoked/invalidated automatically
in some conditions
* MANUAL - no expiration date but can be revoked/invalidated manually
* HARDCODED - cannot be invalidated at all""",
)
isStored = varBool(
False,
doc="""Is the data going to be stored by the target or only processed.
If only derivative data is stored (a hash) it can be set to False.""",
)
isDestEncryptedAtRest = varBool(False, doc="Is data encrypted at rest at dest")
isSourceEncryptedAtRest = varBool(False, doc="Is data encrypted at rest at source")
carriedBy = varElements([], doc="Dataflows that carries this piece of data")
processedBy = varElements([], doc="Elements that store/process this piece of data")

Expand Down Expand Up @@ -1199,6 +1273,7 @@ class Datastore(Asset):
must be able to access only the information and resources
that are necessary for its legitimate purpose.""",
)
isEncryptedAtRest = varBool(False, doc="Stored data is encrypted at rest")

def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
Expand Down
30 changes: 30 additions & 0 deletions pytm/threatlib/threats.json
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,36 @@
"mitigations": "All data should be encrypted in transit. All PII and restricted data must be encrypted at rest. If a service is storing credentials used to authenticate users or incoming connections, it must only store hashes of them created using cryptographic functions, so it is only possible to compare them against user input, without fully decoding them. If a client is storing credentials in either files or other data store, access to them must be as restrictive as possible, including using proper file permissions, database users with restricted access or separate storage.",
"example": "An application, which connects to a database without TLS, performs a database query in which it compares the password to a stored hash, instead of fetching the hash and comparing it locally.",
"references": "https://cwe.mitre.org/data/definitions/311.html, https://cwe.mitre.org/data/definitions/312.html, https://cwe.mitre.org/data/definitions/916.html, https://cwe.mitre.org/data/definitions/653.html"
},
{
"SID": "DR01",
"target": [
"Dataflow"
],
"description": "Unprotected Sensitive Data",
"details": "An attacker can access data in transit or at rest that is not sufficiently protected. If an attacker can decrypt a stored password, it might be used to authenticate against different services.",
"Likelihood Of Attack": "Low",
"severity": "High",
"prerequisites": "",
"condition": "(target.hasDataLeaks() or any(d.isCredentials or d.isPII for d in target.data)) and (not target.isEncrypted or (not target.isResponse and any(d.isStored and d.isDestEncryptedAtRest for d in target.data)) or (target.isResponse and any(d.isStored and d.isSourceEncryptedAtRest for d in target.data)))",
"mitigations": "All data should be encrypted in transit. All PII and restricted data must be encrypted at rest. If a service is storing credentials used to authenticate users or incoming connections, it must only store hashes of them created using cryptographic functions, so it is only possible to compare them against user input, without fully decoding them. If a client is storing credentials in either files or other data store, access to them must be as restrictive as possible, including using proper file permissions, database users with restricted access or separate storage.",
"example": "",
"references": "https://cwe.mitre.org/data/definitions/311.html, https://cwe.mitre.org/data/definitions/312.html, https://cwe.mitre.org/data/definitions/916.html, https://cwe.mitre.org/data/definitions/653.html"
},
{
"SID": "AC22",
"target": [
"Dataflow"
],
"description": "Credentials Aging",
"details": "If no mechanism is in place for managing credentials (passwords and certificates) aging, users will have no incentive to update passwords or rotate certificates in a timely manner. Allowing password aging to occur unchecked or long certificate expiration dates can result in the possibility of diminished password integrity.",
"Likelihood Of Attack": "Medium",
"severity": "High",
"prerequisites": "",
"condition": "any(d.isCredentials for d in target.data) and target.sink.inScope and any(d.credentialsLife in (Lifetime.UNKNOWN, Lifetime.LONG, Lifetime.MANUAL, Lifetime.HARDCODED) for d in target.data)",
"mitigations": "All passwords and other credentials should have a relatively short expiration date with a possibility to be revoked immediately under special circumstances.",
"example": "",
"references": "https://cwe.mitre.org/data/definitions/262.html, https://cwe.mitre.org/data/definitions/263.html, https://cwe.mitre.org/data/definitions/798.html"
}
]

7 changes: 7 additions & 0 deletions tests/output.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
"User enters comments (*)"
],
"classification": "Classification.PUBLIC",
"credentialsLife": "Lifetime.NONE",
"description": "",
"isCredentials": false,
"isDestEncryptedAtRest": false,
"isPII": false,
"isSourceEncryptedAtRest": false,
"isStored": false,
"name": "auth cookie",
"processedBy": [
"User",
Expand Down Expand Up @@ -259,6 +265,7 @@
"Query for tasks"
],
"isEncrypted": false,
"isEncryptedAtRest": false,
"isHardened": false,
"isResilient": false,
"isSQL": true,
Expand Down
23 changes: 23 additions & 0 deletions tests/test_pytmfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
Action,
Actor,
Boundary,
Data,
Dataflow,
Datastore,
ExternalEntity,
Lambda,
Lifetime,
Process,
Server,
Threat,
Expand Down Expand Up @@ -1194,3 +1196,24 @@ def test_AC21(self):
process1.verifySessionIdentifiers = False
threat = threats["AC21"]
self.assertTrue(threat.apply(process1))

def test_AC22(self):
user = Actor("User")
web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.data = Data(
"password", isCredentials=True, credentialsLife=Lifetime.HARDCODED
)
user_to_web.protocol = "HTTPS"
user_to_web.isEncrypted = True
threat = threats["AC22"]
self.assertTrue(threat.apply(user_to_web))

def test_DR01(self):
web = Server("Web Server")
db = Datastore("Database")
insert = Dataflow(web, db, "Insert query")
insert.data = Data("ssn", isPII=True, isStored=True)
insert.isEncrypted = False
threat = threats["DR01"]
self.assertTrue(threat.apply(insert))

0 comments on commit fbf177b

Please sign in to comment.