Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registering functions for use with SparkSQL in pyspark #9

Closed
colinglaes opened this issue Nov 14, 2019 · 9 comments
Closed

Registering functions for use with SparkSQL in pyspark #9

colinglaes opened this issue Nov 14, 2019 · 9 comments

Comments

@colinglaes
Copy link

I'm interested in using your HLL implementation in a pyspark project but i'm having trouble figuring out how to properly register the functions for use in sql execution. I'm unsure of how i'd execute com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark) in scala from pyspark (from the documentation under
"Using HLL functions"."From SparkSQL". I've tried the following without any luck.

sql_context = SQLContext(spark_context)

sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration')
sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.functions.hll_init')
sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.functions.HyperLogLogInitSimple')

spark_session.sql("create temporary function hll_init as 'com.swoop.alchemy.spark.expressions.hll'")
spark_session.sql("create temporary function hll_init as 'com.swoop.alchemy.spark.expressions.hll.functions.HyperLogLogInitSimple'")
@ssimeonov
Copy link
Contributor

@colinglaes we don't use pyspark so it's best to look for insights from the Spark codebase or people with pyspark + Scala experience.

@MrPowers you have exposed functions to pyspark. Are any of them native ones?

@ssimeonov
Copy link
Contributor

Of course, you can always go with Python wrappers for SparkSQL expressions as I outlined in #4 (comment)

@colinglaes
Copy link
Author

Thanks for the quick response @ssimeonov, I tried using functions.expr('hll_init(user_id)) but it seems that i need to register the function for use first. I realized that i was using the incorrect classpath in the above calls but i got the following errors running the corresponding commands. Seems like i'm either doing something wrong or they may not be importable from pyspark.

from running spark_session.udf.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit')

Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1573602219296_0058/container_1573602219296_0058_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1573602219296_0058/container_1573602219296_0058_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o481.registerJava.
: org.apache.spark.sql.AnalysisException: UDF class com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit doesn't implement any UDF interface;
    at org.apache.spark.sql.UDFRegistration.registerJava(UDFRegistration.scala:668)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "main.py", line 80, in <module>
    main()
  File "main.py", line 74, in main
    raise(e)
  File "main.py", line 68, in main
    job_module.analyze(spark_context, spark_session, **job_args)
  File "jobs.zip/um_spark_jobs/jobs/hll/__init__.py", line 11, in analyze
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1573602219296_0058/container_1573602219296_0058_01_000001/pyspark.zip/pyspark/sql/udf.py", line 375, in registerJavaFunction
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1573602219296_0058/container_1573602219296_0058_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1573602219296_0058/container_1573602219296_0058_01_000001/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: "UDF class com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit doesn't implement any UDF interface;"

from running spark_session.sql("create temporary function hll_init as 'com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit'")

Traceback (most recent call last):
  File "/mnt2/yarn/usercache/hadoop/appcache/application_1573602219296_0059/container_1573602219296_0059_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt2/yarn/usercache/hadoop/appcache/application_1573602219296_0059/container_1573602219296_0059_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o278.sql.
: org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 'com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit'; line 4 pos 15
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionExpression$2$$anonfun$apply$1.apply(HiveSessionCatalog.scala:110)
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionExpression$2$$anonfun$apply$1.apply(HiveSessionCatalog.scala:110)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionExpression$2.apply(HiveSessionCatalog.scala:109)
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionExpression$2.apply(HiveSessionCatalog.scala:69)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.hive.HiveSessionCatalog.makeFunctionExpression(HiveSessionCatalog.scala:69)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$org$apache$spark$sql$catalyst$catalog$SessionCatalog$$makeFunctionBuilder$1.apply(SessionCatalog.scala:1122)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$org$apache$spark$sql$catalyst$catalog$SessionCatalog$$makeFunctionBuilder$1.apply(SessionCatalog.scala:1122)
    at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:115)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1278)
    at org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:131)
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:131)
    at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:131)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:131)
    at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:124)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5$$anonfun$applyOrElse$52.apply(Analyzer.scala:1328)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5$$anonfun$applyOrElse$52.apply(Analyzer.scala:1328)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:1327)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:1311)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:83)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:74)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1311)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1309)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:89)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:86)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsUp(AnalysisHelper.scala:86)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1309)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1308)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:127)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:121)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:106)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "main.py", line 80, in <module>
    main()
  File "main.py", line 74, in main
    raise(e)
  File "main.py", line 68, in main
    job_module.analyze(spark_context, spark_session, **job_args)
  File "jobs.zip/um_spark_jobs/jobs/hll/__init__.py", line 29, in analyze
  File "/mnt2/yarn/usercache/hadoop/appcache/application_1573602219296_0059/container_1573602219296_0059_01_000001/pyspark.zip/pyspark/sql/session.py", line 767, in sql
  File "/mnt2/yarn/usercache/hadoop/appcache/application_1573602219296_0059/container_1573602219296_0059_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt2/yarn/usercache/hadoop/appcache/application_1573602219296_0059/container_1573602219296_0059_01_000001/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: "No handler for UDF/UDAF/UDTF 'com.swoop.alchemy.spark.expressions.hll.HyperLogLogInit'; line 4 pos 15"

@ssimeonov
Copy link
Contributor

ssimeonov commented Nov 14, 2019

We run on Databricks where it's easy to have a notebook with both a Scala and Python context: hence the easy workaround of registering in Scala and then using the SparkSQL versions from python.

The UDF registration error makes sense as the functions are not UDFs. They are Spark native functions and not UDFs.

Basically, this isn't a spark-alchemy-related issue. You need to execute a line of Scala from Python, passing in a native Scala SparkSession. Perhaps these might help?

@djo10
Copy link

djo10 commented Nov 18, 2019

@colinglaes While struggling with same problem, I found solution by creating a wrapper around spark-alchemy. Instructions are provided in repo https://github.com/djo10/spark-alchemy-wrapper

@ssimeonov
Copy link
Contributor

ssimeonov commented Nov 18, 2019

@djo10 thank you. You've cracked the code on the exact incantation necessary to use in Python, but I am not sure what the wrapper is needed at all... Why not simply?

sc._jvm.com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark._jsparkSession)

One can then do something like:

spark.range(5).toDF("id").select(expr("hll_cardinality(hll_init(id)) as cd"))

And, of course, higher level wrappers around expr() can be added.

@djo10
Copy link

djo10 commented Nov 18, 2019

Yeah, but, I had to add another jar in case to work (HyperLogLog in Java).

And for configuration spark.conf.set("com.swoop.alchemy.hll.implementation", "AGGREGATE_KNOWLEDGE") also Fastutil jar is needed.

Finally, I think the solution is here, just matter of choice providing 3 or 1 (wrapper) jar.

@ssimeonov
Copy link
Contributor

Setting com.swoop.alchemy.hll.implementation is optional based on which HLL implementation you want to use.

As for dependency JARs, they are always needed unless you produce a far JAR.

@colinglaes
Copy link
Author

thanks @djo10 & @ssimeonov, using sc._jvm.com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark._jsparkSession) works for me

pidge added a commit that referenced this issue Dec 17, 2021
* Bump spark version to 3.2.0
* Bump alchemy version for dependency change
* Code updates for changes to Spark APIs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants