-
Notifications
You must be signed in to change notification settings - Fork 113
/
path_cache.py
1736 lines (1414 loc) · 73.8 KB
/
path_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2013 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.
"""
Methods relating to the Path cache, a central repository where metadata about
all Tank items in the file system are kept.
"""
import collections
import sqlite3
import sys
import os
import itertools
# use api json to cover py 2.5
# todo - replace with proper external library
from tank_vendor import shotgun_api3
json = shotgun_api3.shotgun.json
from .platform.engine import show_global_busy, clear_global_busy
from . import constants
from .errors import TankError
from . import LogManager
from .util.login import get_current_user
# Shotgun field definitions to store the path cache data
SHOTGUN_ENTITY = "FilesystemLocation"
SG_ENTITY_FIELD = "entity"
SG_PATH_FIELD = "path"
SG_METADATA_FIELD = "configuration_metadata"
SG_IS_PRIMARY_FIELD = "is_primary"
SG_ENTITY_ID_FIELD = "linked_entity_id"
SG_ENTITY_TYPE_FIELD = "linked_entity_type"
SG_ENTITY_NAME_FIELD = "code"
SG_PIPELINE_CONFIG_FIELD = "pipeline_configuration"
log = LogManager.get_logger(__name__)
class PathCache(object):
"""
A global cache which holds the mapping between a shotgun entity and a location on disk.
NOTE! This uses sqlite and the db is typically hosted on an NFS storage.
Ensure that the code is developed with the constraints that this entails in mind.
"""
# sqlite has a limit for how many items fit into a single in statement
SQLITE_MAX_ITEMS_FOR_IN_STATEMENT = 200
# To avoid paging, we batch queries of FilesystemLocation entities
# in chunks of 500 and combine the results. The performance issues
# around this have been largely alleviated in Shotgun 7.4.x and the
# accompanying shotgun_api3 that was released at the same time, but
# we still want to batch at the old page length of 500 to boost
# performance when older SG or API versions are used. We should
# eventually raise this to 5000 (or more) when we feel it is safe
# to do so.
SHOTGUN_ENTITY_QUERY_BATCH_SIZE = 500
def __init__(self, tk):
"""
Constructor.
:param tk: Toolkit API instance
"""
self._connection = None
self._tk = tk
self._sync_with_sg = tk.pipeline_configuration.get_shotgun_path_cache_enabled()
if tk.pipeline_configuration.has_associated_data_roots():
self._path_cache_disabled = False
self._init_db()
self._roots = tk.pipeline_configuration.get_data_roots()
else:
# no primary location found. Path cache therefore does not exist!
# go into a no-path-cache-mode
self._path_cache_disabled = True
def _init_db(self):
"""
Sets up the database
"""
# first, make way for the path cache file. This call
# will ensure that there is a valid folder and file on
# disk, created with all the right permissions etc.
path_cache_file = self._get_path_cache_location()
self._connection = sqlite3.connect(path_cache_file)
# this is to handle unicode properly - make sure that sqlite returns
# str objects for TEXT fields rather than unicode. Note that any unicode
# objects that are passed into the database will be automatically
# converted to UTF-8 strs, so this text_factory guarantees that any character
# representation will work for any language, as long as data is either input
# as UTF-8 (byte string) or unicode. And in the latter case, the returned data
# will always be unicode.
self._connection.text_factory = str
c = self._connection.cursor()
try:
# get a list of tables in the current database
ret = c.execute("SELECT name FROM main.sqlite_master WHERE type='table';")
table_names = [x[0] for x in ret.fetchall()]
if len(table_names) == 0:
# we have a brand new database. Create all tables and indices
# note that because some clients are writing to NFS storage, we
# up the default page size somewhat (from 4k -> 8k) to improve
# performance. See https://sqlite.org/pragma.html#pragma_page_size
c.executescript("""
PRAGMA page_size=8192;
CREATE TABLE path_cache (entity_type text, entity_id integer, entity_name text, root text, path text, primary_entity integer);
CREATE INDEX path_cache_entity ON path_cache(entity_type, entity_id);
CREATE INDEX path_cache_path ON path_cache(root, path, primary_entity);
CREATE UNIQUE INDEX path_cache_all ON path_cache(entity_type, entity_id, root, path, primary_entity);
CREATE TABLE event_log_sync (last_id integer);
CREATE TABLE shotgun_status (path_cache_id integer, shotgun_id integer);
CREATE UNIQUE INDEX shotgun_status_id ON shotgun_status(path_cache_id);
CREATE INDEX shotgun_status_shotgun_id ON shotgun_status(shotgun_id);
""")
self._connection.commit()
else:
# we have an existing database! Ensure it is up to date
if "event_log_sync" not in table_names:
# this is a pre-0.15 setup where the path cache does not have event log sync
c.executescript("CREATE TABLE event_log_sync (last_id integer);")
self._connection.commit()
if "shotgun_status" not in table_names:
# this is a pre-0.15 setup where the path cache does not have the shotgun_status table
c.executescript("""CREATE TABLE shotgun_status (path_cache_id integer, shotgun_id integer);
CREATE UNIQUE INDEX shotgun_status_id ON shotgun_status(path_cache_id);""")
self._connection.commit()
# now ensure that some key fields that have been added during the dev cycle are there
ret = c.execute("PRAGMA table_info(path_cache)")
field_names = [x[1] for x in ret.fetchall()]
# check for primary entity field - this was added back in 0.12.x
if "primary_entity" not in field_names:
c.executescript("""
ALTER TABLE path_cache ADD COLUMN primary_entity integer;
UPDATE path_cache SET primary_entity=1;
DROP INDEX IF EXISTS path_cache_path;
CREATE INDEX IF NOT EXISTS path_cache_path ON path_cache(root, path, primary_entity);
DROP INDEX IF EXISTS path_cache_all;
CREATE UNIQUE INDEX IF NOT EXISTS path_cache_all ON path_cache(entity_type, entity_id, root, path, primary_entity);
""")
self._connection.commit()
finally:
c.close()
def _get_path_cache_location(self):
"""
Creates the path cache file and returns its location on disk.
:returns: The path to the path cache file
"""
if self._tk.pipeline_configuration.get_shotgun_path_cache_enabled():
# 0.15+ path cache setup - call out to a core hook to determine
# where the path cache should be located.
path = self._tk.execute_core_hook_method(
constants.CACHE_LOCATION_HOOK_NAME,
"get_path_cache_path",
project_id=self._tk.pipeline_configuration.get_project_id(),
# Do NOT use the plugin_id as a suffix for the path cache folder. This would
# prevent a path cache sync from an engine to be reused by another plugiin.
plugin_id=None,
pipeline_configuration_id=self._tk.pipeline_configuration.get_shotgun_id()
)
else:
# old (v0.14) style path cache
# fall back on the 0.14 setting, where the path cache
# is located in a tank folder in the project root
path = os.path.join(self._tk.pipeline_configuration.get_primary_data_root(),
"tank",
"cache",
"path_cache.db")
# first check that the cache folder exists
# note that the cache folder is inside of the tank folder
# so no need to attempt a recursive creation here.
cache_folder = os.path.dirname(path)
if not os.path.exists(cache_folder):
old_umask = os.umask(0)
try:
os.mkdir(cache_folder, 0o777)
finally:
os.umask(old_umask)
# now try to write a placeholder file with open permissions
if not os.path.exists(path):
old_umask = os.umask(0)
try:
fh = open(path, "wb")
fh.close()
os.chmod(path, 0o666)
finally:
os.umask(old_umask)
return path
def _path_to_dbpath(self, relative_path):
"""
converts a relative path to a db path form
/foo/bar --> /foo/bar
\foo\bar --> /foo/bar
"""
# normalize the path before checking the project
# some tools on windows the / others \
# normalize
norm_path = relative_path.replace(os.sep, "/")
return norm_path
def _separate_root(self, full_path):
"""
Determines project root path and relative path.
:returns: root_name, relative_path
"""
n_path = full_path.replace(os.sep, "/")
# Deterimine which root
root_name = None
relative_path = None
for cur_root_name, root_path in self._roots.items():
n_root = root_path.replace(os.sep, "/")
if n_path.lower().startswith(n_root.lower()):
root_name = cur_root_name
# chop off root
relative_path = full_path[len(root_path):]
break
if not root_name:
storages_str = ",".join( self._roots.values() )
raise TankError("The path '%s' could not be split up into a project centric path for "
"any of the storages %s that are associated with this "
"project." % (full_path, storages_str))
return root_name, relative_path
def _dbpath_to_path(self, root_path, dbpath):
"""
converts a dbpath to path for the local platform
linux: /foo/bar --> /studio/proj/foo/bar
windows: /foo/bar --> \\studio\proj\foo\bar
:param root_path: Project root path
:param db_path: Relative path
"""
# first make sure dbpath doesn't start with a /
if dbpath.startswith("/"):
dbpath = dbpath[1:]
# convert slashes
path_sep = dbpath.replace("/", os.sep)
# and join with root
full_path = os.path.join(root_path, path_sep)
return os.path.normpath(full_path)
def close(self):
"""
Close the database connection.
"""
if self._connection is not None:
self._connection.close()
self._connection = None
############################################################################################
# shotgun synchronization (SG data pushed into path cache database)
def synchronize(self, full_sync=False):
"""
Ensure the local path cache is in sync with Shotgun.
If the method decides to do a full sync, it will attempt to
launch the busy overlay window.
:param full_sync: Boolean to indicate that a full sync should be carried out.
:returns: A list of remote items which were detected, created remotely
and not existing in this path cache. These are returned as a list of
dictionaries, each containing keys:
- entity
- metadata
- path
"""
if self._path_cache_disabled:
log.debug("This project does not have any associated folders.")
return []
if not self._sync_with_sg:
log.debug("Folder synchronization is turned off for this project.")
return []
c = self._connection.cursor()
try:
# check if we should do a full sync
if full_sync:
return self._do_full_sync(c)
# first get the last synchronized event log event.
res = c.execute("SELECT max(last_id) FROM event_log_sync")
# get first item in the data set
data = list(res)[0]
log.debug("Path cache sync tracking marker in local sqlite db: %r" % data)
# expect back something like [(249660,)] for a running cache and [(None,)] for a clear
if len(data) != 1 or data[0] is None:
# we should do a full sync
return self._do_full_sync(c)
# we have an event log id - so check if there are any more recent events
event_log_id = data[0]
# note! We search for all events greater than the prev event_log_id-1.
# this way, the first record returned should be the last record that was
# synced. This is a way of detecting that the event log chain is not broken.
# it could break for example if someone has culled the event log table and in
# that case we should fall back on a full sync.
log.debug(
"Fetching create/delete folder event log "
"entries >= id %s for project %s..." % (event_log_id, self._get_project_link())
)
# note that we return the records in ascending order, meaning that they get
# "played back" in the same order as they were created.
#
# for a non-truncated event log table, the first record returned
# by this query should be the last one previously processed by the
# path cache (via the event_log_id variable)
response = self._tk.shotgun.find(
"EventLogEntry",
[["event_type", "in", ["Toolkit_Folders_Create", "Toolkit_Folders_Delete"]],
["id", "greater_than", (event_log_id - 1)],
["project", "is", self._get_project_link()]
],
["id", "meta", "event_type"],
[{"field_name": "id", "direction": "asc"}]
)
log.debug("Got %s event log entries" % len(response))
# count creation and deletion entries
num_deletions = 0
num_creations = 0
for r in response:
if r["event_type"] == "Toolkit_Folders_Create":
num_creations += 1
if r["event_type"] == "Toolkit_Folders_Delete":
num_deletions += 1
log.debug("Event log contains %s creations and %s deletions" % (num_creations, num_deletions))
if len(response) == 0:
# nothing in event log. Probably a truncated setup.
log.debug("No sync information in the event log. Falling back on a full sync.")
return self._do_full_sync(c)
elif response[0]["id"] != event_log_id:
# there is either no event log data at all or a gap
# in the event log. Assume that some culling has occured and
# fall back on a full sync
log.debug(
"Local path cache tracking marker is %s. "
"First event log id returned is %s. It looks "
"like the event log has been truncated, so falling back "
"on a full sync." % (event_log_id, response[0]["id"])
)
return self._do_full_sync(c)
elif len(response) == 1 and response[0]["id"] == event_log_id:
# nothing has changed since the last sync
log.debug("Path cache syncing not necessary - local folders already up to date!")
return []
elif num_creations > 0 or num_deletions > 0:
# we have a complete trail of increments.
# note that we skip the current entity.
log.debug("Full event log history traced. Running incremental sync.")
return self._do_incremental_sync(c, response[1:])
else:
# should never be here
raise Exception("Unknown error - please contact support.")
finally:
c.close()
def _upload_cache_data_to_shotgun(self, data, event_log_desc):
"""
Takes a standard chunk of Shotgun data and uploads it to Shotgun
using a single batch statement. Then writes a single event log entry record
which binds the created path records. Returns the id of this event log record.
data needs to be a list of dicts with the following keys:
- entity - std sg entity dict with name, id and type
- primary - boolean to indicate if something is primary
- metadata - metadata dict
- path - local os path
- path_cache_row_id - the path cache db row id for the entry
:param data: List of dicts. See details above.
:param event_log_desc: Description to add to the event log entry created.
:returns: A tuple with (event_log_id, sg_id_lookup)
- event_log_id is the id for the event log entry which summarizes the
creation event.
- sg_id_lookup is a dictionary where the keys are path cache row ids
and the values are the newly created corresponding shotgun ids.
"""
if self._tk.pipeline_configuration.is_unmanaged():
# no pipeline config for this one
pc_link = None
else:
pc_link = {
"type": "PipelineConfiguration",
"id": self._tk.pipeline_configuration.get_shotgun_id()
}
sg_batch_data = []
for d in data:
# get a name for the clickable url in the path field
# this will include the name of the storage
root_name, relative_path = self._separate_root(d["path"])
db_path = self._path_to_dbpath(relative_path)
path_display_name = "[%s] %s" % (root_name, db_path)
req = {"request_type":"create",
"entity_type": SHOTGUN_ENTITY,
"data": {"project": self._get_project_link(),
"created_by": get_current_user(self._tk),
SG_ENTITY_FIELD: d["entity"],
SG_IS_PRIMARY_FIELD: d["primary"],
SG_PIPELINE_CONFIG_FIELD: pc_link,
SG_METADATA_FIELD: json.dumps(d["metadata"]),
SG_ENTITY_ID_FIELD: d["entity"]["id"],
SG_ENTITY_TYPE_FIELD: d["entity"]["type"],
SG_ENTITY_NAME_FIELD: d["entity"]["name"],
SG_PATH_FIELD: { "local_path": d["path"], "name": path_display_name }
} }
sg_batch_data.append(req)
# push to shotgun in a single xact
log.debug("Uploading %s path entries to Shotgun..." % len(sg_batch_data))
try:
response = self._tk.shotgun.batch(sg_batch_data)
except Exception as e:
raise TankError("Critical! Could not update Shotgun with folder "
"data. Please contact support. Error details: %s" % e)
# now create a dictionary where input path cache rowid (path_cache_row_id)
# is mapped to the shotgun ids that were just created
def _rowid_from_path(path):
for d in data:
if d["path"] == path:
return d["path_cache_row_id"]
raise TankError("Could not resolve row id for path! Please contact support! "
"trying to resolve path '%s'. Source data set: %s" % (path, data))
rowid_sgid_lookup = {}
for sg_obj in response:
sg_id = sg_obj["id"]
pc_row_id = _rowid_from_path( sg_obj[SG_PATH_FIELD]["local_path"] )
rowid_sgid_lookup[pc_row_id] = sg_id
# now register the created ids in the event log
# this will later on be read by the synchronization
# now, based on the entities we just created, assemble a metadata chunk that
# the sync calls can use later on.
meta = {}
# the api version used is always useful to know
meta["core_api_version"] = self._tk.version
# shotgun ids created
meta["sg_folder_ids"] = [ x["id"] for x in response]
sg_event_data = {}
sg_event_data["event_type"] = "Toolkit_Folders_Create"
sg_event_data["description"] = "Toolkit %s: %s" % (self._tk.version, event_log_desc)
sg_event_data["project"] = self._get_project_link()
sg_event_data["entity"] = pc_link
sg_event_data["meta"] = meta
sg_event_data["user"] = get_current_user(self._tk)
try:
log.debug("Creating event log entry %s" % sg_event_data)
response = self._tk.shotgun.create("EventLogEntry", sg_event_data)
except Exception as e:
raise TankError("Critical! Could not update Shotgun with folder data event log "
"history marker. Please contact support. Error details: %s" % e)
# return the event log id which represents this uploaded slab
return (response["id"], rowid_sgid_lookup)
def _get_project_link(self):
"""
Returns the project link dictionary.
:returns: If we have a site configuration, None will be returned. Otherwise, a dictionary
with keys "type" and "id" will be returned.
"""
if self._tk.pipeline_configuration.is_site_configuration():
return None
else:
return {
"type": "Project",
"id": self._tk.pipeline_configuration.get_project_id()
}
def _do_full_sync(self, cursor):
"""
Ensure the local path cache is in sync with Shotgun.
Returns a list of remote items which were detected, created remotely
and not existing in this path cache. These are returned as a list of
dictionaries, each containing keys:
- entity
- metadata
- path
:param cursor: Sqlite database cursor
"""
show_global_busy("Hang on, Toolkit is preparing folders...",
("Toolkit is retrieving folder listings from Shotgun and ensuring that your "
"setup is up to date. Hang tight while data is being downloaded..."))
try:
log.debug("Performing a complete Shotgun folder sync...")
# find the max event log id. we will store this in the sync db later.
sg_data = self._tk.shotgun.find_one(
"EventLogEntry",
[["event_type", "in", ["Toolkit_Folders_Create", "Toolkit_Folders_Delete"]],
["project", "is", self._get_project_link()]
],
["id"],
[{"field_name": "id", "direction": "desc"}]
)
if sg_data is None:
# event log was wiped or we haven't done any folder operations
max_event_log_id = 0
else:
max_event_log_id = sg_data["id"]
data = self._replay_folder_entities(cursor, max_event_log_id)
finally:
clear_global_busy()
return data
@classmethod
def remove_filesystem_location_entries(cls, tk, path_ids):
"""
Removes FilesystemLocation entries from the path cache.
:param list path_ids: List of FilesystemLocation ids to remove.
"""
sg_batch_data = []
for pid in path_ids:
req = {"request_type": "delete",
"entity_type": SHOTGUN_ENTITY,
"entity_id": pid}
sg_batch_data.append(req)
try:
tk.shotgun.batch(sg_batch_data)
except Exception as e:
raise TankError("Shotgun reported an error while attempting to delete FilesystemLocation entities. "
"Please contact support. Details: %s Data: %s" % (e, sg_batch_data))
# now register the deleted ids in the event log
# this will later on be read by the synchronization
# now, based on the entities we just deleted, assemble a metadata chunk that
# the sync calls can use later on.
if tk.pipeline_configuration.is_unmanaged():
pc_link = None
else:
pc_link = {
"type": "PipelineConfiguration",
"id": tk.pipeline_configuration.get_shotgun_id()
}
if tk.pipeline_configuration.is_site_configuration():
project_link = None
else:
project_link = {"type": "Project", "id": tk.pipeline_configuration.get_project_id()}
meta = {}
# the api version used is always useful to know
meta["core_api_version"] = tk.version
# shotgun ids created
meta["sg_folder_ids"] = path_ids
sg_event_data = {}
sg_event_data["event_type"] = "Toolkit_Folders_Delete"
sg_event_data["description"] = "Toolkit %s: Unregistered %s folders." % (tk.version, len(path_ids))
sg_event_data["project"] = project_link
sg_event_data["entity"] = pc_link
sg_event_data["meta"] = meta
sg_event_data["user"] = get_current_user(tk)
try:
tk.shotgun.create("EventLogEntry", sg_event_data)
except Exception as e:
raise TankError("Shotgun Reported an error while trying to write a Toolkit_Folders_Delete event "
"log entry after having successfully removed folders. Please contact support for "
"assistance. Error details: %s Data: %s" % (e, sg_event_data))
def _do_incremental_sync(self, cursor, sg_data):
"""
Ensure the local path cache is in sync with Shotgun.
Patch the existing cache with the events passed via sg_data.
Assumptions:
- sg_data list always contains some entries
- sg_data list only contains Toolkit_Folders_Create records
This is a list of dicts ordered by id from low to high (old to new),
each with keys
- id
- meta
- attribute_name
Example of items:
{'event_type': 'Toolkit_Folders_Create',
'meta': {'core_api_version': 'HEAD',
'sg_folder_ids': [123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133]},
'type': 'EventLogEntry',
'id': 249240}
:param cursor: Sqlite database cursor
:param sg_data: see details above
:returns: A list of remote items which were detected, created remotely
and not existing in this path cache. These are returned as a list of
dictionaries, each containing keys:
- entity
- metadata
- path
"""
if len(sg_data) == 0:
return []
log.debug("Begin replaying FilesystemLocation entities locally...")
# find the max event log id in sg_data. We will store this in the sync db later.
max_event_log_id = max([x["id"] for x in sg_data])
created_folder_ids = []
for d in sg_data:
log.debug("Looking at event log entry %s" % d)
if d["event_type"] == "Toolkit_Folders_Create":
# this is a creation request! Replay it on our database
created_folder_ids.extend(d["meta"]["sg_folder_ids"])
log.debug("Event log analysis complete.")
log.debug("Doing an incremental sync.")
# Retrieve all the newly created folders and rewire the result so it can be indexed by id.
created_folder_entities = self._get_filesystem_location_entities(created_folder_ids)
created_folder_entities = dict(
(entity["id"], entity) for entity in created_folder_entities
)
new_items = []
for event in sg_data:
sg_folder_ids = event["meta"].get("sg_folder_ids")
if event["event_type"] == "Toolkit_Folders_Delete":
# Remove all the entries associated with that event.
self._remove_filesystem_location_entities(cursor, sg_folder_ids)
elif event["event_type"] == "Toolkit_Folders_Create":
# For every folder in the create event.
for folder_id in sg_folder_ids:
# If the entry is actually part of the end result, we'll add it!
if folder_id in created_folder_entities:
new_item = self._import_filesystem_location_entry(cursor, created_folder_entities[folder_id])
# If the entry was actually imported.
if new_item:
new_items.append(new_item)
self._update_last_event_log_synced(cursor, max_event_log_id)
self._connection.commit()
# run the actual sync - and at the end, inser the event_log_sync data marker
# into the database to show where to start syncing from next time.
return new_items
def _get_filesystem_location_entities(self, folder_ids):
"""
Retrieves filesystem location entities from Shotgun.
:param list folder_ids: List of ids of entities to retrieve. If None, every entry is returned.
:returns: List of FilesystemLocation entity dictionaries with keys:
- id
- type
- configuration_metadata
- is_primary
- linked_entity_id
- path
- linked_entity_type
- code
"""
# get the ids that are missing from shotgun
batches = []
# We check specifically for a None here because it is a valid
# use case to pass in an empty list of folder ids and get nothing
# back in return as a result. Only in the case where we were
# specifically given folder_ids=None would we fall back on
# collecting all FilesystemLocation entities for the project.
if folder_ids is not None:
entity_filter = [["id", "in"]]
batch_count = 0
# Note: we batch the queries here. We want to avoid paging
# for performance purposes when dealing with huge numbers
# of entities (thousands+).
for folder_id in folder_ids:
if batch_count >= self.SHOTGUN_ENTITY_QUERY_BATCH_SIZE:
batches.append(entity_filter)
entity_filter = [["id", "in"]]
batch_count = 0
entity_filter[0].append(folder_id)
batch_count += 1
# Take care of the last batch, which is likely below our
# batch size and needs to be tacked onto the end.
if batch_count > 0:
batches.append(entity_filter)
log.debug(
"Getting FilesystemLocation entries for "
"the following ids: %s", folder_ids
)
else:
project_entity = self._get_project_link()
entity_filter = [["project", "is", project_entity]]
batches.append(entity_filter)
log.debug("Getting all the project's FilesystemLocation entries. "
"Project id: %s" % project_entity['id'])
sg_data = []
for batched_filter in batches:
sg_data.extend(
self._tk.shotgun.find(
SHOTGUN_ENTITY,
batched_filter,
[
"id",
SG_METADATA_FIELD,
SG_IS_PRIMARY_FIELD,
SG_ENTITY_ID_FIELD,
SG_PATH_FIELD,
SG_ENTITY_TYPE_FIELD,
SG_ENTITY_NAME_FIELD
],
[{"field_name": "id", "direction": "asc"}]
)
)
log.debug("...Retrieved %s records.", len(sg_data))
return sg_data
def _replay_folder_entities(self, cursor, max_event_log_id):
"""
Downloads all the filesystem location entities from Shotgun and repopulates the
path cache with them.
Lastly, this method updates the event_log_sync marker in the sqlite database
that tracks what the most recent event log id was being synced.
:param cursor: Sqlite database cursor
:param max_event_log_id: max event log marker to write to the path
cache database after a full operation.
:returns: A list of remote items which were detected, created remotely
and not existing in this path cache. These are returned as a list of
dictionaries, each containing keys:
- entity
- metadata
- path
"""
log.debug("Fetching already registered folders from Shotgun...")
sg_data = self._get_filesystem_location_entities(folder_ids=None)
# complete sync - clear our tables first
log.debug("Full sync - clearing local sqlite path cache tables...")
cursor.execute("DELETE FROM event_log_sync")
cursor.execute("DELETE FROM shotgun_status")
cursor.execute("DELETE FROM path_cache")
return_data = []
for x in sg_data:
imported_data = self._import_filesystem_location_entry(cursor, x)
if imported_data:
return_data.append(imported_data)
# lastly, save the id of this event log entry for purpose of future syncing
# note - we don't maintain a list of event log entries but just a single
# value in the db, so start by clearing the table.
self._update_last_event_log_synced(cursor, max_event_log_id)
self._connection.commit()
return return_data
def _update_last_event_log_synced(self, cursor, event_log_id):
"""
Saves into the db the last event processed from Shotgun.
:param cursor: Database cursor.
:type cursor: :class:`sqlite3.Cursor`
:param int event_log_id: New last event log
"""
log.debug("Inserting path cache marker %s in the sqlite db" % event_log_id)
cursor.execute("DELETE FROM event_log_sync")
cursor.execute("INSERT INTO event_log_sync(last_id) VALUES(?)", (event_log_id, ))
def _import_filesystem_location_entry(self, cursor, fsl_entity):
"""
Imports a single filesystem location into the path cache.
:param cursor: Database cursor.
:type :class:`sqlite3.Cursor`
:param dict fsl_entry: Filesystem location entity dictionary with keys:
- id
- type
- configuration_metadata
- is_primary
- linked_entity_id
- path
- linked_entity_type
- code
"""
# get entity data from our entry
entity = {"id": fsl_entity[SG_ENTITY_ID_FIELD],
"name": fsl_entity[SG_ENTITY_NAME_FIELD],
"type": fsl_entity[SG_ENTITY_TYPE_FIELD]}
is_primary = fsl_entity[SG_IS_PRIMARY_FIELD]
# note! If a local storage which is associated with a path is retired,
# parts of the entity data returned by shotgun will be omitted.
#
# A valid, active path entry will be on the form:
# {'id': 653,
# 'path': {'content_type': None,
# 'id': 2186,
# 'link_type': 'local',
# 'local_path': '/Volumes/xyz/proj1/sequences/aaa',
# 'local_path_linux': '/Volumes/xyz/proj1/sequences/aaa',
# 'local_path_mac': '/Volumes/xyz/proj1/sequences/aaa',
# 'local_path_windows': None,
# 'local_storage': {'id': 2,
# 'name': 'primary',
# 'type': 'LocalStorage'},
# 'name': '[primary] /sequences/aaa',
# 'type': 'Attachment',
# 'url': 'file:///Volumes/xyz/proj1/sequences/aaa'},
# 'type': 'FilesystemLocation'},
#
# With a retired storage, the returned data from the SG API is
# {'id': 646,
# 'path': {'content_type': None,
# 'id': 2141,
# 'link_type': 'local',
# 'local_storage': None,
# 'name': '[primary] /sequences/aaa/missing',
# 'type': 'Attachment'},
# 'type': 'FilesystemLocation'},
#
# no path at all - this is an anomaly but handle it gracefully regardless
if fsl_entity[SG_PATH_FIELD] is None:
log.debug("No path associated with entry for %s. Skipping." % entity)
return None
# retired storage case - see above for details
if fsl_entity[SG_PATH_FIELD].get("local_storage") is None:
log.debug("The storage for the path for %s has been deleted. Skipping." % entity)
return None
# get the local path from our attachment entity dict
sg_local_storage_os_map = {"linux2": "local_path_linux",
"win32": "local_path_windows",
"darwin": "local_path_mac"}
local_os_path_field = sg_local_storage_os_map[sys.platform]
local_os_path = fsl_entity[SG_PATH_FIELD].get(local_os_path_field)
# if the storage is not correctly configured for an OS, it is possible
# that the path comes back as null. Skip such paths and report them in the log.
if local_os_path is None:
log.debug("No local os path associated with entry for %s. Skipping." % entity)
return None
# if the path cannot be split up into a root_name and a leaf path
# using the roots.yml file, log a warning and continue. This can happen
# if roots files and storage setups change half-way through a project,
# or if roots files are not in sync with the main storage definition
# in this case, we want to just warn and skip rather than raise
# an exception which will stop execution entirely.
try:
root_name, relative_path = self._separate_root(local_os_path)
except TankError as e:
log.debug("Could not resolve storages - skipping: %s" % e)
return None
# all validation checks seem ok - go ahead and make the changes.
new_rowid = self._add_db_mapping(cursor, local_os_path, entity, is_primary)
if new_rowid:
# something was inserted into the db!
# because this record came from shotgun, insert a record in the
# shotgun_status table to indicate that this record exists in sg
cursor.execute("INSERT INTO shotgun_status(path_cache_id, shotgun_id) "
"VALUES(?, ?)", (new_rowid, fsl_entity["id"]))
# and add this entry to our list of new things that we will return later on.
return {
"entity": entity,
"path": local_os_path,
"metadata": SG_METADATA_FIELD
}
else:
# Note: edge case - for some reason there was already an entry in the path cache
# representing this. This could be because of duplicate entries and is
# not necessarily an anomaly. It could also happen because a previos sync failed
# at some point half way through.
log.debug("Found existing record for '%s', %s. Skipping." % (local_os_path, entity))
return None
def _gen_param_string(self, items):
"""
Creates a parametring string for SQL list based on the number of items in a list.
If there are three items in the list, ?,?,? will be generated, If there is 5 items in
the list, ?,?,?,?,? will be generated, and so on.
:param list: Items for which we require a parameter string.
"""
# Adapted from http://stackoverflow.com/a/1310001/1074536
return ",".join(itertools.repeat("?", len(items)))
def _remove_filesystem_location_entities(self, cursor, folder_ids):
"""
Removes all the requested filesystem locations from the path cache.