@@ -902,12 +902,18 @@ def init_csrf_secret(cls) -> None:
902902 base64 .urlsafe_b64encode (os .urandom (20 )))
903903
904904 @classmethod
905- def _create_token (cls , user_id : Optional [str ], issued_on : float ) -> str :
905+ def _create_token (
906+ cls , user_id : Optional [str ], issued_on : float ,
907+ nonce : Optional [str ] = None ) -> str :
906908 """Creates a new CSRF token.
907909
908910 Args:
909911 user_id: str|None. The user_id for which the token is generated.
910912 issued_on: float. The timestamp at which the token was issued.
913+ nonce: str|None. A token that is never reused to prevent reply
914+ attacks. This argument should only be provided when validating a
915+ received CSRF token, in which case the nonce in the received
916+ token should be provided here.
911917
912918 Returns:
913919 str. The generated CSRF token.
@@ -923,19 +929,31 @@ def _create_token(cls, user_id: Optional[str], issued_on: float) -> str:
923929 # Round time to seconds.
924930 issued_on_str = str (int (issued_on ))
925931
932+ # Generate a nonce (number used once) to ensure that even two
933+ # consecutive calls to the same endpoint in the same second generate
934+ # different tokens. Note that this nonce is just for anti-collision
935+ # purposes, so it's okay that the nonce is stored in the CSRF token and
936+ # therefore can be controlled by an attacker. See OWASP guidance here:
937+ # https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#signed-double-submit-cookie.
938+ if nonce is None :
939+ nonce = base64 .urlsafe_b64encode (os .urandom (20 )).decode ('utf-8' )
940+
926941 digester = hmac .new (
927942 key = CSRF_SECRET .value .encode ('utf-8' ),
928- digestmod = 'md5 '
943+ digestmod = 'sha256 '
929944 )
930945 digester .update (user_id .encode ('utf-8' ))
931946 digester .update (b':' )
932947 digester .update (issued_on_str .encode ('utf-8' ))
948+ digester .update (b':' )
949+ digester .update (nonce .encode ('utf-8' ))
933950
934951 digest = digester .digest ()
935952 # The b64encode returns bytes, so we first need to decode the returned
936953 # bytes to string.
937- token = '%s/%s' % (
938- issued_on_str , base64 .urlsafe_b64encode (digest ).decode ('utf-8' ))
954+ token = '%s/%s/%s' % (
955+ issued_on_str , nonce ,
956+ base64 .urlsafe_b64encode (digest ).decode ('utf-8' ))
939957
940958 return token
941959
@@ -973,16 +991,20 @@ def is_csrf_token_valid(cls, user_id: Optional[str], token: str) -> bool:
973991 """
974992 try :
975993 parts = token .split ('/' )
976- if len (parts ) != 2 :
994+ if len (parts ) != 3 :
977995 return False
978996
979997 issued_on = int (parts [0 ])
980998 age = cls ._get_current_time () - issued_on
981999 if age > cls ._CSRF_TOKEN_AGE_SECS :
9821000 return False
9831001
984- authentic_token = cls ._create_token (user_id , issued_on )
985- if authentic_token == token :
1002+ nonce = parts [1 ]
1003+
1004+ authentic_token = cls ._create_token (user_id , issued_on , nonce )
1005+ if hmac .compare_digest (
1006+ authentic_token .encode ('utf-8' ), token .encode ('utf-8' )
1007+ ):
9861008 return True
9871009
9881010 return False
0 commit comments