Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ml_als with recommendations and pipelines #1578

Closed
javierluraschi opened this issue Jun 29, 2018 · 8 comments
Closed

Support for ml_als with recommendations and pipelines #1578

javierluraschi opened this issue Jun 29, 2018 · 8 comments
Labels

Comments

@javierluraschi
Copy link
Collaborator

@javierluraschi javierluraschi commented Jun 29, 2018

library(sparklyr)
sc <- spark_connect(master = "local")

products <- data.frame(
  UserID    = c("Bob", "Charlie", "Alice", "Bob", "Charlie", "Alice"),
  ProductID = c("Books", "Books", "Books", "Candy", "Candy", "Apples"),
  Score     = c(3, 1, 2, 4, 5, 4)
)
reviews_tbl <- sdf_copy_to(sc, products)

pipeline <- ml_pipeline(sc) %>%
  ft_string_indexer(input_col="ProductID", output_col="product_index") %>%
  ft_string_indexer(input_col="UserID", output_col="user_index") %>%
  ml_als(rating_col="Score", user_col="user_index", item_col="product_index", max_iter=10)

fitted_pipeline <- ml_fit(pipeline, reviews_tbl)
ml_predict(fitted_pipeline, reviews_tbl)
# Source:   table<sparklyr_tmp_bd4e5ef76649> [?? x 6]
# Database: spark_connection
  UserID  ProductID Score product_index user_index prediction
  <chr>   <chr>     <dbl>         <dbl>      <dbl>      <dbl>
1 Charlie Candy         5             1          1       4.86
2 Bob     Candy         4             1          2       3.97
3 Alice   Apples        4             2          0       3.89
4 Charlie Books         1             0          1       1.08
5 Bob     Books         3             0          2       2.80
6 Alice   Books         2             0          0       2.00

So far so good, however:

ml_recommend(pipeline, "items", 1)
 Error in eval(lhs, parent, parent) : attempt to apply non-function 
4. eval(lhs, parent, parent) 
3. eval(lhs, parent, parent) 
2. (switch(type, items = model$recommend_for_all_users, users = model$recommend_for_all_items))(n) %>% 
    mutate(recommendations = explode(!!as.name("recommendations"))) %>% 
    sdf_separate_column("recommendations") at ml_recommendation_als.R#265
1. ml_recommend(pipeline, "items", 1) 
@kevinykuo
Copy link
Collaborator

@kevinykuo kevinykuo commented Jun 29, 2018

ml_recommend() takes a ALS model object, not a pipeline model, so you'd have to extract the appropriate stage from the pipeline if you want to use ml_recommend():

fitted_pipeline %>%
  ml_stage("als") %>%
  ml_recommend()
# Source:   table<sparklyr_tmp_cedb5c8c865e> [?? x 4]
# Database: spark_connection
# user_index recommendations product_index rating
# <int> <list>                  <int>  <dbl>
# 1          1 <list [2]>                  1   4.86
# 2          2 <list [2]>                  1   3.97
# 3          0 <list [2]>                  2   3.89

We can do better with that error message, though!

@campanell
Copy link

@campanell campanell commented Jun 29, 2018

Thanks for the clarification on ml_recommend and the ml_als pipeline example. It's helpful to someone like me who is inexperienced with Spark ML pipelines.

@campanell
Copy link

@campanell campanell commented Jun 29, 2018

I am not able to get ml_fit to execute. My pipeline looks like this:

pipeline <- ml_pipeline(sc) %>%
            ft_string_indexer(input_col="ProductID",output_col="product_index") %>%
            ft_string_indexer(input_col="UserID",output_col="user_index") %>%
            ml_als(rating_col="Score",user_col="user_index",item_col="product_index",max_iter=10) 
pipeline
fitted_pipeline <- ml_fit(pipeline,reviews)

My data looks like this:

Source: lazy query [?? x 4]
Database: spark_connection
Id ProductId UserId Score

1 B001E4KFG0 A3SGXH7AUHU8GW 5
2 B00813GRG4 A1D87F6ZCVE5NK 1
3 B000LQOCH0 ABXLMWJIXXAIN 4
4 B000UA0QIQ A395BORC6FGVXV 2
5 B006K2ZZ7K A1UQRSCLF8GW1T 5
6 B006K2ZZ7K ADT0SRK1MGOEU 4

The ml_fit is spitting out this mess:

Error: java.lang.IllegalArgumentException: Field "ProductID" does not exist.
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)
at org.apache.spark.ml.feature.StringIndexerBase$class.validateAndTransformSchema(StringIndexer.scala:85)
at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:109)
at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:152)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:184)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:136)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:137)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)

Traceback:

  1. ml_fit(pipeline, reviews)
  2. spark_jobj(x) %>% invoke("fit", spark_dataframe(dataset)) %>%
    . ml_constructor_dispatch()
  3. withVisible(eval(quote(_fseq(_lhs)), env, env))
  4. eval(quote(_fseq(_lhs)), env, env)
  5. eval(quote(_fseq(_lhs)), env, env)
  6. _fseq(_lhs)
  7. freduce(value, _function_list)
  8. function_list[i]
  9. invoke(., "fit", spark_dataframe(dataset))
  10. invoke.shell_jobj(., "fit", spark_dataframe(dataset))
  11. invoke_method(spark_connection(jobj), FALSE, jobj, method, ...)
  12. invoke_method.spark_shell_connection(spark_connection(jobj),
    . FALSE, jobj, method, ...)
  13. core_invoke_method(sc, static, object, method, ...)
  14. withr::with_options(list(warning.length = 8000), {
    . if (nzchar(msg)) {
    . core_handle_known_errors(sc, msg)
    . stop(msg, call. = FALSE)
    . }
    . else {
    . msg <- core_read_spark_log_error(sc)
    . stop(msg, call. = FALSE)
    . }
    . })
  15. force(code)
  16. stop(msg, call. = FALSE)

I am getting the same error on both AWS and IBM Data Platform cloud services.

@kevinykuo
Copy link
Collaborator

@kevinykuo kevinykuo commented Jun 30, 2018

@campanell check the spelling of your column names e.g. ProductId vs ProductID

@javierluraschi
Copy link
Collaborator Author

@javierluraschi javierluraschi commented Jul 2, 2018

@kevinykuo good to know! I wonder if you would want to consider having an S3 method in ml_recommend that does take a pipeline and automatically extracts the stage; that said, I'm not sure this will be desirable for all ml_recommend operations. Otherwise, looks like we can close this one.

@kevinykuo
Copy link
Collaborator

@kevinykuo kevinykuo commented Jul 2, 2018

I'd like to stay away from generalizing specialized helper functions to pipeline objects, because it would require inspection of the pipeline and making assumptions on which stage to extract (e.g. the ALS routine could be in any position in the pipeline and there could be more than one), which in turn could lead to unexpected behavior.

Closing this, but @campanell feel free to let me know if you run into further issues!

@campanell
Copy link

@campanell campanell commented Jul 3, 2018

Thanks for the instructions. So embarrassed about the typos. I should not be afraid of the Boogeyman (ie Scala error messages), but I still am.

I was able to get to ml_recommend. However, I would like to know where in the pipeline, do I use ft_index_to_string to covert the product_index and user_index back to ProductId and UserId?

@campanell
Copy link

@campanell campanell commented Jul 3, 2018

Was able to get the values from ft_index_to string back to the recommend data frame.

prod_index <- fitted_pipeline %>%
              ml_stage(1) %>%
              ml_labels()

user_index <- fitted_pipeline %>%
              ml_stage(2) %>%
              ml_labels()
recommend_1 <- ft_index_to_string(recommend,input_col="product_index",output_col="ProductId_" , 
                                      labels = prod_index)

recommend_2 <- ft_index_to_string(recommend_1,input_col="user_index",output_col="UserId_" , 
                                      labels = user_index)

head(recommend_2)
```

Thanks so much for all your help.   I am now able to do collaborative filtering with sparklyr.   

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants