diff --git a/gateway/gateway.go b/gateway/gateway.go
index b18a57c4df..d5daa3a956 100644
--- a/gateway/gateway.go
+++ b/gateway/gateway.go
@@ -34,6 +34,7 @@ var (
var (
errRequestDropped = errors.New("request dropped")
errRequestSuppressed = errors.New("request suppressed")
+ errEventSuppressed = errors.New("event suppressed")
)
//go:embed openapi/index.html
diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go
index aecb9a4f62..7133578282 100644
--- a/gateway/gateway_test.go
+++ b/gateway/gateway_test.go
@@ -477,9 +477,10 @@ var _ = Describe("Gateway", func() {
}
Expect(err).To(BeNil())
req.Header.Set("Content-Type", "application/json")
- if ep == "/internal/v1/replay" || ep == "/internal/v1/retl" {
+ switch ep {
+ case "/internal/v1/replay", "/internal/v1/retl", "/internal/v1/batch":
req.Header.Set("X-Rudder-Source-Id", ReplaySourceID)
- } else {
+ default:
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(WriteKeyEnabled+":")))
}
resp, err := client.Do(req)
@@ -1708,6 +1709,7 @@ func endpointsToVerify() ([]string, []string, []string) {
"/v1/warehouse/pending-events",
"/v1/warehouse/trigger-upload",
"/v1/warehouse/jobs",
+ "/internal/v1/batch",
}
deleteEndpoints := []string{
diff --git a/gateway/handle.go b/gateway/handle.go
index 0d40abfc62..affdc355cb 100644
--- a/gateway/handle.go
+++ b/gateway/handle.go
@@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
+ obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway/internal/bot"
@@ -612,3 +613,236 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do
}
userWebRequestWorker.webRequestQ <- &webReq
}
+
+func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ var (
+ ctx = r.Context()
+ reqType = ctx.Value(gwtypes.CtxParamCallType).(string)
+ arctx = ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)
+ jobs []*jobsdb.JobT
+ body []byte
+ err error
+ status int
+ errorMessage string
+ responseBody string
+ )
+
+ // TODO: add tracing
+ gw.logger.LogRequest(r)
+ body, err = gw.getPayload(arctx, r, reqType)
+ if err != nil {
+ goto requestError
+ }
+ jobs, err = gw.extractJobsFromInternalBatchPayload(arctx, reqType, body)
+ if err != nil {
+ goto requestError
+ }
+
+ if len(jobs) > 0 {
+ if err := gw.storeJobs(ctx, jobs); err != nil {
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_failed_events",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, "storeFailed"),
+ ).Count(len(jobs))
+ goto requestError
+ }
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_successful_events",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, ""),
+ ).Count(len(jobs))
+ }
+
+ status = http.StatusOK
+ responseBody = response.GetStatus(response.Ok)
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_successful_requests",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, ""),
+ ).Increment()
+ gw.logger.Debugn("response",
+ logger.NewStringField("ip", misc.GetIPFromReq(r)),
+ logger.NewStringField("path", r.URL.Path),
+ logger.NewIntField("status", int64(status)),
+ logger.NewStringField("body", responseBody),
+ )
+ _, _ = w.Write([]byte(responseBody))
+ return
+
+ requestError:
+ errorMessage = err.Error()
+ status = response.GetErrorStatusCode(errorMessage)
+ responseBody = response.GetStatus(errorMessage)
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_failed_requests",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, errorMessage),
+ ).Increment()
+ gw.logger.Infon("response",
+ logger.NewStringField("ip", misc.GetIPFromReq(r)),
+ logger.NewStringField("path", r.URL.Path),
+ logger.NewIntField("status", int64(status)),
+ logger.NewStringField("body", responseBody),
+ )
+ http.Error(w, responseBody, status)
+ }
+}
+
+func (gw *Handle) extractJobsFromInternalBatchPayload(
+ arctx *gwtypes.AuthRequestContext,
+ reqType string,
+ body []byte,
+) ([]*jobsdb.JobT, error) {
+ if !gjson.ValidBytes(body) {
+ return nil, fmt.Errorf("%s", response.InvalidJSON)
+ }
+ gw.requestSizeStat.Observe(float64(len(body)))
+
+ type jobObject struct {
+ userID string
+ events []map[string]interface{}
+ receivedAt string
+ }
+ var (
+ sourcesJobRunID = arctx.SourceJobRunID
+ sourcesTaskRunID = arctx.SourceTaskRunID
+ sourceID = arctx.SourceID
+ workspaceID = arctx.WorkspaceID
+ eventsBatch = gjson.GetBytes(body, "batch").Array()
+ isUserSuppressed = gw.memoizedIsUserSuppressed()
+ out = make([]jobObject, 0, len(eventsBatch))
+ )
+
+ for idx, v := range eventsBatch {
+ toSet, ok := v.Value().(map[string]interface{})
+ if !ok {
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_failed_events",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, response.NotRudderEvent),
+ ).Increment()
+ return nil, fmt.Errorf("%s", response.NotRudderEvent)
+ }
+ anonIDFromReq, _ := toSet["anonymousId"].(string)
+ userIDFromReq, _ := toSet["userId"].(string)
+ eventContext, ok := misc.MapLookup(toSet, "context").(map[string]interface{})
+ if ok {
+ if idx == 0 {
+ if v, _ := misc.MapLookup(eventContext, "sources", "job_run_id").(string); v != "" {
+ sourcesJobRunID = v
+ }
+ if v, _ := misc.MapLookup(eventContext, "sources", "task_run_id").(string); v != "" {
+ sourcesTaskRunID = v
+ }
+ }
+ }
+
+ if isUserSuppressed(workspaceID, userIDFromReq, sourceID) {
+ gw.logger.Infon("suppressed event",
+ logger.NewStringField("sourceID", sourceID),
+ logger.NewStringField("workspaceID", workspaceID),
+ logger.NewStringField("userIDFromReq", userIDFromReq),
+ )
+ gw.stats.NewTaggedStat(
+ "gateway.write_key_suppressed_events",
+ stats.CountType,
+ gw.newSourceStatTagsWithReason(arctx, reqType, errEventSuppressed.Error()),
+ ).Increment()
+ continue
+ }
+
+ rudderID, _ := misc.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
+ toSet["rudderId"] = rudderID
+ userID := buildUserID("", anonIDFromReq, userIDFromReq)
+ receivedAt, _ := toSet["receivedAt"].(string)
+ if receivedAt == "" {
+ receivedAt = time.Now().Format(misc.RFC3339Milli)
+ }
+ out = append(out, jobObject{
+ userID: userID, events: []map[string]interface{}{toSet}, receivedAt: receivedAt,
+ })
+ }
+ if len(out) == 0 { // events suppressed - but return success
+ return nil, nil
+ }
+ var params struct {
+ SourceID string `json:"source_id"`
+ SourceJobRunID string `json:"source_job_run_id"`
+ SourceTaskRunID string `json:"source_task_run_id"`
+ }
+ params.SourceID = sourceID
+ params.SourceJobRunID = sourcesJobRunID
+ params.SourceTaskRunID = sourcesTaskRunID
+ marshalledParams, err := json.Marshal(params)
+ if err != nil {
+ gw.logger.Errorn(
+ "[Gateway] Failed to marshal parameters map. Parameters: %+v",
+ logger.NewField("params", params),
+ obskit.Error(err),
+ )
+ marshalledParams = []byte(
+ `{"error": "rudder-server gateway failed to marshal params"}`,
+ )
+ }
+
+ jobs := make([]*jobsdb.JobT, 0, len(out))
+ type singularEventBatch struct {
+ Batch []map[string]interface{} `json:"batch"`
+ RequestIP string `json:"requestIP"` // update processor accordingly
+ WriteKey string `json:"writeKey"`
+ ReceivedAt string `json:"receivedAt"`
+ }
+ for _, userEvent := range out {
+ var (
+ payload json.RawMessage
+ eventCount int
+ )
+ eventBatch := singularEventBatch{
+ Batch: userEvent.events,
+ WriteKey: arctx.WriteKey,
+ ReceivedAt: userEvent.receivedAt,
+ }
+ payload, err = json.Marshal(eventBatch)
+ if err != nil {
+ panic(err)
+ }
+ eventCount = len(userEvent.events)
+
+ jobs = append(jobs, &jobsdb.JobT{
+ UUID: uuid.New(),
+ UserID: userEvent.userID,
+ Parameters: marshalledParams,
+ CustomVal: customVal,
+ EventPayload: payload,
+ EventCount: eventCount,
+ WorkspaceId: workspaceID,
+ })
+ }
+ return jobs, nil
+}
+
+func (gw *Handle) storeJobs(ctx context.Context, jobs []*jobsdb.JobT) error {
+ ctx, cancel := context.WithTimeout(ctx, gw.conf.WriteTimeout)
+ defer cancel()
+ defer gw.dbWritesStat.Count(1)
+ return gw.jobsDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
+ if err := gw.jobsDB.StoreInTx(ctx, tx, jobs); err != nil {
+ gw.logger.Errorn(
+ "Store into gateway db failed with error",
+ obskit.Error(err),
+ logger.NewField("jobs", jobs),
+ )
+ return err
+ }
+
+ // rsources stats
+ rsourcesStats := rsources.NewStatsCollector(
+ gw.rsourcesService,
+ rsources.IgnoreDestinationID(),
+ )
+ rsourcesStats.JobsStoredWithErrors(jobs, nil)
+ return rsourcesStats.Publish(ctx, tx.SqlTx())
+ })
+}
diff --git a/gateway/handle_http.go b/gateway/handle_http.go
index 8050833d12..b746ed7d73 100644
--- a/gateway/handle_http.go
+++ b/gateway/handle_http.go
@@ -26,6 +26,10 @@ func (gw *Handle) webBatchHandler() http.HandlerFunc {
return gw.callType("batch", gw.writeKeyAuth(gw.webHandler()))
}
+func (gw *Handle) internalBatchHandler() http.HandlerFunc {
+ return gw.callType("internalBatch", gw.sourceIDAuth(gw.internalBatchHandlerFunc()))
+}
+
// webIdentifyHandler - handler for identify requests
func (gw *Handle) webIdentifyHandler() http.HandlerFunc {
return gw.callType("identify", gw.writeKeyAuth(gw.webHandler()))
diff --git a/gateway/handle_lifecycle.go b/gateway/handle_lifecycle.go
index b01290f8e9..bde2d54446 100644
--- a/gateway/handle_lifecycle.go
+++ b/gateway/handle_lifecycle.go
@@ -385,6 +385,7 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error {
r.Get("/v1/warehouse/fetch-tables", gw.whProxy.ServeHTTP)
r.Post("/v1/audiencelist", gw.webAudienceListHandler())
r.Post("/v1/replay", gw.webReplayHandler())
+ r.Post("/v1/batch", gw.internalBatchHandler())
// TODO: delete this handler once we are ready to remove support for the v1 api
r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV1.ServeHTTP))
diff --git a/gateway/handle_observability.go b/gateway/handle_observability.go
index 84af24e874..b1fe480812 100644
--- a/gateway/handle_observability.go
+++ b/gateway/handle_observability.go
@@ -1,6 +1,7 @@
package gateway
import (
+ "github.com/rudderlabs/rudder-go-kit/stats"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
)
@@ -16,3 +17,18 @@ func (gw *Handle) NewSourceStat(arctx *gwtypes.AuthRequestContext, reqType strin
SourceType: arctx.SourceCategory,
}
}
+
+func (gw *Handle) newSourceStatTagsWithReason(arctx *gwtypes.AuthRequestContext, reqType, reason string) stats.Tags {
+ tags := stats.Tags{
+ "source": arctx.SourceTag(),
+ "source_id": arctx.SourceID,
+ "write_key": arctx.WriteKey,
+ "req_type": reqType,
+ "workspace_id": arctx.WorkspaceID,
+ "source_type": arctx.SourceCategory,
+ }
+ if reason != "" {
+ tags["reason"] = reason
+ }
+ return tags
+}
diff --git a/gateway/integration_test.go b/gateway/integration_test.go
index d34c851f87..17a02035da 100644
--- a/gateway/integration_test.go
+++ b/gateway/integration_test.go
@@ -41,7 +41,6 @@ func TestGatewayIntegration(t *testing.T) {
}
func testGatewayByAppType(t *testing.T, appType string) {
- t.SkipNow()
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
@@ -77,12 +76,13 @@ func testGatewayByAppType(t *testing.T, appType string) {
writeKey := rand.String(27)
workspaceID := rand.String(27)
- marshalledWorkspaces := testhelper.FillTemplateAndReturn(t, "../testdata/mtGatewayTest02.json", map[string]string{
+ marshalledWorkspaces := testhelper.FillTemplateAndReturn(t, "../integration_test/multi_tenant_test/testdata/mtGatewayTest02.json", map[string]string{
"writeKey": writeKey,
"workspaceId": workspaceID,
"webhookUrl": webhook.Server.URL,
})
require.NoError(t, err)
+ sourceID := "xxxyyyzzEaEurW247ad9WYZLUyk" // sourceID from the workspace config template
beConfigRouter := chi.NewMux()
if testing.Verbose() {
@@ -105,8 +105,10 @@ func testGatewayByAppType(t *testing.T, appType string) {
require.NoError(t, err)
require.Equal(t, marshalledWorkspaces.Len(), n)
}
+ controlPlaneHandler := func(w http.ResponseWriter, r *http.Request) {}
beConfigRouter.Get("/workspaceConfig", backedConfigHandler)
+ beConfigRouter.Post("/data-plane/v1/workspaces/{workspaceID}/settings", controlPlaneHandler)
beConfigRouter.NotFound(func(w http.ResponseWriter, r *http.Request) {
require.FailNowf(t, "backend config", "unexpected request to backend config, not found: %+v", r.URL)
w.WriteHeader(http.StatusNotFound)
@@ -203,7 +205,7 @@ func testGatewayByAppType(t *testing.T, appType string) {
// Test basic Gateway happy path
t.Run("events are received in gateway", func(t *testing.T) {
require.Empty(t, webhook.Requests(), "webhook should have no requests before sending the events")
- sendEventsToGateway(t, httpPort, writeKey)
+ sendEventsToGateway(t, httpPort, writeKey, sourceID)
t.Cleanup(cleanupGwJobs)
var (
@@ -245,17 +247,17 @@ func testGatewayByAppType(t *testing.T, appType string) {
if appType == app.EMBEDDED {
// Trigger normal mode for the processor to start
t.Run("switch to normal mode", func(t *testing.T) {
- sendEventsToGateway(t, httpPort, writeKey)
+ sendEventsToGateway(t, httpPort, writeKey, sourceID)
t.Cleanup(cleanupGwJobs)
require.Eventuallyf(t, func() bool {
- return len(webhook.Requests()) == 1
- }, 60*time.Second, 100*time.Millisecond, "Webhook should have received a request on %d", httpPort)
+ return webhook.RequestsCount() == 2
+ }, 60*time.Second, 100*time.Millisecond, "Webhook should have received %d requests on %d", webhook.RequestsCount(), httpPort)
})
// Trigger degraded mode, the Gateway should still work
t.Run("switch to degraded mode", func(t *testing.T) {
- sendEventsToGateway(t, httpPort, writeKey)
+ sendEventsToGateway(t, httpPort, writeKey, sourceID)
t.Cleanup(cleanupGwJobs)
var count int
@@ -263,7 +265,7 @@ func testGatewayByAppType(t *testing.T, appType string) {
"SELECT COUNT(*) FROM gw_jobs_1 WHERE workspace_id = $1", workspaceID,
).Scan(&count)
require.NoError(t, err)
- require.Equal(t, 1, count)
+ require.Equal(t, 2, count)
var userId string
err = postgresContainer.DB.QueryRowContext(ctx,
@@ -275,8 +277,8 @@ func testGatewayByAppType(t *testing.T, appType string) {
}
}
-func sendEventsToGateway(t *testing.T, httpPort int, writeKey string) {
- payload1 := strings.NewReader(`{
+func sendEventsToGateway(t *testing.T, httpPort int, writeKey, sourceID string) {
+ event := `{
"userId": "identified_user_id",
"anonymousId":"anonymousId_1",
"messageId":"messageId_1",
@@ -292,8 +294,12 @@ func sendEventsToGateway(t *testing.T, httpPort int, writeKey string) {
}
},
"timestamp": "2020-02-02T00:23:09.544Z"
- }`)
+ }`
+ batchedEvent := fmt.Sprintf(`{"batch": [%s]}`, event)
+ payload1 := strings.NewReader(event)
sendEvent(t, httpPort, payload1, "identify", writeKey)
+ payload2 := strings.NewReader(batchedEvent)
+ sendInternalBatch(t, httpPort, payload2, sourceID)
}
func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, writeKey string) {
@@ -325,3 +331,31 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr
t.Logf("Event Sent Successfully: (%s)", body)
}
+
+func sendInternalBatch(t *testing.T, httpPort int, payload *strings.Reader, sourceID string) {
+ t.Helper()
+ t.Logf("Sending Internal Batch")
+
+ var (
+ httpClient = &http.Client{}
+ method = "POST"
+ url = fmt.Sprintf("http://localhost:%d/internal/v1/batch", httpPort)
+ )
+
+ req, err := http.NewRequest(method, url, payload)
+ require.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Set("X-Rudder-Source-Id", sourceID)
+ req.Header.Add("AnonymousId", "anonymousId_header")
+
+ res, err := httpClient.Do(req)
+ require.NoError(t, err)
+ defer func() { httputil.CloseResponse(res) }()
+
+ body, err := io.ReadAll(res.Body)
+ require.NoError(t, err)
+ require.Equal(t, http.StatusOK, res.StatusCode)
+
+ t.Logf("Internal Batch Sent Successfully: (%s)", body)
+}
diff --git a/gateway/openapi.yaml b/gateway/openapi.yaml
index cf6bac81f7..0682ab162b 100644
--- a/gateway/openapi.yaml
+++ b/gateway/openapi.yaml
@@ -641,6 +641,37 @@ paths:
example: "Too many requests"
security:
- sourceIDAuth: []
+ /internal/v1/batch:
+ post:
+ tags:
+ - Internal API
+ summary: Batch
+ description: Handles internal batch requests.
+ operationId: InternalBatch
+ responses:
+ '200':
+ description: StatusOK
+ content:
+ text/plain; charset=utf-8:
+ schema:
+ type: string
+ example: "OK"
+ '400':
+ description: StatusBadRequest
+ content:
+ text/plain; charset=utf-8:
+ schema:
+ type: string
+ example: "Invalid request"
+ '401':
+ description: StatusUnauthorized
+ content:
+ text/plain; charset=utf-8:
+ schema:
+ type: string
+ example: "Invalid Authorization Header"
+ security:
+ - sourceIDAuth: []
servers:
- url: /v1
components:
diff --git a/gateway/openapi/index.html b/gateway/openapi/index.html
index 51be3162ca..616bb3445d 100644
--- a/gateway/openapi/index.html
+++ b/gateway/openapi/index.html
@@ -1173,6 +1173,9 @@
extract
+
+ internalBatch
+
replay
@@ -7584,6 +7587,471 @@
+
+
+
+
internalBatch
+
Batch
+
+
+
+
+ Handles internal batch requests.
+
+
+ /internal/v1/batch
+
+
Usage and SDK Samples
+
+
+
+
+
+
curl -X POST \
+ -H "Authorization: Basic [[basicHash]]" \
+ -H "Accept: text/plain; charset=utf-8" \
+ "/v1/internal/v1/batch"
+
+
+
+
import org.openapitools.client.*;
+import org.openapitools.client.auth.*;
+import org.openapitools.client.model.*;
+import org.openapitools.client.api.InternalAPIApi;
+
+import java.io.File;
+import java.util.*;
+
+public class InternalAPIApiExample {
+ public static void main(String[] args) {
+ ApiClient defaultClient = Configuration.getDefaultApiClient();
+ // Configure HTTP basic authorization: sourceIDAuth
+ HttpBasicAuth sourceIDAuth = (HttpBasicAuth) defaultClient.getAuthentication("sourceIDAuth");
+ sourceIDAuth.setUsername("YOUR USERNAME");
+ sourceIDAuth.setPassword("YOUR PASSWORD");
+
+ // Create an instance of the API class
+ InternalAPIApi apiInstance = new InternalAPIApi();
+
+ try {
+ 'String' result = apiInstance.internalBatch();
+ System.out.println(result);
+ } catch (ApiException e) {
+ System.err.println("Exception when calling InternalAPIApi#internalBatch");
+ e.printStackTrace();
+ }
+ }
+}
+
+
+
+
+
import 'package:openapi/api.dart';
+
+final api_instance = DefaultApi();
+
+
+try {
+ final result = await api_instance.internalBatch();
+ print(result);
+} catch (e) {
+ print('Exception when calling DefaultApi->internalBatch: $e\n');
+}
+
+
+
+
+
+
import org.openapitools.client.api.InternalAPIApi;
+
+public class InternalAPIApiExample {
+ public static void main(String[] args) {
+ InternalAPIApi apiInstance = new InternalAPIApi();
+
+ try {
+ 'String' result = apiInstance.internalBatch();
+ System.out.println(result);
+ } catch (ApiException e) {
+ System.err.println("Exception when calling InternalAPIApi#internalBatch");
+ e.printStackTrace();
+ }
+ }
+}
+
+
+
+
Configuration *apiConfig = [Configuration sharedConfig];
+// Configure HTTP basic authorization (authentication scheme: sourceIDAuth)
+[apiConfig setUsername:@"YOUR_USERNAME"];
+[apiConfig setPassword:@"YOUR_PASSWORD"];
+
+// Create an instance of the API class
+InternalAPIApi *apiInstance = [[InternalAPIApi alloc] init];
+
+// Batch
+[apiInstance internalBatchWithCompletionHandler:
+ ^('String' output, NSError* error) {
+ if (output) {
+ NSLog(@"%@", output);
+ }
+ if (error) {
+ NSLog(@"Error: %@", error);
+ }
+}];
+
+
+
+
+
var RudderStackHttpApi = require('rudder_stack_http_api');
+var defaultClient = RudderStackHttpApi.ApiClient.instance;
+
+// Configure HTTP basic authorization: sourceIDAuth
+var sourceIDAuth = defaultClient.authentications['sourceIDAuth'];
+sourceIDAuth.username = 'YOUR USERNAME';
+sourceIDAuth.password = 'YOUR PASSWORD';
+
+// Create an instance of the API class
+var api = new RudderStackHttpApi.InternalAPIApi()
+var callback = function(error, data, response) {
+ if (error) {
+ console.error(error);
+ } else {
+ console.log('API called successfully. Returned data: ' + data);
+ }
+};
+api.internalBatch(callback);
+
+
+
+
+
+
using System;
+using System.Diagnostics;
+using Org.OpenAPITools.Api;
+using Org.OpenAPITools.Client;
+using Org.OpenAPITools.Model;
+
+namespace Example
+{
+ public class internalBatchExample
+ {
+ public void main()
+ {
+ // Configure HTTP basic authorization: sourceIDAuth
+ Configuration.Default.Username = "YOUR_USERNAME";
+ Configuration.Default.Password = "YOUR_PASSWORD";
+
+ // Create an instance of the API class
+ var apiInstance = new InternalAPIApi();
+
+ try {
+ // Batch
+ 'String' result = apiInstance.internalBatch();
+ Debug.WriteLine(result);
+ } catch (Exception e) {
+ Debug.Print("Exception when calling InternalAPIApi.internalBatch: " + e.Message );
+ }
+ }
+ }
+}
+
+
+
+
+
<?php
+require_once(__DIR__ . '/vendor/autoload.php');
+// Configure HTTP basic authorization: sourceIDAuth
+OpenAPITools\Client\Configuration::getDefaultConfiguration()->setUsername('YOUR_USERNAME');
+OpenAPITools\Client\Configuration::getDefaultConfiguration()->setPassword('YOUR_PASSWORD');
+
+// Create an instance of the API class
+$api_instance = new OpenAPITools\Client\Api\InternalAPIApi();
+
+try {
+ $result = $api_instance->internalBatch();
+ print_r($result);
+} catch (Exception $e) {
+ echo 'Exception when calling InternalAPIApi->internalBatch: ', $e->getMessage(), PHP_EOL;
+}
+?>
+
+
+
+
use Data::Dumper;
+use WWW::OPenAPIClient::Configuration;
+use WWW::OPenAPIClient::InternalAPIApi;
+# Configure HTTP basic authorization: sourceIDAuth
+$WWW::OPenAPIClient::Configuration::username = 'YOUR_USERNAME';
+$WWW::OPenAPIClient::Configuration::password = 'YOUR_PASSWORD';
+
+# Create an instance of the API class
+my $api_instance = WWW::OPenAPIClient::InternalAPIApi->new();
+
+eval {
+ my $result = $api_instance->internalBatch();
+ print Dumper($result);
+};
+if ($@) {
+ warn "Exception when calling InternalAPIApi->internalBatch: $@\n";
+}
+
+
+
+
from __future__ import print_statement
+import time
+import openapi_client
+from openapi_client.rest import ApiException
+from pprint import pprint
+# Configure HTTP basic authorization: sourceIDAuth
+openapi_client.configuration.username = 'YOUR_USERNAME'
+openapi_client.configuration.password = 'YOUR_PASSWORD'
+
+# Create an instance of the API class
+api_instance = openapi_client.InternalAPIApi()
+
+try:
+ # Batch
+ api_response = api_instance.internal_batch()
+ pprint(api_response)
+except ApiException as e:
+ print("Exception when calling InternalAPIApi->internalBatch: %s\n" % e)
+
+
+
+
extern crate InternalAPIApi;
+
+pub fn main() {
+
+ let mut context = InternalAPIApi::Context::default();
+ let result = client.internalBatch(&context).wait();
+
+ println!("{:?}", result);
+}
+
+
+
+
+ Scopes
+
+
+ Parameters
+
+
+
+
+
+
+ Responses
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+