Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions backend/docs/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

\restrict dummy

-- Dumped from database version 15.15 (Debian 15.15-1.pgdg13+1)
-- Dumped by pg_dump version 16.11 (Debian 16.11-1.pgdg13+1)
-- Dumped from database version 15.17 (Debian 15.17-1.pgdg13+1)
-- Dumped by pg_dump version 16.13 (Debian 16.13-1.pgdg13+1)

SET statement_timeout = 0;
SET lock_timeout = 0;
Expand Down Expand Up @@ -427,7 +427,8 @@ CREATE TABLE public.sequence_entries (
released_at timestamp without time zone,
is_revocation boolean DEFAULT false NOT NULL,
original_data jsonb,
version_comment text
version_comment text,
unprocessed_data jsonb
);


Expand Down Expand Up @@ -467,7 +468,7 @@ CREATE VIEW public.sequence_entries_view AS
se.submitted_at,
se.released_at,
se.is_revocation,
se.original_data,
se.unprocessed_data,
se.version_comment,
sepd.started_processing_at,
sepd.finished_processing_at,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,12 @@ typealias OriginalData<SequenceType> = OriginalDataInternal<SequenceType, FileId
typealias OriginalDataWithFileUrls<SequenceType> =
OriginalDataInternal<SequenceType, FileIdAndNameAndReadUrl>

data class AccessionVersionOriginalMetadata(
data class AccessionVersionUnprocessedMetadata(
override val accession: Accession,
override val version: Version,
val submitter: String,
val isRevocation: Boolean,
val originalMetadata: Map<String, String?>?,
val unprocessedMetadata: Map<String, String?>?,
) : AccessionVersionInterface

enum class Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,11 @@ open class SubmissionController(
size,
)

@Operation(description = "Retrieve original metadata of submitted accession versions.")
@Operation(description = "Retrieve unprocessed metadata of submitted accession versions.")
@ResponseStatus(HttpStatus.OK)
@ApiResponse(
responseCode = "200",
description = GET_ORIGINAL_METADATA_RESPONSE_DESCRIPTION,
description = GET_UNPROCESSED_METADATA_RESPONSE_DESCRIPTION,
headers = [
Header(
name = X_TOTAL_RECORDS,
Expand All @@ -410,8 +410,8 @@ open class SubmissionController(
),
],
)
@GetMapping("/get-original-metadata", produces = [MediaType.APPLICATION_JSON_VALUE])
fun getOriginalMetadata(
@GetMapping("/get-unprocessed-metadata", produces = [MediaType.APPLICATION_JSON_VALUE])
fun getUnprocessedMetadata(
@PathVariable @Valid organism: Organism,
@Parameter(
description = "The metadata fields that should be returned. If not provided, all fields are returned.",
Expand All @@ -431,7 +431,7 @@ open class SubmissionController(
headers.add(HttpHeaders.CONTENT_ENCODING, compression.compressionName)
}

val totalRecords = submissionDatabaseService.countOriginalMetadata(
val totalRecords = submissionDatabaseService.countUnprocessedMetadata(
authenticatedUser,
organism,
groupIdsFilter?.takeIf { it.isNotEmpty() },
Expand All @@ -444,8 +444,8 @@ open class SubmissionController(
// We just need to make sure the etag used is from before the count
// Alternatively, we could read once to file while counting and then stream the file

val streamBody = streamTransactioned(compression, endpoint = "get-original-metadata", organism = organism) {
submissionDatabaseService.streamOriginalMetadata(
val streamBody = streamTransactioned(compression, endpoint = "get-unprocessed-metadata", organism = organism) {
submissionDatabaseService.streamUnprocessedMetadata(
authenticatedUser,
organism,
groupIdsFilter?.takeIf { it.isNotEmpty() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ Releasable accession versions.
The schema is to be understood per line of the NDJSON stream.
"""

const val GET_ORIGINAL_METADATA_RESPONSE_DESCRIPTION = """
The original metadata of submission sequence versions as NDJSON where each line is a flat JSON object where the values
const val GET_UNPROCESSED_METADATA_RESPONSE_DESCRIPTION = """
The unprocessed metadata of submission sequence versions as NDJSON where each line is a flat JSON object where the values
are all strings (or null).
"""
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.jetbrains.exposed.sql.vendors.ForUpdateOption.PostgreSQL.ForUpdate
import org.jetbrains.exposed.sql.vendors.ForUpdateOption.PostgreSQL.MODE
import org.loculus.backend.api.AccessionVersion
import org.loculus.backend.api.AccessionVersionInterface
import org.loculus.backend.api.AccessionVersionOriginalMetadata
import org.loculus.backend.api.AccessionVersionUnprocessedMetadata
import org.loculus.backend.api.ApproveDataScope
import org.loculus.backend.api.DataUseTerms
import org.loculus.backend.api.DataUseTermsType
Expand Down Expand Up @@ -167,7 +167,7 @@ class SubmissionDatabaseService(
.select(
table.accessionColumn,
table.versionColumn,
table.originalDataColumn,
table.unprocessedDataColumn,
table.submissionIdColumn,
table.submitterColumn,
table.groupIdColumn,
Expand All @@ -193,7 +193,7 @@ class SubmissionDatabaseService(
.map { chunk ->
val chunkOfUnprocessedData = chunk.map {
val originalData = compressionService.decompressSequencesInOriginalData(
it[table.originalDataColumn]!!,
it[table.unprocessedDataColumn]!!,
)
val originalDataWithFileUrls = OriginalDataWithFileUrls(
originalData.metadata,
Expand Down Expand Up @@ -1127,7 +1127,7 @@ class SubmissionDatabaseService(
SequenceEntriesTable.accessionVersionIsIn(listOf(editedSequenceEntryData))
},
) {
it[originalDataColumn] = compressionService
it[unprocessedDataColumn] = compressionService
.compressSequencesInOriginalData(editedSequenceEntryData.data, organism)
}

Expand Down Expand Up @@ -1164,7 +1164,7 @@ class SubmissionDatabaseService(
SequenceEntriesView.groupIdColumn,
SequenceEntriesView.statusColumn,
SequenceEntriesView.processedDataColumn,
SequenceEntriesView.originalDataColumn,
SequenceEntriesView.unprocessedDataColumn,
SequenceEntriesView.errorsColumn,
SequenceEntriesView.warningsColumn,
SequenceEntriesView.isRevocationColumn,
Expand All @@ -1189,7 +1189,7 @@ class SubmissionDatabaseService(
organism,
),
originalData = compressionService.decompressSequencesInOriginalData(
selectedSequenceEntry[SequenceEntriesView.originalDataColumn]!!,
selectedSequenceEntry[SequenceEntriesView.unprocessedDataColumn]!!,
),
errors = selectedSequenceEntry[SequenceEntriesView.errorsColumn],
warnings = selectedSequenceEntry[SequenceEntriesView.warningsColumn],
Expand All @@ -1215,7 +1215,7 @@ class SubmissionDatabaseService(
)
}

private fun originalMetadataFilter(
private fun unprocessedMetadataFilter(
authenticatedUser: AuthenticatedUser,
organism: Organism,
groupIdsFilter: List<Int>?,
Expand All @@ -1233,15 +1233,15 @@ class SubmissionDatabaseService(
return conditions
}

fun countOriginalMetadata(
fun countUnprocessedMetadata(
authenticatedUser: AuthenticatedUser,
organism: Organism,
groupIdsFilter: List<Int>?,
statusesFilter: List<Status>?,
): Long = SequenceEntriesView
.selectAll()
.where(
originalMetadataFilter(
unprocessedMetadataFilter(
authenticatedUser,
organism,
groupIdsFilter,
Expand All @@ -1250,28 +1250,28 @@ class SubmissionDatabaseService(
)
.count()

fun streamOriginalMetadata(
fun streamUnprocessedMetadata(
authenticatedUser: AuthenticatedUser,
organism: Organism,
groupIdsFilter: List<Int>?,
statusesFilter: List<Status>?,
fields: List<String>?,
): Sequence<AccessionVersionOriginalMetadata> {
val originalMetadata = SequenceEntriesView.originalDataColumn
): Sequence<AccessionVersionUnprocessedMetadata> {
val unprocessedMetadata = SequenceEntriesView.unprocessedDataColumn
// It's actually <Map<String, String>?> but exposed does not support nullable types here
.extract<Map<String, String>>("metadata")
.alias("original_metadata")
.alias("unprocessed_metadata")

return SequenceEntriesView
.select(
originalMetadata,
unprocessedMetadata,
SequenceEntriesView.accessionColumn,
SequenceEntriesView.versionColumn,
SequenceEntriesView.submitterColumn,
SequenceEntriesView.isRevocationColumn,
)
.where(
originalMetadataFilter(
unprocessedMetadataFilter(
authenticatedUser,
organism,
groupIdsFilter,
Expand All @@ -1283,10 +1283,10 @@ class SubmissionDatabaseService(
.map {
// Revoked sequences have no original metadata, hence null can happen
@Suppress("USELESS_ELVIS")
val metadata = it[originalMetadata] ?: null
val metadata = it[unprocessedMetadata] ?: null
val selectedMetadata = fields?.associateWith { field -> metadata?.get(field) }
?: metadata
AccessionVersionOriginalMetadata(
AccessionVersionUnprocessedMetadata(
it[SequenceEntriesView.accessionColumn],
it[SequenceEntriesView.versionColumn],
it[SequenceEntriesView.submitterColumn],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class UploadDatabaseService(
submitter,
group_id,
submitted_at,
unprocessed_data,
original_data
)
SELECT
Expand All @@ -217,20 +218,23 @@ class UploadDatabaseService(
m.submission_id,
m.submitter,
m.group_id,
m.uploaded_at,
jsonb_build_object(
'metadata', m.metadata,
'files', m.files,
'unalignedNucleotideSequences',
COALESCE(x.seq_map, '{}'::jsonb)
)
m.uploaded_at,
built.data,
built.data
FROM metadata_upload_aux_table AS m
LEFT JOIN LATERAL (
SELECT jsonb_object_agg(s.fasta_id, s.compressed_sequence_data::jsonb) AS seq_map
FROM sequence_upload_aux_table AS s
WHERE s.upload_id = m.upload_id
AND s.fasta_id = ANY (COALESCE(m.fasta_ids, ARRAY[]::text[]))
) AS x ON TRUE
CROSS JOIN LATERAL (
SELECT jsonb_build_object(
'metadata', m.metadata,
'files', m.files,
'unalignedNucleotideSequences', COALESCE(x.seq_map, '{}'::jsonb)
) AS data
) AS built
WHERE m.upload_id = ?
RETURNING accession, version, submission_id;
""".trimIndent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const val SEQUENCE_ENTRIES_TABLE_NAME = "sequence_entries"

object SequenceEntriesTable : Table(SEQUENCE_ENTRIES_TABLE_NAME) {
val originalDataColumn = jacksonSerializableJsonb<OriginalData<CompressedSequence>>("original_data").nullable()
val unprocessedDataColumn = jacksonSerializableJsonb<OriginalData<CompressedSequence>>(
"unprocessed_data",
).nullable()

val accessionColumn = varchar("accession", 255)
val versionColumn = long("version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import org.loculus.backend.service.jacksonSerializableJsonb
const val SEQUENCE_ENTRIES_VIEW_NAME = "sequence_entries_view"

object SequenceEntriesView : Table(SEQUENCE_ENTRIES_VIEW_NAME) {
val originalDataColumn = jacksonSerializableJsonb<OriginalData<CompressedSequence>>("original_data").nullable()
val unprocessedDataColumn = jacksonSerializableJsonb<OriginalData<CompressedSequence>>(
"unprocessed_data",
).nullable()
val processedDataColumn =
jacksonSerializableJsonb<ProcessedData<CompressedSequence>>("processed_data").nullable()
val jointDataColumn =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
-- Add unprocessed_data column to sequence_entries.
-- In future original_data will be immutable (the raw submission),
-- while unprocessed_data is the data that gets sent to preprocessing
-- and made available via endpoints. For existing rows, seed it from original_data.

ALTER TABLE sequence_entries ADD COLUMN unprocessed_data jsonb;

UPDATE sequence_entries SET unprocessed_data = original_data;

-- Recreate the view to expose unprocessed_data.
DROP VIEW IF EXISTS sequence_entries_view CASCADE;

CREATE VIEW sequence_entries_view AS
SELECT
se.accession,
se.version,
se.organism,
se.submission_id,
se.submitter,
se.approver,
se.group_id,
se.submitted_at,
se.released_at,
se.is_revocation,
se.unprocessed_data,
se.version_comment,
sepd.started_processing_at,
sepd.finished_processing_at,
sepd.processed_data,
-- Build joint_metadata inline using subquery join
CASE
WHEN aem.external_metadata IS NULL THEN sepd.processed_data
ELSE sepd.processed_data ||
jsonb_build_object('metadata', (sepd.processed_data -> 'metadata') || aem.external_metadata)
END AS joint_metadata,
CASE
WHEN se.is_revocation THEN cpp.version
ELSE sepd.pipeline_version
END AS pipeline_version,
sepd.errors,
sepd.warnings,
CASE
WHEN se.released_at IS NOT NULL THEN 'APPROVED_FOR_RELEASE'
WHEN se.is_revocation THEN 'PROCESSED'
WHEN sepd.processing_status = 'IN_PROCESSING' THEN 'IN_PROCESSING'
WHEN sepd.processing_status = 'PROCESSED' THEN 'PROCESSED'
ELSE 'RECEIVED'
END AS status,
CASE
WHEN sepd.processing_status = 'IN_PROCESSING' THEN NULL
WHEN sepd.errors IS NOT NULL AND jsonb_array_length(sepd.errors) > 0 THEN 'HAS_ERRORS'
WHEN sepd.warnings IS NOT NULL AND jsonb_array_length(sepd.warnings) > 0 THEN 'HAS_WARNINGS'
ELSE 'NO_ISSUES'
END AS processing_result
FROM sequence_entries se
LEFT JOIN current_processing_pipeline cpp
ON se.organism = cpp.organism
LEFT JOIN sequence_entries_preprocessed_data sepd
ON se.accession = sepd.accession
AND se.version = sepd.version
AND sepd.pipeline_version = cpp.version
LEFT JOIN (
SELECT
em.accession,
em.version,
jsonb_merge_agg(em.external_metadata) AS external_metadata
FROM external_metadata em
GROUP BY em.accession, em.version
) aem
ON aem.accession = se.accession
AND aem.version = se.version;
Loading
Loading