Skip to content

Commit

Permalink
media data sync (#2102)
Browse files Browse the repository at this point in the history
* basic sync operation backfill

* media data sync

* sync entry helpers

* fix sync generator

* nicer

* re-add key_id
  • Loading branch information
Brendonovich committed Feb 21, 2024
1 parent 393a907 commit c533d12
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 258 deletions.
150 changes: 96 additions & 54 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use sd_prisma::{
prisma::{
file_path, label, label_on_object, location, object, tag, tag_on_object, PrismaClient,
file_path, label, label_on_object, location, media_data, object, tag, tag_on_object,
PrismaClient,
},
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_sync::{option_sync_entry, OperationFactory};
use sd_utils::chain_optional_iter;
use serde_json::json;

Expand All @@ -23,28 +24,26 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
locations
.into_iter()
.flat_map(|l| {
use location::*;

sync.shared_create(
prisma_sync::location::SyncId { pub_id: l.pub_id },
chain_optional_iter(
[],
[
l.name.map(|v| (location::name::NAME, json!(v))),
l.path.map(|v| (location::path::NAME, json!(v))),
l.total_capacity
.map(|v| (location::total_capacity::NAME, json!(v))),
l.available_capacity
.map(|v| (location::available_capacity::NAME, json!(v))),
l.size_in_bytes
.map(|v| (location::size_in_bytes::NAME, json!(v))),
l.is_archived
.map(|v| (location::is_archived::NAME, json!(v))),
l.generate_preview_media
.map(|v| (location::generate_preview_media::NAME, json!(v))),
l.sync_preview_media
.map(|v| (location::sync_preview_media::NAME, json!(v))),
l.hidden.map(|v| (location::hidden::NAME, json!(v))),
l.date_created
.map(|v| (location::date_created::NAME, json!(v))),
option_sync_entry!(l.name, name),
option_sync_entry!(l.path, path),
option_sync_entry!(l.total_capacity, total_capacity),
option_sync_entry!(l.available_capacity, available_capacity),
option_sync_entry!(l.size_in_bytes, size_in_bytes),
option_sync_entry!(l.is_archived, is_archived),
option_sync_entry!(
l.generate_preview_media,
generate_preview_media
),
option_sync_entry!(l.sync_preview_media, sync_preview_media),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
],
),
)
Expand All @@ -62,20 +61,65 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
objects
.into_iter()
.flat_map(|o| {
use object::*;

sync.shared_create(
prisma_sync::object::SyncId { pub_id: o.pub_id },
chain_optional_iter(
[],
[
o.kind.map(|v| (object::kind::NAME, json!(v))),
o.hidden.map(|v| (object::hidden::NAME, json!(v))),
o.favorite.map(|v| (object::favorite::NAME, json!(v))),
o.important.map(|v| (object::important::NAME, json!(v))),
o.note.map(|v| (object::note::NAME, json!(v))),
o.date_created
.map(|v| (object::date_created::NAME, json!(v))),
o.date_accessed
.map(|v| (object::date_accessed::NAME, json!(v))),
option_sync_entry!(o.kind, kind),
option_sync_entry!(o.hidden, hidden),
option_sync_entry!(o.favorite, favorite),
option_sync_entry!(o.important, important),
option_sync_entry!(o.note, note),
option_sync_entry!(o.date_created, date_created),
option_sync_entry!(o.date_accessed, date_accessed),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let media_datas = db
.media_data()
.find_many(vec![])
.include(media_data::include!({
object: select { pub_id }
}))
.exec()
.await
.unwrap();
db.crdt_operation()
.create_many(
media_datas
.into_iter()
.flat_map(|md| {
use media_data::*;

sync.shared_create(
prisma_sync::media_data::SyncId {
object: prisma_sync::object::SyncId {
pub_id: md.object.pub_id,
},
},
chain_optional_iter(
[],
[
option_sync_entry!(md.resolution, resolution),
option_sync_entry!(md.media_date, media_date),
option_sync_entry!(md.media_location, media_location),
option_sync_entry!(md.camera_data, camera_data),
option_sync_entry!(md.artist, artist),
option_sync_entry!(md.description, description),
option_sync_entry!(md.copyright, copyright),
option_sync_entry!(md.exif_version, exif_version),
option_sync_entry!(md.epoch_time, epoch_time),
],
),
)
Expand Down Expand Up @@ -103,35 +147,31 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
file_paths
.into_iter()
.flat_map(|fp| {
use file_path::*;

sync.shared_create(
prisma_sync::file_path::SyncId { pub_id: fp.pub_id },
chain_optional_iter(
[],
[
fp.is_dir.map(|v| (file_path::is_dir::NAME, json!(v))),
fp.cas_id.map(|v| (file_path::cas_id::NAME, json!(v))),
fp.integrity_checksum
.map(|v| (file_path::integrity_checksum::NAME, json!(v))),
fp.location.map(|l| {
(
file_path::location::NAME,
json!(prisma_sync::location::SyncId { pub_id: l.pub_id }),
)
}),
fp.materialized_path
.map(|v| (file_path::materialized_path::NAME, json!(v))),
fp.name.map(|v| (file_path::name::NAME, json!(v))),
fp.extension.map(|v| (file_path::extension::NAME, json!(v))),
fp.hidden.map(|v| (file_path::hidden::NAME, json!(v))),
fp.size_in_bytes_bytes
.map(|v| (file_path::size_in_bytes_bytes::NAME, json!(v))),
fp.inode.map(|v| (file_path::inode::NAME, json!(v))),
fp.date_created
.map(|v| (file_path::date_created::NAME, json!(v))),
fp.date_modified
.map(|v| (file_path::date_modified::NAME, json!(v))),
fp.date_indexed
.map(|v| (file_path::date_indexed::NAME, json!(v))),
option_sync_entry!(fp.is_dir, is_dir),
option_sync_entry!(fp.cas_id, cas_id),
option_sync_entry!(fp.integrity_checksum, integrity_checksum),
option_sync_entry!(
fp.location.map(|l| prisma_sync::location::SyncId {
pub_id: l.pub_id
}),
location
),
option_sync_entry!(fp.materialized_path, materialized_path),
option_sync_entry!(fp.name, name),
option_sync_entry!(fp.extension, extension),
option_sync_entry!(fp.hidden, hidden),
option_sync_entry!(fp.size_in_bytes_bytes, size_in_bytes_bytes),
option_sync_entry!(fp.inode, inode),
option_sync_entry!(fp.date_created, date_created),
option_sync_entry!(fp.date_modified, date_modified),
option_sync_entry!(fp.date_indexed, date_indexed),
],
),
)
Expand Down Expand Up @@ -194,8 +234,10 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
},
chain_optional_iter(
[],
[t_o.date_created
.map(|v| (tag_on_object::date_created::NAME, json!(v)))],
[option_sync_entry!(
t_o.date_created,
tag_on_object::date_created
)],
),
)
})
Expand Down
13 changes: 2 additions & 11 deletions core/crates/sync/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,9 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {

use prisma::location;

macro_rules! item {
($name:ident, $value:expr) => {
(
(location::$name::NAME, json!($value)),
location::$name::set(Some($value.to_string())),
)
};
}

let (sync_ops, db_ops): (Vec<_>, Vec<_>) = [
item!(name, "Location 0"),
item!(path, "/User/Brendan/Documents"),
sync_db_entry!(location::name, "Location 0"),
sync_db_entry!(location::path, "/User/Brendan/Documents"),
]
.into_iter()
.unzip();
Expand Down
1 change: 1 addition & 0 deletions core/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ model Object {
// @@map("key")
// }

/// @shared(id: object)
model MediaData {
id Int @id @default(autoincrement())
Expand Down
86 changes: 16 additions & 70 deletions core/src/api/search/saved.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::{api::utils::library, invalidate_query, library::Library};

use sd_prisma::{prisma::saved_search, prisma_sync};
use sd_sync::OperationFactory;
use sd_sync::{option_sync_db_entry, sync_db_entry, OperationFactory};
use sd_utils::chain_optional_iter;

use chrono::{DateTime, FixedOffset, Utc};
use rspc::alpha::AlphaRouter;
use serde::{de::IgnoredAny, Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use tracing::error;
use uuid::Uuid;
Expand Down Expand Up @@ -39,18 +38,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {

let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter(
[
(
(saved_search::date_created::NAME, json!(date_created)),
saved_search::date_created::set(Some(date_created)),
),
(
(saved_search::name::NAME, json!(&args.name)),
saved_search::name::set(Some(args.name)),
),
sync_db_entry!(date_created, saved_search::date_created),
sync_db_entry!(args.name, saved_search::name),
],
[
args.filters
.and_then(|s| {
option_sync_db_entry!(
args.filters.and_then(|s| {
// https://github.com/serde-rs/json/issues/579
// https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html

Expand All @@ -60,31 +53,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
} else {
Some(s)
}
})
.map(|v| {
(
(saved_search::filters::NAME, json!(&v)),
saved_search::filters::set(Some(v)),
)
}),
args.search.map(|v| {
(
(saved_search::search::NAME, json!(&v)),
saved_search::search::set(Some(v)),
)
}),
args.description.map(|v| {
(
(saved_search::description::NAME, json!(&v)),
saved_search::description::set(Some(v)),
)
}),
args.icon.map(|v| {
(
(saved_search::icon::NAME, json!(&v)),
saved_search::icon::set(Some(v)),
)
}),
saved_search::filters
),
option_sync_db_entry!(args.search, saved_search::search),
option_sync_db_entry!(args.description, saved_search::description),
option_sync_db_entry!(args.icon, saved_search::icon),
],
)
.into_iter()
Expand Down Expand Up @@ -157,41 +131,13 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})?;

let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter(
[(
(saved_search::date_modified::NAME, json!(updated_at)),
saved_search::date_modified::set(Some(updated_at)),
)],
[sync_db_entry!(updated_at, saved_search::date_modified)],
[
args.name.map(|v| {
(
(saved_search::name::NAME, json!(&v)),
saved_search::name::set(v),
)
}),
args.description.map(|v| {
(
(saved_search::name::NAME, json!(&v)),
saved_search::name::set(v),
)
}),
args.icon.map(|v| {
(
(saved_search::icon::NAME, json!(&v)),
saved_search::icon::set(v),
)
}),
args.search.map(|v| {
(
(saved_search::search::NAME, json!(&v)),
saved_search::search::set(v),
)
}),
args.filters.map(|v| {
(
(saved_search::filters::NAME, json!(&v)),
saved_search::filters::set(v),
)
}),
option_sync_db_entry!(args.name.flatten(), saved_search::name),
option_sync_db_entry!(args.description.flatten(), saved_search::name),
option_sync_db_entry!(args.icon.flatten(), saved_search::icon),
option_sync_db_entry!(args.search.flatten(), saved_search::search),
option_sync_db_entry!(args.filters.flatten(), saved_search::filters),
],
)
.into_iter()
Expand Down
Loading

0 comments on commit c533d12

Please sign in to comment.