Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
silesky committed Jun 10, 2024
1 parent 0735206 commit 45ae5e1
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 31 deletions.
22 changes: 15 additions & 7 deletions packages/signals/browser-signals/src/core/processor/processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { logger } from '../../lib/logger'
import { AnyAnalytics, CDNSettings, Signal } from '../../types'
import { Sandbox } from './sandbox'
import { SignalBuffer } from '../buffer'
import { Sandbox, SignalsRuntime } from './sandbox'

const parseDownloadURL = (cdnSettings: CDNSettings) => {
if (
Expand All @@ -11,20 +13,26 @@ const parseDownloadURL = (cdnSettings: CDNSettings) => {
return cdnSettings.edgeFunction.downloadURL
}
}

interface SignalEventProcessorSettings {
edgeFn?: string
}
export class SignalEventProcessor {
private sandbox: Sandbox
private analytics: AnyAnalytics
constructor(analytics: AnyAnalytics) {
constructor(
analytics: AnyAnalytics,
settings: SignalEventProcessorSettings = {}
) {
this.analytics = analytics
this.sandbox = new Sandbox({
edgeFnDownloadUrl: parseDownloadURL(analytics.settings.cdnSettings),
signalsRuntime: {} as any,
edgeFn: settings.edgeFn,
})
}
async process(signal: Signal) {
// todo: think about loop protection
const events = await this.sandbox.process(signal)
// add event.context.__eventOrigin?.type = 'Signal' to any event that gets dispatched via this edge function
async process(signal: Signal, signals: Signal[]) {
const events = await this.sandbox.process(signal, signals)
logger.debug('procsessed events.', events)

Object.keys(events).forEach((eventName) => {
// @ts-ignore
Expand Down
40 changes: 20 additions & 20 deletions packages/signals/browser-signals/src/core/processor/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,9 @@ import {
} from '@segment/analytics-next'
import { logger } from '../../lib/logger'
import createWorkerBox from 'workerboxjs'
import { createDeferred } from '@segment/analytics-generic-utils'

import { Signal } from '../../types'

// could be the buffered signals object?
class SignalsRuntime {
find(..._args: any[]) {
throw new Error('nope')
}
getNextIndex(..._args: any[]) {
throw new Error('nope')
}
}
import { SignalsRuntime } from './signals-runtime'

export type MethodName =
| 'page'
Expand Down Expand Up @@ -94,10 +84,17 @@ class AnalyticsRuntime {
}
}

interface SandboxSettings {
edgeFnDownloadUrl: string | URL
signalsRuntime: SignalsRuntime
}
type SandboxSettings = {} & EdgeFnSettings

type EdgeFnSettings =
| {
edgeFn: string
edgeFnDownloadUrl?: string
}
| {
edgeFn?: string
edgeFnDownloadUrl: string
}

interface CodeSandbox {
run: (fn: string, scope: Record<string, any>) => Promise<any>
Expand All @@ -122,19 +119,22 @@ class JavascriptSandbox implements CodeSandbox {

export class Sandbox {
edgeFn: Promise<string>
signalsRuntime: SignalsRuntime
jsSandbox: CodeSandbox

constructor(settings: SandboxSettings) {
this.edgeFn = fetch(settings.edgeFnDownloadUrl).then((res) => res.text())
this.signalsRuntime = settings.signalsRuntime
this.edgeFn = settings.edgeFnDownloadUrl
? fetch(settings.edgeFnDownloadUrl).then((res) => res.text())
: Promise.resolve(settings.edgeFn!)
this.jsSandbox = new JavascriptSandbox()
}

async process(signal: Signal): Promise<BufferedSegmentEvents> {
async process(
signal: Signal,
signals: Signal[]
): Promise<BufferedSegmentEvents> {
const analytics = new AnalyticsRuntime()
const scope = {
Signals: this.signalsRuntime,
Signals: new SignalsRuntime(signals),
analytics,
processSignal: await this.edgeFn,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Signal } from '../../types/signals'

// could be the buffered signals object?
// This can't get indexdb, it needs to have all the signals in memory.
export class SignalsRuntime {
constructor(private signals: Signal[]) {}
find(..._args: any[]) {
throw new Error('nope')
}
getNextIndex(..._args: any[]) {
throw new Error('nope')
}
}
18 changes: 14 additions & 4 deletions packages/signals/browser-signals/src/core/signals/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { AnyAnalytics } from '../../types'
import { registerGenerator } from '../signal-generators/register'
import { AnalyticsService } from '../analytics-service'
import { SignalEventProcessor } from '../processor/processor'

interface SignalsSettings {
/**
Expand All @@ -25,9 +26,9 @@ interface SignalsSettings {
*/
signalStorage?: SignalPersistentStorage
/**
* Custom SignalStorage implementation.
* Custom SignalStorage implementation. Typically used for testing.
*/
edgeFn__experimental?: string
edgeFn?: string
}

interface ISignals {
Expand All @@ -44,8 +45,13 @@ export class Signals implements ISignals {
private signalEmitter: SignalEmitter
private cleanup: VoidFunction[] = []
private signalsClient: SignalsIngestClient
/**
* String representation of the edge function.
*/
private edgeFn?: string

constructor(settings: SignalsSettings = {}) {
this.edgeFn = settings.edgeFn
this.signalEmitter = new SignalEmitter()
this.signalsClient = new SignalsIngestClient()

Expand All @@ -59,8 +65,6 @@ export class Signals implements ISignals {
this.signalEmitter.subscribe((signal) => {
void this.signalsClient.send(signal)
})

this.signalEmitter.subscribe((signal) => this.buffer.add(signal))
}

/**
Expand All @@ -71,6 +75,12 @@ export class Signals implements ISignals {
*/
async start(analytics: AnyAnalytics): Promise<void> {
const analyticsService = new AnalyticsService(analytics)
const processor = new SignalEventProcessor(analytics, {
edgeFn: this.edgeFn,
})
this.signalEmitter.subscribe(async (signal) => {
void processor.process(signal, await this.buffer.getAll())
})
await this.registerGenerator([
analyticsService.createSegmentInstrumentationEventGenerator(),
])
Expand Down

0 comments on commit 45ae5e1

Please sign in to comment.