The cluster_pairwise_predictions_at_threshold is failing #2636
Unanswered
andrzejurbanowicz
asked this question in
Q&A
Replies: 1 comment 1 reply
-
|
If you're hitting scaling constrats with clustering I would suggest minimising the size of the data by mapping all your ids into int32, clustering, then joining back on your business ids. here is some code that does something similar in duckdb and works well for us. Clustering sped up by about 10x when doing this. With your scale of data at a guess clustering should work fine in duckdb on a fairly moderate size machine - we have found clustering in DuckDB to be dramatically faster than Spark as in went from hours down to minutes The relevant function is cluster_pairwise_predictions_at_threshold which is here |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello!
I am trying link 4 data sets (25M, 26M, 26M and 56M) with Splink 4.0.6. For this I am using spark backend with parameters:
--num-executors 100 --executor-cores 4 --executor-memory 64G --driver-memory 40G --conf spark.driver.maxResultSize=5G --conf spark.sql.shuffle.partitions=400 --conf spark.default.parallelism=1000 --conf spark.yarn.maxAppAttempts=4 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.execution.arrow.enabled=true --conf spark.sql.files.maxPartitionBytes=512MBTraining and prediction took around 1h, the prediction generated 54M but when I tried to run :
clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(df_predictions, threshold_match_probability=0.95 )The code failed with:
File "/home/hadoop/splink_model.py", line 185, in run_prediction clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(df_predictions, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/linker_components/clustering.py", line 137, in cluster_pairwise_predictions_at_threshold cc = solve_connected_components( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/connected_components.py", line 442, in solve_connected_components prev_representatives_thinned = db_api.sql_pipeline_to_splink_dataframe(pipeline) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/database_api.py", line 200, in sql_pipeline_to_splink_dataframe splink_dataframe = self.sql_to_splink_dataframe_checking_cache( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/database_api.py", line 171, in sql_to_splink_dataframe_checking_cache splink_dataframe = self._sql_to_splink_dataframe( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/database_api.py", line 94, in _sql_to_splink_dataframe output_df = self._cleanup_for_execute_sql( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/spark/database_api.py", line 112, in _cleanup_for_execute_sql spark_df = self._break_lineage_and_repartition( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/splink/internals/spark/database_api.py", line 309, in _break_lineage_and_repartition spark_df.write.mode("overwrite").parquet(write_path) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1721, in parquet File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o1258.parquet. : org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8.0 GiB: 21.0 GiB. at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableBytesError(QueryExecutionErrors.scala:2201) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.org$apache$spark$sql$execution$exchange$BroadcastExchangeExec$$doComputeRelation(BroadcastExchangeExec.scala:224) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.doCompute(BroadcastExchangeExec.scala:191) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.doCompute(BroadcastExchangeExec.scala:184) at org.apache.spark.sql.execution.AsyncDriverOperation.$anonfun$compute$1(AsyncDriverOperation.scala:75) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:376) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:359)I spent already few days and thinking to use MLib for clustering. Any recommendations or tips ?
My target is to link 4 data sets with around 300M records each.
Thanks
Andrzej
Beta Was this translation helpful? Give feedback.
All reactions