/
metadata.rb
146 lines (121 loc) · 4.7 KB
/
metadata.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# frozen_string_literal: true
require "time"
require "active_support/json"
require_relative "serializer_with_fallback"
module ActiveSupport
module Messages # :nodoc:
module Metadata # :nodoc:
singleton_class.attr_accessor :use_message_serializer_for_metadata
ENVELOPE_SERIALIZERS = [
*SerializerWithFallback::SERIALIZERS.values,
ActiveSupport::JSON,
::JSON,
Marshal,
]
TIMESTAMP_SERIALIZERS = [
SerializerWithFallback::SERIALIZERS.fetch(:message_pack),
SerializerWithFallback::SERIALIZERS.fetch(:message_pack_allow_marshal),
]
ActiveSupport.on_load(:message_pack) do
ENVELOPE_SERIALIZERS << ActiveSupport::MessagePack
TIMESTAMP_SERIALIZERS << ActiveSupport::MessagePack
end
private
def serialize_with_metadata(data, **metadata)
has_metadata = metadata.any? { |k, v| v }
if has_metadata && !use_message_serializer_for_metadata?
data_string = serialize_to_json_safe_string(data)
envelope = wrap_in_metadata_legacy_envelope({ "message" => data_string }, **metadata)
serialize_to_json(envelope)
else
data = wrap_in_metadata_envelope({ "data" => data }, **metadata) if has_metadata
serialize(data)
end
end
def deserialize_with_metadata(message, **expected_metadata)
if dual_serialized_metadata_envelope_json?(message)
envelope = deserialize_from_json(message)
extracted = extract_from_metadata_envelope(envelope, **expected_metadata)
deserialize_from_json_safe_string(extracted["message"])
else
deserialized = deserialize(message)
if metadata_envelope?(deserialized)
extract_from_metadata_envelope(deserialized, **expected_metadata)["data"]
elsif expected_metadata.none? { |k, v| v }
deserialized
else
throw :invalid_message_content, "missing metadata"
end
end
end
def use_message_serializer_for_metadata?
Metadata.use_message_serializer_for_metadata && Metadata::ENVELOPE_SERIALIZERS.include?(serializer)
end
def wrap_in_metadata_envelope(hash, expires_at: nil, expires_in: nil, purpose: nil)
expiry = pick_expiry(expires_at, expires_in)
hash["exp"] = expiry if expiry
hash["pur"] = purpose.to_s if purpose
{ "_rails" => hash }
end
def wrap_in_metadata_legacy_envelope(hash, expires_at: nil, expires_in: nil, purpose: nil)
expiry = pick_expiry(expires_at, expires_in)
hash["exp"] = expiry
hash["pur"] = purpose
{ "_rails" => hash }
end
def extract_from_metadata_envelope(envelope, purpose: nil)
hash = envelope["_rails"]
if hash["exp"] && Time.now.utc >= parse_expiry(hash["exp"])
throw :invalid_message_content, "expired"
end
if hash["pur"].to_s != purpose.to_s
throw :invalid_message_content, "mismatched purpose"
end
hash
end
def metadata_envelope?(object)
object.is_a?(Hash) && object.key?("_rails")
end
def dual_serialized_metadata_envelope_json?(string)
string.start_with?('{"_rails":{"message":')
end
def pick_expiry(expires_at, expires_in)
expiry = if expires_at
expires_at.utc
elsif expires_in
Time.now.utc.advance(seconds: expires_in)
end
unless Metadata::TIMESTAMP_SERIALIZERS.include?(serializer)
expiry = expiry&.iso8601(3)
end
expiry
end
def parse_expiry(expires_at)
if !expires_at.is_a?(String)
expires_at
elsif ActiveSupport.use_standard_json_time_format
Time.iso8601(expires_at)
else
Time.parse(expires_at)
end
end
def serialize_to_json(data)
ActiveSupport::JSON.encode(data)
end
def deserialize_from_json(serialized)
ActiveSupport::JSON.decode(serialized)
rescue ::JSON::ParserError => error
# Throw :invalid_message_format instead of :invalid_message_serialization
# because here a parse error is due to a bad message rather than an
# incompatible `self.serializer`.
throw :invalid_message_format, error
end
def serialize_to_json_safe_string(data)
encode(serialize(data), url_safe: false)
end
def deserialize_from_json_safe_string(string)
deserialize(decode(string, url_safe: false))
end
end
end
end