Skip to content

Commit

Permalink
Rename disableMigration to disableRecovery
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and colmsnowplow committed Jan 17, 2024
1 parent b0c6f99 commit 7b8b36b
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 28 deletions.
4 changes: 2 additions & 2 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@

# Optional. Enable features that are still in beta, or which are here to enable smoother upgrades
"featureFlags": {
# List of SchemaKey with partial SchemaVer to disable migration for, redshift only
# List of SchemaKey with partial SchemaVer to disable recoveries for, redshift only
# Redshift Loader will disable all migration and recovery table creation for the tables
# which belongs to provided schema keys
# e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"]
# Optional, empty list by default
"disableMigration": []
"disableRecovery": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object Databricks {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
i: Unit,
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): LoadStatements = {
val toCopy = columnsToCopyFromDiscoveredData(discovery)
val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ object DatabricksSpec {
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil),
Config.FeatureFlags(addLoadTstampColumn = true, disableRecovery = Nil),
exampleTelemetry
)

Expand Down
2 changes: 1 addition & 1 deletion modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
},
"featureFlags": {
"addLoadTstampColumn": true,
"disableMigration": []
"disableRecovery": []
}
"telemetry": {
"disable": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ object Loader {
start <- Clock[F].realTimeInstant
_ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void
result <-
Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target, config.featureFlags.disableMigration)
Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target, config.featureFlags.disableRecovery)
attempts <- control.getAndResetAttempts
_ <- result match {
case loadSuccess: Load.LoadSuccess =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ object Config {
backoff: FiniteDuration,
cumulativeBound: Option[FiniteDuration]
)
final case class FeatureFlags(addLoadTstampColumn: Boolean, disableMigration: List[SchemaCriterion])
final case class FeatureFlags(addLoadTstampColumn: Boolean, disableRecovery: List[SchemaCriterion])

sealed trait Strategy
object Strategy {
Expand Down Expand Up @@ -268,7 +268,7 @@ object Config {
implicit val configDecoder: Decoder[Config[StorageTarget]] =
deriveDecoder[Config[StorageTarget]].ensure(validateConfig)

implicit val disableMigrationConfigDecoder: Decoder[SchemaCriterion] =
implicit val schemaCriterionConfigDecoder: Decoder[SchemaCriterion] =
Decoder[String].emap(s => SchemaCriterion.parse(s).toRight(s"[$s] is not a valid schema criterion"))

implicit val featureFlagsConfigDecoder: Decoder[FeatureFlags] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ object Migration {
def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO, I](
discovery: DataDiscovery,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): F[Migration[C]] = {
val descriptions: LoaderAction[F, List[Description]] =
discovery.shreddedTypes.filterNot(_.isAtomic).traverse {
case s: ShreddedType.Tabular =>
if (!disableMigration.contains(s.info.toCriterion))
if (!disableRecovery.contains(s.info.toCriterion))
EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey).mergeRedshiftSchemasResult))
else EitherT.rightT[F, LoaderError](Description.NoMigration)
case ShreddedType.Widerow(info) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait Target[I] {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
initQueryResult: I,
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): LoadStatements

/** Get DDL of a manifest table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ object Load {
* results of the queries sent to warehouse when application is initialized
* @param target
* storage target object
* @param disableRecovery
* list of schema versions for which we must not load into a recovery table
* @return
* either alert payload in case of duplicate event or ingestion timestamp in case of success
*/
Expand All @@ -89,14 +91,14 @@ object Load {
discovery: DataDiscovery.WithOrigin,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): F[LoadResult] =
for {
_ <- TargetCheck.prepareTarget[F, C]
migrations <- Migration.build[F, C, I](discovery.discovery, target, disableMigration)
migrations <- Migration.build[F, C, I](discovery.discovery, target, disableRecovery)
_ <- getPreTransactions(setStage, migrations.preTransaction, incrementAttempt).traverse_(Transaction[F, C].run(_))
result <- Transaction[F, C].transact {
getTransaction[C, I](setStage, discovery, initQueryResult, target, disableMigration)(migrations.inTransaction)
getTransaction[C, I](setStage, discovery, initQueryResult, target, disableRecovery)(migrations.inTransaction)
.onError { case _: Throwable => incrementAttempt }
}
} yield result
Expand All @@ -114,8 +116,8 @@ object Load {
* results of the queries sent to warehouse when application is initialized
* @param target
* storage target object
* @param inTransactionMigrations
* sequence of migration actions such as ALTER TABLE that have to run before the batch is loaded
* @param disableRecovery
* list of schema versions for which we must not load into a recovery table
* @return
* either alert payload in case of an existing folder or ingestion timestamp of the current
* folder
Expand All @@ -125,7 +127,7 @@ object Load {
discovery: DataDiscovery.WithOrigin,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
)(
inTransactionMigrations: F[Unit]
): F[LoadResult] =
Expand All @@ -144,7 +146,7 @@ object Load {
Logging[F].info(s"Loading transaction for ${discovery.origin.base} has started") *>
setStage(Stage.MigrationIn) *>
inTransactionMigrations *>
run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration).flatMap {
run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableRecovery).flatMap {
loadedRecoveryTableNames =>
for {
_ <- setStage(Stage.Committing)
Expand Down Expand Up @@ -192,6 +194,8 @@ object Load {
* results of the queries sent to warehouse when application is initialized
* @param target
* storage target object
* @param disableRecovery
* list of schema versions for which we must not load into a recovery table
* @return
* block of VACUUM and ANALYZE statements to execute them out of a main transaction
*/
Expand All @@ -200,13 +204,13 @@ object Load {
discovery: DataDiscovery,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): F[List[String]] =
for {
_ <- Logging[F].info(s"Loading ${discovery.base}")
existingEventTableColumns <- if (target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F]
loadedRecoveryTableNames <-
target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableMigration).toList.traverseFilter {
target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableRecovery).toList.traverseFilter {
genStatement =>
for {
loadAuthMethod <- LoadAuthService[F].forLoadingEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ object ConfigSpec {
1.hour
)
val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil)
val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true, disableRecovery = Nil)
val exampleCloud: Config.Cloud = Config.Cloud.AWS(exampleRegion, exampleMessageQueue)
val exampleTelemetry =
Telemetry.Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object PureDAO {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
i: Unit,
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): LoadStatements =
NonEmptyList(
loadAuthMethod =>
Expand All @@ -104,9 +104,9 @@ object PureDAO {
case _: ShredModel.GoodModel => false
case _: ShredModel.RecoveryModel => true
}
val isMigrationDisabled = disableMigration.contains(shredded.info.toCriterion)
val isRecoveryDisabled = disableRecovery.contains(shredded.info.toCriterion)
val tableName =
if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
if (isRecoveryDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
else discoveredShredModels.shredModel.tableName

loadAuthMethod =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object Redshift {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
i: Unit,
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): LoadStatements = {
val shreddedStatements = discovery.shreddedTypes
.filterNot(_.isAtomic)
Expand All @@ -99,9 +99,9 @@ object Redshift {
case _: ShredModel.RecoveryModel => true
}

val isMigrationDisabled = disableMigration.contains(shreddedType.info.toCriterion)
val isRecoveryDisabled = disableRecovery.contains(shreddedType.info.toCriterion)
val tableName =
if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
if (isRecoveryDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
else discoveredShredModels.shredModel.tableName

loadAuthMethod =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object Snowflake {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
initQueryResult: InitQueryResult,
disableMigration: List[SchemaCriterion]
disableRecovery: List[SchemaCriterion]
): LoadStatements = {
val columnsToCopy = columnsToCopyFromDiscoveredData(discovery)

Expand Down

0 comments on commit 7b8b36b

Please sign in to comment.