Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Subpipline stop without stopping parent pipline #291

Closed
AuMilliat opened this issue Aug 28, 2023 · 5 comments
Closed

[Question] Subpipline stop without stopping parent pipline #291

AuMilliat opened this issue Aug 28, 2023 · 5 comments

Comments

@AuMilliat
Copy link

Hello, I'm trying to figure if there is a way to stop a subpipline without stopping the parent pipline.
My goal is to start and stop subpipline from rendezvous system (ie a subpipline handling a kinect azure).

My understanding is that the parent pipline needs to remove the subpipline from
private ConcurrentQueue<PipelineElement> components

I've made a quick add in Subpipline class :

        /// <summary>
        /// Remove subpipline from parent.
        /// </summary>
        public override void Dispose()
        {
            this.Stop(this.parentPipeline.GetCurrentTime(), true);
            this.DeactivateComponents();
            this.DisposeComponents();
            this.parentPipeline.RemoveSubpipline(this);
        }

In Pipeline class, DeactivateComponents & DisposeComponents methods switch from private to protected and add

       internal bool RemoveSubpipline(Subpipeline subpipeline)
       {
           PipelineElement node = this.components.FirstOrDefault(c => c.StateObject == subpipeline);
           if (node == null)
           {
               return false;
           }

           if (!subpipeline.IsCompleted)
           {
               throw new InvalidOperationException($"Subpipeline is still running, it can't be removed from parent pipeline.");
           }

           SynchronizationLock locker = new SynchronizationLock(this, true);
           this.scheduler.Freeze(locker);
           this.components = new ConcurrentQueue<PipelineElement>(this.components.Where(x => x != node));
           this.DiagnosticsCollector?.PipelineElementDisposed(this, node);
           locker.Release();
           return true;
       }

It probably needs some optimisation and cases handling but it works with

 static void Main(string[] args)
 {
     // Create the \psi pipeline
     Pipeline pipeline = Pipeline.Create("Subpipline Removal");
     var timer = Timers.Timer(pipeline, TimeSpan.FromSeconds(1));
     timer.Out.Do(t =>
     {
         Console.WriteLine($"\tPipeline timer.");
     });
     // Start the pipeline running
     pipeline.RunAsync();
     Console.WriteLine("Pipeline Run Asynch.");

     Subpipeline sub1 = new Subpipeline(pipeline); 
     var subtimer1 = Timers.Timer(sub1, TimeSpan.FromSeconds(1));
     subtimer1.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 1 timer.");
     });
     RemoteExporter exporter1 = new RemoteExporter(sub1, 11511, TransportKind.Tcp);
     exporter1.Exporter.Write(subtimer1.Out, "SubExporter1");
     sub1.RunAsync();
     Console.WriteLine("Subpipeline 1 Run Asynch.");
     Thread.Sleep(3000);
     Subpipeline sub2 = new Subpipeline(pipeline);
     var subtimer2 = Timers.Timer(sub2, TimeSpan.FromSeconds(1));
     subtimer2.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 2 timer.");
     });
     RemoteExporter exporter2 = new RemoteExporter(sub2, 11512, TransportKind.Tcp);
     exporter2.Exporter.Write(subtimer2.Out, "SubExporter2");
     sub2.RunAsync();
     Console.WriteLine("Subpipeline 2 Run Asynch.");
     Thread.Sleep(5000);
     Console.WriteLine("Dispose Subpipeline 1.");
     sub1.Dispose();
     Thread.Sleep(5000);
     Console.WriteLine("Dispose Subpipeline 2.");
     sub2.Dispose();
     Thread.Sleep(5000);
     Subpipeline sub3 = new Subpipeline(pipeline);
     var subtimer3 = Timers.Timer(sub3, TimeSpan.FromSeconds(1));
     subtimer3.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 3 timer.");
     });
     RemoteExporter exporter3 = new RemoteExporter(sub3, 11513, TransportKind.Tcp);
     exporter3.Exporter.Write(subtimer3.Out, "SubExporter3");
     sub3.RunAsync();
     Console.WriteLine("Subpipeline 3 Run Asynch.");
     // Waiting for an out key
     Console.WriteLine("Press any key to stop the application.");
     Console.ReadLine();
     // Stop correctly the pipeline.
     pipeline.Dispose();
 }
@AuMilliat
Copy link
Author

Few modifications to get it works with a real case:

In Subpipline.cs

 public override void Dispose()
 {
     this.Stop(this.parentPipeline.GetCurrentTime(), true);
     this.DisposeComponents();
     this.parentPipeline.RemoveSubpipline(this);
     this.DiagnosticsCollector?.PipelineDisposed(this);
 }

In Pipline.cs

 internal bool RemoveSubpipline(Subpipeline subpipeline)
 {
     PipelineElement node = this.components.FirstOrDefault(c => c.StateObject == subpipeline);
     if (node == null)
     {
         return false;
     }

     if (!subpipeline.IsCompleted)
     {
         throw new InvalidOperationException($"Subpipeline is still running, it can't be removed from parent pipeline.");
     }

     List<PipelineElement> list = subpipeline.Components.ToList();
     list.Add(node);
     SynchronizationLock locker = new SynchronizationLock(this, true);
     this.scheduler.Freeze(locker);
     this.components = new ConcurrentQueue<PipelineElement>(this.components.Where(x => !list.Contains(x)));
     this.DiagnosticsCollector?.PipelineElementDisposed(this, node);
     locker.Release();
     return true;
 }

It probably still needs some improvements (DiagnosticsCollector).

@sandrist
Copy link
Contributor

Hello, can you explain a bit more about your use case which would benefit from stopping and resuming subpipelines? Pausing, stopping, and resuming (sub)pipelines mid-process is a bit tricky given the way the system has been designed so far. Take a look at the long discussion in issue #256.

Another idea is to look into the use of the Parallel Operator. This operator already has the ability to dynamically construct and tear down subpipelines according to the data flowing into it, and it's pretty extensible. It was carefully designed to do this in the right way. I wonder if you could use Parallel to achieve the goals in your scenario?

@AuMilliat
Copy link
Author

AuMilliat commented Sep 1, 2023

Thanks for the answers and the information.

What we want to do is to be able to remove a subpipeline without disposing the parent pipeline. At the moment, in our setup we have a dedicated computer for handling a kinect azure camera. We’ve done a WPF application that stream the data through the RendezVous system. Our next step is to able able to start and stop the kinect streaming with a command from a remote computer (as we might have configuration modification).
What we try to have is a pipeline that own RemoteClockImporter and a RemoteExporter for diagnostics. And have subpipeline for each RendezVous.Process that can be created and disposed while the pipline is still running.

We were looking at independents pipelines with Connector for the clock and the diagnostics. But maybe the ParallelOperator is the way to go, thanks!

@AuMilliat
Copy link
Author

I've checked the ParallelOperator, it does not seems to work on my case as I have different type of streams. I may have found a better solution for my use case by adding a method in the Pipeline class :

        /// <summary>
        /// Create new pipeline setting time offset and diagnostics collector from given pipeline.
        /// </summary>
        /// <param name="pipeline">Pipeline to retrieve time offset and diagonistic configuration.</param>
        /// <param name="name">Pipeline name.</param>
        /// <param name="threadCount">Number of threads.</param>
        /// <param name="allowSchedulingOnExternalThreads">Whether to allow scheduling on external threads.</param>
        /// <param name="enableDiagnostics">Indicates whether to enable collecting and publishing diagnostics information on the Pipeline.Diagnostics stream.</param>
        /// <returns>Created pipeline.</returns>
        public static Pipeline CreateSynchedPipeline(
            Pipeline pipeline,
            string name = null,
            int threadCount = 0,
            bool allowSchedulingOnExternalThreads = false,
            bool enableDiagnostics = false)
        {
            Pipeline newPipeline = new Pipeline(name == null ? $"Synched|{pipeline.Name}" : name, pipeline.defaultDeliveryPolicy, threadCount, allowSchedulingOnExternalThreads);
            newPipeline.VirtualTimeOffset = pipeline.VirtualTimeOffset;
            newPipeline.DiagnosticsCollector = enableDiagnostics ? pipeline.DiagnosticsCollector : null;
            newPipeline.DiagnosticsConfiguration = pipeline.DiagnosticsConfiguration;
            return newPipeline;
        }

@AuMilliat
Copy link
Author

I'm closing this issue, the implementation is available at https://github.com/SaacPSI/psi/tree/Pipline%2BExporter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants