-
-
Notifications
You must be signed in to change notification settings - Fork 7.5k
/
import_realm.py
1692 lines (1461 loc) · 66.7 KB
/
import_realm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime
import logging
import os
import shutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from mimetypes import guess_type
from typing import Any, Dict, List, Optional, Tuple
import bmemcached
import orjson
from bs4 import BeautifulSoup
from django.conf import settings
from django.core.cache import cache
from django.core.validators import validate_email
from django.db import connection, transaction
from django.utils.timezone import now as timezone_now
from psycopg2.extras import execute_values
from psycopg2.sql import SQL, Identifier
from analytics.models import RealmCount, StreamCount, UserCount
from zerver.actions.realm_settings import do_change_realm_plan_type
from zerver.actions.user_settings import do_change_avatar_fields
from zerver.lib.avatar_hash import user_avatar_path_from_ids
from zerver.lib.bulk_create import bulk_set_users_or_streams_recipient_fields
from zerver.lib.export import DATE_FIELDS, Field, Path, Record, TableData, TableName
from zerver.lib.markdown import markdown_convert
from zerver.lib.markdown import version as markdown_version
from zerver.lib.message import get_last_message_id
from zerver.lib.server_initialization import create_internal_realm, server_initialized
from zerver.lib.streams import render_stream_description
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.upload import upload_backend
from zerver.lib.upload.base import BadImageError, sanitize_name
from zerver.lib.upload.s3 import get_bucket
from zerver.lib.user_groups import create_system_user_groups_for_realm
from zerver.lib.user_message import UserMessageLite, bulk_insert_ums
from zerver.lib.utils import generate_api_key, process_list_in_batches
from zerver.models import (
AlertWord,
Attachment,
BotConfigData,
BotStorageData,
Client,
CustomProfileField,
CustomProfileFieldValue,
DefaultStream,
GroupGroupMembership,
Huddle,
Message,
MutedUser,
Reaction,
Realm,
RealmAuditLog,
RealmAuthenticationMethod,
RealmDomain,
RealmEmoji,
RealmFilter,
RealmPlayground,
RealmUserDefault,
Recipient,
ScheduledMessage,
Service,
Stream,
Subscription,
UserActivity,
UserActivityInterval,
UserGroup,
UserGroupMembership,
UserHotspot,
UserMessage,
UserPresence,
UserProfile,
UserStatus,
UserTopic,
get_huddle_hash,
get_realm,
get_system_bot,
get_user_profile_by_id,
)
realm_tables = [
("zerver_realmauthenticationmethod", RealmAuthenticationMethod, "realmauthenticationmethod"),
("zerver_defaultstream", DefaultStream, "defaultstream"),
("zerver_realmemoji", RealmEmoji, "realmemoji"),
("zerver_realmdomain", RealmDomain, "realmdomain"),
("zerver_realmfilter", RealmFilter, "realmfilter"),
("zerver_realmplayground", RealmPlayground, "realmplayground"),
] # List[Tuple[TableName, Any, str]]
# ID_MAP is a dictionary that maps table names to dictionaries
# that map old ids to new ids. We use this in
# re_map_foreign_keys and other places.
#
# We explicitly initialize ID_MAP with the tables that support
# id re-mapping.
#
# Code reviewers: give these tables extra scrutiny, as we need to
# make sure to reload related tables AFTER we re-map the ids.
ID_MAP: Dict[str, Dict[int, int]] = {
"alertword": {},
"client": {},
"user_profile": {},
"huddle": {},
"realm": {},
"stream": {},
"recipient": {},
"subscription": {},
"defaultstream": {},
"reaction": {},
"realmauthenticationmethod": {},
"realmemoji": {},
"realmdomain": {},
"realmfilter": {},
"realmplayground": {},
"message": {},
"user_presence": {},
"userstatus": {},
"useractivity": {},
"useractivityinterval": {},
"usermessage": {},
"customprofilefield": {},
"customprofilefieldvalue": {},
"attachment": {},
"realmauditlog": {},
"recipient_to_huddle_map": {},
"userhotspot": {},
"usertopic": {},
"muteduser": {},
"service": {},
"usergroup": {},
"usergroupmembership": {},
"groupgroupmembership": {},
"botstoragedata": {},
"botconfigdata": {},
"analytics_realmcount": {},
"analytics_streamcount": {},
"analytics_usercount": {},
"realmuserdefault": {},
"scheduledmessage": {},
}
id_map_to_list: Dict[str, Dict[int, List[int]]] = {
"huddle_to_user_list": {},
}
path_maps: Dict[str, Dict[str, str]] = {
"attachment_path": {},
}
def update_id_map(table: TableName, old_id: int, new_id: int) -> None:
if table not in ID_MAP:
raise Exception(
f"""
Table {table} is not initialized in ID_MAP, which could
mean that we have not thought through circular
dependencies.
"""
)
ID_MAP[table][old_id] = new_id
def fix_datetime_fields(data: TableData, table: TableName) -> None:
for item in data[table]:
for field_name in DATE_FIELDS[table]:
if item[field_name] is not None:
item[field_name] = datetime.datetime.fromtimestamp(
item[field_name], tz=datetime.timezone.utc
)
def fix_upload_links(data: TableData, message_table: TableName) -> None:
"""
Because the URLs for uploaded files encode the realm ID of the
organization being imported (which is only determined at import
time), we need to rewrite the URLs of links to uploaded files
during the import process.
"""
for message in data[message_table]:
if message["has_attachment"] is True:
for key, value in path_maps["attachment_path"].items():
if key in message["content"]:
message["content"] = message["content"].replace(key, value)
if message["rendered_content"]:
message["rendered_content"] = message["rendered_content"].replace(
key, value
)
def fix_streams_can_remove_subscribers_group_column(data: TableData, realm: Realm) -> None:
table = get_db_table(Stream)
admins_group = UserGroup.objects.get(
name=UserGroup.ADMINISTRATORS_GROUP_NAME, realm=realm, is_system_group=True
)
for stream in data[table]:
stream["can_remove_subscribers_group"] = admins_group
def create_subscription_events(data: TableData, realm_id: int) -> None:
"""
When the export data doesn't contain the table `zerver_realmauditlog`,
this function creates RealmAuditLog objects for `subscription_created`
type event for all the existing Stream subscriptions.
This is needed for all the export tools which do not include the
table `zerver_realmauditlog` (Slack, Gitter, etc.) because the appropriate
data about when a user was subscribed is not exported by the third-party
service.
"""
all_subscription_logs = []
event_last_message_id = get_last_message_id()
event_time = timezone_now()
recipient_id_to_stream_id = {
d["id"]: d["type_id"] for d in data["zerver_recipient"] if d["type"] == Recipient.STREAM
}
for sub in data["zerver_subscription"]:
recipient_id = sub["recipient_id"]
stream_id = recipient_id_to_stream_id.get(recipient_id)
if stream_id is None:
continue
user_id = sub["user_profile_id"]
all_subscription_logs.append(
RealmAuditLog(
realm_id=realm_id,
acting_user_id=user_id,
modified_user_id=user_id,
modified_stream_id=stream_id,
event_last_message_id=event_last_message_id,
event_time=event_time,
event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
)
)
RealmAuditLog.objects.bulk_create(all_subscription_logs)
def fix_service_tokens(data: TableData, table: TableName) -> None:
"""
The tokens in the services are created by 'generate_api_key'.
As the tokens are unique, they should be re-created for the imports.
"""
for item in data[table]:
item["token"] = generate_api_key()
def process_huddle_hash(data: TableData, table: TableName) -> None:
"""
Build new huddle hashes with the updated ids of the users
"""
for huddle in data[table]:
user_id_list = id_map_to_list["huddle_to_user_list"][huddle["id"]]
huddle["huddle_hash"] = get_huddle_hash(user_id_list)
def get_huddles_from_subscription(data: TableData, table: TableName) -> None:
"""
Extract the IDs of the user_profiles involved in a huddle from the subscription object
This helps to generate a unique huddle hash from the updated user_profile ids
"""
id_map_to_list["huddle_to_user_list"] = {
value: [] for value in ID_MAP["recipient_to_huddle_map"].values()
}
for subscription in data[table]:
if subscription["recipient"] in ID_MAP["recipient_to_huddle_map"]:
huddle_id = ID_MAP["recipient_to_huddle_map"][subscription["recipient"]]
id_map_to_list["huddle_to_user_list"][huddle_id].append(subscription["user_profile_id"])
def fix_customprofilefield(data: TableData) -> None:
"""
In CustomProfileField with 'field_type' like 'USER', the IDs need to be
re-mapped.
"""
field_type_USER_id_list = []
for item in data["zerver_customprofilefield"]:
if item["field_type"] == CustomProfileField.USER:
field_type_USER_id_list.append(item["id"])
for item in data["zerver_customprofilefieldvalue"]:
if item["field_id"] in field_type_USER_id_list:
old_user_id_list = orjson.loads(item["value"])
new_id_list = re_map_foreign_keys_many_to_many_internal(
table="zerver_customprofilefieldvalue",
field_name="value",
related_table="user_profile",
old_id_list=old_user_id_list,
)
item["value"] = orjson.dumps(new_id_list).decode()
def fix_message_rendered_content(
realm: Realm, sender_map: Dict[int, Record], messages: List[Record]
) -> None:
"""
This function sets the rendered_content of all the messages
after the messages have been imported from a non-Zulip platform.
"""
for message in messages:
if message["rendered_content"] is not None:
# For Zulip->Zulip imports, we use the original rendered
# Markdown; this avoids issues where e.g. a mention can no
# longer render properly because a user has changed their
# name.
#
# However, we still need to update the data-user-id and
# similar values stored on mentions, stream mentions, and
# similar syntax in the rendered HTML.
soup = BeautifulSoup(message["rendered_content"], "html.parser")
user_mentions = soup.findAll("span", {"class": "user-mention"})
if len(user_mentions) != 0:
user_id_map = ID_MAP["user_profile"]
for mention in user_mentions:
if not mention.has_attr("data-user-id"):
# Legacy mentions don't have a data-user-id
# field; we should just import them
# unmodified.
continue
if mention["data-user-id"] == "*":
# No rewriting is required for wildcard mentions
continue
old_user_id = int(mention["data-user-id"])
if old_user_id in user_id_map:
mention["data-user-id"] = str(user_id_map[old_user_id])
message["rendered_content"] = str(soup)
stream_mentions = soup.findAll("a", {"class": "stream"})
if len(stream_mentions) != 0:
stream_id_map = ID_MAP["stream"]
for mention in stream_mentions:
old_stream_id = int(mention["data-stream-id"])
if old_stream_id in stream_id_map:
mention["data-stream-id"] = str(stream_id_map[old_stream_id])
message["rendered_content"] = str(soup)
user_group_mentions = soup.findAll("span", {"class": "user-group-mention"})
if len(user_group_mentions) != 0:
user_group_id_map = ID_MAP["usergroup"]
for mention in user_group_mentions:
old_user_group_id = int(mention["data-user-group-id"])
if old_user_group_id in user_group_id_map:
mention["data-user-group-id"] = str(user_group_id_map[old_user_group_id])
message["rendered_content"] = str(soup)
continue
try:
content = message["content"]
sender_id = message["sender_id"]
sender = sender_map[sender_id]
sent_by_bot = sender["is_bot"]
translate_emoticons = sender["translate_emoticons"]
# We don't handle alert words on import from third-party
# platforms, since they generally don't have an "alert
# words" type feature, and notifications aren't important anyway.
realm_alert_words_automaton = None
rendered_content = markdown_convert(
content=content,
realm_alert_words_automaton=realm_alert_words_automaton,
message_realm=realm,
sent_by_bot=sent_by_bot,
translate_emoticons=translate_emoticons,
).rendered_content
message["rendered_content"] = rendered_content
if "scheduled_timestamp" not in message:
# This logic runs also for ScheduledMessage, which doesn't use
# the rendered_content_version field.
message["rendered_content_version"] = markdown_version
except Exception:
# This generally happens with two possible causes:
# * rendering Markdown throwing an uncaught exception
# * rendering Markdown failing with the exception being
# caught in Markdown (which then returns None, causing the the
# rendered_content assert above to fire).
logging.warning(
"Error in Markdown rendering for message ID %s; continuing", message["id"]
)
def current_table_ids(data: TableData, table: TableName) -> List[int]:
"""
Returns the ids present in the current table
"""
id_list = []
for item in data[table]:
id_list.append(item["id"])
return id_list
def idseq(model_class: Any) -> str:
if model_class == RealmDomain:
return "zerver_realmalias_id_seq"
elif model_class == BotStorageData:
return "zerver_botuserstatedata_id_seq"
elif model_class == BotConfigData:
return "zerver_botuserconfigdata_id_seq"
elif model_class == UserTopic:
# The database table for this model was renamed from `mutedtopic` to
# `usertopic`, but the name of the sequence object remained the same.
return "zerver_mutedtopic_id_seq"
return f"{model_class._meta.db_table}_id_seq"
def allocate_ids(model_class: Any, count: int) -> List[int]:
"""
Increases the sequence number for a given table by the amount of objects being
imported into that table. Hence, this gives a reserved range of IDs to import the
converted Slack objects into the tables.
"""
conn = connection.cursor()
sequence = idseq(model_class)
conn.execute("select nextval(%s) from generate_series(1, %s)", [sequence, count])
query = conn.fetchall() # Each element in the result is a tuple like (5,)
conn.close()
# convert List[Tuple[int]] to List[int]
return [item[0] for item in query]
def convert_to_id_fields(data: TableData, table: TableName, field_name: Field) -> None:
"""
When Django gives us dict objects via model_to_dict, the foreign
key fields are `foo`, but we want `foo_id` for the bulk insert.
This function handles the simple case where we simply rename
the fields. For cases where we need to munge ids in the
database, see re_map_foreign_keys.
"""
for item in data[table]:
item[field_name + "_id"] = item[field_name]
del item[field_name]
def re_map_foreign_keys(
data: TableData,
table: TableName,
field_name: Field,
related_table: TableName,
verbose: bool = False,
id_field: bool = False,
recipient_field: bool = False,
) -> None:
"""
This is a wrapper function for all the realm data tables
and only avatar and attachment records need to be passed through the internal function
because of the difference in data format (TableData corresponding to realm data tables
and List[Record] corresponding to the avatar and attachment records)
"""
# See comments in bulk_import_user_message_data.
assert "usermessage" not in related_table
re_map_foreign_keys_internal(
data[table],
table,
field_name,
related_table,
verbose,
id_field,
recipient_field,
)
def re_map_foreign_keys_internal(
data_table: List[Record],
table: TableName,
field_name: Field,
related_table: TableName,
verbose: bool = False,
id_field: bool = False,
recipient_field: bool = False,
) -> None:
"""
We occasionally need to assign new ids to rows during the
import/export process, to accommodate things like existing rows
already being in tables. See bulk_import_client for more context.
The tricky part is making sure that foreign key references
are in sync with the new ids, and this fixer function does
the re-mapping. (It also appends `_id` to the field.)
"""
lookup_table = ID_MAP[related_table]
for item in data_table:
old_id = item[field_name]
if recipient_field:
if related_table == "stream" and item["type"] == 2:
pass
elif related_table == "user_profile" and item["type"] == 1:
pass
elif related_table == "huddle" and item["type"] == 3:
# save the recipient id with the huddle id, so that we can extract
# the user_profile ids involved in a huddle with the help of the
# subscription object
# check function 'get_huddles_from_subscription'
ID_MAP["recipient_to_huddle_map"][item["id"]] = lookup_table[old_id]
else:
continue
old_id = item[field_name]
if old_id in lookup_table:
new_id = lookup_table[old_id]
if verbose:
logging.info(
"Remapping %s %s from %s to %s", table, field_name + "_id", old_id, new_id
)
else:
new_id = old_id
if not id_field:
item[field_name + "_id"] = new_id
del item[field_name]
else:
item[field_name] = new_id
def re_map_realm_emoji_codes(data: TableData, *, table_name: str) -> None:
"""
Some tables, including Reaction and UserStatus, contain a form of
foreign key reference to the RealmEmoji table in the form of
`str(realm_emoji.id)` when `reaction_type="realm_emoji"`.
See the block comment for emoji_code in the AbstractEmoji
definition for more details.
"""
realm_emoji_dct = {}
for row in data["zerver_realmemoji"]:
realm_emoji_dct[row["id"]] = row
for row in data[table_name]:
if row["reaction_type"] == Reaction.REALM_EMOJI:
old_realm_emoji_id = int(row["emoji_code"])
# Fail hard here if we didn't map correctly here
new_realm_emoji_id = ID_MAP["realmemoji"][old_realm_emoji_id]
# This is a very important sanity check.
realm_emoji_row = realm_emoji_dct[new_realm_emoji_id]
assert realm_emoji_row["name"] == row["emoji_name"]
# Now update emoji_code to the new id.
row["emoji_code"] = str(new_realm_emoji_id)
def re_map_foreign_keys_many_to_many(
data: TableData,
table: TableName,
field_name: Field,
related_table: TableName,
verbose: bool = False,
) -> None:
"""
We need to assign new ids to rows during the import/export
process.
The tricky part is making sure that foreign key references
are in sync with the new ids, and this wrapper function does
the re-mapping only for ManyToMany fields.
"""
for item in data[table]:
old_id_list = item[field_name]
new_id_list = re_map_foreign_keys_many_to_many_internal(
table, field_name, related_table, old_id_list, verbose
)
item[field_name] = new_id_list
del item[field_name]
def re_map_foreign_keys_many_to_many_internal(
table: TableName,
field_name: Field,
related_table: TableName,
old_id_list: List[int],
verbose: bool = False,
) -> List[int]:
"""
This is an internal function for tables with ManyToMany fields,
which takes the old ID list of the ManyToMany relation and returns the
new updated ID list.
"""
lookup_table = ID_MAP[related_table]
new_id_list = []
for old_id in old_id_list:
if old_id in lookup_table:
new_id = lookup_table[old_id]
if verbose:
logging.info(
"Remapping %s %s from %s to %s", table, field_name + "_id", old_id, new_id
)
else:
new_id = old_id
new_id_list.append(new_id)
return new_id_list
def fix_bitfield_keys(data: TableData, table: TableName, field_name: Field) -> None:
for item in data[table]:
item[field_name] = item[field_name + "_mask"]
del item[field_name + "_mask"]
def remove_denormalized_recipient_column_from_data(data: TableData) -> None:
"""
The recipient column shouldn't be imported, we'll set the correct values
when Recipient table gets imported.
"""
for stream_dict in data["zerver_stream"]:
if "recipient" in stream_dict:
del stream_dict["recipient"]
for user_profile_dict in data["zerver_userprofile"]:
if "recipient" in user_profile_dict:
del user_profile_dict["recipient"]
for huddle_dict in data["zerver_huddle"]:
if "recipient" in huddle_dict:
del huddle_dict["recipient"]
def get_db_table(model_class: Any) -> str:
"""E.g. (RealmDomain -> 'zerver_realmdomain')"""
return model_class._meta.db_table
def update_model_ids(model: Any, data: TableData, related_table: TableName) -> None:
table = get_db_table(model)
# Important: remapping usermessage rows is
# not only unnecessary, it's expensive and can cause
# memory errors. We don't even use ids from ID_MAP.
assert "usermessage" not in table
old_id_list = current_table_ids(data, table)
allocated_id_list = allocate_ids(model, len(data[table]))
for item in range(len(data[table])):
update_id_map(related_table, old_id_list[item], allocated_id_list[item])
re_map_foreign_keys(data, table, "id", related_table=related_table, id_field=True)
def bulk_import_user_message_data(data: TableData, dump_file_id: int) -> None:
model = UserMessage
table = "zerver_usermessage"
lst = data[table]
# IMPORTANT NOTE: We do not use any primary id
# data from either the import itself or ID_MAP.
# We let the DB itself generate ids. Note that
# no tables use user_message.id as a foreign key,
# so we can safely avoid all re-mapping complexity.
def process_batch(items: List[Dict[str, Any]]) -> None:
ums = [
UserMessageLite(
user_profile_id=item["user_profile_id"],
message_id=item["message_id"],
flags=item["flags"],
)
for item in items
]
bulk_insert_ums(ums)
chunk_size = 10000
process_list_in_batches(
lst=lst,
chunk_size=chunk_size,
process_batch=process_batch,
)
logging.info("Successfully imported %s from %s[%s].", model, table, dump_file_id)
def bulk_import_model(data: TableData, model: Any, dump_file_id: Optional[str] = None) -> None:
table = get_db_table(model)
# TODO, deprecate dump_file_id
model.objects.bulk_create(model(**item) for item in data[table])
if dump_file_id is None:
logging.info("Successfully imported %s from %s.", model, table)
else:
logging.info("Successfully imported %s from %s[%s].", model, table, dump_file_id)
# Client is a table shared by multiple realms, so in order to
# correctly import multiple realms into the same server, we need to
# check if a Client object already exists, and so we need to support
# remap all Client IDs to the values in the new DB.
def bulk_import_client(data: TableData, model: Any, table: TableName) -> None:
for item in data[table]:
try:
client = Client.objects.get(name=item["name"])
except Client.DoesNotExist:
client = Client.objects.create(name=item["name"])
update_id_map(table="client", old_id=item["id"], new_id=client.id)
def fix_subscriptions_is_user_active_column(
data: TableData, user_profiles: List[UserProfile]
) -> None:
table = get_db_table(Subscription)
user_id_to_active_status = {user.id: user.is_active for user in user_profiles}
for sub in data[table]:
sub["is_user_active"] = user_id_to_active_status[sub["user_profile_id"]]
def process_avatars(record: Dict[str, Any]) -> None:
# We need to re-import upload_backend here, because in the
# import-export unit tests, the Zulip settings are overridden for
# specific tests to control the choice of upload backend, and this
# reimport ensures that we use the right choice for the current
# test. Outside the test suite, settings never change after the
# server is started, so this import will have no effect in production.
from zerver.lib.upload import upload_backend
if record["s3_path"].endswith(".original"):
user_profile = get_user_profile_by_id(record["user_profile_id"])
if settings.LOCAL_AVATARS_DIR is not None:
avatar_path = user_avatar_path_from_ids(user_profile.id, record["realm_id"])
medium_file_path = os.path.join(settings.LOCAL_AVATARS_DIR, avatar_path) + "-medium.png"
if os.path.exists(medium_file_path):
# We remove the image here primarily to deal with
# issues when running the import script multiple
# times in development (where one might reuse the
# same realm ID from a previous iteration).
os.remove(medium_file_path)
try:
upload_backend.ensure_avatar_image(user_profile=user_profile, is_medium=True)
if record.get("importer_should_thumbnail"):
upload_backend.ensure_avatar_image(user_profile=user_profile)
except BadImageError:
logging.warning(
"Could not thumbnail avatar image for user %s; ignoring",
user_profile.id,
)
# Delete the record of the avatar to avoid 404s.
do_change_avatar_fields(
user_profile, UserProfile.AVATAR_FROM_GRAVATAR, acting_user=None
)
def import_uploads(
realm: Realm,
import_dir: Path,
processes: int,
default_user_profile_id: Optional[int] = None,
processing_avatars: bool = False,
processing_emojis: bool = False,
processing_realm_icons: bool = False,
) -> None:
if processing_avatars and processing_emojis:
raise AssertionError("Cannot import avatars and emojis at the same time!")
if processing_avatars:
logging.info("Importing avatars")
elif processing_emojis:
logging.info("Importing emojis")
elif processing_realm_icons:
logging.info("Importing realm icons and logos")
else:
logging.info("Importing uploaded files")
records_filename = os.path.join(import_dir, "records.json")
with open(records_filename, "rb") as records_file:
records: List[Dict[str, Any]] = orjson.loads(records_file.read())
timestamp = datetime_to_timestamp(timezone_now())
re_map_foreign_keys_internal(
records, "records", "realm_id", related_table="realm", id_field=True
)
if not processing_emojis and not processing_realm_icons:
re_map_foreign_keys_internal(
records, "records", "user_profile_id", related_table="user_profile", id_field=True
)
s3_uploads = settings.LOCAL_UPLOADS_DIR is None
if s3_uploads:
if processing_avatars or processing_emojis or processing_realm_icons:
bucket_name = settings.S3_AVATAR_BUCKET
else:
bucket_name = settings.S3_AUTH_UPLOADS_BUCKET
bucket = get_bucket(bucket_name)
count = 0
for record in records:
count += 1
if count % 1000 == 0:
logging.info("Processed %s/%s uploads", count, len(records))
if processing_avatars:
# For avatars, we need to rehash the user ID with the
# new server's avatar salt
relative_path = user_avatar_path_from_ids(record["user_profile_id"], record["realm_id"])
if record["s3_path"].endswith(".original"):
relative_path += ".original"
else:
# TODO: This really should be unconditional. However,
# until we fix the S3 upload backend to use the .png
# path suffix for its normal avatar URLs, we need to
# only do this for the LOCAL_UPLOADS_DIR backend.
if not s3_uploads:
relative_path += ".png"
elif processing_emojis:
# For emojis we follow the function 'upload_emoji_image'
relative_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=record["realm_id"], emoji_file_name=record["file_name"]
)
record["last_modified"] = timestamp
elif processing_realm_icons:
icon_name = os.path.basename(record["path"])
relative_path = os.path.join(str(record["realm_id"]), "realm", icon_name)
record["last_modified"] = timestamp
else:
# This relative_path is basically the new location of the file,
# which will later be copied from its original location as
# specified in record["s3_path"].
relative_path = upload_backend.generate_message_upload_path(
str(record["realm_id"]), sanitize_name(os.path.basename(record["path"]))
)
path_maps["attachment_path"][record["s3_path"]] = relative_path
if s3_uploads:
key = bucket.Object(relative_path)
metadata = {}
if "user_profile_id" not in record:
# This should never happen for uploads or avatars; if
# so, it is an error, default_user_profile_id will be
# None, and we assert. For emoji / realm icons, we
# fall back to default_user_profile_id.
# default_user_profile_id can be None in Gitter
# imports, which do not create any owners; but Gitter
# does not have emoji which we would need to allocate
# a user to.
assert default_user_profile_id is not None
metadata["user_profile_id"] = str(default_user_profile_id)
else:
user_profile_id = int(record["user_profile_id"])
# Support email gateway bot and other cross-realm messages
if user_profile_id in ID_MAP["user_profile"]:
logging.info("Uploaded by ID mapped user: %s!", user_profile_id)
user_profile_id = ID_MAP["user_profile"][user_profile_id]
user_profile = get_user_profile_by_id(user_profile_id)
metadata["user_profile_id"] = str(user_profile.id)
if "last_modified" in record:
metadata["orig_last_modified"] = str(record["last_modified"])
metadata["realm_id"] = str(record["realm_id"])
# Zulip exports will always have a content-type, but third-party exports might not.
content_type = record.get("content_type")
if content_type is None:
content_type = guess_type(record["s3_path"])[0]
if content_type is None:
# This is the default for unknown data. Note that
# for `.original` files, this is the value we'll
# set; that is OK, because those are never served
# directly anyway.
content_type = "application/octet-stream"
key.upload_file(
Filename=os.path.join(import_dir, record["path"]),
ExtraArgs={"ContentType": content_type, "Metadata": metadata},
)
else:
assert settings.LOCAL_UPLOADS_DIR is not None
assert settings.LOCAL_AVATARS_DIR is not None
assert settings.LOCAL_FILES_DIR is not None
if processing_avatars or processing_emojis or processing_realm_icons:
file_path = os.path.join(settings.LOCAL_AVATARS_DIR, relative_path)
else:
file_path = os.path.join(settings.LOCAL_FILES_DIR, relative_path)
orig_file_path = os.path.join(import_dir, record["path"])
os.makedirs(os.path.dirname(file_path), exist_ok=True)
shutil.copy(orig_file_path, file_path)
if processing_avatars:
# Ensure that we have medium-size avatar images for every
# avatar. TODO: This implementation is hacky, both in that it
# does get_user_profile_by_id for each user, and in that it
# might be better to require the export to just have these.
if processes == 1:
for record in records:
process_avatars(record)
else:
connection.close()
_cache = cache._cache # type: ignore[attr-defined] # not in stubs
assert isinstance(_cache, bmemcached.Client)
_cache.disconnect_all()
with ProcessPoolExecutor(max_workers=processes) as executor:
for future in as_completed(
executor.submit(process_avatars, record) for record in records
):
future.result()
# Importing data suffers from a difficult ordering problem because of
# models that reference each other circularly. Here is a correct order.
#
# (Note that this list is not exhaustive and only talks about the main,
# most important models. There's a bunch of minor models that are handled
# separately and not mentioned here - but following the principle that we
# have to import the dependencies first.)
#
# * Client [no deps]
# * Realm [-notifications_stream]
# * UserGroup
# * Stream [only depends on realm]
# * Realm's notifications_stream
# * UserProfile, in order by ID to avoid bot loop issues
# * Now can do all realm_tables
# * Huddle
# * Recipient
# * Subscription
# * Message
# * UserMessage
#
# Because the Python object => JSON conversion process is not fully
# faithful, we have to use a set of fixers (e.g. on DateTime objects
# and foreign keys) to do the import correctly.
def do_import_realm(import_dir: Path, subdomain: str, processes: int = 1) -> Realm:
logging.info("Importing realm dump %s", import_dir)
if not os.path.exists(import_dir):
raise Exception("Missing import directory!")
realm_data_filename = os.path.join(import_dir, "realm.json")
if not os.path.exists(realm_data_filename):
raise Exception("Missing realm.json file!")
if not server_initialized():
create_internal_realm()
logging.info("Importing realm data from %s", realm_data_filename)
with open(realm_data_filename, "rb") as f:
data = orjson.loads(f.read())
# Merge in zerver_userprofile_mirrordummy
data["zerver_userprofile"] = data["zerver_userprofile"] + data["zerver_userprofile_mirrordummy"]
del data["zerver_userprofile_mirrordummy"]
data["zerver_userprofile"].sort(key=lambda r: r["id"])
remove_denormalized_recipient_column_from_data(data)
sort_by_date = data.get("sort_by_date", False)
bulk_import_client(data, Client, "zerver_client")
# We don't import the Stream model yet, since it depends on Realm,
# which isn't imported yet. But we need the Stream model IDs for
# notifications_stream.
update_model_ids(Stream, data, "stream")
re_map_foreign_keys(data, "zerver_realm", "notifications_stream", related_table="stream")
re_map_foreign_keys(data, "zerver_realm", "signup_notifications_stream", related_table="stream")
fix_datetime_fields(data, "zerver_realm")
# Fix realm subdomain information
data["zerver_realm"][0]["string_id"] = subdomain
data["zerver_realm"][0]["name"] = subdomain
update_model_ids(Realm, data, "realm")
# Create the realm, but mark it deactivated for now, while we
# import the supporting data structures, which may take a bit.
realm_properties = dict(**data["zerver_realm"][0])
realm_properties["deactivated"] = True
with transaction.atomic(durable=True):
realm = Realm(**realm_properties)
realm.save()
if "zerver_usergroup" in data:
update_model_ids(UserGroup, data, "usergroup")
re_map_foreign_keys(data, "zerver_usergroup", "realm", related_table="realm")
for setting_name in UserGroup.GROUP_PERMISSION_SETTINGS:
re_map_foreign_keys(
data, "zerver_usergroup", setting_name, related_table="usergroup"
)
bulk_import_model(data, UserGroup)
# We expect Zulip server exports to contain these system groups,
# this logic here is needed to handle the imports from other services.
role_system_groups_dict: Optional[Dict[int, UserGroup]] = None
if "zerver_usergroup" not in data:
role_system_groups_dict = create_system_user_groups_for_realm(realm)
# Email tokens will automatically be randomly generated when the
# Stream objects are created by Django.
fix_datetime_fields(data, "zerver_stream")
re_map_foreign_keys(data, "zerver_stream", "realm", related_table="realm")
if role_system_groups_dict is not None:
# Because the system user groups are missing, we manually set up
# the defaults for can_remove_subscribers_group for all the
# streams.
fix_streams_can_remove_subscribers_group_column(data, realm)
else:
re_map_foreign_keys(