Skip to content

Conversation

@brknstrngz
Copy link
Contributor

@brknstrngz brknstrngz commented Mar 14, 2025

This change enables us to run MongoDB aggregation pipelines from the mongodb processor.

Given a collection populated like this:

db.orders.insertMany( [
   { _id: 0, name: "Pepperoni", size: "small", price: 19,
     quantity: 10, date: ISODate( "2021-03-13T08:14:30Z" ) },
   { _id: 1, name: "Pepperoni", size: "medium", price: 20,
     quantity: 20, date : ISODate( "2021-03-13T09:13:24Z" ) },
   { _id: 2, name: "Pepperoni", size: "large", price: 21,
     quantity: 30, date : ISODate( "2021-03-17T09:22:12Z" ) },
   { _id: 3, name: "Cheese", size: "small", price: 12,
     quantity: 15, date : ISODate( "2021-03-13T11:21:39.736Z" ) },
   { _id: 4, name: "Cheese", size: "medium", price: 13,
     quantity:50, date : ISODate( "2022-01-12T21:23:13.331Z" ) },
   { _id: 5, name: "Cheese", size: "large", price: 14,
     quantity: 10, date : ISODate( "2022-01-12T05:08:13Z" ) },
   { _id: 6, name: "Vegan", size: "small", price: 17,
     quantity: 10, date : ISODate( "2021-01-13T05:08:13Z" ) },
   { _id: 7, name: "Vegan", size: "medium", price: 18,
     quantity: 10, date : ISODate( "2021-01-13T05:10:13Z" ) }
] )

We could write a query like the following:

       processors:
          - mongodb:
              url: ${MONGODB_URL}
              database: ${MONGODB_DB}
              collection: ${MONGODB_COLLECTION}
              operation: aggregate
              document_map: |
                root = [
                 { "$match": { "size": "medium" }  },
                 { "$group": { "_id": "$name", "totalQuantity": { "$sum": "$quantity" } } }                
               ]

To obtain an output like this:
[{ "_id": { "$string": "Cheese" }, "totalQuantity": { "$int32": 50 } }, { "_id": { "$string": "Vegan" }, "totalQuantity": { "$int32": 10 } }, { "_id": { "$string": "Pepperoni" }, "totalQuantity": { "$int32": 20 } }]

@brknstrngz brknstrngz changed the title Aggregation pipelines in the MongoDB processor Improvement: mongodb processor: run ggregation pipelines Mar 22, 2025
@brknstrngz brknstrngz changed the title Improvement: mongodb processor: run ggregation pipelines Improvement: mongodb processor: run agregation pipelines Mar 22, 2025
@brknstrngz brknstrngz changed the title Improvement: mongodb processor: run agregation pipelines Improvement: mongodb processor: run aggregation pipelines Mar 22, 2025
@mihaitodor mihaitodor force-pushed the brknstrngz/mongodb_processor_aggregate_ops branch from ac27cbe to 50b02e2 Compare March 28, 2025 01:04
Copy link
Contributor

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this enhancement!

I had to make a few changes since the test wasn't working (you need to use go test -run ^TestProcessorIntegration$ ./internal/impl/mongodb to actually run it) and I think the intent is to extract all the results of the aggregation, so I changed it a bit. Since this is a batch processor, I thought the easiest thing to do is to return a JSON array for each message. I could also explode that array into individual messages and then return a batch for each message, but I'm not sure if that's worth the extra effort.

Could you please have a look and let me know if the updated implementation is what you expect?

@brknstrngz
Copy link
Contributor Author

@mihaitodor thank you, this is perfect!

@mihaitodor mihaitodor merged commit 8d6e83a into redpanda-data:main Mar 28, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants