Skip to content

Commit

Permalink
Revert "Optimized the partitioning strategy implementation details to…
Browse files Browse the repository at this point in the history
… avoid unnecessarily high RU usage (Azure#39438)"

This reverts commit ed43699.
  • Loading branch information
annie-mac committed Apr 10, 2024
1 parent 73e05c8 commit f958a77
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 30 deletions.
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@
" { \"spark.cosmos.accountKey\", cosmosMasterKey },\r\n",
" { \"spark.cosmos.database\", \"SampleDatabase\" },\r\n",
" { \"spark.cosmos.container\", \"GreenTaxiRecords\" },\r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Restrictive\" }, \r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Default\" }, \r\n",
" { \"spark.cosmos.read.inferSchema.enabled\", \"false\" },\r\n",
" { \"spark.cosmos.changeFeed.startFrom\", \"Beginning\" },\r\n",
" { \"spark.cosmos.changeFeed.mode\", \"Incremental\" }\r\n",
Expand Down Expand Up @@ -437,7 +437,7 @@
" { \"spark.cosmos.accountKey\", cosmosMasterKey },\r\n",
" { \"spark.cosmos.database\", \"SampleDatabase\" },\r\n",
" { \"spark.cosmos.container\", \"GreenTaxiRecords\" },\r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Restrictive\" }, \r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Default\" }, \r\n",
" { \"spark.cosmos.read.inferSchema.enabled\", \"false\" }\r\n",
"};\r\n",
"\r\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def createCosmosView(cosmosDatabaseName: String, cosmosContainerName: String, co
spark.cosmos.database = '$cosmosDatabaseName',
spark.cosmos.container = '$cosmosContainerName',
spark.cosmos.read.inferSchema.enabled = 'False',
spark.cosmos.read.partitioning.strategy = 'Restrictive'
spark.cosmos.read.partitioning.strategy = 'Default'
);
"""
println("Executing create View...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spark.sql(createTargetResources)
"spark.cosmos.accountKey" -> cosmosSourceMasterKey,
"spark.cosmos.database" -> cosmosSourceDatabaseName,
"spark.cosmos.container" -> cosmosSourceContainerName,
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ OPTIONS (
spark.cosmos.database = '${cosmosSourceDatabaseName}', -- source database
spark.cosmos.container = '${cosmosSourceContainerName}', -- source container
spark.cosmos.read.inferSchema.enabled = 'False',
spark.cosmos.read.partitioning.strategy = 'Restrictive');
spark.cosmos.read.partitioning.strategy = 'Default');
"""

var selectView = s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
{
"cell_type": "code",
"source": [
"print(\"Starting validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nchangeFeedCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n}\nchangeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\ncount_changeFeed = changeFeed_df.count()\nprint(\"Number of records retrieved via change feed: \", count_changeFeed) \nprint(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_changeFeed"
"print(\"Starting validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nchangeFeedCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n}\nchangeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\ncount_changeFeed = changeFeed_df.count()\nprint(\"Number of records retrieved via change feed: \", count_changeFeed) \nprint(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_changeFeed"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down Expand Up @@ -301,7 +301,7 @@
{
"cell_type": "code",
"source": [
"import math\n\nprint(\"Starting to identify to be deleted documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n}\n\ntoBeDeleted_df = spark.read.format(\"cosmos.oltp\").options(**readCfg).load().limit(100_000)\nprint(\"Number of records to be deleted: \", toBeDeleted_df.count()) \n\nprint(\"Starting to bulk delete documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ndeleteCfg = writeCfg.copy()\ndeleteCfg[\"spark.cosmos.write.strategy\"] = \"ItemDelete\"\ntoBeDeleted_df \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**deleteCfg) \\\n .save()\nprint(\"Finished deleting documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nprint(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nreadCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert max(0, count_source - 100_000) == count_query"
"import math\n\nprint(\"Starting to identify to be deleted documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n}\n\ntoBeDeleted_df = spark.read.format(\"cosmos.oltp\").options(**readCfg).load().limit(100_000)\nprint(\"Number of records to be deleted: \", toBeDeleted_df.count()) \n\nprint(\"Starting to bulk delete documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ndeleteCfg = writeCfg.copy()\ndeleteCfg[\"spark.cosmos.write.strategy\"] = \"ItemDelete\"\ntoBeDeleted_df \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**deleteCfg) \\\n .save()\nprint(\"Finished deleting documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nprint(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nreadCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert max(0, count_source - 100_000) == count_query"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down Expand Up @@ -421,7 +421,7 @@
{
"cell_type": "code",
"source": [
"%sql\nCREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsView \n (id STRING, _ts TIMESTAMP, vendorID INT, totalAmount DOUBLE)\nUSING cosmos.oltp\nTBLPROPERTIES(isCosmosView = 'True')\nOPTIONS (\n spark.cosmos.database = 'SampleDatabase',\n spark.cosmos.container = 'GreenTaxiRecords',\n spark.cosmos.read.inferSchema.enabled = 'False',\n spark.cosmos.read.inferSchema.includeSystemProperties = 'True',\n spark.cosmos.read.partitioning.strategy = 'Restrictive');\n\nSELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10"
"%sql\nCREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsView \n (id STRING, _ts TIMESTAMP, vendorID INT, totalAmount DOUBLE)\nUSING cosmos.oltp\nTBLPROPERTIES(isCosmosView = 'True')\nOPTIONS (\n spark.cosmos.database = 'SampleDatabase',\n spark.cosmos.container = 'GreenTaxiRecords',\n spark.cosmos.read.inferSchema.enabled = 'False',\n spark.cosmos.read.inferSchema.includeSystemProperties = 'True',\n spark.cosmos.read.partitioning.strategy = 'Aggressive');\n\nSELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
" \"spark.cosmos.accountKey\": cosmosMasterKey,\n",
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n",
" \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\",\n",
" \"spark.cosmos.read.inferSchema.forceNullableProperties\" : \"true\",\n",
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
Expand Down Expand Up @@ -338,7 +338,7 @@
" spark.cosmos.database = 'SampleDatabase',\n",
" spark.cosmos.container = 'GreenTaxiRecordsCFSink',\n",
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
" spark.cosmos.read.partitioning.strategy = 'Restrictive');\n",
" spark.cosmos.read.partitioning.strategy = 'Default');\n",
"\n",
"SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ val changeFeedCfg = Map(
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental"
Expand All @@ -255,7 +255,7 @@ val readCfg = Map(
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.inferSchema.enabled" -> "false",
)

Expand Down Expand Up @@ -329,7 +329,7 @@ assert(df_Tables.count() == 3)
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
// MAGIC spark.cosmos.read.inferSchema.enabled = 'False',
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'True',
// MAGIC spark.cosmos.read.partitioning.strategy = 'Restrictive');
// MAGIC spark.cosmos.read.partitioning.strategy = 'Aggressive');
// MAGIC
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ val changeFeedCfg = Map(
"spark.cosmos.auth.aad.clientSecret" -> clientSecret,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental"
Expand Down Expand Up @@ -284,7 +284,7 @@ val readCfg = Map(
"spark.cosmos.auth.aad.clientId" -> clientId,
"spark.cosmos.auth.aad.clientSecret" -> clientSecret, "spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.inferSchema.enabled" -> "false",
)

Expand Down Expand Up @@ -358,7 +358,7 @@ assert(df_Tables.count() == 3)
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
// MAGIC spark.cosmos.read.inferSchema.enabled = 'False',
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'True',
// MAGIC spark.cosmos.read.partitioning.strategy = 'Restrictive');
// MAGIC spark.cosmos.read.partitioning.strategy = 'Aggressive');
// MAGIC
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,25 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
cosmosPartitioningConfig.partitioningStrategy match {
case PartitioningStrategies.Restrictive =>
applyRestrictiveStrategy(planningInfo)
case PartitioningStrategies.Default =>
applyRestrictiveStrategy(planningInfo)
case PartitioningStrategies.Custom =>
applyCustomStrategy(
container,
planningInfo,
cosmosPartitioningConfig.targetedPartitionCount.get)
case PartitioningStrategies.Aggressive =>
case PartitioningStrategies.Default =>
applyStorageAlignedStrategy(
container,
planningInfo,
1 / defaultMaxPartitionSizeInMB.toDouble,
defaultMinimalPartitionCount
)
case PartitioningStrategies.Aggressive =>
applyStorageAlignedStrategy(
container,
planningInfo,
5 / defaultMaxPartitionSizeInMB.toDouble,
defaultMinimalPartitionCount
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CosmosPartitionPlannerITest
PartitionMetadataCache.cachedCount() shouldEqual expectedCachedCount
}

it should "provide single partition as soon as storage size is > 128 MB" in {
it should "provide multiple partitions as soon as storage size is > 128 MB" in {
evaluateStorageBasedStrategy(128 * 1024 + 1, 2 * cosmosBEPartitionCount)
evaluateStorageBasedStrategy(256 * 1024, 2 * cosmosBEPartitionCount)
}
Expand All @@ -102,24 +102,24 @@ class CosmosPartitionPlannerITest
}
}

it should "create Spark partition storage based" in {
it should "create exactly 5 times more partitions than with Default for Aggressive" in {

// Min is still 1 (not 3) to avoid wasting compute resources where not necessary
evaluateStrategy("Aggressive", 0, 1 * cosmosBEPartitionCount)

// 1 Spark partitions for every 128 MB
evaluateStrategy("Aggressive", 10 * 128 * 1024, 10 * cosmosBEPartitionCount)
// 5 Spark partitions for every 128 MB
evaluateStrategy("Aggressive", 10 * 128 * 1024, 5 * 10 * cosmosBEPartitionCount)

// change feed progress is honored
evaluateStrategy(
"Aggressive",
10 * 128 * 1024,
3 * cosmosBEPartitionCount,
5 * 3 * cosmosBEPartitionCount,
Some(70))

for (_ <- 1 to 100) {
val docSizeInKB = rnd.nextInt(50 * 1024 * 1024)
val expectedPartitionCount = ((docSizeInKB) + (128 * 1024) - 1)/(128 * 1024)
val expectedPartitionCount = ((5 * docSizeInKB) + (128 * 1024) - 1)/(128 * 1024)
evaluateStrategy("Aggressive", docSizeInKB, expectedPartitionCount * cosmosBEPartitionCount)
}
}
Expand Down Expand Up @@ -295,7 +295,7 @@ class CosmosPartitionPlannerITest
defaultMinimalPartitionCount: Int = cosmosBEPartitionCount
): Assertion = {
this.evaluateStrategy(
"Aggressive",
"Default",
docSizeInKB,
expectedPartitionCount,
startLsn,
Expand Down

0 comments on commit f958a77

Please sign in to comment.