Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Live Publication in Archive on Capture Agent Change #4316

Merged
merged 3 commits into from Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -291,7 +291,7 @@ public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws Liv
// Replace and distribute acl, this creates new mp
MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
// Publish mp to engage search index
publish(newMp);
publishToSearch(newMp);
// Don't leave garbage there!
retractPreviousElements(previousMp, newMp);
logger.info("Updated live acl for media package {}", newMp);
Expand All @@ -303,77 +303,66 @@ public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws Liv
boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
try {
logger.info("Creating live media package {}", mpId);
// Get latest mp from the asset manager
Snapshot snapshot = getSnapshot(mpId);
// Temporary mp
MediaPackage tempMp = (MediaPackage) snapshot.getMediaPackage().clone();
// Set duration (used by live tracks)
setDuration(tempMp, episodeDC);
// Add live tracks to media package
Map<String, Track> generatedTracks = addLiveTracks(tempMp, episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL));
// Add and distribute catalogs/acl, this creates a new mp object
MediaPackage mp = addAndDistributeElements(snapshot);
// Add tracks from tempMp
for (Track t : tempMp.getTracks()) {
mp.add(t);
}
// Publish mp to engage search index
publish(mp);
// Add engage-live publication channel to archived mp
Organization currentOrg = null;
try {
currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
} catch (NotFoundException e) {
logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
Snapshot snapshot = getSnapshotFromArchive(mpId);

// generate live tracks
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

// publish to search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
MediaPackage archivedMp = snapshot.getMediaPackage();
addLivePublicationChannel(currentOrg, archivedMp, generatedTracks);
// Take a snapshot with the publication added and put its version in our local cache
// so that we ignore notifications for this snapshot version.
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
publishToSearch(mpForSearch);

// add live publication to archive
MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}

boolean updateLiveEvent(MediaPackage previousMp, DublinCoreCatalog episodeDC) throws LiveScheduleException {
// Get latest mp from the asset manager
Snapshot snapshot = getSnapshot(previousMp.getIdentifier().toString());
boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
String mpId = mpFromSearch.getIdentifier().toString();
Snapshot snapshot = getSnapshotFromArchive(mpId);

// If the snapshot version is in our local cache, it means that this snapshot was created by us so
// nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
// will be compared below.
if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(previousMp.getIdentifier().toString()))) {
if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
return false;
}
// Temporary mp
MediaPackage tempMp = (MediaPackage) snapshot.getMediaPackage().clone();
// Set duration (used by live tracks)
setDuration(tempMp, episodeDC);
// Add live tracks to media package
Map<String, Track> generatedTracks = addLiveTracks(tempMp, episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL));
// Update tracks in the publication
createOrUpdatePublicationTracks(tempMp, generatedTracks);
// If same mp, no need to do anything
if (isSameMediaPackage(previousMp, tempMp)) {
logger.debug("Live media package {} seems to be the same. Not updating.", previousMp);

// generate new live tracks
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

// if nothing changed, no need to do anything
if (isSameMediaPackage(mpFromSearch, tmpMp)) {
logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
return false;
}
logger.info("Updating live media package {}", previousMp);
// Add and distribute catalogs/acl, this creates a new mp
MediaPackage mp = addAndDistributeElements(snapshot);
// Add tracks from tempMp
for (Track t : tempMp.getTracks()) {
mp.add(t);
}
// Remove publication element that came with the snapshot mp
removeLivePublicationChannel(mp);
// Publish mp to engage search index
publish(mp);
// Publication channel already there so no need to add
// Don't leave garbage there!
retractPreviousElements(previousMp, mp);

logger.info("Updating live media package {}", mpFromSearch);

// update mp in search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
publishToSearch(mpForSearch);
retractPreviousElements(mpFromSearch, mpForSearch); // cleanup

// update live publication in archive
MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KatrinIhler So, this new line 364 and 365 is the crux of the patch to update the publication element in the archive when the live publication track changes. Previously, just the mediapackage top level metadata and catalog is updated but not the publication element, and this updates the publication element as well. Does that sound right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!


Expand All @@ -390,13 +379,14 @@ private void createOrUpdatePublicationTracks(Publication publication, Map<String
}
}

private void createOrUpdatePublicationTracks(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
Publication[] publications = mediaPackage.getPublications();
for (Publication publication : publications) {
if (publication.getChannel().equals(CHANNEL_ID)) {
createOrUpdatePublicationTracks(publication, generatedTracks);
karendolan marked this conversation as resolved.
Show resolved Hide resolved
}
}
return mediaPackage;
}

boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
Expand All @@ -405,7 +395,7 @@ boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
// Get latest mp from the asset manager if there to remove the publication
try {
String mpId = mp.getIdentifier().toString();
Snapshot snapshot = getSnapshot(mpId);
Snapshot snapshot = getSnapshotFromArchive(mpId);
MediaPackage archivedMp = snapshot.getMediaPackage();
removeLivePublicationChannel(archivedMp);
logger.debug("Removed live pub channel from archived media package {}", mp);
Expand All @@ -418,7 +408,7 @@ boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
return true;
}

void publish(MediaPackage mp) throws LiveScheduleException {
void publishToSearch(MediaPackage mp) throws LiveScheduleException {
try {
// Add media package to the search index
logger.info("Publishing LIVE media package {} to search index", mp);
Expand Down Expand Up @@ -494,16 +484,18 @@ MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveSchedul
}
}

void setDuration(MediaPackage mp, DublinCoreCatalog dc) {
void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
long duration = period.getEnd().getTime() - period.getStart().getTime();
mp.setDuration(duration);
logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
mp.getDuration());
}

Map<String, Track> addLiveTracks(MediaPackage mp, String caName) throws LiveScheduleException {
HashMap<String, Track> generatedTracks = new HashMap<String, Track>();
Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
throws LiveScheduleException {
String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
HashMap<String, Track> generatedTracks = new HashMap<>();
String mpId = mp.getIdentifier().toString();
try {
// If capture agent registered the properties:
Expand Down Expand Up @@ -638,7 +630,7 @@ private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateExceptio
return barrier.waitForJobs();
}

Snapshot getSnapshot(String mpId) throws LiveScheduleException {
Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
AQueryBuilder query = assetManager.createQuery();
AResult result = query.select(query.snapshot()).where(query.mediaPackageId(mpId).and(query.version().isLatest()))
.run();
Expand All @@ -654,11 +646,11 @@ Snapshot getSnapshot(String mpId) throws LiveScheduleException {
return record.get().getSnapshot().get();
}

MediaPackage addAndDistributeElements(Snapshot snapshot) throws LiveScheduleException {
MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
try {
MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();

Set<String> elementIds = new HashSet<String>();
Set<String> elementIds = new HashSet<>();
// Then, add series catalog if needed
if (StringUtils.isNotEmpty(mp.getSeries())) {
DublinCoreCatalog catalog = seriesService.getSeries(mp.getSeries());
Expand Down Expand Up @@ -747,11 +739,17 @@ MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList
}
}

void addLivePublicationChannel(
Organization currentOrg,
MediaPackage mp,
Map<String, Track> generatedTracks
) throws LiveScheduleException {
MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
throws LiveScheduleException {
MediaPackage mp = snapshot.getMediaPackage();

Organization currentOrg = null;
try {
currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
} catch (NotFoundException e) {
logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
}

logger.debug("Adding live channel publication element to media package {}", mp);
String engageUrlString = null;
if (currentOrg != null) {
Expand All @@ -771,6 +769,7 @@ void addLivePublicationChannel(
MimeTypes.parseMimeType("text/html"));
mp.add(publicationElement);
createOrUpdatePublicationTracks(publicationElement, generatedTracks);
return mp;
} catch (URISyntaxException e) {
throw new LiveScheduleException(e);
}
Expand Down