Skip to content

Commit

Permalink
Merge branch 'update-live-publication-12.x' of KatrinIhler/opencast i…
Browse files Browse the repository at this point in the history
…nto r/12.x

Pull request #4472
  Update Live Publication in Archive on Capture Agent Change (12.x version)
  • Loading branch information
gregorydlogan committed Jan 16, 2023
2 parents a31cda0 + 6e15312 commit 16c0f5f
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws Liv
// Replace and distribute acl, this creates new mp
MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
// Publish mp to engage search index
publish(newMp);
publishToSearch(newMp);
// Don't leave garbage there!
retractPreviousElements(previousMp, newMp);
logger.info("Updated live acl for media package {}", newMp);
Expand All @@ -316,77 +316,66 @@ public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws Liv
boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
try {
logger.info("Creating live media package {}", mpId);
// Get latest mp from the asset manager
Snapshot snapshot = getSnapshot(mpId);
// Temporary mp
MediaPackage tempMp = (MediaPackage) snapshot.getMediaPackage().clone();
// Set duration (used by live tracks)
setDuration(tempMp, episodeDC);
// Add live tracks to media package
Map<String, Track> generatedTracks = addLiveTracks(tempMp, episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL));
// Add and distribute catalogs/acl, this creates a new mp object
MediaPackage mp = addAndDistributeElements(snapshot);
// Add tracks from tempMp
for (Track t : tempMp.getTracks()) {
mp.add(t);
}
// Publish mp to engage search index
publish(mp);
// Add engage-live publication channel to archived mp
Organization currentOrg = null;
try {
currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
} catch (NotFoundException e) {
logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
Snapshot snapshot = getSnapshotFromArchive(mpId);

// generate live tracks
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

// publish to search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
MediaPackage archivedMp = snapshot.getMediaPackage();
addLivePublicationChannel(currentOrg, archivedMp, generatedTracks);
// Take a snapshot with the publication added and put its version in our local cache
// so that we ignore notifications for this snapshot version.
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
publishToSearch(mpForSearch);

// add live publication to archive
MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}

boolean updateLiveEvent(MediaPackage previousMp, DublinCoreCatalog episodeDC) throws LiveScheduleException {
// Get latest mp from the asset manager
Snapshot snapshot = getSnapshot(previousMp.getIdentifier().toString());
boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
String mpId = mpFromSearch.getIdentifier().toString();
Snapshot snapshot = getSnapshotFromArchive(mpId);

// If the snapshot version is in our local cache, it means that this snapshot was created by us so
// nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
// will be compared below.
if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(previousMp.getIdentifier().toString()))) {
if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
return false;
}
// Temporary mp
MediaPackage tempMp = (MediaPackage) snapshot.getMediaPackage().clone();
// Set duration (used by live tracks)
setDuration(tempMp, episodeDC);
// Add live tracks to media package
Map<String, Track> generatedTracks = addLiveTracks(tempMp, episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL));
// Update tracks in the publication
createOrUpdatePublicationTracks(tempMp, generatedTracks);
// If same mp, no need to do anything
if (isSameMediaPackage(previousMp, tempMp)) {
logger.debug("Live media package {} seems to be the same. Not updating.", previousMp);

// generate new live tracks
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

// if nothing changed, no need to do anything
if (isSameMediaPackage(mpFromSearch, tmpMp)) {
logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
return false;
}
logger.info("Updating live media package {}", previousMp);
// Add and distribute catalogs/acl, this creates a new mp
MediaPackage mp = addAndDistributeElements(snapshot);
// Add tracks from tempMp
for (Track t : tempMp.getTracks()) {
mp.add(t);
}
// Remove publication element that came with the snapshot mp
removeLivePublicationChannel(mp);
// Publish mp to engage search index
publish(mp);
// Publication channel already there so no need to add
// Don't leave garbage there!
retractPreviousElements(previousMp, mp);

logger.info("Updating live media package {}", mpFromSearch);

// update mp in search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
publishToSearch(mpForSearch);
retractPreviousElements(mpFromSearch, mpForSearch); // cleanup

// update live publication in archive
MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
}

Expand All @@ -403,13 +392,14 @@ private void createOrUpdatePublicationTracks(Publication publication, Map<String
}
}

private void createOrUpdatePublicationTracks(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
Publication[] publications = mediaPackage.getPublications();
for (Publication publication : publications) {
if (publication.getChannel().equals(CHANNEL_ID)) {
createOrUpdatePublicationTracks(publication, generatedTracks);
}
}
return mediaPackage;
}

boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
Expand All @@ -418,7 +408,7 @@ boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
// Get latest mp from the asset manager if there to remove the publication
try {
String mpId = mp.getIdentifier().toString();
Snapshot snapshot = getSnapshot(mpId);
Snapshot snapshot = getSnapshotFromArchive(mpId);
MediaPackage archivedMp = snapshot.getMediaPackage();
removeLivePublicationChannel(archivedMp);
logger.debug("Removed live pub channel from archived media package {}", mp);
Expand All @@ -431,7 +421,7 @@ boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
return true;
}

void publish(MediaPackage mp) throws LiveScheduleException {
void publishToSearch(MediaPackage mp) throws LiveScheduleException {
try {
// Add media package to the search index
logger.info("Publishing LIVE media package {} to search index", mp);
Expand Down Expand Up @@ -507,16 +497,18 @@ MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveSchedul
}
}

void setDuration(MediaPackage mp, DublinCoreCatalog dc) {
void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
long duration = period.getEnd().getTime() - period.getStart().getTime();
mp.setDuration(duration);
logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
mp.getDuration());
}

Map<String, Track> addLiveTracks(MediaPackage mp, String caName) throws LiveScheduleException {
HashMap<String, Track> generatedTracks = new HashMap<String, Track>();
Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
throws LiveScheduleException {
String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
HashMap<String, Track> generatedTracks = new HashMap<>();
String mpId = mp.getIdentifier().toString();
try {
// If capture agent registered the properties:
Expand Down Expand Up @@ -645,7 +637,7 @@ private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateExceptio
return barrier.waitForJobs();
}

Snapshot getSnapshot(String mpId) throws LiveScheduleException {
Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
AQueryBuilder query = assetManager.createQuery();
AResult result = query.select(query.snapshot()).where(query.mediaPackageId(mpId).and(query.version().isLatest()))
.run();
Expand All @@ -661,11 +653,11 @@ Snapshot getSnapshot(String mpId) throws LiveScheduleException {
return record.get().getSnapshot().get();
}

MediaPackage addAndDistributeElements(Snapshot snapshot) throws LiveScheduleException {
MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
try {
MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();

Set<String> elementIds = new HashSet<String>();
Set<String> elementIds = new HashSet<>();
// Then, add series catalog if needed
if (StringUtils.isNotEmpty(mp.getSeries())) {
DublinCoreCatalog catalog = seriesService.getSeries(mp.getSeries());
Expand Down Expand Up @@ -754,11 +746,17 @@ MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList
}
}

void addLivePublicationChannel(
Organization currentOrg,
MediaPackage mp,
Map<String, Track> generatedTracks
) throws LiveScheduleException {
MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
throws LiveScheduleException {
MediaPackage mp = snapshot.getMediaPackage();

Organization currentOrg = null;
try {
currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
} catch (NotFoundException e) {
logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
}

logger.debug("Adding live channel publication element to media package {}", mp);
String engageUrlString = null;
if (currentOrg != null) {
Expand All @@ -778,6 +776,7 @@ void addLivePublicationChannel(
MimeTypes.parseMimeType("text/html"));
mp.add(publicationElement);
createOrUpdatePublicationTracks(publicationElement, generatedTracks);
return mp;
} catch (URISyntaxException e) {
throw new LiveScheduleException(e);
}
Expand Down
Loading

0 comments on commit 16c0f5f

Please sign in to comment.