diff --git a/README.md b/README.md index 3386d8c..8a6721d 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,23 @@ Gzip compression can reduce bandwidth usage and improve performance, especially The library supports importing historical events (events older than 5 days that are not accepted using /track) via the `/import` endpoint. Project token will be used for basic auth. +### Custom Import Batch Size + +When importing large events through the `/import` endpoint, you may need to control the batch size to prevent exceeding the server's 10MB uncompressed JSON payload limit. The batch size can be configured between 1 and 2000 (default is 2000): + + // Import with default batch size (2000) + MixpanelAPI mixpanel = new MixpanelAPI(); + + // Import with custom batch size (500) + MixpanelAPI mixpanel = new MixpanelAPI(500); + +### Disabling Strict Import Validation + +By default, the `/import` endpoint enforces strict validation (strict=1). You can disable strict validation by calling `disableStrictImport()` before delivering import messages. See the [Mixpanel Import API documentation](https://developer.mixpanel.com/reference/import-events) for more details about strict. + + MixpanelAPI mixpanel = new MixpanelAPI(); + mixpanel.disableStrictImport(); // Set strict=0 to skip validation + mixpanel.deliver(delivery); ### High-Performance JSON Serialization (Optional) For applications that import large batches of events (e.g., using the `/import` endpoint), the library supports optional high-performance JSON serialization using Jackson. When Jackson is available on the classpath, the library automatically uses it for JSON serialization, providing **up to 5x performance improvement** for large batches. @@ -71,6 +88,34 @@ The performance improvement is most noticeable when: No code changes are required to benefit from this optimization - simply add the Jackson dependency to your project. +### Error Handling + +When the Mixpanel server rejects messages, a `MixpanelServerException` is thrown. This exception provides detailed information about the error for debugging and logging: + +```java +try { + mixpanel.deliver(delivery); +} catch (MixpanelServerException e) { + // Get the HTTP status code (400, 401, 413, 500, etc.) + int statusCode = e.getHttpStatusCode(); + + // Get the raw response body from the server + String responseBody = e.getResponseBody(); + + // Get the list of messages that were rejected + List failedMessages = e.getBadDeliveryContents(); + + // The exception message includes status code and response body + System.err.println("Mixpanel error: " + e.getMessage()); +} +``` + +Common HTTP status codes: +- **400**: Bad Request - malformed messages or validation errors (in strict mode) +- **401**: Unauthorized - invalid project token +- **413**: Payload Too Large - request exceeds size limit (automatically retried with chunking) +- **500**: Internal Server Error + ## Feature Flags The Mixpanel Java SDK supports feature flags with both local and remote evaluation modes. diff --git a/pom.xml b/pom.xml index d312d09..563c15e 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ org.json json - 20231013 + 20250517 diff --git a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java index 7304834..8cf4670 100644 --- a/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java +++ b/src/demo/java/com/mixpanel/mixpanelapi/demo/MixpanelAPIDemo.java @@ -32,6 +32,12 @@ public DeliveryThread(Queue messages, boolean useGzipCompression) { mUseGzipCompression = useGzipCompression; } + public DeliveryThread(Queue messages, int importBatchSize) { + mMixpanel = new MixpanelAPI(importBatchSize); + mMessageQueue = messages; + mUseGzipCompression = false; + } + @Override public void run() { try { @@ -88,10 +94,15 @@ public static void main(String[] args) throws IOException, InterruptedException { Queue messages = new ConcurrentLinkedQueue(); Queue messagesWithGzip = new ConcurrentLinkedQueue(); + Queue messagesWithCustomBatch = new ConcurrentLinkedQueue(); - // Create two delivery threads - one without gzip and one with gzip compression + // Create three delivery threads: + // 1. Default batching (50 for events, 2000 for imports) + // 2. With gzip compression (50 for events, 2000 for imports) + // 3. With custom import batch size of 500 DeliveryThread worker = new DeliveryThread(messages, false); DeliveryThread workerWithGzip = new DeliveryThread(messagesWithGzip, true); + DeliveryThread workerWithCustomBatch = new DeliveryThread(messagesWithCustomBatch, 500); MessageBuilder messageBuilder = new MessageBuilder(PROJECT_TOKEN); @@ -102,6 +113,7 @@ public static void main(String[] args) worker.start(); workerWithGzip.start(); + workerWithCustomBatch.start(); String distinctId = args[0]; BufferedReader inputLines = new BufferedReader(new InputStreamReader(System.in)); @@ -114,9 +126,13 @@ public static void main(String[] args) JSONObject nameMessage = messageBuilder.set(distinctId, nameProps); messages.add(nameMessage); - // Charge the user $2.50 for using the program :) + // Demonstrate deprecated trackCharge method (now logs error instead of tracking revenue) + System.out.println("\n=== Demonstrating deprecated trackCharge method ==="); JSONObject transactionMessage = messageBuilder.trackCharge(distinctId, 2.50, null); - messages.add(transactionMessage); + if (transactionMessage != null) { + messages.add(transactionMessage); + } + System.out.println("trackCharge() returns null and logs an error to stderr\n"); // Import a historical event (30 days ago) with explicit time and $insert_id long thirtyDaysAgo = System.currentTimeMillis() - (30L * 24L * 60L * 60L * 1000L); @@ -161,6 +177,21 @@ public static void main(String[] args) System.out.println("Added events to gzip compression queue\n"); + // Demonstrate custom import batch size + System.out.println("\n=== Demonstrating custom import batch size (500) ==="); + + // Send import events with custom batch size + long customBatchTime = System.currentTimeMillis() - (45L * 24L * 60L * 60L * 1000L); + Map customBatchProps = new HashMap(); + customBatchProps.put("time", customBatchTime); + customBatchProps.put("$insert_id", "custom-batch-" + System.currentTimeMillis()); + customBatchProps.put("Batch Size", 500); + customBatchProps.put("Event Type", "Custom Batch Size Import"); + JSONObject customBatchEvent = messageBuilder.importEvent(distinctId, "Custom Batch Size Import", new JSONObject(customBatchProps)); + messagesWithCustomBatch.add(customBatchEvent); + + System.out.println("Added import event to custom batch size queue (batch size: 500)\n"); + while((line != null) && (line.length() > 0)) { System.out.println("SENDING LINE: " + line); Map propMap = new HashMap(); @@ -177,11 +208,12 @@ public static void main(String[] args) line = inputLines.readLine(); } - while(! messages.isEmpty() || ! messagesWithGzip.isEmpty()) { + while(! messages.isEmpty() || ! messagesWithGzip.isEmpty() || ! messagesWithCustomBatch.isEmpty()) { Thread.sleep(1000); } worker.interrupt(); workerWithGzip.interrupt(); + workerWithCustomBatch.interrupt(); } } diff --git a/src/main/java/com/mixpanel/mixpanelapi/Config.java b/src/main/java/com/mixpanel/mixpanelapi/Config.java index 50b8bc7..0cc94e7 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/Config.java +++ b/src/main/java/com/mixpanel/mixpanelapi/Config.java @@ -4,4 +4,13 @@ public static final String BASE_ENDPOINT = "https://api.mixpanel.com"; public static final int MAX_MESSAGE_SIZE = 50; public static final int IMPORT_MAX_MESSAGE_SIZE = 2000; + + // Payload size limits for different endpoints + // When a 413 Payload Too Large error is received, payloads are chunked to these limits + public static final int IMPORT_MAX_PAYLOAD_BYTES = 10 * 1024 * 1024; // 10 MB + public static final int TRACK_MAX_PAYLOAD_BYTES = 1 * 1024 * 1024; // 1 MB + + // HTTP status codes + public static final int HTTP_400_BAD_REQUEST = 400; + public static final int HTTP_413_PAYLOAD_TOO_LARGE = 413; } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java index 45ed07d..838b143 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MessageBuilder.java @@ -1,12 +1,8 @@ package com.mixpanel.mixpanelapi; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.Collection; -import java.util.Date; -import java.util.Iterator; +import java.util.HashMap; import java.util.Map; -import java.util.TimeZone; import java.util.UUID; import org.json.JSONArray; @@ -22,14 +18,30 @@ */ public class MessageBuilder { - private static final String ENGAGE_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss"; - private final String mToken; + /** + * Constructs a MessageBuilder with a Mixpanel project token. + * + * @param token the Mixpanel project token (cannot be null or empty) + * @throws IllegalArgumentException if token is null or empty + */ public MessageBuilder(String token) { + if (token == null || token.trim().isEmpty()) { + throw new IllegalArgumentException("Token cannot be null or empty"); + } mToken = token; } + /** + * Returns the token associated with this MessageBuilder. + * + * @return the project token + */ + public String getToken() { + return mToken; + } + /*** * Creates a message tracking an event, for consumption by MixpanelAPI * See: @@ -298,13 +310,16 @@ public JSONObject delete(String distinctId, JSONObject modifiers) { /** * For each key and value in the properties argument, adds that amount * to the associated property in the profile with the given distinct id. + * Supports both integer (Long, Integer) and decimal (Double, Float) increments. + * * So, to maintain a login count for user 12345, one might run the following code * at every login: *
      * {@code
-     *    Map updates = new HashMap();
-     *    updates.put('Logins', 1);
-     *    JSONObject message = messageBuilder.set("12345", updates);
+     *    Map updates = new HashMap();
+     *    updates.put("Logins", 1L);
+     *    updates.put("Rating", 4.5);  // decimal value
+     *    JSONObject message = messageBuilder.increment("12345", updates);
      *    mixpanelApi.sendMessage(message);
      * }
      * 
@@ -312,37 +327,29 @@ public JSONObject delete(String distinctId, JSONObject modifiers) { * for example, a user id of an app, or the hostname of a server. If no profile * exists for the given id, a new one will be created. * @param properties a collection of properties to change on the associated profile, - * each associated with a numeric value. + * each associated with a numeric value (Long, Integer, Double, Float, etc.) * @return user profile increment message for consumption by MixpanelAPI */ - public JSONObject increment(String distinctId, Map properties) { + public JSONObject increment(String distinctId, Map properties) { return increment(distinctId, properties, null); } /** * For each key and value in the properties argument, adds that amount * to the associated property in the profile with the given distinct id. - * So, to maintain a login count for user 12345, one might run the following code - * at every login: - *
-     * {@code
-     *    Map updates = new HashMap();
-     *    updates.put('Logins', 1);
-     *    JSONObject message = messageBuilder.set("12345", updates);
-     *    mixpanelApi.sendMessage(message);
-     * }
-     * 
+ * Supports both integer (Long, Integer) and decimal (Double, Float) increments. + * * @param distinctId a string uniquely identifying the profile to change, * for example, a user id of an app, or the hostname of a server. If no profile * exists for the given id, a new one will be created. * @param properties a collection of properties to change on the associated profile, - * each associated with a numeric value. + * each associated with a numeric value (Long, Integer, Double, Float, etc.) * @param modifiers Modifiers associated with the update message. (for example "$time" or "$ignore_time"). * this can be null- if non-null, the keys and values in the modifiers * object will be associated directly with the update. * @return user profile increment message for consumption by MixpanelAPI */ - public JSONObject increment(String distinctId, Map properties, JSONObject modifiers) { + public JSONObject increment(String distinctId, Map properties, JSONObject modifiers) { JSONObject jsonProperties = new JSONObject(properties); return peopleMessage(distinctId, "$add", jsonProperties, modifiers); } @@ -465,55 +472,6 @@ public JSONObject unset(String distinctId, Collection propertyNames, JSO return peopleMessage(distinctId, "$unset", propNamesArray, modifiers); } - /** - * Tracks revenue associated with the given distinctId. - * - * @param distinctId an identifier associated with a profile - * @param amount a double revenue amount. Positive amounts represent income for your business. - * @param properties can be null. If provided, a set of properties to associate with - * the individual transaction. - * @return user profile trackCharge message for consumption by MixpanelAPI - */ - public JSONObject trackCharge(String distinctId, double amount, JSONObject properties) { - return trackCharge(distinctId, amount, properties, null); - } - - /** - * Tracks revenue associated with the given distinctId. - * - * @param distinctId an identifier associated with a profile - * @param amount a double revenue amount. Positive amounts represent income for your business. - * @param properties can be null. If provided, a set of properties to associate with - * the individual transaction. - * @param modifiers can be null. If provided, the keys and values in the object will - * be merged as modifiers associated with the update message (for example, "$time" or "$ignore_time") - * @return user profile trackCharge message for consumption by MixpanelAPI - */ - public JSONObject trackCharge(String distinctId, double amount, JSONObject properties, JSONObject modifiers) { - JSONObject transactionValue = new JSONObject(); - JSONObject appendProperties = new JSONObject(); - try { - transactionValue.put("$amount", amount); - DateFormat dateFormat = new SimpleDateFormat(ENGAGE_DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - transactionValue.put("$time", dateFormat.format(new Date())); - - if (null != properties) { - for (Iterator iter = properties.keys(); iter.hasNext();) { - String key = (String) iter.next(); - transactionValue.put(key, properties.get(key)); - } - } - - appendProperties.put("$transactions", transactionValue); - - return this.append(distinctId, appendProperties, modifiers); - } catch (JSONException e) { - e.printStackTrace(); - throw new RuntimeException("Cannot create trackCharge message", e); - } - } - /** * Formats a generic user profile message. * Use of this method requires familiarity with the underlying Mixpanel HTTP API, @@ -860,4 +818,45 @@ public JSONObject groupMessage(String groupKey, String groupId, String actionTyp } } + /** + * @deprecated The trackCharge() method is deprecated. The old version of Mixpanel's Revenue analysis UI + * has been replaced by a newer suite of analysis tools which don't depend on profile properties. + * See https://docs.mixpanel.com/docs/features/revenue_analytics for more information. + * + * This method now only logs an error and returns null. It no longer sets a profile property or produces any other change. + * + * @param distinctId an identifier associated with a profile + * @param amount a double revenue amount (deprecated - no longer used) + * @param properties properties associated with the transaction (deprecated - no longer used) + * @return null + */ + @Deprecated + public JSONObject trackCharge(String distinctId, double amount, JSONObject properties) { + System.err.println("ERROR: The trackCharge() method is deprecated and no longer functional. " + + "The old version of Mixpanel's Revenue analysis UI has been replaced by a newer suite of analysis tools. " + + "See https://docs.mixpanel.com/docs/features/revenue_analytics for more information."); + return null; + } + + /** + * @deprecated The trackCharge() method is deprecated. The old version of Mixpanel's Revenue analysis UI + * has been replaced by a newer suite of analysis tools which don't depend on profile properties. + * See https://docs.mixpanel.com/docs/features/revenue_analytics for more information. + * + * This method now only logs an error and returns null. It no longer sets a profile property or produces any other change. + * + * @param distinctId an identifier associated with a profile + * @param amount a double revenue amount (deprecated - no longer used) + * @param properties properties associated with the transaction (deprecated - no longer used) + * @param modifiers modifiers for the message (deprecated - no longer used) + * @return null + */ + @Deprecated + public JSONObject trackCharge(String distinctId, double amount, JSONObject properties, JSONObject modifiers) { + System.err.println("ERROR: The trackCharge() method is deprecated and no longer functional. " + + "The old version of Mixpanel's Revenue analysis UI has been replaced by a newer suite of analysis tools. " + + "See https://docs.mixpanel.com/docs/features/revenue_analytics for more information."); + return null; + } + } diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java index 955d4f6..a92aa69 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java @@ -9,6 +9,7 @@ import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; +import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,13 +47,34 @@ public class MixpanelAPI implements AutoCloseable { private static final int CONNECT_TIMEOUT_MILLIS = 2000; private static final int READ_TIMEOUT_MILLIS = 10000; + /** Connect timeout in milliseconds for per-instance control */ + private volatile int mConnectTimeoutMillis = CONNECT_TIMEOUT_MILLIS; + /** Read timeout in milliseconds for per-instance control */ + private volatile int mReadTimeoutMillis = READ_TIMEOUT_MILLIS; + /** Whether strict validation is enabled for import mode */ + private volatile boolean mStrictImportMode = true; + + /** The endpoint URL for events tracking */ protected final String mEventsEndpoint; + /** The endpoint URL for people profiles */ protected final String mPeopleEndpoint; + /** The endpoint URL for group profiles */ protected final String mGroupsEndpoint; + /** The endpoint URL for import operations */ protected final String mImportEndpoint; + /** Whether gzip compression is enabled for requests */ protected final boolean mUseGzipCompression; + /** Local feature flags provider (null if not configured) */ protected final LocalFlagsProvider mLocalFlags; + /** Remote feature flags provider (null if not configured) */ protected final RemoteFlagsProvider mRemoteFlags; + /** Maximum batch size for import endpoint messages */ + protected final int mImportMaxMessageSize; + + /** The last response body from the import endpoint for error logging */ + protected volatile String mLastResponseBody; + /** The HTTP status code from the last import request */ + protected volatile int mLastStatusCode; /** * Constructs a MixpanelAPI object associated with the production, Mixpanel services. @@ -67,7 +89,26 @@ public MixpanelAPI() { * @param useGzipCompression whether to use gzip compression for network requests */ public MixpanelAPI(boolean useGzipCompression) { - this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, null, null); + this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); + } + + /** + * Constructs a MixpanelAPI object associated with the production, Mixpanel services. + * + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) + */ + public MixpanelAPI(int importMaxMessageSize) { + this(false, importMaxMessageSize); + } + + /** + * Constructs a MixpanelAPI object associated with the production, Mixpanel services. + * + * @param useGzipCompression whether to use gzip compression for network requests + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) + */ + public MixpanelAPI(boolean useGzipCompression, int importMaxMessageSize) { + this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage", Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", useGzipCompression, importMaxMessageSize, null, null); } /** @@ -101,6 +142,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF mGroupsEndpoint = Config.BASE_ENDPOINT + "/groups"; mImportEndpoint = Config.BASE_ENDPOINT + "/import"; mUseGzipCompression = false; + mImportMaxMessageSize = Config.IMPORT_MAX_MESSAGE_SIZE; if (localFlagsConfig != null) { EventSender eventSender = createEventSender(localFlagsConfig, this); @@ -126,7 +168,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { - this(eventsEndpoint, peopleEndpoint, Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", false, null, null); + this(eventsEndpoint, peopleEndpoint, Config.BASE_ENDPOINT + "/groups", Config.BASE_ENDPOINT + "/import", false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -140,7 +182,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) { * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, Config.BASE_ENDPOINT + "/import", false, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, Config.BASE_ENDPOINT + "/import", false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -155,7 +197,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -171,7 +213,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @see #MixpanelAPI() */ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression) { - this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null); + this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, Config.IMPORT_MAX_MESSAGE_SIZE, null, null); } /** @@ -182,15 +224,17 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn * @param groupsEndpoint a URL that will accept Mixpanel groups messages * @param importEndpoint a URL that will accept Mixpanel import messages * @param useGzipCompression whether to use gzip compression for network requests + * @param importMaxMessageSize custom batch size for /import endpoint (must be between 1 and 2000) * @param localFlags optional LocalFlagsProvider for local feature flags (can be null) * @param remoteFlags optional RemoteFlagsProvider for remote feature flags (can be null) */ - private MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression, LocalFlagsProvider localFlags, RemoteFlagsProvider remoteFlags) { + /* package */ MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression, int importMaxMessageSize, LocalFlagsProvider localFlags, RemoteFlagsProvider remoteFlags) { mEventsEndpoint = eventsEndpoint; mPeopleEndpoint = peopleEndpoint; mGroupsEndpoint = groupsEndpoint; mImportEndpoint = importEndpoint; mUseGzipCompression = useGzipCompression; + mImportMaxMessageSize = Math.max(1, Math.min(importMaxMessageSize, 2000)); mLocalFlags = localFlags; mRemoteFlags = remoteFlags; } @@ -216,8 +260,8 @@ public void sendMessage(JSONObject message) * Sends a ClientDelivery full of messages to Mixpanel's servers. * * This call will block, possibly for a long time. - * @param toSend - * @throws IOException + * @param toSend a ClientDelivery containing a number of Mixpanel messages + * @throws IOException if an I/O error occurs while sending * @see ClientDelivery */ public void deliver(ClientDelivery toSend) throws IOException { @@ -230,7 +274,8 @@ public void deliver(ClientDelivery toSend) throws IOException { * should be called in a separate thread or in a queue consumer. * * @param toSend a ClientDelivery containing a number of Mixpanel messages - * @throws IOException + * @param useIpAddress whether to include the client IP in the tracking requests + * @throws IOException if an I/O error occurs while sending * @see ClientDelivery */ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOException { @@ -254,7 +299,8 @@ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOExcept // Handle import messages - use strict mode and extract token for auth List importMessages = toSend.getImportMessages(); if (importMessages.size() > 0) { - String importUrl = mImportEndpoint + "?strict=1"; + String strictParam = mStrictImportMode ? "1" : "0"; + String importUrl = mImportEndpoint + "?strict=" + strictParam; sendImportMessages(importMessages, importUrl); } } @@ -277,13 +323,40 @@ protected String encodeDataString(String dataString) { } /** - * Package scope for mocking purposes + * Container for HTTP response information including status code and response body. + * Used to communicate both success/failure and the specific HTTP status code, + * along with any response body for error diagnostics. */ - /* package */ boolean sendData(String dataString, String endpointUrl) throws IOException { + /* package */ static class HttpStatusResponse { + public final boolean success; + public final int statusCode; + public final String responseBody; + + public HttpStatusResponse(boolean success, int statusCode) { + this(success, statusCode, null); + } + + public HttpStatusResponse(boolean success, int statusCode, String responseBody) { + this.success = success; + this.statusCode = statusCode; + this.responseBody = responseBody; + } + } + + /** + * Package scope for mocking purposes. + * + * Sends data to an endpoint and returns both success status and HTTP status code. + * This allows callers to detect specific error conditions like 413 Payload Too Large. + * + * When a 413 error is received, the caller should split the payload into smaller chunks + * using PayloadChunker and retry each chunk individually. + */ + /* package */ HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { URL endpoint = new URL(endpointUrl); URLConnection conn = endpoint.openConnection(); - conn.setReadTimeout(READ_TIMEOUT_MILLIS); - conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + conn.setReadTimeout(mReadTimeoutMillis); + conn.setConnectTimeout(mConnectTimeoutMillis); conn.setDoOutput(true); byte[] dataToSend; @@ -334,11 +407,45 @@ protected String encodeDataString(String dataString) { } } + // For HttpURLConnection, we need to handle status codes + int statusCode = 0; + HttpURLConnection httpConn = null; + if (conn instanceof HttpURLConnection) { + httpConn = (HttpURLConnection) conn; + try { + statusCode = httpConn.getResponseCode(); + } catch (IOException e) { + // If we can't get the status code, return failure + return new HttpStatusResponse(false, 0, null); + } + } + InputStream responseStream = null; String response = null; try { responseStream = conn.getInputStream(); response = slurp(responseStream); + } catch (IOException e) { + // HTTP error codes (4xx, 5xx) throw IOException when calling getInputStream() + // Read the error stream for diagnostic details + if (httpConn != null) { + InputStream errorStream = httpConn.getErrorStream(); + if (errorStream != null) { + try { + String errorResponse = slurp(errorStream); + return new HttpStatusResponse(false, statusCode, errorResponse); + } catch (IOException ignored) { + // If we can't read error stream, continue with null response + } finally { + try { + errorStream.close(); + } catch (IOException ignored) { + // ignore + } + } + } + } + return new HttpStatusResponse(false, statusCode, null); } finally { if (responseStream != null) { try { @@ -349,7 +456,8 @@ protected String encodeDataString(String dataString) { } } - return ((response != null) && response.equals("1")); + boolean accepted = ((response != null) && response.equals("1")); + return new HttpStatusResponse(accepted, statusCode, response); } private void sendMessages(List messages, String endpointUrl) throws IOException { @@ -360,18 +468,67 @@ private void sendMessages(List messages, String endpointUrl) throws if (batch.size() > 0) { String messagesString = dataString(batch); - boolean accepted = sendData(messagesString, endpointUrl); + HttpStatusResponse response = sendData(messagesString, endpointUrl); + + if (!response.success) { + // Check if we got a 413 Payload Too Large error + if (response.statusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE) { + // Retry with chunked payloads (only once) + sendMessagesChunked(batch, endpointUrl); + } else { + String respBody = response.responseBody != null ? response.responseBody : "no response body"; + throw new MixpanelServerException("Server refused to accept messages, they may be malformed. HTTP " + response.statusCode + " Response: " + respBody, batch, response.statusCode, response.responseBody); + } + } + } + } + } - if (! accepted) { - throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch); + /** + * Sends messages by chunking the payload to handle 413 Payload Too Large errors. + * This is a retry mechanism that only happens once when the initial send returns 413. + * + * The messages are split into smaller chunks using PayloadChunker, with each chunk + * sized to be under the track endpoint's limit (1 MB). Each chunk is sent independently, + * and we do NOT retry chunked sends that fail - only the initial payload gets one retry. + * + * @param batch the batch of messages to send in chunks + * @param endpointUrl the endpoint URL + * @throws IOException if there's a network error + * @throws MixpanelServerException if any chunk is rejected by the server + */ + private void sendMessagesChunked(List batch, String endpointUrl) throws IOException { + String originalPayload = dataString(batch); + + try { + // Split the payload into chunks under the track endpoint's 1MB limit + // Size limits are based on uncompressed data (server limits apply to uncompressed payloads) + List chunks = PayloadChunker.chunkJsonArray(originalPayload, Config.TRACK_MAX_PAYLOAD_BYTES); + + for (String chunk : chunks) { + HttpStatusResponse response = sendData(chunk, endpointUrl); + + if (!response.success) { + // Parse the chunk back into messages for the error response + JSONArray chunkArray = new JSONArray(chunk); + List chunkMessages = new ArrayList<>(); + for (int i = 0; i < chunkArray.length(); i++) { + chunkMessages.add(chunkArray.getJSONObject(i)); + } + String respBody = response.responseBody != null ? response.responseBody : "no response body"; + throw new MixpanelServerException("Server refused to accept chunked messages, they may be malformed. HTTP " + response.statusCode + " Response: " + respBody, chunkMessages, response.statusCode, response.responseBody); } } + } catch (JSONException e) { + throw new MixpanelServerException("Failed to chunk messages due to JSON error", batch); + } catch (UnsupportedEncodingException e) { + throw new MixpanelServerException("Failed to chunk messages due to encoding error", batch); } } private void sendImportMessages(List messages, String endpointUrl) throws IOException { // Extract token from first message for authentication - // If token is missing, we'll still attempt to send and let the server reject it + // Token is required for /import endpoint Basic Auth String token = ""; if (messages.size() > 0) { try { @@ -386,11 +543,16 @@ private void sendImportMessages(List messages, String endpointUrl) t // Malformed message - continue with empty token and let server reject it } } + + // Validate that we have a non-empty token for /import endpoint + if (token == null || token.trim().isEmpty()) { + throw new MixpanelServerException("Import endpoint requires a valid token in message properties", messages); + } - // Send messages in batches (max 2000 per batch for /import) - // If token is empty, the server will reject with 401 Unauthorized - for (int i = 0; i < messages.size(); i += Config.IMPORT_MAX_MESSAGE_SIZE) { - int endIndex = i + Config.IMPORT_MAX_MESSAGE_SIZE; + // Send messages in batches (configurable batch size for /import, default max 2000 per batch) + // Token has been validated and is guaranteed to be non-empty + for (int i = 0; i < messages.size(); i += mImportMaxMessageSize) { + int endIndex = i + mImportMaxMessageSize; endIndex = Math.min(endIndex, messages.size()); List batch = messages.subList(i, endIndex); @@ -399,10 +561,66 @@ private void sendImportMessages(List messages, String endpointUrl) t String messagesString = dataString(batch); boolean accepted = sendImportData(messagesString, endpointUrl, token); - if (! accepted) { - throw new MixpanelServerException("Server refused to accept import messages, they may be malformed.", batch); + if (!accepted) { + boolean is413 = mLastStatusCode == Config.HTTP_413_PAYLOAD_TOO_LARGE; + boolean is400WithPayloadTooLarge = mLastStatusCode == Config.HTTP_400_BAD_REQUEST + && mLastResponseBody != null + && mLastResponseBody.contains("request body too large"); + + if (is413 || is400WithPayloadTooLarge) { + // Retry with chunked payloads (only once) for 413 or 400 with "request body too large" + sendImportMessagesChunked(batch, endpointUrl, token); + } else { + String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; + int status = mLastStatusCode; + throw new MixpanelServerException("Server refused to accept import messages, they may be malformed. HTTP " + status + " Response: " + respBody, batch, status, mLastResponseBody); + } + } + } + } + } + + /** + * Sends import messages by chunking the payload to handle 413 Payload Too Large errors. + * This is a retry mechanism that only happens once when the initial send returns 413. + * + * The messages are split into smaller chunks using PayloadChunker, with each chunk + * sized to be under the import endpoint's limit (10 MB). Each chunk is sent independently, + * and we do NOT retry chunked sends that fail - only the initial payload gets one retry. + * + * @param batch the batch of messages to send in chunks + * @param endpointUrl the endpoint URL + * @param token the authentication token + * @throws IOException if there's a network error + * @throws MixpanelServerException if any chunk is rejected by the server + */ + private void sendImportMessagesChunked(List batch, String endpointUrl, String token) throws IOException { + String originalPayload = dataString(batch); + + try { + // Split the payload into chunks under the import endpoint's 10MB limit + // Size limits are based on uncompressed data (server limits apply to uncompressed payloads) + List chunks = PayloadChunker.chunkJsonArray(originalPayload, Config.IMPORT_MAX_PAYLOAD_BYTES); + + for (String chunk : chunks) { + boolean accepted = sendImportData(chunk, endpointUrl, token); + + if (!accepted) { + // Parse the chunk back into messages for the error response + JSONArray chunkArray = new JSONArray(chunk); + List chunkMessages = new ArrayList<>(); + for (int i = 0; i < chunkArray.length(); i++) { + chunkMessages.add(chunkArray.getJSONObject(i)); + } + String respBody = mLastResponseBody != null ? mLastResponseBody : "no response body"; + int status = mLastStatusCode; + throw new MixpanelServerException("Server refused to accept chunked import messages, they may be malformed. HTTP " + status + " Response: " + respBody, chunkMessages, status, mLastResponseBody); } } + } catch (JSONException e) { + throw new MixpanelServerException("Failed to chunk import messages due to JSON error", batch); + } catch (UnsupportedEncodingException e) { + throw new MixpanelServerException("Failed to chunk import messages due to encoding error", batch); } } @@ -426,19 +644,34 @@ private String dataString(List messages) { * The /import endpoint requires: * - JSON content type (not URL-encoded like /track) * - Basic authentication with token as username and empty password - * - strict=1 parameter for validation + * - strict parameter (1 or 0) for validation behavior + * + * Response format depends on strict mode: + * - strict=1 (default): Returns JSON with code, status, and num_records_imported. + * Example: {"code":200,"status":"OK","num_records_imported":100} + * If there are validation errors, returns HTTP 400 with details, but correctly formed + * events are still ingested. + * + * - strict=0: Returns plain text "1" if events are imported, "0" if they are not. + * The reason for failure is unknown with strict=0 (no error details provided). + * Use strict=1 for better error information. + * + * When a 413 Payload Too Large error is received, the caller should split the payload + * into smaller chunks using PayloadChunker and retry each chunk individually. + * Similarly, a 400 error with "request body too large" in the response will also trigger chunking. + * This method will store the 413/400 status code in mLastStatusCode for the caller to detect. * * @param dataString JSON array of events to import - * @param endpointUrl The import endpoint URL + * @param endpointUrl The import endpoint URL (should include strict parameter) * @param token The project token for Basic Auth - * @return true if the server accepted the data + * @return true if the server accepted the data (plain text "1" or JSON with code 200) * @throws IOException if there's a network error */ /* package */ boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { URL endpoint = new URL(endpointUrl); HttpURLConnection conn = (HttpURLConnection) endpoint.openConnection(); - conn.setReadTimeout(READ_TIMEOUT_MILLIS); - conn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + conn.setReadTimeout(mReadTimeoutMillis); + conn.setConnectTimeout(mConnectTimeoutMillis); conn.setDoOutput(true); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); @@ -500,14 +733,17 @@ private String dataString(List messages) { responseStream = conn.getInputStream(); response = slurp(responseStream); } catch (IOException e) { - // HTTP error codes (401, 400, etc.) throw IOException when calling getInputStream() + // HTTP error codes (401, 400, 413, etc.) throw IOException when calling getInputStream() // Check if it's an HTTP error and read the error stream for details + int statusCode = conn.getResponseCode(); InputStream errorStream = conn.getErrorStream(); if (errorStream != null) { try { - slurp(errorStream); + String errorResponse = slurp(errorStream); + mLastResponseBody = errorResponse; + mLastStatusCode = statusCode; errorStream.close(); - // Return false to indicate rejection, which will throw MixpanelServerException + // Return false to indicate rejection, which will allow caller to check status code return false; } catch (IOException ignored) { // If we can't read the error stream, just let the original exception propagate @@ -525,12 +761,30 @@ private String dataString(List messages) { } } - // Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N} + // Import endpoint returns different formats depending on strict mode: + // - strict=1: JSON like {"code":200,"status":"OK","num_records_imported":N} + // - strict=0: Plain text "0" (not imported) or "1" (imported) if (response == null) { + mLastResponseBody = null; + mLastStatusCode = 0; + return false; + } + + // First, try to handle strict=0 response format (plain text "0" or "1") + String trimmedResponse = response.trim(); + if ("1".equals(trimmedResponse)) { + // strict=0 with successful import + mLastResponseBody = response; + mLastStatusCode = 200; + return true; + } else if ("0".equals(trimmedResponse)) { + // strict=0 with failed import (events not imported, reason unknown) + mLastResponseBody = response; + mLastStatusCode = 200; // HTTP 200 but import failed silently return false; } - // Parse JSON response + // Try to parse as JSON response (strict=1 format) try { JSONObject jsonResponse = new JSONObject(response); @@ -538,9 +792,19 @@ private String dataString(List messages) { boolean statusOk = jsonResponse.has("status") && "OK".equals(jsonResponse.getString("status")); boolean codeOk = jsonResponse.has("code") && jsonResponse.getInt("code") == 200; - return statusOk && codeOk; + if (statusOk && codeOk) { + mLastResponseBody = response; + mLastStatusCode = 200; + return true; + } else { + mLastResponseBody = response; + mLastStatusCode = jsonResponse.has("code") ? jsonResponse.getInt("code") : 0; + return false; + } } catch (JSONException e) { // Not valid JSON or missing expected fields + mLastResponseBody = response; + mLastStatusCode = 0; return false; } } @@ -596,6 +860,90 @@ private static EventSender createEventSender(BaseFlagsConfig config, MixpanelAPI }; } + /** + * Sets the connection timeout for HTTP requests. + * + * Default is 2000 milliseconds (2 seconds). You may need to increase this for high-latency regions. + * + * This method is thread-safe and can be called at any time, even while other threads are executing + * deliver() or sendMessage(). Each HTTP request will use the current timeout value at the moment + * it establishes a connection. + * + * Best practice: Set all timeout values during initialization before starting worker threads, + * though concurrent calls are safe. + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.setConnectTimeout(5000); // 5 seconds for slow regions + * api.setReadTimeout(15000); + * // Now safe to use from multiple threads + * executorService.submit({@code () -> api.deliver(delivery)}); + * + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 + */ + public void setConnectTimeout(int timeoutMillis) { + if (timeoutMillis <= 0) { + throw new IllegalArgumentException("Connect timeout must be > 0"); + } + this.mConnectTimeoutMillis = timeoutMillis; + } + + /** + * Sets the read timeout for HTTP requests. + * + * Default is 10000 milliseconds (10 seconds). You may need to increase this for high-latency regions + * or when processing large batches of events. + * + * This method is thread-safe and can be called at any time, even while other threads are executing + * deliver() or sendMessage(). Each HTTP request will use the current timeout value at the moment + * the response is being read. + * + * Best practice: Set all timeout values during initialization before starting worker threads, + * though concurrent calls are safe. + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.setConnectTimeout(5000); // 5 seconds for slow regions + * api.setReadTimeout(15000); // 15 seconds for slow reads + * // Now safe to use from multiple threads + * executorService.submit({@code () -> api.deliver(delivery)}); + * + * @param timeoutMillis timeout in milliseconds (must be > 0) + * @throws IllegalArgumentException if timeoutMillis <= 0 + */ + public void setReadTimeout(int timeoutMillis) { + if (timeoutMillis <= 0) { + throw new IllegalArgumentException("Read timeout must be > 0"); + } + this.mReadTimeoutMillis = timeoutMillis; + } + + /** + * Disables strict validation for import operations. + * + * By default, the /import endpoint uses strict=1 which validates each event and returns + * a 400 error if any event has issues. Correctly formed events are still ingested, and + * problematic events are returned in the response with error messages. + * + * Calling this method sets strict=0, which bypasses validation and imports all events + * regardless of their validity. This can be useful for importing data with known issues + * or when validation errors are not a concern. + * + * This should be called before calling any deliver() or sendMessage() methods. + * + * Example: + * MixpanelAPI api = new MixpanelAPI(); + * api.disableStrictImport(); // Skip validation on import + * api.deliver(delivery); + * + * For more details on import validation, see: + * https://developer.mixpanel.com/reference/import-events + */ + public void disableStrictImport() { + this.mStrictImportMode = false; + } + /** * Closes this MixpanelAPI instance and releases any resources held by the flags providers. * This method should be called when the MixpanelAPI instance is no longer needed. diff --git a/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java b/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java index f2b616b..0fd8a45 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java +++ b/src/main/java/com/mixpanel/mixpanelapi/MixpanelServerException.java @@ -10,20 +10,77 @@ * * This exception can be thrown when messages are too large, * event times are too old to accept, the api key is invalid, etc. + * + * The exception provides access to: + * - The error message describing what went wrong + * - The list of messages that were rejected + * - The HTTP status code from the server (if available) + * - The raw response body from the server (if available) */ public class MixpanelServerException extends IOException { private static final long serialVersionUID = 8230724556897575457L; private final List mBadDelivery; + private final int mHttpStatusCode; + private final String mResponseBody; + /** + * Creates a new MixpanelServerException with a message and the rejected delivery. + * + * @param message the error message + * @param badDelivery the list of messages that were rejected + */ public MixpanelServerException(String message, List badDelivery) { + this(message, badDelivery, 0, null); + } + + /** + * Creates a new MixpanelServerException with full error details. + * + * @param message the error message + * @param badDelivery the list of messages that were rejected + * @param httpStatusCode the HTTP status code from the server (0 if not available) + * @param responseBody the raw response body from the server (null if not available) + */ + public MixpanelServerException(String message, List badDelivery, int httpStatusCode, String responseBody) { super(message); mBadDelivery = badDelivery; + mHttpStatusCode = httpStatusCode; + mResponseBody = responseBody; } + /** + * @return the list of messages that were rejected by the server + */ public List getBadDeliveryContents() { return mBadDelivery; } + /** + * Returns the HTTP status code from the server response. + * + * Common status codes: + * - 400: Bad Request (malformed messages or validation errors) + * - 401: Unauthorized (invalid token) + * - 413: Payload Too Large + * - 500: Internal Server Error + * + * @return the HTTP status code, or 0 if not available + */ + public int getHttpStatusCode() { + return mHttpStatusCode; + } + + /** + * Returns the raw response body from the server. + * This can be useful for debugging server-side validation errors, + * especially when using strict import mode which returns detailed error information. + * + * @return the response body string, or null if not available + */ + public String getResponseBody() { + return mResponseBody; + } + } diff --git a/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java new file mode 100644 index 0000000..22c1194 --- /dev/null +++ b/src/main/java/com/mixpanel/mixpanelapi/PayloadChunker.java @@ -0,0 +1,127 @@ +package com.mixpanel.mixpanelapi; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + +import org.json.JSONArray; +import org.json.JSONException; + +/** + * Utility class for safely chunking JSON array payloads while maintaining data integrity. + * + * This class is used to split large payloads into smaller chunks when the server returns a 413 + * Payload Too Large error. It ensures that: + * - The payload remains valid JSON (JSONArray format) + * - Chunk size limits are based on UNCOMPRESSED data (server limits apply to uncompressed payloads) + * - Each chunk is below the specified size limit + * + * The chunking strategy removes individual messages from the original payload and creates new + * JSONArray chunks, ensuring that the JSON structure remains valid and parseable. + */ +public class PayloadChunker { + + /** + * Splits a JSON array string into multiple chunks, each under the specified byte size limit. + * + * Size limits are calculated based on UNCOMPRESSED UTF-8 encoded data, since the server's + * payload size limits (1 MB for /track, 10 MB for /import) apply to uncompressed payloads + * before any gzip compression is applied. + * + * This method is useful when a server returns a 413 Payload Too Large error. It splits the + * original payload into smaller chunks such that each chunk's uncompressed size stays under + * the specified limit. + * + * NOTE: To ensure the final serialized chunks (via JSONArray.toString()) remain safely under + * the limit, a 90% safety margin is applied. This accounts for potential formatting differences + * between individual item serialization and the final array serialization. For example: + * - 10 MB import limit becomes an effective 9 MB limit per chunk + * - 1 MB track limit becomes an effective 900 KB limit per chunk + * This conservative approach guarantees compliance with server limits. + * + * Performance: O(n) time complexity by tracking cumulative size instead of re-serializing + * the entire chunk after each item addition. + * + * @param jsonArrayString the JSON array string to chunk (e.g., "[{...}, {...}, ...]") + * @param maxBytesPerChunk the maximum size in bytes per chunk (uncompressed data) + * @return a list of JSON array strings, each under the size limit + * @throws JSONException if the input is not a valid JSON array + * @throws UnsupportedEncodingException if UTF-8 encoding is not supported + */ + public static List chunkJsonArray(String jsonArrayString, int maxBytesPerChunk) + throws JSONException, UnsupportedEncodingException { + + // Apply 90% safety margin to account for potential serialization formatting differences + // between individual items (item.toString()) and the final array (JSONArray.toString()) + int effectiveLimit = (int) (maxBytesPerChunk * 0.9); + + JSONArray originalArray = new JSONArray(jsonArrayString); + List chunks = new ArrayList<>(); + + if (originalArray.length() == 0) { + chunks.add("[]"); + return chunks; + } + + // If the array has only one item and it's already too large, we still need to return it + // (the server will reject it, but we can't split a single item) + if (originalArray.length() == 1) { + chunks.add(jsonArrayString); + return chunks; + } + + JSONArray currentChunk = new JSONArray(); + int currentChunkSize = 2; // Account for opening "[" and closing "]" + + for (int i = 0; i < originalArray.length(); i++) { + Object item = originalArray.get(i); + + // Calculate the size of this item when serialized + String itemString = item.toString(); + int itemSize = getUncompressedPayloadSize(itemString); + + // Account for comma separator if this isn't the first item in the chunk + int itemWithSeparator = itemSize + (currentChunk.length() > 0 ? 1 : 0); + + // Check if adding this item would exceed the effective limit + int sizeIfAdded = currentChunkSize + itemWithSeparator; + + if (sizeIfAdded > effectiveLimit && currentChunk.length() > 0) { + // Current item would push us over the limit, so save current chunk + chunks.add(currentChunk.toString()); + + // Start a new chunk with this item + currentChunk = new JSONArray(); + currentChunk.put(item); + currentChunkSize = 2 + itemSize; // "[" + item + "]" + } else { + // Item fits in current chunk + currentChunk.put(item); + currentChunkSize += itemWithSeparator; + } + } + + // Add the remaining items + if (currentChunk.length() > 0) { + chunks.add(currentChunk.toString()); + } + + return chunks; + } + + /** + * Calculates the uncompressed byte size of a string payload in UTF-8 encoding. + * + * This method always returns the uncompressed size because the server's payload limits + * (1 MB for /track, 10 MB for /import) apply to the uncompressed data before any + * gzip compression is applied during transmission. + * + * @param payload the string payload to measure + * @return the size in bytes (uncompressed UTF-8 encoded) + * @throws UnsupportedEncodingException if UTF-8 encoding is not supported + */ + public static int getUncompressedPayloadSize(String payload) + throws UnsupportedEncodingException { + return payload.getBytes("UTF-8").length; + } +} diff --git a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java index d86155d..ad16c76 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/MixpanelAPITest.java @@ -52,6 +52,17 @@ public static Test suite() { return new TestSuite( MixpanelAPITest.class ); } + /** + * Helper method to repeat a string (Java 8 compatibility - String.repeat not available) + */ + private String repeat(String str, int count) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < count; i++) { + sb.append(str); + } + return sb.toString(); + } + @Override public void setUp() { mTimeZero = System.currentTimeMillis() / 1000; @@ -73,9 +84,9 @@ public void setUp() { MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { sawData.put(endpointUrl, dataString); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -87,7 +98,7 @@ public boolean sendData(String dataString, String endpointUrl) { JSONObject set = mBuilder.set("a distinct id", mSampleProps); c.addMessage(set); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); c.addMessage(increment); @@ -148,7 +159,7 @@ public void testPeopleMessageBuilds() } { - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("k1", 10L); increments.put("k2", 1L); JSONObject increment = mBuilder.increment("a distinct id", increments, mSampleModifiers); @@ -159,7 +170,7 @@ public void testPeopleMessageBuilds() } { - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("k1", 10L); increments.put("k2", 1L); JSONObject increment = mBuilder.increment("a distinct id", increments); @@ -405,20 +416,21 @@ public void testMessageFormat() { JSONObject set = mBuilder.set("a distinct id", mSampleProps); assertTrue(c.isValidMessage(set)); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); assertTrue(c.isValidMessage(increment)); + // Test deprecated trackCharge method - should return null JSONObject charge = mBuilder.trackCharge("a distinct id", 100.00, mSampleProps); - assertTrue(c.isValidMessage(charge)); + assertNull("trackCharge should return null (deprecated)", charge); } public void testModifiers() { JSONObject set = mBuilder.set("a distinct id", mSampleProps, mSampleModifiers); checkModifiers(set); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments, mSampleModifiers); checkModifiers(increment); @@ -426,8 +438,37 @@ public void testModifiers() { JSONObject append = mBuilder.append("a distinct id", mSampleProps, mSampleModifiers); checkModifiers(append); + // Test deprecated trackCharge method - should return null JSONObject trackCharge = mBuilder.trackCharge("a distinct id", 2.2, null, mSampleModifiers); - checkModifiers(trackCharge); + assertNull("trackCharge should return null (deprecated)", trackCharge); + } + + public void testIncrementWithDecimals() { + // Test that increment() supports decimal values (not just integers) + ClientDelivery c = new ClientDelivery(); + + // Test with Double values + Map decimals = new HashMap(); + decimals.put("rating", 4.5); + decimals.put("score", 2.7); + decimals.put("cost", 10.50); + JSONObject incrementDecimals = mBuilder.increment("a distinct id", decimals); + assertTrue("increment with decimals should be valid", c.isValidMessage(incrementDecimals)); + + // Test with mixed numeric types (Double, Integer, Long) + Map mixed = new HashMap(); + mixed.put("double_value", 3.14); + mixed.put("int_value", 5); + mixed.put("long_value", 100L); + JSONObject incrementMixed = mBuilder.increment("a distinct id", mixed); + assertTrue("increment with mixed numeric types should be valid", c.isValidMessage(incrementMixed)); + + // Test with modifiers + Map decimalsWithMods = new HashMap(); + decimalsWithMods.put("rating", 1.5); + JSONObject incrementWithMods = mBuilder.increment("a distinct id", decimalsWithMods, mSampleModifiers); + assertTrue("increment with decimals and modifiers should be valid", c.isValidMessage(incrementWithMods)); + checkModifiers(incrementWithMods); } public void testEmptyMessageFormat() { @@ -471,7 +512,7 @@ public void testClientDelivery() { c.addMessage(groupSet); - Map increments = new HashMap(); + Map increments = new HashMap(); increments.put("a key", 24L); JSONObject increment = mBuilder.increment("a distinct id", increments); c.addMessage(increment); @@ -585,9 +626,9 @@ public void testExpectedPeopleParams() { public void testEmptyDelivery() { MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { fail("Data sent when no data should be sent"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -604,9 +645,9 @@ public void testLargeDelivery() { MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { sends.add(dataString); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -646,9 +687,9 @@ public boolean sendData(String dataString, String endpointUrl) { public void testEncodeDataString(){ MixpanelAPI api = new MixpanelAPI("events url", "people url") { @Override - public boolean sendData(String dataString, String endpointUrl) { + public HttpStatusResponse sendData(String dataString, String endpointUrl) { fail("Data sent when no data should be sent"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -949,10 +990,10 @@ public void testGzipCompressionEnabled() { // Test that gzip compression is properly enabled and data is compressed MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { @Override - public boolean sendData(String dataString, String endpointUrl) throws IOException { + public HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { // This method should be called with gzip compression enabled fail("sendData should not be called directly when testing at this level"); - return true; + return new HttpStatusResponse(true, 200); } }; @@ -979,7 +1020,7 @@ public void testGzipCompressionDataIntegrity() { MixpanelAPI apiWithGzip = new MixpanelAPI("events url", "people url", "groups url", "import url", true) { @Override - public boolean sendData(String dataString, String endpointUrl) throws IOException { + public HttpStatusResponse sendData(String dataString, String endpointUrl) throws IOException { capturedOriginalData.put(endpointUrl, dataString); // Simulate what the real sendData does with gzip @@ -996,11 +1037,10 @@ public boolean sendData(String dataString, String endpointUrl) throws IOExceptio capturedCompressedData.put(endpointUrl, byteStream.toByteArray()); } catch (Exception e) { - throw new IOException("Compression failed", e); + fail("Error in gzip compression simulation: " + e.toString()); } } - - return true; + return new HttpStatusResponse(true, 200); } }; @@ -1119,4 +1159,532 @@ public boolean sendImportData(String dataString, String endpointUrl, String toke } } + public void testCustomImportBatchSizeConstructor() { + // Test constructor with int parameter only: new MixpanelAPI(500) + MixpanelAPI api = new MixpanelAPI(500); + assertEquals("Custom batch size should be 500", 500, api.mImportMaxMessageSize); + } + + public void testCustomImportBatchSizeWithGzipConstructor() { + // Test constructor with both gzip and batch size: new MixpanelAPI(true, 500) + MixpanelAPI api = new MixpanelAPI(true, 500); + assertEquals("Custom batch size should be 500", 500, api.mImportMaxMessageSize); + assertTrue("Gzip compression should be enabled", api.mUseGzipCompression); + } + + public void testImportBatchSizeClamping() { + // Test that batch size is clamped between 1 and 2000 + MixpanelAPI apiTooSmall = new MixpanelAPI(0); + assertEquals("Batch size below 1 should be clamped to 1", 1, apiTooSmall.mImportMaxMessageSize); + + MixpanelAPI apiTooLarge = new MixpanelAPI(3000); + assertEquals("Batch size above 2000 should be clamped to 2000", 2000, apiTooLarge.mImportMaxMessageSize); + + MixpanelAPI apiValid = new MixpanelAPI(1000); + assertEquals("Batch size within range should not be clamped", 1000, apiValid.mImportMaxMessageSize); + } + + public void testImportMessagesWithCustomBatchSize() { + // Test that import messages are sent in batches of the custom size + final List batchSizes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url", false, 5, null, null) { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) throws IOException { + // Parse the JSON array to count messages in this batch + JSONArray array = new JSONArray(dataString); + batchSizes.add(array.length()); + return true; + } + }; + + try { + ClientDelivery delivery = new ClientDelivery(); + + // Create 13 import messages (should be sent as batches of 5, 5, 3) + MessageBuilder builder = new MessageBuilder("a token"); + for (int i = 0; i < 13; i++) { + Map props = new HashMap(); + props.put("time", System.currentTimeMillis()); + props.put("$insert_id", "id-" + i); + props.put("index", i); + JSONObject importEvent = builder.importEvent("user-" + i, "test event", new JSONObject(props)); + delivery.addMessage(importEvent); + } + + api.deliver(delivery); + + assertEquals("Should have 3 batches", 3, batchSizes.size()); + assertEquals("First batch should have 5 messages", 5, batchSizes.get(0).intValue()); + assertEquals("Second batch should have 5 messages", 5, batchSizes.get(1).intValue()); + assertEquals("Third batch should have 3 messages", 3, batchSizes.get(2).intValue()); + + } catch (IOException e) { + fail("IOException during custom batch size test: " + e.toString()); + } catch (JSONException e) { + fail("JSONException during custom batch size test: " + e.toString()); + } + } + + public void test413TrackEndpointChunking() { + // Test that 413 errors on track endpoint trigger payload chunking and retry + final List sendAttempts = new ArrayList(); + final List statusCodes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { + private int attemptCount = 0; + + @Override + public HttpStatusResponse sendData(String dataString, String endpointUrl) { + sendAttempts.add(dataString); + attemptCount++; + + // First attempt returns 413 (payload too large) + // Subsequent attempts succeed + if (attemptCount == 1) { + statusCodes.add(413); + return new HttpStatusResponse(false, 413); + } else { + statusCodes.add(200); + return new HttpStatusResponse(true, 200); + } + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + // Create a large payload that would trigger 413 + // Using many moderately-sized messages + for (int i = 0; i < 100; i++) { + JSONObject props = new JSONObject(); + props.put("large_property_" + i, repeat("x", 1000)); // Large property to increase payload size + JSONObject event = mBuilder.event("user-" + i, "test event", props); + c.addMessage(event); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 413) + retry with chunks (succeeds) + assertTrue("Multiple send attempts made", sendAttempts.size() >= 2); + assertEquals("First attempt got 413", 413, statusCodes.get(0).intValue()); + assertTrue("Retry attempts succeeded", statusCodes.size() > 1); + + } catch (IOException e) { + fail("IOException during 413 track test: " + e.toString()); + } + } + + public void test413ImportEndpointChunking() { + // Test that 413 errors on import endpoint trigger payload chunking and retry + final List sendAttempts = new ArrayList(); + final List statusCodes = new ArrayList(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + private int attemptCount = 0; + + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + sendAttempts.add(dataString); + attemptCount++; + + // First attempt returns 413 (payload too large) + // Subsequent attempts succeed + if (attemptCount == 1) { + statusCodes.add(413); + mLastStatusCode = 413; + mLastResponseBody = "{\"code\": 413}"; + return false; + } else { + statusCodes.add(200); + mLastStatusCode = 200; + mLastResponseBody = "{\"code\": 200, \"status\": \"OK\"}"; + return true; + } + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + // Create a large payload for import + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + for (int i = 0; i < 50; i++) { + JSONObject props = new JSONObject(); + props.put("time", historicalTime + i); + props.put("$insert_id", "insert-id-" + i); + props.put("large_property_" + i, repeat("x", 5000)); // Large property + JSONObject importEvent = mBuilder.importEvent("user-" + i, "test event", props); + c.addMessage(importEvent); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 413) + retry with chunks (succeeds) + assertTrue("Multiple send attempts made", sendAttempts.size() >= 2); + assertEquals("First attempt got 413", 413, statusCodes.get(0).intValue()); + assertTrue("Retry attempts succeeded", statusCodes.size() > 1); + + } catch (IOException e) { + fail("IOException during 413 import test: " + e.toString()); + } + } + + public void testPayloadChunkerBasic() { + // Test basic chunking functionality + try { + JSONArray array = new JSONArray(); + for (int i = 0; i < 10; i++) { + JSONObject obj = new JSONObject(); + obj.put("id", i); + obj.put("data", "item " + i); + array.put(obj); + } + + String jsonString = array.toString(); + List chunks = PayloadChunker.chunkJsonArray(jsonString, 200); + + // Should be split into multiple chunks given the 200 byte limit + assertTrue("Payload chunked into multiple pieces", chunks.size() > 1); + + // Each chunk should be valid JSON + for (String chunk : chunks) { + JSONArray chunkArray = new JSONArray(chunk); + assertTrue("Chunk is valid JSONArray", chunkArray.length() > 0); + } + + // Total number of items should be preserved + int totalItems = 0; + for (String chunk : chunks) { + totalItems += new JSONArray(chunk).length(); + } + assertEquals("All items preserved after chunking", 10, totalItems); + + } catch (Exception e) { + fail("Unexpected error in chunker test: " + e.toString()); + } + } + + public void testPayloadChunkerWithGzip() { + // Test that chunking is based on uncompressed size + // (gzip compression is applied later during transmission, but limits apply to uncompressed data) + try { + JSONArray array = new JSONArray(); + for (int i = 0; i < 20; i++) { + JSONObject obj = new JSONObject(); + obj.put("id", i); + obj.put("content", repeat("x", 100)); // Larger content + array.put(obj); + } + + String jsonString = array.toString(); + + // All chunking is now based on uncompressed size + List chunks = PayloadChunker.chunkJsonArray(jsonString, 500); + + // Each chunk should be valid + int totalItems = 0; + for (String chunk : chunks) { + JSONArray chunkArray = new JSONArray(chunk); + assertTrue("Chunk is valid JSONArray", chunkArray.length() > 0); + totalItems += chunkArray.length(); + } + + assertEquals("All items preserved", 20, totalItems); + + } catch (Exception e) { + fail("Unexpected error in gzip chunker test: " + e.toString()); + } + } + + public void testPayloadChunkerEdgeCases() { + // Test edge cases: empty array, single item, very small limit + try { + // Empty array + List emptyChunks = PayloadChunker.chunkJsonArray("[]", 100); + assertEquals("Empty array results in one chunk", 1, emptyChunks.size()); + assertEquals("Empty chunk is valid", "[]", emptyChunks.get(0)); + + // Single item that's larger than limit (can't be split further) + JSONArray singleArray = new JSONArray(); + JSONObject large = new JSONObject(); + large.put("data", repeat("x", 500)); + singleArray.put(large); + + List singleChunks = PayloadChunker.chunkJsonArray(singleArray.toString(), 100); + assertEquals("Single oversized item still returned", 1, singleChunks.size()); + + } catch (Exception e) { + fail("Unexpected error in edge case test: " + e.toString()); + } + } + + public void testTrackEndpoint413WithoutChunking() { + // Test that non-413 errors are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url") { + @Override + public HttpStatusResponse sendData(String dataString, String endpointUrl) { + // Return 400 (bad request) instead of 413 + return new HttpStatusResponse(false, 400); + } + }; + + ClientDelivery c = new ClientDelivery(); + + try { + JSONObject event = mBuilder.event("user-1", "test event", mSampleProps); + c.addMessage(event); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 400 error"); + + } catch (MixpanelServerException e) { + // Expected behavior - non-413 errors should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } + } + + public void testImportEndpoint413WithoutChunking() { + // Test that non-413 import errors are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + // Return 401 (unauthorized) instead of 413 + mLastStatusCode = 401; + mLastResponseBody = "{\"code\": 401, \"status\": \"Unauthorized\"}"; + return false; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-1"); + JSONObject importEvent = mBuilder.importEvent("user-1", "test event", props); + c.addMessage(importEvent); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 401 error"); + + } catch (MixpanelServerException e) { + // Expected behavior - non-413 errors should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused") || e.getMessage().contains("401")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } catch (JSONException e) { + fail("Unexpected JSONException: " + e.toString()); + } + } + + public void test400RequestBodyTooLargeImportEndpointChunking() { + // Test that 400 errors with "request body too large" on import endpoint trigger chunking and retry + final List attemptCount = new ArrayList<>(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + attemptCount.add(1); + + // First attempt returns 400 with "request body too large" message + if (attemptCount.size() == 1) { + mLastStatusCode = 400; + mLastResponseBody = "{\"error\":\"json decode error at element idx: 1784, byte_offset: 10481924: http: request body too large\",\"status\":0}"; + return false; + } + + // Retry with chunks succeeds + mLastStatusCode = 200; + mLastResponseBody = "{\"code\":200,\"status\":\"OK\",\"num_records_imported\":5}"; + return true; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + // Create a large payload that would trigger 400 with request body too large + for (int i = 0; i < 100; i++) { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-" + i); + props.put("largeData", repeat("x", 5000)); // Add large data to each event + JSONObject importEvent = mBuilder.importEvent("user-" + i, "test event", props); + c.addMessage(importEvent); + } + + api.deliver(c); + + // Should have at least 2 attempts: initial (fails with 400) + retry with chunks (succeeds) + assertTrue("Should have tried sending at least twice", attemptCount.size() >= 2); + + } catch (IOException e) { + fail("IOException during 400 request body too large test: " + e.toString()); + } catch (JSONException e) { + fail("JSONException during 400 request body too large test: " + e.toString()); + } + } + + public void test400RequestBodyTooLargeWithoutChunking() { + // Test that 400 errors WITHOUT "request body too large" are still thrown immediately without chunking + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + // Return 400 (bad request) with a different error message (not "request body too large") + mLastStatusCode = 400; + mLastResponseBody = "{\"error\":\"invalid json format\",\"status\":0}"; + return false; + } + }; + + ClientDelivery c = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props = new JSONObject(); + props.put("time", historicalTime); + props.put("$insert_id", "insert-id-1"); + JSONObject importEvent = mBuilder.importEvent("user-1", "test event", props); + c.addMessage(importEvent); + + api.deliver(c); + fail("Should have thrown MixpanelServerException for 400 error without 'request body too large'"); + + } catch (MixpanelServerException e) { + // Expected behavior - 400 errors without "request body too large" should be thrown immediately + assertTrue("Error message indicates server rejection", + e.getMessage().contains("refused") || e.getMessage().contains("400")); + } catch (IOException e) { + fail("Should have thrown MixpanelServerException, not IOException: " + e.toString()); + } catch (JSONException e) { + fail("Unexpected JSONException: " + e.toString()); + } + } + + public void testDisableStrictImport() { + // Test that disableStrictImport() sets strict=0 in the URL + final Map capturedUrls = new HashMap(); + + MixpanelAPI api = new MixpanelAPI("events url", "people url", "groups url", "import url") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + capturedUrls.put("endpoint", endpointUrl); + return true; + } + }; + + // By default, strict mode should be enabled (strict=1) + ClientDelivery c1 = new ClientDelivery(); + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + JSONObject props1 = new JSONObject(); + props1.put("time", historicalTime); + props1.put("$insert_id", "insert-id-1"); + JSONObject importEvent1 = mBuilder.importEvent("user-1", "test event", props1); + c1.addMessage(importEvent1); + + api.deliver(c1); + + String url1 = capturedUrls.get("endpoint"); + assertTrue("Default: strict=1 in URL", url1.contains("strict=1")); + + } catch (IOException e) { + fail("IOException during first delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + + // Now disable strict mode and test again + capturedUrls.clear(); + api.disableStrictImport(); + + ClientDelivery c2 = new ClientDelivery(); + + try { + JSONObject props2 = new JSONObject(); + props2.put("time", historicalTime); + props2.put("$insert_id", "insert-id-2"); + JSONObject importEvent2 = mBuilder.importEvent("user-2", "test event 2", props2); + c2.addMessage(importEvent2); + + api.deliver(c2); + + String url2 = capturedUrls.get("endpoint"); + assertTrue("After disableStrictImport(): strict=0 in URL", url2.contains("strict=0")); + + } catch (IOException e) { + fail("IOException during second delivery: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + + public void testMultipleInstancesHaveIndependentStrictMode() { + // Test that different MixpanelAPI instances have independent strict mode settings + final Map api1Urls = new HashMap(); + final Map api2Urls = new HashMap(); + + MixpanelAPI api1 = new MixpanelAPI("events url", "people url", "groups url", "import url1") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + api1Urls.put("endpoint", endpointUrl); + return true; + } + }; + + MixpanelAPI api2 = new MixpanelAPI("events url", "people url", "groups url", "import url2") { + @Override + public boolean sendImportData(String dataString, String endpointUrl, String token) { + api2Urls.put("endpoint", endpointUrl); + return true; + } + }; + + long historicalTime = System.currentTimeMillis() - (90L * 24L * 60L * 60L * 1000L); + + try { + // Disable strict mode only on api1 + api1.disableStrictImport(); + + // Send import message with api1 (strict=0) + ClientDelivery c1 = new ClientDelivery(); + JSONObject props1 = new JSONObject(); + props1.put("time", historicalTime); + props1.put("$insert_id", "insert-id-1"); + JSONObject importEvent1 = mBuilder.importEvent("user-1", "test event", props1); + c1.addMessage(importEvent1); + api1.deliver(c1); + + // Send import message with api2 (strict=1, not disabled) + ClientDelivery c2 = new ClientDelivery(); + JSONObject props2 = new JSONObject(); + props2.put("time", historicalTime); + props2.put("$insert_id", "insert-id-2"); + JSONObject importEvent2 = mBuilder.importEvent("user-2", "test event", props2); + c2.addMessage(importEvent2); + api2.deliver(c2); + + // Verify api1 has strict=0 and api2 has strict=1 + String url1 = api1Urls.get("endpoint"); + String url2 = api2Urls.get("endpoint"); + + assertTrue("API1: strict=0 after disableStrictImport()", url1.contains("strict=0")); + assertTrue("API2: strict=1 by default", url2.contains("strict=1")); + + } catch (IOException e) { + fail("IOException: " + e.toString()); + } catch (JSONException e) { + fail("JSON error: " + e.toString()); + } + } + }