Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I plug into the observation process? #12

Open
fiatjaf opened this issue Jun 8, 2017 · 9 comments
Open

How can I plug into the observation process? #12

fiatjaf opened this issue Jun 8, 2017 · 9 comments

Comments

@fiatjaf
Copy link

fiatjaf commented Jun 8, 2017

I want to take a bunch of observable/streams (like RxJS, I don't know the exact name of those) and pass them to a function that will return an observable that will have all their last emitted values stored, and every time one of those observable/streams emit something that will update the observable and trigger updates accordingly to observers.

This seemed like a simple thing to do, but I'm somehow seeing a lot of unexpected errors.

import { observable as nxobservable } from '@nx-js/observer-util'

export function observable (def = {}) {
  var values = nxobservable({})

  for (let property in def) {
    let value = def[property]
    setnew(values, property, value)
  }

  return new Proxy(def, {
    set (target, property, value) {
      setnew(values, property, value)
    },

    get (target, property) {
      return values[property]
    }
  })
}

function setnew (target, property, value) {
  var stream = value
  var initialValue
  if (Array.isArray(value) && value.length === 2 && value[0].subscribe) {
    stream = value[0]
    initialValue = value[1]
  }

  if (!initialValue && stream && stream._prod) {
    // hackshly inspect xstream streams to get initialValues
    switch (stream._prod.type) {
      case 'fromArray':
        initialValue = stream._prod.a[0]
        break
      case 'startWith':
        initialValue = stream._prod.val
        break
      case 'mapTo':
        initialValue = stream._prod.f()
        break
      case 'debug':
        setnew(target, property, stream._prod.ins)
    }
  }

  if (!value || typeof value.subscribe !== 'function') {
    initialValue = typeof value === 'object' && !Array.isArray(value)
      ? observable(value)
      : value
    stream = noopStream
  }
  
  target[property] = initialValue
  stream.subscribe({
    next (v) {
      target[property] = v && typeof v === 'object' ? observable(v) : v
    },
    error (e) {
      console.error(`error on stream ${property}:`, e)
      target[property] = e
    } 
  })
} 

const noopStream = {subscribe: () => {}}

There's a lot of complication here because I had to introduce initialValue, which wasn't needed when I was trying to build this from scratch without using Proxy or @nx-js/observer-util (in my previous implementation the observers were only triggered when the streams emitted their first values, but now they're triggered before that.)

I'm just looking for some help, but if there was a way to "intercept" values before get I think it would solve it to me.

@fiatjaf
Copy link
Author

fiatjaf commented Jun 8, 2017

It is starting to feel that I should create a fork of this repository just to implement my thing, but I don't like that idea. However, it is not clear how should I do it even in a fork. Please help!

@solkimicreb
Copy link
Member

i am checking it. it might take some time for me to fully grasp it though 😉

@solkimicreb
Copy link
Member

If I understand correctly you would like to do something like this:

import xs from 'observable-xstream'

const observableStream = xs.periodic(1000)
  .filter(i => i % 2 === 0)
  .map(i => i * i)
  .endWhen(xs.periodic(5000).take(1))

// auto logs a number that increments every second from 1 to 5
observe(() => console.log(observableStream.value))

Is this roughly correct or am I totally off track?

@fiatjaf
Copy link
Author

fiatjaf commented Jun 9, 2017

Yes, that's basically it. Thank you.

However, I was implementing it in a transparent way, without the need of the .value property, by assigning streams to an object.

import xs from 'xstream' 
import observableStreams from 'observables-xstream' 

const stream = xs.periodic(1000)
  .filter(i => i % 2 === 0)
  .map(i => i * i)
  .endWhen(xs.periodic(5000).take(1))

var state = observableStreams({
  number: stream
})

// auto logs a number that increments every second from 1 to 5
observe(() => console.log(state.number))

Looking now, I think that all my problems arise from that implementation and that your 'observable-xstream' would be much easier to implement.

@fiatjaf
Copy link
Author

fiatjaf commented Jun 9, 2017

It gets worse when I remember that the values emitted by the stream may be objects or collections, even with others streams as values, and those should be observables too, just like the top object.

@solkimicreb
Copy link
Member

No worries, nested observable construction is automatic in nx-js observer util. I have to work now but I will get back to this in the afternoon. I think it will be much simpler than it seems like (;

@fiatjaf
Copy link
Author

fiatjaf commented Jun 9, 2017

Nested observables are automatic, but not in my special case if it requires a special function to turn streams into observable values.

I'm counting on you to give me an awesome magical solution here :P

@solkimicreb
Copy link
Member

sorry, i was busy during the weekend, i don't want to ruin your motivation/inspiration for this. is your fork public?

@fiatjaf
Copy link
Author

fiatjaf commented Jun 11, 2017

You're not ruining anything. I don't know if my approach is solid and you have no responsibility for my incompetence.

I've just found https://github.com/calmm-js/karet, that does something similar of what I wanted to do, without passing through the process of converting things to observables, they just make the component somehow listen to emitted values on the stream. The point is that I can't understand what they're doing there, much less than I could at observer-util or react-easy-state, it's pure magic.

Anyway, what do you said you thought would be "much simpler"? You're talking about adding new features to this repository (for "plugging in"), or writing a wrapper around it?


My "fork" is currently a mess of files in my local disk, but the only relevant changes are

  • I've added a global WeakMap named streamResults for caching the "current" stream values.
  • On object initialization, for each property, and whenever a new property is added to an observable I call
function handleStream (stream) {
  if (stream && typeof stream.subscribe === 'function') {
    stream.subscribe({
      next (v) {
        streamResults.set(stream, v)
      },
      error (e) {
        streamResults.set(stream, e)
      }
    })
  }
}
  • on the common get handler for objects, right after var result = Reflect.get(target, key, receiver) I do
  if (result.subscribe) {
    result = streamResults.get(result)
  }

@fiatjaf fiatjaf closed this as completed Jun 15, 2017
@fiatjaf fiatjaf reopened this Jul 21, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants