From 3baf92b5fac6bbcba345fa44bfd2bc9e31660373 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 01:34:35 +0100 Subject: [PATCH 01/20] Add configurable import batch size (1-2000) Support custom batch sizes for /import endpoint to prevent 413 errors when events exceed server's 1MB payload limit. - Two new constructors: MixpanelAPI(int batchSize) and MixpanelAPI(boolean gzip, int batchSize) - Batch size clamped to [1, 2000] range (default: 2000) - Only affects /import endpoint; other endpoints unaffected - Fully backward compatible - Includes tests and updated demo and README --- README.md | 10 +++ .../mixpanelapi/demo/MixpanelAPIDemo.java | 32 ++++++++- .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 41 +++++++++--- .../mixpanel/mixpanelapi/MixpanelAPITest.java | 67 +++++++++++++++++++ 4 files changed, 139 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 4b0c8d8..7735b9d 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,16 @@ Gzip compression can reduce bandwidth usage and improve performance, especially The library supports importing historical events (events older than 5 days that are not accepted using /track) via the `/import` endpoint. Project token will be used for basic auth. +### Custom Import Batch Size + +When importing large events through the `/import` endpoint, you may need to control the batch size to prevent exceeding the server's 1MB uncompressed JSON payload limit. The batch size can be configured between 1 and 2000 (default is 2000): + + // Import with default batch size (2000) + MixpanelAPI mixpanel = new MixpanelAPI(); + + // Import with custom batch size (500) + MixpanelAPI mixpanel = new MixpanelAPI(500); + ## Feature Flags The Mixpanel Java SDK supports feature flags with both local and remote evaluation modes. diff --git a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java index 7304834..38dcc4a 100644 --- a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java +++ b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java @@ -32,6 +32,12 @@ public DeliveryThread(Queue messages, boolean useGzipCompression) { mUseGzipCompression = useGzipCompression; } + public DeliveryThread(Queue messages, int importBatchSize) { + mMixpanel = new MixpanelAPI(importBatchSize); + mMessageQueue = messages; + mUseGzipCompression = false; + } + @Override public void run() { try { @@ -88,10 +94,15 @@ public static void main(String[] args) throws IOException, InterruptedException { Queue messages = new ConcurrentLinkedQueue(); Queue messagesWithGzip = new ConcurrentLinkedQueue(); + Queue messagesWithCustomBatch = new ConcurrentLinkedQueue(); - // Create two delivery threads - one without gzip and one with gzip compression + // Create three delivery threads: + // 1. Default batching (50 for events, 2000 for imports) + // 2. With gzip compression (50 for events, 2000 for imports) + // 3. With custom import batch size of 500 DeliveryThread worker = new DeliveryThread(messages, false); DeliveryThread workerWithGzip = new DeliveryThread(messagesWithGzip, true); + DeliveryThread workerWithCustomBatch = new DeliveryThread(messagesWithCustomBatch, 500); MessageBuilder messageBuilder = new MessageBuilder(PROJECT_TOKEN); @@ -102,6 +113,7 @@ public static void main(String[] args) worker.start(); workerWithGzip.start(); + workerWithCustomBatch.start(); String distinctId = args[0]; BufferedReader inputLines = new BufferedReader(new InputStreamReader(System.in)); @@ -161,6 +173,21 @@ public static void main(String[] args) System.out.println("Added events to gzip compression queue\n"); + // Demonstrate custom import batch size + System.out.println("\n=== Demonstrating custom import batch size (500) ==="); + + // Send import events with custom batch size + long customBatchTime = System.currentTimeMillis() - (45L * 24L * 60L * 60L * 1000L); + Map customBatchProps = new HashMap(); + customBatchProps.put("time", customBatchTime); + customBatchProps.put("$insert_id", "custom-batch-" + System.currentTimeMillis()); + customBatchProps.put("Batch Size", 500); + customBatchProps.put("Event Type", "Custom Batch Size Import"); + JSONObject customBatchEvent = messageBuilder.importEvent(distinctId, "Custom Batch Size Import", new JSONObject(customBatchProps)); + messagesWithCustomBatch.add(customBatchEvent); + + System.out.println("Added import event to custom batch size queue (batch size: 500)\n"); + while((line != null) && (line.length() > 0)) { System.out.println("SENDING LINE: " + line); Map propMap = new HashMap(); @@ -177,11 +204,12 @@ public static void main(String[] args) line = inputLines.readLine(); } - while(! messages.isEmpty() || ! messagesWithGzip.isEmpty()) { + while(! messages.isEmpty() || ! messagesWithGzip.isEmpty() || ! messagesWithCustomBatch.isEmpty()) { Thread.sleep(1000); } worker.interrupt(); workerWithGzip.interrupt(); + workerWithCustomBatch.interrupt(); } } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 97e648d..b8eb4ff 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -48,6 +48,7 @@ public class MixpanelAPI implements AutoCloseable { protected final boolean mUseGzipCompression; protected final LocalFlagsProvider mLocalFlags; protected final RemoteFlagsProvider mRemoteFlags; + protected final int mImportMaxMessageSize; /** * Constructs a MixpanelAPI object associated with the production, Mixpanel services. @@ -62,7 +63,26 @@ public MixpanelAPI() { * @param useGzipCompression whether to use gzip compression for network requests */ public MixpanelAPI(boolean useGzipCompression) { - this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, null, null); + this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); + } + + /** + * Constructs a MixpanelAPI object associated with the production, Mixpanel services. + * + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) + */ + public MixpanelAPI(int importMaxMessageSize) { + this(false, importMaxMessageSize); + } + + /** + * Constructs a MixpanelAPI object associated with the production, Mixpanel services. + * + * @param useGzipCompression whether to use gzip compression for network requests + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) + */ + public MixpanelAPI(boolean useGzipCompression, int importMaxMessageSize) { + this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, importMaxMessageSize, null, null); } /** @@ -96,6 +116,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF mGroupsEndpoint = Config.BASE_ENDPOINT + "/groups"; mImportEndpoint = Config.BASE_ENDPOINT + "/import"; mUseGzipCompression = false; + mImportMaxMessageSize = Config.IMPORT_MAX_MESSAGE_SIZE; if (localFlagsConfig != null) { EventSender eventSender = createEventSender(localFlagsConfig, this); @@ -121,7 +142,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { - this(eventsEndpoint, peopleEndpoint, Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", false, null, null); + this(eventsEndpoint, peopleEndpoint, Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -135,7 +156,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, Config.BASE_ENDPOINT + "/import", false, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, Config.BASE_ENDPOINT + "/import", false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -150,7 +171,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -166,7 +187,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -177,15 +198,17 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @param groupsEndpoint a URL that will accept Mixpanel groups messages * @param importEndpoint a URL that will accept Mixpanel import messages * @param useGzipCompression whether to use gzip compression for network requests + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) * @param localFlags optional LocalFlagsProvider for local feature flags (can be null) * @param remoteFlags optional RemoteFlagsProvider for remote feature flags (can be null) */ - private MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression, LocalFlagsProvider localFlags, RemoteFlagsProvider remoteFlags) { + /* package */ MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression, int importMaxMessageSize, LocalFlagsProvider localFlags, RemoteFlagsProvider remoteFlags) { mEventsEndpoint = eventsEndpoint; mPeopleEndpoint = peopleEndpoint; mGroupsEndpoint = groupsEndpoint; mImportEndpoint = importEndpoint; mUseGzipCompression = useGzipCompression; + mImportMaxMessageSize = Math.max(1, Math.min(importMaxMessageSize, 2000)); mLocalFlags = localFlags; mRemoteFlags = remoteFlags; } @@ -382,10 +405,10 @@ private void sendImportMessages(List messages, String endpointUrl) t } } - // Send messages in batches (max 2000 per batch for /import) + // Send messages in batches (configurable batch size for /import, default max 2000 per batch) // If token is empty, the server will reject with 401 Unauthorized - for (int i = 0; i < messages.size(); i += Config.IMPORT_MAX_MESSAGE_SIZE) { - int endIndex = i + Config.IMPORT_MAX_MESSAGE_SIZE; + for (int i = 0; i < messages.size(); i += mImportMaxMessageSize) { + int endIndex = i + mImportMaxMessageSize; endIndex = Math.min(endIndex, messages.size()); List batch = messages.subList(i, endIndex); diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index d86155d..67b19a0 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -1119,4 +1119,71 @@ public boolean sendImportData(String dataString, String endpointUrl, String toke } } + public void testCustomImportBatchSizeConstructor() { + // Test constructor with int parameter only: new MixpanelAPI(500) + MixpanelAPI api = new MixpanelAPI(500); + assertEquals("Custom batch size should be 500", 500, api.mImportMaxMessageSize); + } + + public void testCustomImportBatchSizeWithGzipConstructor() { + // Test constructor with both gzip and batch size: new MixpanelAPI(true, 500) + MixpanelAPI api = new MixpanelAPI(true, 500); + assertEquals("Custom batch size should be 500", 500, api.mImportMaxMessageSize); + assertTrue("Gzip compression should be enabled", api.mUseGzipCompression); + } + + public void testImportBatchSizeClamping() { + // Test that batch size is clamped between 1 and 2000 + MixpanelAPI apiTooSmall = new MixpanelAPI(0); + assertEquals("Batch size below 1 should be clamped to 1", 1, apiTooSmall.mImportMaxMessageSize); + + MixpanelAPI apiTooLarge = new MixpanelAPI(3000); + assertEquals("Batch size above 2000 should be clamped to 2000", 2000, apiTooLarge.mImportMaxMessageSize); + + MixpanelAPI apiValid = new MixpanelAPI(1000); + assertEquals("Batch size within range should not be clamped", 1000, apiValid.mImportMaxMessageSize); + } + + public void testImportMessagesWithCustomBatchSize() { + // Test that import messages are sent in batches of the custom size + final List batchSizes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url", false, 5, null, null) { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { + // Parse the JSON array to count messages in this batch + JSONArray array = new JSONArray(dataString); + batchSizes.add(array.length()); + return true; + } + }; + + try { + ClientDelivery delivery = new ClientDelivery(); + + // Create 13 import messages (should be sent as batches of 5, 5, 3) + MessageBuilder builder = new MessageBuilder("a token"); + for (int i = 0; i < 13; i++) { + Map props = new HashMap(); + props.put("time", System.currentTimeMillis()); + props.put("$insert_id", "id-" + i); + props.put("index", i); + JSONObject importEvent = builder.importEvent("user-" + i, "test event", new JSONObject(props)); + delivery.addMessage(importEvent); + } + + api.deliver(delivery); + + assertEquals("Should have 3 batches", 3, batchSizes.size()); + assertEquals("First batch should have 5 messages", 5, batchSizes.get(0).intValue()); + assertEquals("Second batch should have 5 messages", 5, batchSizes.get(1).intValue()); + assertEquals("Third batch should have 3 messages", 3, batchSizes.get(2).intValue()); + + } catch (IOException e) { + fail("IOException during custom batch size test: " + e.toString()); + } catch (JSONException e) { + fail("JSONException during custom batch size test: " + e.toString()); + } + } + } From 91f7fc8d8d41bac49f0b7097bb4c0691963a454e Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 01:39:42 +0100 Subject: [PATCH 02/20] Add HTTP response logging for import endpoint errors Capture and include HTTP status code and response body in import endpoint exceptions for better error diagnostics. --- .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index b8eb4ff..7f31484 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -50,6 +50,10 @@ public class MixpanelAPI implements AutoCloseable { protected final RemoteFlagsProvider mRemoteFlags; protected final int mImportMaxMessageSize; + // Track the last response from import endpoint for error logging + protected String mLastResponseBody; + protected int mLastStatusCode; + /** * Constructs a MixpanelAPI object associated with the production, Mixpanel services. */ @@ -417,7 +421,9 @@ private void sendImportMessages(List messages, String endpointUrl) t boolean accepted = sendImportData(messagesString, endpointUrl, token); if (! accepted) { - throw new MixpanelServerException("Server refused to accept import messages, they may be malformed.", batch); + String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; + int status = mLastStatusCode; + throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch); } } } @@ -516,7 +522,9 @@ private String dataString(List messages) { InputStream errorStream = conn.getErrorStream(); if (errorStream != null) { try { - slurp(errorStream); + String errorResponse = slurp(errorStream); + mLastResponseBody = errorResponse; + mLastStatusCode = conn.getResponseCode(); errorStream.close(); // Return false to indicate rejection, which will throw MixpanelServerException return false; @@ -538,6 +546,8 @@ private String dataString(List messages) { // Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N} if (response == null) { + mLastResponseBody = null; + mLastStatusCode = 0; return false; } @@ -549,9 +559,19 @@ private String dataString(List messages) { boolean statusOk = jsonResponse.has("status") && "OK".equals(jsonResponse.getString("status")); boolean codeOk = jsonResponse.has("code") && jsonResponse.getInt("code") == 200; - return statusOk && codeOk; + if (statusOk && codeOk) { + mLastResponseBody = response; + mLastStatusCode = 200; + return true; + } else { + mLastResponseBody = response; + mLastStatusCode = jsonResponse.has("code") ? jsonResponse.getInt("code") : 0; + return false; + } } catch (JSONException e) { // Not valid JSON or missing expected fields + mLastResponseBody = response; + mLastStatusCode = 0; return false; } } From 1c3cb2da08a4ace6b71f09e21a62a10d12cacbcb Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 01:55:58 +0100 Subject: [PATCH 03/20] Deprecate trackCharge() method - now logs error instead of tracking revenue Deprecate the trackCharge() method following the same pattern as the JavaScript SDK. The method now only logs an error to stderr and returns null instead of creating profile properties. The old version of Mixpanel's Revenue analysis UI has been replaced by a newer suite of analysis tools which don't depend on profile properties. See https://docs.mixpanel.com/docs/features/revenue_analytics for more information. BREAKING CHANGE: Existing code calling trackCharge() will still compile and run, but the method will no longer have any effect and will log an error message. Customers using the old Revenue Report should migrate to alternative revenue tracking approaches. Reach out to support@mixpanel.com for help. - Mark trackCharge(String, double, JSONObject) as @Deprecated - Mark trackCharge(String, double, JSONObject, JSONObject) as @Deprecated - Both methods now log deprecation error to System.err - Both methods return null instead of creating profile updates - Update tests to verify null return value - Update demo to show deprecated behavior --- .../mixpanelapi/demo/MixpanelAPIDemo.java | 8 +- .../mixpanel/mixpanelapi/MessageBuilder.java | 97 ++++++++----------- .../mixpanel/mixpanelapi/MixpanelAPITest.java | 6 +- 3 files changed, 51 insertions(+), 60 deletions(-) diff --git a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java index 38dcc4a..8cf4670 100644 --- a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java +++ b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java @@ -126,9 +126,13 @@ public static void main(String[] args) JSONObject nameMessage = messageBuilder.set(distinctId, nameProps); messages.add(nameMessage); - // Charge the user $2.50 for using the program :) + // Demonstrate deprecated trackCharge method (now logs error instead of tracking revenue) + System.out.println("\n=== Demonstrating deprecated trackCharge method ==="); JSONObject transactionMessage = messageBuilder.trackCharge(distinctId, 2.50, null); - messages.add(transactionMessage); + if (transactionMessage != null) { + messages.add(transactionMessage); + } + System.out.println("trackCharge() returns null and logs an error to stderr\n"); // Import a historical event (30 days ago) with explicit time and $insert_id long thirtyDaysAgo = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L); diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index 45ed07d..7e86c97 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -1,12 +1,7 @@ package com.mixpanel.mixpanelapi; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.Collection; -import java.util.Date; -import java.util.Iterator; import java.util.Map; -import java.util.TimeZone; import java.util.UUID; import org.json.JSONArray; @@ -22,8 +17,6 @@ */ public class MessageBuilder { - private static final String ENGAGE_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss"; - private final String mToken; public MessageBuilder(String token) { @@ -465,55 +458,6 @@ public JSONObject unset(String distinctId, Collection propertyNames, JSO return peopleMessage(distinctId, "$unset", propNamesArray, modifiers); } - /** - * Tracks revenue associated with the given distinctId. - * - * @param distinctId an identifier associated with a profile - * @param amount a double revenue amount. Positive amounts represent income for your business. - * @param properties can be null. If provided, a set of properties to associate with - * the individual transaction. - * @return user profile trackCharge message for consumption by MixpanelAPI - */ - public JSONObject trackCharge(String distinctId, double amount, JSONObject properties) { - return trackCharge(distinctId, amount, properties, null); - } - - /** - * Tracks revenue associated with the given distinctId. - * - * @param distinctId an identifier associated with a profile - * @param amount a double revenue amount. Positive amounts represent income for your business. - * @param properties can be null. If provided, a set of properties to associate with - * the individual transaction. - * @param modifiers can be null. If provided, the keys and values in the object will - * be merged as modifiers associated with the update message (for example, "$time" or "$ignore_time") - * @return user profile trackCharge message for consumption by MixpanelAPI - */ - public JSONObject trackCharge(String distinctId, double amount, JSONObject properties, JSONObject modifiers) { - JSONObject transactionValue = new JSONObject(); - JSONObject appendProperties = new JSONObject(); - try { - transactionValue.put("$amount", amount); - DateFormat dateFormat = new SimpleDateFormat(ENGAGE_DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - transactionValue.put("$time", dateFormat.format(new Date())); - - if (null != properties) { - for (Iterator iter = properties.keys(); iter.hasNext();) { - String key = (String) iter.next(); - transactionValue.put(key, properties.get(key)); - } - } - - appendProperties.put("$transactions", transactionValue); - - return this.append(distinctId, appendProperties, modifiers); - } catch (JSONException e) { - e.printStackTrace(); - throw new RuntimeException("Cannot create trackCharge message", e); - } - } - /** * Formats a generic user profile message. * Use of this method requires familiarity with the underlying Mixpanel HTTP API, @@ -860,4 +804,45 @@ public JSONObject groupMessage(String groupKey, String groupId, String actionTyp } } + /** + * @deprecated The trackCharge() method is deprecated. The old version of Mixpanel's Revenue analysis UI + * has been replaced by a newer suite of analysis tools which don't depend on profile properties. + * See https://docs.mixpanel.com/docs/features/revenue_analytics for more information. + * + * This method now only logs an error and returns null. It no longer sets a profile property or produces any other change. + * + * @param distinctId an identifier associated with a profile + * @param amount a double revenue amount (deprecated - no longer used) + * @param properties properties associated with the transaction (deprecated - no longer used) + * @return null + */ + @Deprecated + public JSONObject trackCharge(String distinctId, double amount, JSONObject properties) { + System.err.println("ERROR: The trackCharge() method is deprecated and no longer functional. " + + "The old version of Mixpanel's Revenue analysis UI has been replaced by a newer suite of analysis tools. " + + "See https://docs.mixpanel.com/docs/features/revenue_analytics for more information."); + return null; + } + + /** + * @deprecated The trackCharge() method is deprecated. The old version of Mixpanel's Revenue analysis UI + * has been replaced by a newer suite of analysis tools which don't depend on profile properties. + * See https://docs.mixpanel.com/docs/features/revenue_analytics for more information. + * + * This method now only logs an error and returns null. It no longer sets a profile property or produces any other change. + * + * @param distinctId an identifier associated with a profile + * @param amount a double revenue amount (deprecated - no longer used) + * @param properties properties associated with the transaction (deprecated - no longer used) + * @param modifiers modifiers for the message (deprecated - no longer used) + * @return null + */ + @Deprecated + public JSONObject trackCharge(String distinctId, double amount, JSONObject properties, JSONObject modifiers) { + System.err.println("ERROR: The trackCharge() method is deprecated and no longer functional. " + + "The old version of Mixpanel's Revenue analysis UI has been replaced by a newer suite of analysis tools. " + + "See https://docs.mixpanel.com/docs/features/revenue_analytics for more information."); + return null; + } + } diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 67b19a0..0ce5726 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -410,8 +410,9 @@ public void testMessageFormat() { JSONObject increment = mBuilder.increment("a distinct id", increments); assertTrue(c.isValidMessage(increment)); + // Test deprecated trackCharge method - should return null JSONObject charge = mBuilder.trackCharge("a distinct id", 100.00, mSampleProps); - assertTrue(c.isValidMessage(charge)); + assertNull("trackCharge should return null (deprecated)", charge); } public void testModifiers() { @@ -426,8 +427,9 @@ public void testModifiers() { JSONObject append = mBuilder.append("a distinct id", mSampleProps, mSampleModifiers); checkModifiers(append); + // Test deprecated trackCharge method - should return null JSONObject trackCharge = mBuilder.trackCharge("a distinct id", 2.2, null, mSampleModifiers); - checkModifiers(trackCharge); + assertNull("trackCharge should return null (deprecated)", trackCharge); } public void testEmptyMessageFormat() { From 003f8b2af367b9f2c61be33ca677719bbfd7c7d4 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 02:07:15 +0100 Subject: [PATCH 04/20] Support decimal increments in increment() method Update increment() method to accept Map instead of Map enabling decimal and mixed numeric increments. Changes: - Replace Map with Map in both increment() overloads - Enables decimal increments like 4.5, 2.7, 10.50 - Maintains backward compatibility - existing code continues to work - Add testIncrementWithDecimals() test verifying decimal support - Update all existing test calls to use Map --- .../mixpanel/mixpanelapi/MessageBuilder.java | 30 ++++++-------- .../mixpanel/mixpanelapi/MixpanelAPITest.java | 40 ++++++++++++++++--- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index 7e86c97..c5f8f2b 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -1,6 +1,7 @@ package com.mixpanel.mixpanelapi; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -291,13 +292,16 @@ public JSONObject delete(String distinctId, JSONObject modifiers) { /** * For each key and value in the properties argument, adds that amount * to the associated property in the profile with the given distinct id. + * Supports both integer (Long, Integer) and decimal (Double, Float) increments. + * * So, to maintain a login count for user 12345, one might run the following code * at every login: *
      * {@code
-     *    Map updates = new HashMap();
-     *    updates.put('Logins', 1);
-     *    JSONObject message = messageBuilder.set("12345", updates);
+     *    Map updates = new HashMap();
+     *    updates.put("Logins", 1L);
+     *    updates.put("Rating", 4.5);  // decimal value
+     *    JSONObject message = messageBuilder.increment("12345", updates);
      *    mixpanelApi.sendMessage(message);
      * }
      * 
@@ -305,37 +309,29 @@ public JSONObject delete(String distinctId, JSONObject modifiers) { * for example, a user id of an app, or the hostname of a server. If no profile * exists for the given id, a new one will be created. * @param properties a collection of properties to change on the associated profile, - * each associated with a numeric value. + * each associated with a numeric value (Long, Integer, Double, Float, etc.) * @return user profile increment message for consumption by MixpanelAPI */ - public JSONObject increment(String distinctId, Map properties) { + public JSONObject increment(String distinctId, Map properties) { return increment(distinctId, properties, null); } /** * For each key and value in the properties argument, adds that amount * to the associated property in the profile with the given distinct id. - * So, to maintain a login count for user 12345, one might run the following code - * at every login: - *
-     * {@code
-     *    Map updates = new HashMap();
-     *    updates.put('Logins', 1);
-     *    JSONObject message = messageBuilder.set("12345", updates);
-     *    mixpanelApi.sendMessage(message);
-     * }
-     * 
+ * Supports both integer (Long, Integer) and decimal (Double, Float) increments. + * * @param distinctId a string uniquely identifying the profile to change, * for example, a user id of an app, or the hostname of a server. If no profile * exists for the given id, a new one will be created. * @param properties a collection of properties to change on the associated profile, - * each associated with a numeric value. + * each associated with a numeric value (Long, Integer, Double, Float, etc.) * @param modifiers Modifiers associated with the update message. (for example "$time" or "$ignore_time"). * this can be null- if non-null, the keys and values in the modifiers * object will be associated directly with the update. * @return user profile increment message for consumption by MixpanelAPI */ - public JSONObject increment(String distinctId, Map properties, JSONObject modifiers) { + public JSONObject increment(String distinctId, Map properties, JSONObject modifiers) { JSONObject jsonProperties = new JSONObject(properties); return peopleMessage(distinctId, "$add", jsonProperties, modifiers); } diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 0ce5726..67d1216 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -87,7 +87,7 @@ public boolean sendData(String dataString, String endpointUrl) { JSONObject set = mBuilder.set("a distinct id", mSampleProps); c.addMessage(set); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); c.addMessage(increment); @@ -148,7 +148,7 @@ public void testPeopleMessageBuilds() } { - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("k1", 10L); increments.put("k2", 1L); JSONObject increment = mBuilder.increment("a distinct id", increments, mSampleModifiers); @@ -159,7 +159,7 @@ public void testPeopleMessageBuilds() } { - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("k1", 10L); increments.put("k2", 1L); JSONObject increment = mBuilder.increment("a distinct id", increments); @@ -405,7 +405,7 @@ public void testMessageFormat() { JSONObject set = mBuilder.set("a distinct id", mSampleProps); assertTrue(c.isValidMessage(set)); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); assertTrue(c.isValidMessage(increment)); @@ -419,7 +419,7 @@ public void testModifiers() { JSONObject set = mBuilder.set("a distinct id", mSampleProps, mSampleModifiers); checkModifiers(set); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments, mSampleModifiers); checkModifiers(increment); @@ -432,6 +432,34 @@ public void testModifiers() { assertNull("trackCharge should return null (deprecated)", trackCharge); } + public void testIncrementWithDecimals() { + // Test that increment() supports decimal values (not just integers) + ClientDelivery c = new ClientDelivery(); + + // Test with Double values + Map decimals = new HashMap(); + decimals.put("rating", 4.5); + decimals.put("score", 2.7); + decimals.put("cost", 10.50); + JSONObject incrementDecimals = mBuilder.increment("a distinct id", decimals); + assertTrue("increment with decimals should be valid", c.isValidMessage(incrementDecimals)); + + // Test with mixed numeric types (Double, Integer, Long) + Map mixed = new HashMap(); + mixed.put("double_value", 3.14); + mixed.put("int_value", 5); + mixed.put("long_value", 100L); + JSONObject incrementMixed = mBuilder.increment("a distinct id", mixed); + assertTrue("increment with mixed numeric types should be valid", c.isValidMessage(incrementMixed)); + + // Test with modifiers + Map decimalsWithMods = new HashMap(); + decimalsWithMods.put("rating", 1.5); + JSONObject incrementWithMods = mBuilder.increment("a distinct id", decimalsWithMods, mSampleModifiers); + assertTrue("increment with decimals and modifiers should be valid", c.isValidMessage(incrementWithMods)); + checkModifiers(incrementWithMods); + } + public void testEmptyMessageFormat() { ClientDelivery c = new ClientDelivery(); JSONObject eventMessage = mBuilder.event("a distinct id", "empty event", null); @@ -473,7 +501,7 @@ public void testClientDelivery() { c.addMessage(groupSet); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); c.addMessage(increment); From 96ae95ea3a68d8828c7d37a406752163589db67e Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 02:40:19 +0100 Subject: [PATCH 05/20] Add customizable connection and read timeouts Allow per-instance customization of connection and read timeouts for HTTP requests, enabling better support for high-latency regions and slow networks. --- .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 7f31484..e65b9b6 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -41,6 +41,10 @@ public class MixpanelAPI implements AutoCloseable { private static final int CONNECT_TIMEOUT_MILLIS = 2000; private static final int READ_TIMEOUT_MILLIS = 10000; + // Instance fields for customizable timeouts (per-instance control) + private int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; + private int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; + protected final String mEventsEndpoint; protected final String mPeopleEndpoint; protected final String mGroupsEndpoint; @@ -304,8 +308,8 @@ protected String encodeDataString(String dataString) { /* package */ boolean sendData(String dataString, String endpointUrl) throws IOException { URL endpoint = new URL(endpointUrl); URLConnection conn = endpoint.openConnection(); - conn.setReadTimeout(READ_TIMEOUT_MILLIS); - conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + conn.setReadTimeout(mReadTimeoutMillis); + conn.setConnectTimeout(mConnectTimeoutMillis); conn.setDoOutput(true); byte[] dataToSend; @@ -454,8 +458,8 @@ private String dataString(List messages) { /* package */ boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { URL endpoint = new URL(endpointUrl); HttpURLConnection conn = (HttpURLConnection) endpoint.openConnection(); - conn.setReadTimeout(READ_TIMEOUT_MILLIS); - conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + conn.setReadTimeout(mReadTimeoutMillis); + conn.setConnectTimeout(mConnectTimeoutMillis); conn.setDoOutput(true); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); @@ -627,6 +631,49 @@ private static EventSender createEventSender(BaseFlagsConfig config, MixpanelAPI }; } + /** + * Sets the connection timeout for HTTP requests. + * + * Default is 2000 milliseconds (2 seconds). You may need to increase this for high-latency regions. + * This should be called before calling any deliver() or sendMessage(). + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.setConnectTimeout(5000); // 5 seconds for slow regions + * api.deliver(delivery); + * + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 + */ + public void setConnectTimeout(int timeoutMillis) { + if (timeoutMillis <= 0) { + throw new IllegalArgumentException("Connect timeout must be > 0"); + } + this.mConnectTimeoutMillis = timeoutMillis; + } + + /** + * Sets the read timeout for HTTP requests. + * + * Default is 10000 milliseconds (10 seconds). You may need to increase this for high-latency regions + * or when processing large batches of events. + * This should be called before calling any deliver() or sendMessage(). + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.setReadTimeout(15000); // 15 seconds for slow regions + * api.deliver(delivery); + * + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 + */ + public void setReadTimeout(int timeoutMillis) { + if (timeoutMillis <= 0) { + throw new IllegalArgumentException("Read timeout must be > 0"); + } + this.mReadTimeoutMillis = timeoutMillis; + } + /** * Closes this MixpanelAPI instance and releases any resources held by the flags providers. * This method should be called when the MixpanelAPI instance is no longer needed. From b2dde50d8364ef4f40105ea8b6d6d12a973518e7 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 22:16:02 +0100 Subject: [PATCH 06/20] Add payload chunking for 413 errors and improve tests Introduces PayloadChunker to split large JSON payloads when a 413 Payload Too Large error is received from the server. Updates MixpanelAPI to retry sending chunked payloads for both /track and /import endpoints, with configurable size limits. Refactors sendData to return HTTP status codes, and enhances test coverage for chunking logic and 413 error handling. --- .../java/com/mixpanel/mixpanelapi/Config.java | 8 + .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 152 ++++++++- .../mixpanel/mixpanelapi/PayloadChunker.java | 107 +++++++ .../mixpanel/mixpanelapi/MixpanelAPITest.java | 296 +++++++++++++++++- 4 files changed, 539 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java diff --git a/src/main/java/com/mixpanel/mixpanelapi/Config.java b/src/main/java/com/mixpanel/mixpanelapi/Config.java index 50b8bc7..7f0a42f 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/Config.java +++ b/src/main/java/com/mixpanel/mixpanelapi/Config.java @@ -4,4 +4,12 @@ public static final String BASE_ENDPOINT = "https://api.mixpanel.com"; public static final int MAX_MESSAGE_SIZE = 50; public static final int IMPORT_MAX_MESSAGE_SIZE = 2000; + + // Payload size limits for different endpoints + // When a 413 Payload Too Large error is received, payloads are chunked to these limits + public static final int IMPORT_MAX_PAYLOAD_BYTES = 10 * 1024 * 1024; // 10 MB + public static final int TRACK_MAX_PAYLOAD_BYTES = 1 * 1024 * 1024; // 1 MB + + // HTTP status codes + public static final int HTTP_413_PAYLOAD_TOO_LARGE = 413; } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index e65b9b6..dd1c8c6 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -9,6 +9,7 @@ import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; +import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPOutputStream; import org.json.JSONArray; @@ -303,9 +304,29 @@ protected String encodeDataString(String dataString) { } /** - * Package scope for mocking purposes + * Container for HTTP response information including status code. + * Used to communicate both success/failure and the specific HTTP status code. */ - /* package */ boolean sendData(String dataString, String endpointUrl) throws IOException { + /* package */ static class HttpStatusResponse { + public final boolean success; + public final int statusCode; + + public HttpStatusResponse(boolean success, int statusCode) { + this.success = success; + this.statusCode = statusCode; + } + } + + /** + * Package scope for mocking purposes. + * + * Sends data to an endpoint and returns both success status and HTTP status code. + * This allows callers to detect specific error conditions like 413 Payload Too Large. + * + * When a 413 error is received, the caller should split the payload into smaller chunks + * using PayloadChunker and retry each chunk individually. + */ + /* package */ HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { URL endpoint = new URL(endpointUrl); URLConnection conn = endpoint.openConnection(); conn.setReadTimeout(mReadTimeoutMillis); @@ -360,6 +381,17 @@ protected String encodeDataString(String dataString) { } } + // For HttpURLConnection, we need to handle status codes + int statusCode = 0; + if (conn instanceof HttpURLConnection) { + try { + statusCode = ((HttpURLConnection) conn).getResponseCode(); + } catch (IOException e) { + // If we can't get the status code, return failure + return new HttpStatusResponse(false, 0); + } + } + InputStream responseStream = null; String response = null; try { @@ -375,7 +407,8 @@ protected String encodeDataString(String dataString) { } } - return ((response != null) && response.equals("1")); + boolean accepted = ((response != null) && response.equals("1")); + return new HttpStatusResponse(accepted, statusCode); } private void sendMessages(List messages, String endpointUrl) throws IOException { @@ -386,12 +419,59 @@ private void sendMessages(List messages, String endpointUrl) throws if (batch.size() > 0) { String messagesString = dataString(batch); - boolean accepted = sendData(messagesString, endpointUrl); + HttpStatusResponse response = sendData(messagesString, endpointUrl); + + if (!response.success) { + // Check if we got a 413 Payload Too Large error + if (response.statusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE) { + // Retry with chunked payloads (only once) + sendMessagesChunked(batch, endpointUrl); + } else { + throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch); + } + } + } + } + } - if (! accepted) { - throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch); + /** + * Sends messages by chunking the payload to handle 413 Payload Too Large errors. + * This is a retry mechanism that only happens once when the initial send returns 413. + * + * The messages are split into smaller chunks using PayloadChunker, with each chunk + * sized to be under the track endpoint's limit (1 MB). Each chunk is sent independently, + * and we do NOT retry chunked sends that fail - only the initial payload gets one retry. + * + * @param batch the batch of messages to send in chunks + * @param endpointUrl the endpoint URL + * @throws IOException if there's a network error + * @throws MixpanelServerException if any chunk is rejected by the server + */ + private void sendMessagesChunked(List batch, String endpointUrl) throws IOException { + String originalPayload = dataString(batch); + + try { + // Split the payload into chunks under the track endpoint's 1MB limit + // Size limits are based on uncompressed data (server limits apply to uncompressed payloads) + List chunks = PayloadChunker.chunkJsonArray(originalPayload, Config.TRACK_MAX_PAYLOAD_BYTES); + + for (String chunk : chunks) { + HttpStatusResponse response = sendData(chunk, endpointUrl); + + if (!response.success) { + // Parse the chunk back into messages for the error response + JSONArray chunkArray = new JSONArray(chunk); + List chunkMessages = new ArrayList<>(); + for (int i = 0; i < chunkArray.length(); i++) { + chunkMessages.add(chunkArray.getJSONObject(i)); + } + throw new MixpanelServerException("Server refused to accept chunked messages, they may be malformed. HTTP " + response.statusCode, chunkMessages); } } + } catch (JSONException e) { + throw new MixpanelServerException("Failed to chunk messages due to JSON error", batch); + } catch (UnsupportedEncodingException e) { + throw new MixpanelServerException("Failed to chunk messages due to encoding error", batch); } } @@ -424,7 +504,10 @@ private void sendImportMessages(List messages, String endpointUrl) t String messagesString = dataString(batch); boolean accepted = sendImportData(messagesString, endpointUrl, token); - if (! accepted) { + if (!accepted && mLastStatusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE) { + // Retry with chunked payloads (only once) for 413 errors + sendImportMessagesChunked(batch, endpointUrl, token); + } else if (!accepted) { String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; int status = mLastStatusCode; throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch); @@ -433,6 +516,50 @@ private void sendImportMessages(List messages, String endpointUrl) t } } + /** + * Sends import messages by chunking the payload to handle 413 Payload Too Large errors. + * This is a retry mechanism that only happens once when the initial send returns 413. + * + * The messages are split into smaller chunks using PayloadChunker, with each chunk + * sized to be under the import endpoint's limit (10 MB). Each chunk is sent independently, + * and we do NOT retry chunked sends that fail - only the initial payload gets one retry. + * + * @param batch the batch of messages to send in chunks + * @param endpointUrl the endpoint URL + * @param token the authentication token + * @throws IOException if there's a network error + * @throws MixpanelServerException if any chunk is rejected by the server + */ + private void sendImportMessagesChunked(List batch, String endpointUrl, String token) throws IOException { + String originalPayload = dataString(batch); + + try { + // Split the payload into chunks under the import endpoint's 10MB limit + // Size limits are based on uncompressed data (server limits apply to uncompressed payloads) + List chunks = PayloadChunker.chunkJsonArray(originalPayload, Config.IMPORT_MAX_PAYLOAD_BYTES); + + for (String chunk : chunks) { + boolean accepted = sendImportData(chunk, endpointUrl, token); + + if (!accepted) { + // Parse the chunk back into messages for the error response + JSONArray chunkArray = new JSONArray(chunk); + List chunkMessages = new ArrayList<>(); + for (int i = 0; i < chunkArray.length(); i++) { + chunkMessages.add(chunkArray.getJSONObject(i)); + } + String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; + int status = mLastStatusCode; + throw new MixpanelServerException("Server refused to accept chunked import messages, they may be malformed. HTTP " + status + " Response: " + respBody, chunkMessages); + } + } + } catch (JSONException e) { + throw new MixpanelServerException("Failed to chunk import messages due to JSON error", batch); + } catch (UnsupportedEncodingException e) { + throw new MixpanelServerException("Failed to chunk import messages due to encoding error", batch); + } + } + private String dataString(List messages) { JSONArray array = new JSONArray(); for (JSONObject message:messages) { @@ -448,6 +575,10 @@ private String dataString(List messages) { * - JSON content type (not URL-encoded like /track) * - Basic authentication with token as username and empty password * - strict=1 parameter for validation + * + * When a 413 Payload Too Large error is received, the caller should split the payload + * into smaller chunks using PayloadChunker and retry each chunk individually. + * This method will store the 413 status code in mLastStatusCode for the caller to detect. * * @param dataString JSON array of events to import * @param endpointUrl The import endpoint URL @@ -521,16 +652,17 @@ private String dataString(List messages) { responseStream = conn.getInputStream(); response = slurp(responseStream); } catch (IOException e) { - // HTTP error codes (401, 400, etc.) throw IOException when calling getInputStream() + // HTTP error codes (401, 400, 413, etc.) throw IOException when calling getInputStream() // Check if it's an HTTP error and read the error stream for details + int statusCode = conn.getResponseCode(); InputStream errorStream = conn.getErrorStream(); if (errorStream != null) { try { String errorResponse = slurp(errorStream); mLastResponseBody = errorResponse; - mLastStatusCode = conn.getResponseCode(); + mLastStatusCode = statusCode; errorStream.close(); - // Return false to indicate rejection, which will throw MixpanelServerException + // Return false to indicate rejection, which will allow caller to check status code return false; } catch (IOException ignored) { // If we can't read the error stream, just let the original exception propagate diff --git a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java new file mode 100644 index 0000000..4bcc35f --- /dev/null +++ b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java @@ -0,0 +1,107 @@ +package com.mixpanel.mixpanelapi; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + +import org.json.JSONArray; +import org.json.JSONException; + +/** + * Utility class for safely chunking JSON array payloads while maintaining data integrity. + * + * This class is used to split large payloads into smaller chunks when the server returns a 413 + * Payload Too Large error. It ensures that: + * - The payload remains valid JSON (JSONArray format) + * - Chunk size limits are based on UNCOMPRESSED data (server limits apply to uncompressed payloads) + * - Each chunk is below the specified size limit + * + * The chunking strategy removes individual messages from the original payload and creates new + * JSONArray chunks, ensuring that the JSON structure remains valid and parseable. + */ +public class PayloadChunker { + + /** + * Splits a JSON array string into multiple chunks, each under the specified byte size limit. + * + * Size limits are calculated based on UNCOMPRESSED UTF-8 encoded data, since the server's + * payload size limits (1 MB for /track, 10 MB for /import) apply to uncompressed payloads + * before any gzip compression is applied. + * + * This method is useful when a server returns a 413 Payload Too Large error. It splits the + * original payload into smaller chunks such that each chunk's uncompressed size stays under + * the specified limit. + * + * @param jsonArrayString the JSON array string to chunk (e.g., "[{...}, {...}, ...]") + * @param maxBytesPerChunk the maximum size in bytes per chunk (uncompressed data) + * @return a list of JSON array strings, each under the size limit + * @throws JSONException if the input is not a valid JSON array + * @throws UnsupportedEncodingException if UTF-8 encoding is not supported + */ + public static List chunkJsonArray(String jsonArrayString, int maxBytesPerChunk) + throws JSONException, UnsupportedEncodingException { + + JSONArray originalArray = new JSONArray(jsonArrayString); + List chunks = new ArrayList<>(); + + if (originalArray.length() == 0) { + chunks.add("[]"); + return chunks; + } + + // If the array has only one item and it's already too large, we still need to return it + // (the server will reject it, but we can't split a single item) + if (originalArray.length() == 1) { + chunks.add(jsonArrayString); + return chunks; + } + + JSONArray currentChunk = new JSONArray(); + + for (int i = 0; i < originalArray.length(); i++) { + currentChunk.put(originalArray.get(i)); + + // Check if the current chunk would exceed the limit + // Always use uncompressed size for comparison since server limits apply to uncompressed data + int currentSize = getUncompressedPayloadSize(currentChunk.toString()); + + if (currentSize > maxBytesPerChunk && currentChunk.length() > 1) { + // Current item pushed us over the limit, so remove it and save the current chunk + currentChunk.remove(currentChunk.length() - 1); + chunks.add(currentChunk.toString()); + + // Start a new chunk with the item that caused the overflow + currentChunk = new JSONArray(); + currentChunk.put(originalArray.get(i)); + } else if (currentSize > maxBytesPerChunk && currentChunk.length() == 1) { + // Even a single item exceeds the limit - this item is too large to chunk + // Add it anyway; the server will reject it and we can't do better + chunks.add(currentChunk.toString()); + currentChunk = new JSONArray(); + } + } + + // Add the remaining items + if (currentChunk.length() > 0) { + chunks.add(currentChunk.toString()); + } + + return chunks; + } + + /** + * Calculates the uncompressed byte size of a string payload in UTF-8 encoding. + * + * This method always returns the uncompressed size because the server's payload limits + * (1 MB for /track, 10 MB for /import) apply to the uncompressed data before any + * gzip compression is applied during transmission. + * + * @param payload the string payload to measure + * @return the size in bytes (uncompressed UTF-8 encoded) + * @throws UnsupportedEncodingException if UTF-8 encoding is not supported + */ + public static int getUncompressedPayloadSize(String payload) + throws UnsupportedEncodingException { + return payload.getBytes("utf-8").length; + } +} diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 67d1216..5b03da5 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -52,6 +52,17 @@ public static Test suite() { return new TestSuite( MixpanelAPITest.class ); } + /** + * Helper method to repeat a string (Java 8 compatibility - String.repeat not available) + */ + private String repeat(String str, int count) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < count; i++) { + sb.append(str); + } + return sb.toString(); + } + @Override public void setUp() { mTimeZero = System.currentTimeMillis() / 1000; @@ -73,9 +84,9 @@ public void setUp() { MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { sawData.put(endpointUrl, dataString); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -615,9 +626,9 @@ public void testExpectedPeopleParams() { public void testEmptyDelivery() { MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { fail("Data sent when no data should be sent"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -634,9 +645,9 @@ public void testLargeDelivery() { MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { sends.add(dataString); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -676,9 +687,9 @@ public boolean sendData(String dataString, String endpointUrl) { public void testEncodeDataString(){ MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { fail("Data sent when no data should be sent"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -979,10 +990,10 @@ public void testGzipCompressionEnabled() { // Test that gzip compression is properly enabled and data is compressed MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { @Override - public boolean sendData(String dataString, String endpointUrl) throws IOException { + public HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { // This method should be called with gzip compression enabled fail("sendData should not be called directly when testing at this level"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -1009,7 +1020,7 @@ public void testGzipCompressionDataIntegrity() { MixpanelAPI apiWithGzip = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { @Override - public boolean sendData(String dataString, String endpointUrl) throws IOException { + public HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { capturedOriginalData.put(endpointUrl, dataString); // Simulate what the real sendData does with gzip @@ -1026,11 +1037,10 @@ public boolean sendData(String dataString, String endpointUrl) throws IOExceptio capturedCompressedData.put(endpointUrl, byteStream.toByteArray()); } catch (Exception e) { - throw new IOException("Compression failed", e); + fail("Error in gzip compression simulation: " + e.toString()); } } - - return true; + return new HttpStatusResponse(true, 200); } }; @@ -1216,4 +1226,262 @@ public boolean sendImportData(String dataString, String endpointUrl, String toke } } + public void test413TrackEndpointChunking() { + // Test that 413 errors on track endpoint trigger payload chunking and retry + final List sendAttempts = new ArrayList(); + final List statusCodes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { + private int attemptCount = 0; + + @Override + public HttpStatusResponse sendData(String dataString, String endpointUrl) { + sendAttempts.add(dataString); + attemptCount++; + + // First attempt returns 413 (payload too large) + // Subsequent attempts succeed + if (attemptCount == 1) { + statusCodes.add(413); + return new HttpStatusResponse(false, 413); + } else { + statusCodes.add(200); + return new HttpStatusResponse(true, 200); + } + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + // Create a large payload that would trigger 413 + // Using many moderately-sized messages + for (int i = 0; i < 100; i++) { + JSONObject props = new JSONObject(); + props.put("large_property_" + i, repeat("x", 1000)); // Large property to increase payload size + JSONObject event = mBuilder.event("user-" + i, "test event", props); + c.addMessage(event); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 413) + retry with chunks (succeeds) + assertTrue("Multiple send attempts made", sendAttempts.size() >= 2); + assertEquals("First attempt got 413", 413, statusCodes.get(0).intValue()); + assertTrue("Retry attempts succeeded", statusCodes.size() > 1); + + } catch (IOException e) { + fail("IOException during 413 track test: " + e.toString()); + } + } + + public void test413ImportEndpointChunking() { + // Test that 413 errors on import endpoint trigger payload chunking and retry + final List sendAttempts = new ArrayList(); + final List statusCodes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + private int attemptCount = 0; + + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + sendAttempts.add(dataString); + attemptCount++; + + // First attempt returns 413 (payload too large) + // Subsequent attempts succeed + if (attemptCount == 1) { + statusCodes.add(413); + mLastStatusCode = 413; + mLastResponseBody = "{\"code\": 413}"; + return false; + } else { + statusCodes.add(200); + mLastStatusCode = 200; + mLastResponseBody = "{\"code\": 200, \"status\": \"OK\"}"; + return true; + } + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + // Create a large payload for import + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + for (int i = 0; i < 50; i++) { + JSONObject props = new JSONObject(); + props.put("time", historicalTime + i); + props.put("$insert_id", "insert-id-" + i); + props.put("large_property_" + i, repeat("x", 5000)); // Large property + JSONObject importEvent = mBuilder.importEvent("user-" + i, "test event", props); + c.addMessage(importEvent); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 413) + retry with chunks (succeeds) + assertTrue("Multiple send attempts made", sendAttempts.size() >= 2); + assertEquals("First attempt got 413", 413, statusCodes.get(0).intValue()); + assertTrue("Retry attempts succeeded", statusCodes.size() > 1); + + } catch (IOException e) { + fail("IOException during 413 import test: " + e.toString()); + } + } + + public void testPayloadChunkerBasic() { + // Test basic chunking functionality + try { + JSONArray array = new JSONArray(); + for (int i = 0; i < 10; i++) { + JSONObject obj = new JSONObject(); + obj.put("id", i); + obj.put("data", "item " + i); + array.put(obj); + } + + String jsonString = array.toString(); + List chunks = PayloadChunker.chunkJsonArray(jsonString, 200); + + // Should be split into multiple chunks given the 200 byte limit + assertTrue("Payload chunked into multiple pieces", chunks.size() > 1); + + // Each chunk should be valid JSON + for (String chunk : chunks) { + JSONArray chunkArray = new JSONArray(chunk); + assertTrue("Chunk is valid JSONArray", chunkArray.length() > 0); + } + + // Total number of items should be preserved + int totalItems = 0; + for (String chunk : chunks) { + totalItems += new JSONArray(chunk).length(); + } + assertEquals("All items preserved after chunking", 10, totalItems); + + } catch (Exception e) { + fail("Unexpected error in chunker test: " + e.toString()); + } + } + + public void testPayloadChunkerWithGzip() { + // Test that chunking is based on uncompressed size + // (gzip compression is applied later during transmission, but limits apply to uncompressed data) + try { + JSONArray array = new JSONArray(); + for (int i = 0; i < 20; i++) { + JSONObject obj = new JSONObject(); + obj.put("id", i); + obj.put("content", repeat("x", 100)); // Larger content + array.put(obj); + } + + String jsonString = array.toString(); + + // All chunking is now based on uncompressed size + List chunks = PayloadChunker.chunkJsonArray(jsonString, 500); + + // Each chunk should be valid + int totalItems = 0; + for (String chunk : chunks) { + JSONArray chunkArray = new JSONArray(chunk); + assertTrue("Chunk is valid JSONArray", chunkArray.length() > 0); + totalItems += chunkArray.length(); + } + + assertEquals("All items preserved", 20, totalItems); + + } catch (Exception e) { + fail("Unexpected error in gzip chunker test: " + e.toString()); + } + } + + public void testPayloadChunkerEdgeCases() { + // Test edge cases: empty array, single item, very small limit + try { + // Empty array + List emptyChunks = PayloadChunker.chunkJsonArray("[]", 100); + assertEquals("Empty array results in one chunk", 1, emptyChunks.size()); + assertEquals("Empty chunk is valid", "[]", emptyChunks.get(0)); + + // Single item that's larger than limit (can't be split further) + JSONArray singleArray = new JSONArray(); + JSONObject large = new JSONObject(); + large.put("data", repeat("x", 500)); + singleArray.put(large); + + List singleChunks = PayloadChunker.chunkJsonArray(singleArray.toString(), 100); + assertEquals("Single oversized item still returned", 1, singleChunks.size()); + + } catch (Exception e) { + fail("Unexpected error in edge case test: " + e.toString()); + } + } + + public void testTrackEndpoint413WithoutChunking() { + // Test that non-413 errors are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { + @Override + public HttpStatusResponse sendData(String dataString, String endpointUrl) { + // Return 400 (bad request) instead of 413 + return new HttpStatusResponse(false, 400); + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + JSONObject event = mBuilder.event("user-1", "test event", mSampleProps); + c.addMessage(event); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 400 error"); + + } catch (MixpanelServerException e) { + // Expected behavior - non-413 errors should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } + } + + public void testImportEndpoint413WithoutChunking() { + // Test that non-413 import errors are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + // Return 401 (unauthorized) instead of 413 + mLastStatusCode = 401; + mLastResponseBody = "{\"code\": 401, \"status\": \"Unauthorized\"}"; + return false; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-1"); + JSONObject importEvent = mBuilder.importEvent("user-1", "test event", props); + c.addMessage(importEvent); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 401 error"); + + } catch (MixpanelServerException e) { + // Expected behavior - non-413 errors should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused") || e.getMessage().contains("401")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } catch (JSONException e) { + fail("Unexpected JSONException: " + e.toString()); + } + } + } From 9440687ec5b40879f93bde42750d6c8c9f4f8554 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sat, 1 Nov 2025 23:32:38 +0100 Subject: [PATCH 07/20] Add option to disable strict import validation Introduces the disableStrictImport() method to MixpanelAPI, allowing users to set strict=0 for import operations and bypass event validation. Updates documentation and adds tests to verify the new behavior and ensure strict mode is instance-specific. --- README.md | 8 ++ .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 29 ++++- .../mixpanel/mixpanelapi/MixpanelAPITest.java | 118 ++++++++++++++++++ 3 files changed, 154 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7735b9d..d684b05 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,14 @@ When importing large events through the `/import` endpoint, you may need to cont // Import with custom batch size (500) MixpanelAPI mixpanel = new MixpanelAPI(500); +### Disabling Strict Import Validation + +By default, the `/import` endpoint enforces strict validation (strict=1). You can disable strict validation by calling `disableStrictImport()` before delivering import messages. See the [Mixpanel Import API documentation](https://developer.mixpanel.com/reference/import-events) for more details about strict. + + MixpanelAPI mixpanel = new MixpanelAPI(); + mixpanel.disableStrictImport(); // Set strict=0 to skip validation + mixpanel.deliver(delivery); + ## Feature Flags The Mixpanel Java SDK supports feature flags with both local and remote evaluation modes. diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index dd1c8c6..cce9d56 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -45,6 +45,7 @@ public class MixpanelAPI implements AutoCloseable { // Instance fields for customizable timeouts (per-instance control) private int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; private int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; + private boolean mStrictImportMode = true; protected final String mEventsEndpoint; protected final String mPeopleEndpoint; @@ -281,7 +282,8 @@ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOExcept // Handle import messages - use strict mode and extract token for auth List importMessages = toSend.getImportMessages(); if (importMessages.size() > 0) { - String importUrl = mImportEndpoint + "?strict=1"; + String strictParam = mStrictImportMode ? "1" : "0"; + String importUrl = mImportEndpoint + "?strict=" + strictParam; sendImportMessages(importMessages, importUrl); } } @@ -806,6 +808,31 @@ public void setReadTimeout(int timeoutMillis) { this.mReadTimeoutMillis = timeoutMillis; } + /** + * Disables strict validation for import operations. + * + * By default, the /import endpoint uses strict=1 which validates each event and returns + * a 400 error if any event has issues. Correctly formed events are still ingested, and + * problematic events are returned in the response with error messages. + * + * Calling this method sets strict=0, which bypasses validation and imports all events + * regardless of their validity. This can be useful for importing data with known issues + * or when validation errors are not a concern. + * + * This should be called before calling any deliver() or sendMessage() methods. + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.disableStrictImport(); // Skip validation on import + * api.deliver(delivery); + * + * For more details on import validation, see: + * https://developer.mixpanel.com/reference/import-events + */ + public void disableStrictImport() { + this.mStrictImportMode = false; + } + /** * Closes this MixpanelAPI instance and releases any resources held by the flags providers. * This method should be called when the MixpanelAPI instance is no longer needed. diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 5b03da5..11c952c 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -1484,4 +1484,122 @@ public boolean sendImportData(String dataString, String endpointUrl, String toke } } + public void testDisableStrictImport() { + // Test that disableStrictImport() sets strict=0 in the URL + final Map capturedUrls = new HashMap(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + capturedUrls.put("endpoint", endpointUrl); + return true; + } + }; + + // By default, strict mode should be enabled (strict=1) + ClientDelivery c1 = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props1 = new JSONObject(); + props1.put("time", historicalTime); + props1.put("$insert_id", "insert-id-1"); + JSONObject importEvent1 = mBuilder.importEvent("user-1", "test event", props1); + c1.addMessage(importEvent1); + + api.deliver(c1); + + String url1 = capturedUrls.get("endpoint"); + assertTrue("Default: strict=1 in URL", url1.contains("strict=1")); + + } catch (IOException e) { + fail("IOException during first delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + + // Now disable strict mode and test again + capturedUrls.clear(); + api.disableStrictImport(); + + ClientDelivery c2 = new ClientDelivery(); + + try { + JSONObject props2 = new JSONObject(); + props2.put("time", historicalTime); + props2.put("$insert_id", "insert-id-2"); + JSONObject importEvent2 = mBuilder.importEvent("user-2", "test event 2", props2); + c2.addMessage(importEvent2); + + api.deliver(c2); + + String url2 = capturedUrls.get("endpoint"); + assertTrue("After disableStrictImport(): strict=0 in URL", url2.contains("strict=0")); + + } catch (IOException e) { + fail("IOException during second delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + + public void testMultipleInstancesHaveIndependentStrictMode() { + // Test that different MixpanelAPI instances have independent strict mode settings + final Map api1Urls = new HashMap(); + final Map api2Urls = new HashMap(); + + MixpanelAPI api1 = new MixpanelAPI("events url", "people url", "groups url", "import url1") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + api1Urls.put("endpoint", endpointUrl); + return true; + } + }; + + MixpanelAPI api2 = new MixpanelAPI("events url", "people url", "groups url", "import url2") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + api2Urls.put("endpoint", endpointUrl); + return true; + } + }; + + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + // Disable strict mode only on api1 + api1.disableStrictImport(); + + // Send import message with api1 (strict=0) + ClientDelivery c1 = new ClientDelivery(); + JSONObject props1 = new JSONObject(); + props1.put("time", historicalTime); + props1.put("$insert_id", "insert-id-1"); + JSONObject importEvent1 = mBuilder.importEvent("user-1", "test event", props1); + c1.addMessage(importEvent1); + api1.deliver(c1); + + // Send import message with api2 (strict=1, not disabled) + ClientDelivery c2 = new ClientDelivery(); + JSONObject props2 = new JSONObject(); + props2.put("time", historicalTime); + props2.put("$insert_id", "insert-id-2"); + JSONObject importEvent2 = mBuilder.importEvent("user-2", "test event", props2); + c2.addMessage(importEvent2); + api2.deliver(c2); + + // Verify api1 has strict=0 and api2 has strict=1 + String url1 = api1Urls.get("endpoint"); + String url2 = api2Urls.get("endpoint"); + + assertTrue("API1: strict=0 after disableStrictImport()", url1.contains("strict=0")); + assertTrue("API2: strict=1 by default", url2.contains("strict=1")); + + } catch (IOException e) { + fail("IOException: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + } From 1001ec8254d3bfe3068c7ace17de646441bdf9b1 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sun, 2 Nov 2025 15:13:53 +0100 Subject: [PATCH 08/20] Improving performance when retrying rejected payloads due to size + Handle 400 'request body too large' with chunked retry Extend import endpoint error handling to retry with chunked payloads when a 400 Bad Request is received with a 'request body too large' message, in addition to 413 errors. Improve PayloadChunker performance by tracking cumulative size, and add tests to verify correct chunking and error handling for 400 responses. --- .../java/com/mixpanel/mixpanelapi/Config.java | 1 + .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 24 ++++-- .../mixpanel/mixpanelapi/PayloadChunker.java | 37 +++++--- .../mixpanel/mixpanelapi/MixpanelAPITest.java | 85 +++++++++++++++++++ 4 files changed, 125 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/Config.java b/src/main/java/com/mixpanel/mixpanelapi/Config.java index 7f0a42f..0cc94e7 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/Config.java +++ b/src/main/java/com/mixpanel/mixpanelapi/Config.java @@ -11,5 +11,6 @@ public static final int TRACK_MAX_PAYLOAD_BYTES = 1 * 1024 * 1024; // 1 MB // HTTP status codes + public static final int HTTP_400_BAD_REQUEST = 400; public static final int HTTP_413_PAYLOAD_TOO_LARGE = 413; } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index cce9d56..341912c 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -506,13 +506,20 @@ private void sendImportMessages(List messages, String endpointUrl) t String messagesString = dataString(batch); boolean accepted = sendImportData(messagesString, endpointUrl, token); - if (!accepted && mLastStatusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE) { - // Retry with chunked payloads (only once) for 413 errors - sendImportMessagesChunked(batch, endpointUrl, token); - } else if (!accepted) { - String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; - int status = mLastStatusCode; - throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch); + if (!accepted) { + boolean is413 = mLastStatusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE; + boolean is400WithPayloadTooLarge = mLastStatusCode == Config.HTTP_400_BAD_REQUEST + && mLastResponseBody != null + && mLastResponseBody.contains("request body too large"); + + if (is413 || is400WithPayloadTooLarge) { + // Retry with chunked payloads (only once) for 413 or 400 with "request body too large" + sendImportMessagesChunked(batch, endpointUrl, token); + } else { + String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; + int status = mLastStatusCode; + throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch); + } } } } @@ -580,7 +587,8 @@ private String dataString(List messages) { * * When a 413 Payload Too Large error is received, the caller should split the payload * into smaller chunks using PayloadChunker and retry each chunk individually. - * This method will store the 413 status code in mLastStatusCode for the caller to detect. + * Similarly, a 400 error with "request body too large" in the response will also trigger chunking. + * This method will store the 413/400 status code in mLastStatusCode for the caller to detect. * * @param dataString JSON array of events to import * @param endpointUrl The import endpoint URL diff --git a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java index 4bcc35f..1b4dbda 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java +++ b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java @@ -32,6 +32,9 @@ public class PayloadChunker { * original payload into smaller chunks such that each chunk's uncompressed size stays under * the specified limit. * + * Performance: O(n) time complexity by tracking cumulative size instead of re-serializing + * the entire chunk after each item addition. + * * @param jsonArrayString the JSON array string to chunk (e.g., "[{...}, {...}, ...]") * @param maxBytesPerChunk the maximum size in bytes per chunk (uncompressed data) * @return a list of JSON array strings, each under the size limit @@ -57,27 +60,33 @@ public static List chunkJsonArray(String jsonArrayString, int maxBytesPe } JSONArray currentChunk = new JSONArray(); + int currentChunkSize = 2; // Account for opening "[" and closing "]" for (int i = 0; i < originalArray.length(); i++) { - currentChunk.put(originalArray.get(i)); + Object item = originalArray.get(i); + + // Calculate the size of this item when serialized + String itemString = item.toString(); + int itemSize = getUncompressedPayloadSize(itemString); + + // Account for comma separator if this isn't the first item in the chunk + int itemWithSeparator = itemSize + (currentChunk.length() > 0 ? 1 : 0); - // Check if the current chunk would exceed the limit - // Always use uncompressed size for comparison since server limits apply to uncompressed data - int currentSize = getUncompressedPayloadSize(currentChunk.toString()); + // Check if adding this item would exceed the limit + int sizeIfAdded = currentChunkSize + itemWithSeparator; - if (currentSize > maxBytesPerChunk && currentChunk.length() > 1) { - // Current item pushed us over the limit, so remove it and save the current chunk - currentChunk.remove(currentChunk.length() - 1); + if (sizeIfAdded > maxBytesPerChunk && currentChunk.length() > 0) { + // Current item would push us over the limit, so save current chunk chunks.add(currentChunk.toString()); - // Start a new chunk with the item that caused the overflow - currentChunk = new JSONArray(); - currentChunk.put(originalArray.get(i)); - } else if (currentSize > maxBytesPerChunk && currentChunk.length() == 1) { - // Even a single item exceeds the limit - this item is too large to chunk - // Add it anyway; the server will reject it and we can't do better - chunks.add(currentChunk.toString()); + // Start a new chunk with this item currentChunk = new JSONArray(); + currentChunk.put(item); + currentChunkSize = 2 + itemSize; // "[" + item + "]" + } else { + // Item fits in current chunk + currentChunk.put(item); + currentChunkSize += itemWithSeparator; } } diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index 11c952c..ad16c76 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -1484,6 +1484,91 @@ public boolean sendImportData(String dataString, String endpointUrl, String toke } } + public void test400RequestBodyTooLargeImportEndpointChunking() { + // Test that 400 errors with "request body too large" on import endpoint trigger chunking and retry + final List attemptCount = new ArrayList<>(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + attemptCount.add(1); + + // First attempt returns 400 with "request body too large" message + if (attemptCount.size() == 1) { + mLastStatusCode = 400; + mLastResponseBody = "{\"error\":\"json decode error at element idx: 1784, byte_offset: 10481924: http: request body too large\",\"status\":0}"; + return false; + } + + // Retry with chunks succeeds + mLastStatusCode = 200; + mLastResponseBody = "{\"code\":200,\"status\":\"OK\",\"num_records_imported\":5}"; + return true; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + // Create a large payload that would trigger 400 with request body too large + for (int i = 0; i < 100; i++) { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-" + i); + props.put("largeData", repeat("x", 5000)); // Add large data to each event + JSONObject importEvent = mBuilder.importEvent("user-" + i, "test event", props); + c.addMessage(importEvent); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 400) + retry with chunks (succeeds) + assertTrue("Should have tried sending at least twice", attemptCount.size() >= 2); + + } catch (IOException e) { + fail("IOException during 400 request body too large test: " + e.toString()); + } catch (JSONException e) { + fail("JSONException during 400 request body too large test: " + e.toString()); + } + } + + public void test400RequestBodyTooLargeWithoutChunking() { + // Test that 400 errors WITHOUT "request body too large" are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + // Return 400 (bad request) with a different error message (not "request body too large") + mLastStatusCode = 400; + mLastResponseBody = "{\"error\":\"invalid json format\",\"status\":0}"; + return false; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-1"); + JSONObject importEvent = mBuilder.importEvent("user-1", "test event", props); + c.addMessage(importEvent); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 400 error without 'request body too large'"); + + } catch (MixpanelServerException e) { + // Expected behavior - 400 errors without "request body too large" should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused") || e.getMessage().contains("400")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } catch (JSONException e) { + fail("Unexpected JSONException: " + e.toString()); + } + } + public void testDisableStrictImport() { // Test that disableStrictImport() sets strict=0 in the URL final Map capturedUrls = new HashMap(); From f4e397a352df2c9163cc36a9106540337b07cd1c Mon Sep 17 00:00:00 2001 From: Santi G Date: Sun, 2 Nov 2025 15:48:42 +0100 Subject: [PATCH 09/20] Document and handle Mixpanel import endpoint strict modes Updated Javadoc and in-code comments to clarify the behavior of the /import endpoint with strict=1 and strict=0 parameters. Enhanced sendImportData to explicitly handle plain text responses for strict=0 mode, returning appropriate results and status codes based on the response format. --- .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 341912c..4e49822 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -583,7 +583,17 @@ private String dataString(List messages) { * The /import endpoint requires: * - JSON content type (not URL-encoded like /track) * - Basic authentication with token as username and empty password - * - strict=1 parameter for validation + * - strict parameter (1 or 0) for validation behavior + * + * Response format depends on strict mode: + * - strict=1 (default): Returns JSON with code, status, and num_records_imported. + * Example: {"code":200,"status":"OK","num_records_imported":100} + * If there are validation errors, returns HTTP 400 with details, but correctly formed + * events are still ingested. + * + * - strict=0: Returns plain text "1" if events are imported, "0" if they are not. + * The reason for failure is unknown with strict=0 (no error details provided). + * Use strict=1 for better error information. * * When a 413 Payload Too Large error is received, the caller should split the payload * into smaller chunks using PayloadChunker and retry each chunk individually. @@ -591,9 +601,9 @@ private String dataString(List messages) { * This method will store the 413/400 status code in mLastStatusCode for the caller to detect. * * @param dataString JSON array of events to import - * @param endpointUrl The import endpoint URL + * @param endpointUrl The import endpoint URL (should include strict parameter) * @param token The project token for Basic Auth - * @return true if the server accepted the data + * @return true if the server accepted the data (plain text "1" or JSON with code 200) * @throws IOException if there's a network error */ /* package */ boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { @@ -690,14 +700,30 @@ private String dataString(List messages) { } } - // Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N} + // Import endpoint returns different formats depending on strict mode: + // - strict=1: JSON like {"code":200,"status":"OK","num_records_imported":N} + // - strict=0: Plain text "0" (not imported) or "1" (imported) if (response == null) { mLastResponseBody = null; mLastStatusCode = 0; return false; } - // Parse JSON response + // First, try to handle strict=0 response format (plain text "0" or "1") + String trimmedResponse = response.trim(); + if ("1".equals(trimmedResponse)) { + // strict=0 with successful import + mLastResponseBody = response; + mLastStatusCode = 200; + return true; + } else if ("0".equals(trimmedResponse)) { + // strict=0 with failed import (events not imported, reason unknown) + mLastResponseBody = response; + mLastStatusCode = 200; // HTTP 200 but import failed silently + return false; + } + + // Try to parse as JSON response (strict=1 format) try { JSONObject jsonResponse = new JSONObject(response); From 3a6f110f2c9c79e32ecb49dd63f4771e1cf5dc94 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sun, 9 Nov 2025 00:01:12 +0100 Subject: [PATCH 10/20] Enforce non-empty token for MessageBuilder and import Added validation to ensure the Mixpanel project token is not null or empty in MessageBuilder and when sending import messages in MixpanelAPI. This prevents accidental usage with invalid tokens and ensures proper authentication for the /import endpoint --- .../com/mixpanel/mixpanelapi/MessageBuilder.java | 12 ++++++++++++ .../java/com/mixpanel/mixpanelapi/MixpanelAPI.java | 7 ++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index c5f8f2b..c0ca8cf 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -21,9 +21,21 @@ public class MessageBuilder { private final String mToken; public MessageBuilder(String token) { + if (token == null || token.trim().isEmpty()) { + throw new IllegalArgumentException("Token cannot be null or empty"); + } mToken = token; } + /** + * Returns the token associated with this MessageBuilder. + * + * @return the project token + */ + public String getToken() { + return mToken; + } + /*** * Creates a message tracking an event, for consumption by MixpanelAPI * See: diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 4e49822..84cb088 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -479,7 +479,7 @@ private void sendMessagesChunked(List batch, String endpointUrl) thr private void sendImportMessages(List messages, String endpointUrl) throws IOException { // Extract token from first message for authentication - // If token is missing, we'll still attempt to send and let the server reject it + // Token is required for /import endpoint Basic Auth String token = ""; if (messages.size() > 0) { try { @@ -494,6 +494,11 @@ private void sendImportMessages(List messages, String endpointUrl) t // Malformed message - continue with empty token and let server reject it } } + + // Validate that we have a non-empty token for /import endpoint + if (token == null || token.trim().isEmpty()) { + throw new MixpanelServerException("Import endpoint requires a valid token in message properties", messages); + } // Send messages in batches (configurable batch size for /import, default max 2000 per batch) // If token is empty, the server will reject with 401 Unauthorized From 85ae829b342b5d7a4c089b5d7bc891e9a489c7c9 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sun, 9 Nov 2025 00:06:53 +0100 Subject: [PATCH 11/20] Update org.json dependency version Bump org.json library from version 20231013 to 20250517 to use the latest features and bug fixes. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6fee8ff..4e30da0 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ org.json json - 20231013 + 20250517 From e120a96434e05654293d0d5adb0a02c62ac0f9e5 Mon Sep 17 00:00:00 2001 From: Santi G Date: Sun, 16 Nov 2025 20:21:39 +0100 Subject: [PATCH 12/20] Add and improve JavaDoc comments for public APIs Enhanced JavaDoc comments across core classes to clarify constructor parameters, method behaviors, and field purposes. --- .../mixpanel/mixpanelapi/MessageBuilder.java | 6 ++++ .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 30 +++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index c0ca8cf..838b143 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -20,6 +20,12 @@ public class MessageBuilder { private final String mToken; + /** + * Constructs a MessageBuilder with a Mixpanel project token. + * + * @param token the Mixpanel project token (cannot be null or empty) + * @throws IllegalArgumentException if token is null or empty + */ public MessageBuilder(String token) { if (token == null || token.trim().isEmpty()) { throw new IllegalArgumentException("Token cannot be null or empty"); diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 84cb088..9677aa1 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -42,22 +42,33 @@ public class MixpanelAPI implements AutoCloseable { private static final int CONNECT_TIMEOUT_MILLIS = 2000; private static final int READ_TIMEOUT_MILLIS = 10000; - // Instance fields for customizable timeouts (per-instance control) + /** Connect timeout in milliseconds for per-instance control */ private int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; + /** Read timeout in milliseconds for per-instance control */ private int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; + /** Whether strict validation is enabled for import mode */ private boolean mStrictImportMode = true; + /** The endpoint URL for events tracking */ protected final String mEventsEndpoint; + /** The endpoint URL for people profiles */ protected final String mPeopleEndpoint; + /** The endpoint URL for group profiles */ protected final String mGroupsEndpoint; + /** The endpoint URL for import operations */ protected final String mImportEndpoint; + /** Whether gzip compression is enabled for requests */ protected final boolean mUseGzipCompression; + /** Local feature flags provider (null if not configured) */ protected final LocalFlagsProvider mLocalFlags; + /** Remote feature flags provider (null if not configured) */ protected final RemoteFlagsProvider mRemoteFlags; + /** Maximum batch size for import endpoint messages */ protected final int mImportMaxMessageSize; - // Track the last response from import endpoint for error logging + /** The last response body from the import endpoint for error logging */ protected String mLastResponseBody; + /** The HTTP status code from the last import request */ protected int mLastStatusCode; /** @@ -244,8 +255,8 @@ public void sendMessage(JSONObject message) * Sends a ClientDelivery full of messages to Mixpanel's servers. * * This call will block, possibly for a long time. - * @param toSend - * @throws IOException + * @param toSend a ClientDelivery containing a number of Mixpanel messages + * @throws IOException if an I/O error occurs while sending * @see ClientDelivery */ public void deliver(ClientDelivery toSend) throws IOException { @@ -258,7 +269,8 @@ public void deliver(ClientDelivery toSend) throws IOException { * should be called in a separate thread or in a queue consumer. * * @param toSend a ClientDelivery containing a number of Mixpanel messages - * @throws IOException + * @param useIpAddress whether to include the client IP in the tracking requests + * @throws IOException if an I/O error occurs while sending * @see ClientDelivery */ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOException { @@ -815,8 +827,8 @@ private static EventSender createEventSender(BaseFlagsConfig config, MixpanelAPI * api.setConnectTimeout(5000); // 5 seconds for slow regions * api.deliver(delivery); * - * @param timeoutMillis timeout in milliseconds (must be > 0) - * @throws IllegalArgumentException if timeoutMillis <= 0 + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 */ public void setConnectTimeout(int timeoutMillis) { if (timeoutMillis <= 0) { @@ -837,8 +849,8 @@ public void setConnectTimeout(int timeoutMillis) { * api.setReadTimeout(15000); // 15 seconds for slow regions * api.deliver(delivery); * - * @param timeoutMillis timeout in milliseconds (must be > 0) - * @throws IllegalArgumentException if timeoutMillis <= 0 + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 */ public void setReadTimeout(int timeoutMillis) { if (timeoutMillis <= 0) { From 4b4b937d0eed59ace59f1eb1d0fcaf4cbbcdb9d0 Mon Sep 17 00:00:00 2001 From: Santi Gracia Date: Tue, 18 Nov 2025 00:13:40 +0100 Subject: [PATCH 13/20] Update README.md correcting current import limit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d684b05..2268b45 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ The library supports importing historical events (events older than 5 days that ### Custom Import Batch Size -When importing large events through the `/import` endpoint, you may need to control the batch size to prevent exceeding the server's 1MB uncompressed JSON payload limit. The batch size can be configured between 1 and 2000 (default is 2000): +When importing large events through the `/import` endpoint, you may need to control the batch size to prevent exceeding the server's 10MB uncompressed JSON payload limit. The batch size can be configured between 1 and 2000 (default is 2000): // Import with default batch size (2000) MixpanelAPI mixpanel = new MixpanelAPI(); From 5b9f50c85217794786ac18b7bcc47ce5f592839a Mon Sep 17 00:00:00 2001 From: Santi Gracia Date: Tue, 18 Nov 2025 00:17:29 +0100 Subject: [PATCH 14/20] Update src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java We are now enforcing non-empty token for MessageBuilder and import before hitting the server Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 9677aa1..0215b9b 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -513,7 +513,7 @@ private void sendImportMessages(List messages, String endpointUrl) t } // Send messages in batches (configurable batch size for /import, default max 2000 per batch) - // If token is empty, the server will reject with 401 Unauthorized + // Token has been validated and is guaranteed to be non-empty for (int i = 0; i < messages.size(); i += mImportMaxMessageSize) { int endIndex = i + mImportMaxMessageSize; endIndex = Math.min(endIndex, messages.size()); From 4f4abaf1177ce7b85ce90d941cdb30eb4e6d920e Mon Sep 17 00:00:00 2001 From: Santi G Date: Tue, 18 Nov 2025 19:07:20 +0100 Subject: [PATCH 15/20] Make timeout and import mode fields thread-safe Changed mConnectTimeoutMillis, mReadTimeoutMillis, and mStrictImportMode to volatile for thread safety. Updated Javadoc for setConnectTimeout and setReadTimeout to clarify thread-safe usage and best practices for concurrent environments. --- .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 0215b9b..8e425a5 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -43,11 +43,11 @@ public class MixpanelAPI implements AutoCloseable { private static final int READ_TIMEOUT_MILLIS = 10000; /** Connect timeout in milliseconds for per-instance control */ - private int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; + private volatile int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; /** Read timeout in milliseconds for per-instance control */ - private int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; + private volatile int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; /** Whether strict validation is enabled for import mode */ - private boolean mStrictImportMode = true; + private volatile boolean mStrictImportMode = true; /** The endpoint URL for events tracking */ protected final String mEventsEndpoint; @@ -820,12 +820,20 @@ private static EventSender createEventSender(BaseFlagsConfig config, MixpanelAPI * Sets the connection timeout for HTTP requests. * * Default is 2000 milliseconds (2 seconds). You may need to increase this for high-latency regions. - * This should be called before calling any deliver() or sendMessage(). + * + * This method is thread-safe and can be called at any time, even while other threads are executing + * deliver() or sendMessage(). Each HTTP request will use the current timeout value at the moment + * it establishes a connection. + * + * Best practice: Set all timeout values during initialization before starting worker threads, + * though concurrent calls are safe. * * Example: * MixpanelAPI api = new MixpanelAPI(); * api.setConnectTimeout(5000); // 5 seconds for slow regions - * api.deliver(delivery); + * api.setReadTimeout(15000); + * // Now safe to use from multiple threads + * executorService.submit(() -> api.deliver(delivery)); * * @param timeoutMillis timeout in milliseconds (must be > 0) * @throws IllegalArgumentException if timeoutMillis <= 0 @@ -842,12 +850,20 @@ public void setConnectTimeout(int timeoutMillis) { * * Default is 10000 milliseconds (10 seconds). You may need to increase this for high-latency regions * or when processing large batches of events. - * This should be called before calling any deliver() or sendMessage(). + * + * This method is thread-safe and can be called at any time, even while other threads are executing + * deliver() or sendMessage(). Each HTTP request will use the current timeout value at the moment + * the response is being read. + * + * Best practice: Set all timeout values during initialization before starting worker threads, + * though concurrent calls are safe. * * Example: * MixpanelAPI api = new MixpanelAPI(); - * api.setReadTimeout(15000); // 15 seconds for slow regions - * api.deliver(delivery); + * api.setConnectTimeout(5000); // 5 seconds for slow regions + * api.setReadTimeout(15000); // 15 seconds for slow reads + * // Now safe to use from multiple threads + * executorService.submit(() -> api.deliver(delivery)); * * @param timeoutMillis timeout in milliseconds (must be > 0) * @throws IllegalArgumentException if timeoutMillis <= 0 From 44c97246c0390ef38e5534560666ce862de19c9d Mon Sep 17 00:00:00 2001 From: Santi G Date: Tue, 18 Nov 2025 19:10:59 +0100 Subject: [PATCH 16/20] Quick Fix for Javadoc code formatting Updated Javadoc examples --- src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 8e425a5..8fee7a6 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -833,7 +833,7 @@ private static EventSender createEventSender(BaseFlagsConfig config, MixpanelAPI * api.setConnectTimeout(5000); // 5 seconds for slow regions * api.setReadTimeout(15000); * // Now safe to use from multiple threads - * executorService.submit(() -> api.deliver(delivery)); + * executorService.submit({@code () -> api.deliver(delivery)}); * * @param timeoutMillis timeout in milliseconds (must be > 0) * @throws IllegalArgumentException if timeoutMillis <= 0 @@ -863,7 +863,7 @@ public void setConnectTimeout(int timeoutMillis) { * api.setConnectTimeout(5000); // 5 seconds for slow regions * api.setReadTimeout(15000); // 15 seconds for slow reads * // Now safe to use from multiple threads - * executorService.submit(() -> api.deliver(delivery)); + * executorService.submit({@code () -> api.deliver(delivery)}); * * @param timeoutMillis timeout in milliseconds (must be > 0) * @throws IllegalArgumentException if timeoutMillis <= 0 From e9c6381beac34722e054312509754fe0202b7c01 Mon Sep 17 00:00:00 2001 From: Santi Gracia Date: Tue, 18 Nov 2025 21:02:22 +0100 Subject: [PATCH 17/20] Update src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java [nitpick] The string "utf-8" is used in lowercase Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java index 1b4dbda..fbe3eae 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java +++ b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java @@ -111,6 +111,6 @@ public static List chunkJsonArray(String jsonArrayString, int maxBytesPe */ public static int getUncompressedPayloadSize(String payload) throws UnsupportedEncodingException { - return payload.getBytes("utf-8").length; + return payload.getBytes("UTF-8").length; } } From c4079e74e6514b3c4ae13d63034d167de0a5a1ab Mon Sep 17 00:00:00 2001 From: Santi G Date: Wed, 19 Nov 2025 00:26:05 +0100 Subject: [PATCH 18/20] Make last response fields volatile for thread safety Changed mLastResponseBody and mLastStatusCode to volatile to ensure thread-safe access to the last import response data. --- src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 8fee7a6..e3fb09d 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -67,9 +67,9 @@ public class MixpanelAPI implements AutoCloseable { protected final int mImportMaxMessageSize; /** The last response body from the import endpoint for error logging */ - protected String mLastResponseBody; + protected volatile String mLastResponseBody; /** The HTTP status code from the last import request */ - protected int mLastStatusCode; + protected volatile int mLastStatusCode; /** * Constructs a MixpanelAPI object associated with the production, Mixpanel services. From fc50a5c5d27eb9892ca1b45dc797149e23ff38b0 Mon Sep 17 00:00:00 2001 From: Santi G Date: Tue, 25 Nov 2025 12:20:04 +0100 Subject: [PATCH 19/20] Apply safety margin to payload chunk size limit Introduces a 90% safety margin to the chunk size limit in PayloadChunker to account for potential differences in serialization size between individual items and the final JSON array. This ensures that serialized chunks remain safely under server-imposed limits. --- .../com/mixpanel/mixpanelapi/PayloadChunker.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java index fbe3eae..22c1194 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java +++ b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java @@ -32,6 +32,13 @@ public class PayloadChunker { * original payload into smaller chunks such that each chunk's uncompressed size stays under * the specified limit. * + * NOTE: To ensure the final serialized chunks (via JSONArray.toString()) remain safely under + * the limit, a 90% safety margin is applied. This accounts for potential formatting differences + * between individual item serialization and the final array serialization. For example: + * - 10 MB import limit becomes an effective 9 MB limit per chunk + * - 1 MB track limit becomes an effective 900 KB limit per chunk + * This conservative approach guarantees compliance with server limits. + * * Performance: O(n) time complexity by tracking cumulative size instead of re-serializing * the entire chunk after each item addition. * @@ -44,6 +51,10 @@ public class PayloadChunker { public static List chunkJsonArray(String jsonArrayString, int maxBytesPerChunk) throws JSONException, UnsupportedEncodingException { + // Apply 90% safety margin to account for potential serialization formatting differences + // between individual items (item.toString()) and the final array (JSONArray.toString()) + int effectiveLimit = (int) (maxBytesPerChunk * 0.9); + JSONArray originalArray = new JSONArray(jsonArrayString); List chunks = new ArrayList<>(); @@ -72,10 +83,10 @@ public static List chunkJsonArray(String jsonArrayString, int maxBytesPe // Account for comma separator if this isn't the first item in the chunk int itemWithSeparator = itemSize + (currentChunk.length() > 0 ? 1 : 0); - // Check if adding this item would exceed the limit + // Check if adding this item would exceed the effective limit int sizeIfAdded = currentChunkSize + itemWithSeparator; - if (sizeIfAdded > maxBytesPerChunk && currentChunk.length() > 0) { + if (sizeIfAdded > effectiveLimit && currentChunk.length() > 0) { // Current item would push us over the limit, so save current chunk chunks.add(currentChunk.toString()); From 12152960c3c4ef3e5bc6f6f0da6fe86932dd24d5 Mon Sep 17 00:00:00 2001 From: Santi G Date: Tue, 25 Nov 2025 16:57:34 +0100 Subject: [PATCH 20/20] Improve error diagnostics for server responses Enhances MixpanelAPI to capture and log HTTP error response bodies for failed requests, and updates MixpanelServerException to include HTTP status code and response body. This provides more detailed diagnostics for server-side errors and aids debugging of malformed or rejected messages. Adding this to the README. --- README.md | 28 +++++++++ .../com/mixpanel/mixpanelapi/MixpanelAPI.java | 50 +++++++++++++--- .../mixpanelapi/MixpanelServerException.java | 57 +++++++++++++++++++ 3 files changed, 126 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 2268b45..b3363a3 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,34 @@ By default, the `/import` endpoint enforces strict validation (strict=1). You ca mixpanel.disableStrictImport(); // Set strict=0 to skip validation mixpanel.deliver(delivery); +### Error Handling + +When the Mixpanel server rejects messages, a `MixpanelServerException` is thrown. This exception provides detailed information about the error for debugging and logging: + +```java +try { + mixpanel.deliver(delivery); +} catch (MixpanelServerException e) { + // Get the HTTP status code (400, 401, 413, 500, etc.) + int statusCode = e.getHttpStatusCode(); + + // Get the raw response body from the server + String responseBody = e.getResponseBody(); + + // Get the list of messages that were rejected + List failedMessages = e.getBadDeliveryContents(); + + // The exception message includes status code and response body + System.err.println("Mixpanel error: " + e.getMessage()); +} +``` + +Common HTTP status codes: +- **400**: Bad Request - malformed messages or validation errors (in strict mode) +- **401**: Unauthorized - invalid project token +- **413**: Payload Too Large - request exceeds size limit (automatically retried with chunking) +- **500**: Internal Server Error + ## Feature Flags The Mixpanel Java SDK supports feature flags with both local and remote evaluation modes. diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index e3fb09d..1e629c7 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -318,16 +318,23 @@ protected String encodeDataString(String dataString) { } /** - * Container for HTTP response information including status code. - * Used to communicate both success/failure and the specific HTTP status code. + * Container for HTTP response information including status code and response body. + * Used to communicate both success/failure and the specific HTTP status code, + * along with any response body for error diagnostics. */ /* package */ static class HttpStatusResponse { public final boolean success; public final int statusCode; + public final String responseBody; public HttpStatusResponse(boolean success, int statusCode) { + this(success, statusCode, null); + } + + public HttpStatusResponse(boolean success, int statusCode, String responseBody) { this.success = success; this.statusCode = statusCode; + this.responseBody = responseBody; } } @@ -397,12 +404,14 @@ public HttpStatusResponse(boolean success, int statusCode) { // For HttpURLConnection, we need to handle status codes int statusCode = 0; + HttpURLConnection httpConn = null; if (conn instanceof HttpURLConnection) { + httpConn = (HttpURLConnection) conn; try { - statusCode = ((HttpURLConnection) conn).getResponseCode(); + statusCode = httpConn.getResponseCode(); } catch (IOException e) { // If we can't get the status code, return failure - return new HttpStatusResponse(false, 0); + return new HttpStatusResponse(false, 0, null); } } @@ -411,6 +420,27 @@ public HttpStatusResponse(boolean success, int statusCode) { try { responseStream = conn.getInputStream(); response = slurp(responseStream); + } catch (IOException e) { + // HTTP error codes (4xx, 5xx) throw IOException when calling getInputStream() + // Read the error stream for diagnostic details + if (httpConn != null) { + InputStream errorStream = httpConn.getErrorStream(); + if (errorStream != null) { + try { + String errorResponse = slurp(errorStream); + return new HttpStatusResponse(false, statusCode, errorResponse); + } catch (IOException ignored) { + // If we can't read error stream, continue with null response + } finally { + try { + errorStream.close(); + } catch (IOException ignored) { + // ignore + } + } + } + } + return new HttpStatusResponse(false, statusCode, null); } finally { if (responseStream != null) { try { @@ -422,7 +452,7 @@ public HttpStatusResponse(boolean success, int statusCode) { } boolean accepted = ((response != null) && response.equals("1")); - return new HttpStatusResponse(accepted, statusCode); + return new HttpStatusResponse(accepted, statusCode, response); } private void sendMessages(List messages, String endpointUrl) throws IOException { @@ -441,7 +471,8 @@ private void sendMessages(List messages, String endpointUrl) throws // Retry with chunked payloads (only once) sendMessagesChunked(batch, endpointUrl); } else { - throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch); + String respBody = response.responseBody != null ? response.responseBody : "no response body"; + throw new MixpanelServerException("Server refused to accept messages, they may be malformed. HTTP " + response.statusCode + " Response: " + respBody, batch, response.statusCode, response.responseBody); } } } @@ -479,7 +510,8 @@ private void sendMessagesChunked(List batch, String endpointUrl) thr for (int i = 0; i < chunkArray.length(); i++) { chunkMessages.add(chunkArray.getJSONObject(i)); } - throw new MixpanelServerException("Server refused to accept chunked messages, they may be malformed. HTTP " + response.statusCode, chunkMessages); + String respBody = response.responseBody != null ? response.responseBody : "no response body"; + throw new MixpanelServerException("Server refused to accept chunked messages, they may be malformed. HTTP " + response.statusCode + " Response: " + respBody, chunkMessages, response.statusCode, response.responseBody); } } } catch (JSONException e) { @@ -535,7 +567,7 @@ private void sendImportMessages(List messages, String endpointUrl) t } else { String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; int status = mLastStatusCode; - throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch); + throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch, status, mLastResponseBody); } } } @@ -576,7 +608,7 @@ private void sendImportMessagesChunked(List batch, String endpointUr } String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; int status = mLastStatusCode; - throw new MixpanelServerException("Server refused to accept chunked import messages, they may be malformed. HTTP " + status + " Response: " + respBody, chunkMessages); + throw new MixpanelServerException("Server refused to accept chunked import messages, they may be malformed. HTTP " + status + " Response: " + respBody, chunkMessages, status, mLastResponseBody); } } } catch (JSONException e) { diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java index f2b616b..0fd8a45 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java @@ -10,20 +10,77 @@ * * This exception can be thrown when messages are too large, * event times are too old to accept, the api key is invalid, etc. + * + * The exception provides access to: + * - The error message describing what went wrong + * - The list of messages that were rejected + * - The HTTP status code from the server (if available) + * - The raw response body from the server (if available) */ public class MixpanelServerException extends IOException { private static final long serialVersionUID = 8230724556897575457L; private final List mBadDelivery; + private final int mHttpStatusCode; + private final String mResponseBody; + /** + * Creates a new MixpanelServerException with a message and the rejected delivery. + * + * @param message the error message + * @param badDelivery the list of messages that were rejected + */ public MixpanelServerException(String message, List badDelivery) { + this(message, badDelivery, 0, null); + } + + /** + * Creates a new MixpanelServerException with full error details. + * + * @param message the error message + * @param badDelivery the list of messages that were rejected + * @param httpStatusCode the HTTP status code from the server (0 if not available) + * @param responseBody the raw response body from the server (null if not available) + */ + public MixpanelServerException(String message, List badDelivery, int httpStatusCode, String responseBody) { super(message); mBadDelivery = badDelivery; + mHttpStatusCode = httpStatusCode; + mResponseBody = responseBody; } + /** + * @return the list of messages that were rejected by the server + */ public List getBadDeliveryContents() { return mBadDelivery; } + /** + * Returns the HTTP status code from the server response. + * + * Common status codes: + * - 400: Bad Request (malformed messages or validation errors) + * - 401: Unauthorized (invalid token) + * - 413: Payload Too Large + * - 500: Internal Server Error + * + * @return the HTTP status code, or 0 if not available + */ + public int getHttpStatusCode() { + return mHttpStatusCode; + } + + /** + * Returns the raw response body from the server. + * This can be useful for debugging server-side validation errors, + * especially when using strict import mode which returns detailed error information. + * + * @return the response body string, or null if not available + */ + public String getResponseBody() { + return mResponseBody; + } + }