Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 7 commits
  • 14 files changed
  • 0 comments
  • 1 contributor
Jul 09, 2012
Wei Cao Change pollset implementation to use one pollset for each scheduler
In our proxy project which has massive network IO, we found erts_check_io()
become system bottleneck, a call to erts_check_io() would take 500 microseconds,
and invoked about 3.5k times per second under heavy preesure,
which resulted in the average response time of user request longer than 2 milliseconds,
so that limited system throughput down, while cpus were idle(in starvation)
at the same time.

This patch allocates one pollset for each scheduler, so that each scheduler could invoke
erts_check_io() on its own pollset concurrently. To be incorporate well with port migration,
all used fd in each port're recorded, removed from old scheduler's pollset and
added to new scheduler's when port is migrating between schedulers.

After applying this patch, together with binding process&port with schedulers,
erts_check_io() is invoked about 230k times per second (dozens of times more than before),
and throughtput increases from 45k to 105k, so it works.
121afd0
Jul 12, 2012
Wei Cao move erts_deliver_time out of erl_check_io 7577507
Jul 18, 2012
Wei Cao fix the assertion error at erts_init_check_io cab6b2c
Wei Cao fix various assertion failures with DEBUG build a89a910
Wei Cao fix 'move erts_deliver_time out of erl_check_io' commit to avoid it d…
…eadlocks the non-smp emulator
d1c5f30
Jul 19, 2012
Wei Cao fix compiler warnings ff209d9
Jul 25, 2012
Wei Cao Fix a bug which causes erts_check_io() keep omitting some I/O event. 0ff0e78
2  erts/emulator/beam/dist.h
@@ -208,7 +208,7 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
208 208
 				       &dep->dist_cmd,
209 209
 				       ERTS_PORT_TASK_DIST_CMD,
210 210
 				       (ErlDrvEvent) -1,
211  
-				       NULL);
  211
+				       NULL, -1);
212 212
     }
213 213
 }
214 214
 
4  erts/emulator/beam/erl_lock_check.c
@@ -121,6 +121,10 @@ static erts_lc_lock_order_t erts_lock_order[] = {
121 121
     {	"drv_ev_state_grow",			NULL,   		},
122 122
     {	"drv_ev_state",				"address"		},
123 123
     {	"safe_hash",				"address"		},
  124
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  125
+    {   "sfl_tab_lock",               NULL            },
  126
+    {   "sfl_lock",                     NULL            },
  127
+#endif
124 128
     {   "pollset_rm_list",                      NULL                    },
125 129
     {   "removed_fd_pre_alloc_lock",            "address"               },
126 130
     {   "state_prealloc",                       NULL                    },
125  erts/emulator/beam/erl_port_task.c
@@ -199,6 +199,37 @@ pop_port(ErtsRunQueue *runq)
199 199
     return pp;
200 200
 }
201 201
 
  202
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  203
+int erts_transfer_outstanding_io_tasks(Port* pp, ErtsRunQueue* from, ErtsRunQueue* to)
  204
+{
  205
+    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(from));
  206
+    if(pp) {
  207
+        ErtsPortTaskQueue *ptqp = pp->sched.taskq;
  208
+        if (ptqp) {
  209
+        ErtsPortTask *ptp = ptqp->first;
  210
+        int io_tasks = 0;
  211
+        while (ptp) {
  212
+            switch (ptp->type) {
  213
+            case ERTS_PORT_TASK_INPUT:
  214
+            case ERTS_PORT_TASK_OUTPUT:
  215
+            case ERTS_PORT_TASK_EVENT:
  216
+                io_tasks ++;
  217
+                break;
  218
+            default:
  219
+                break;
  220
+            }
  221
+            ptp = ptp->next;
  222
+        }
  223
+        if(io_tasks) {
  224
+            ASSERT(erts_smp_atomic_read_nob(&from->ports.outstanding_io_tasks) >= io_tasks);
  225
+            erts_smp_atomic_add_relb(&from->ports.outstanding_io_tasks, -1*io_tasks);
  226
+            erts_smp_atomic_add_relb(&to->ports.outstanding_io_tasks, io_tasks);
  227
+        }
  228
+        }
  229
+    }
  230
+    return 0;
  231
+}
  232
+#endif
202 233
 
203 234
 #ifdef HARD_DEBUG
204 235
 
@@ -457,6 +488,10 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
457 488
     case ERTS_PORT_TASK_EVENT:
458 489
 	ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) > 0);
459 490
 	erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
  491
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  492
+	ASSERT(erts_smp_atomic_read_nob(&runq->ports.outstanding_io_tasks) > 0);
  493
+	erts_smp_atomic_dec_relb(&runq->ports.outstanding_io_tasks);
  494
+#endif
460 495
 	break;
461 496
     default:
462 497
 	break;
@@ -489,7 +524,8 @@ erts_port_task_schedule(Eterm id,
489 524
 			ErtsPortTaskHandle *pthp,
490 525
 			ErtsPortTaskType type,
491 526
 			ErlDrvEvent event,
492  
-			ErlDrvEventData event_data)
  527
+			ErlDrvEventData event_data,
  528
+			int ix)
493 529
 {
494 530
     ErtsRunQueue *runq;
495 531
     Port *pp;
@@ -503,13 +539,6 @@ erts_port_task_schedule(Eterm id,
503 539
      *          tasks_lock is held.
504 540
      */
505 541
 
506  
-    if (pthp && erts_port_task_is_scheduled(pthp)) {
507  
-	ASSERT(0);
508  
-	erts_port_task_abort(id, pthp);
509  
-    }
510  
-
511  
-    ptp = port_task_alloc();
512  
-
513 542
     ASSERT(is_internal_port(id));
514 543
     pp = &erts_port[internal_port_index(id)];
515 544
     runq = erts_port_runq(pp);
@@ -519,8 +548,21 @@ erts_port_task_schedule(Eterm id,
519 548
 	    erts_smp_runq_unlock(runq);
520 549
 	return -1;
521 550
     }
  551
+    
  552
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  553
+    /* port has been migrated to another pollset */
  554
+    if (ix >=0 && runq->ix != ix) {
  555
+        erts_smp_runq_unlock(runq);
  556
+        return 0;
  557
+    }
  558
+#endif
  559
+
  560
+    if (pthp && erts_port_task_is_scheduled(pthp)) {
  561
+	ASSERT(0);
  562
+	erts_port_task_abort(id, pthp);
  563
+    }
522 564
 
523  
-    ASSERT(!erts_port_task_is_scheduled(pthp));
  565
+    ptp = port_task_alloc();
524 566
 
525 567
     ERTS_PT_CHK_PRES_PORTQ(runq, pp);
526 568
 
@@ -530,17 +572,22 @@ erts_port_task_schedule(Eterm id,
530 572
     }
531 573
 
532 574
 #ifdef ERTS_SMP
533  
-    if (enq_port) {
534  
-	ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
535  
-	if (xrunq) {
536  
-	    /* Port emigrated ... */
537  
-	    erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
538  
-	    erts_smp_runq_unlock(runq);
539  
-	    runq = erts_port_runq(pp);
540  
-	    if (!runq)
541  
-		return -1;
542  
-	}
543  
-    }
  575
+//    if (enq_port) {
  576
+//	ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
  577
+//	if (xrunq) {
  578
+//	    /* Port emigrated ... */
  579
+//	    erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
  580
+//	    erts_smp_runq_unlock(runq);
  581
+//	    
  582
+//#ifdef ERTS_POLLSET_PER_SCHEDULER
  583
+//        erts_change_port_pollset(pp->id, xrunq->ix);
  584
+//#endif
  585
+//	    
  586
+//	    runq = erts_port_runq(pp);
  587
+//	    if (!runq)
  588
+//		return -1;
  589
+//	}
  590
+//    }
544 591
 #endif
545 592
 
546 593
     ASSERT(pp->sched.taskq);
@@ -561,6 +608,9 @@ erts_port_task_schedule(Eterm id,
561 608
     case ERTS_PORT_TASK_OUTPUT:
562 609
     case ERTS_PORT_TASK_EVENT:
563 610
 	erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
  611
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  612
+	erts_smp_atomic_inc_relb(&runq->ports.outstanding_io_tasks);
  613
+#endif
564 614
 	/* Fall through... */
565 615
     default:
566 616
 	enqueue_task(pp->sched.taskq, ptp);
@@ -703,7 +753,12 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
703 753
     if (!pp->sched.taskq) {
704 754
 	if (erts_system_profile_flags.runnable_ports)
705 755
 	    profile_runnable_port(pp, am_inactive);
706  
-	res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  756
+	res = 
  757
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  758
+	(erts_smp_atomic_read_nob(&runq->ports.outstanding_io_tasks)
  759
+#else
  760
+	(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  761
+#endif
707 762
 	       != (erts_aint_t) 0);
708 763
 	goto done;
709 764
     }
@@ -851,6 +906,10 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
851 906
 	       >= io_tasks_executed);
852 907
 	erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
853 908
 				 -1*io_tasks_executed);
  909
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  910
+	ASSERT(erts_smp_atomic_read_nob(&runq->ports.outstanding_io_tasks) >= io_tasks_executed);
  911
+	erts_smp_atomic_add_relb(&runq->ports.outstanding_io_tasks, -1*io_tasks_executed);
  912
+#endif
854 913
     }
855 914
 
856 915
     *curr_port_pp = NULL;
@@ -885,15 +944,23 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
885 944
 	}
886 945
 	else {
887 946
 	    /* Port emigrated ... */
  947
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  948
+	    erts_transfer_outstanding_io_tasks(pp, runq, xrunq);
  949
+#endif
888 950
 	    erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
889 951
 	    erts_smp_runq_unlock(runq);
890 952
 
  953
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  954
+        erts_change_port_pollset(pp->id, xrunq->ix);
  955
+#endif
  956
+
891 957
 	    xrunq = erts_port_runq(pp);
892 958
 	    if (xrunq) {
893 959
 		enqueue_port(xrunq, pp);
894 960
 		ASSERT(pp->sched.exe_taskq);
895 961
 		pp->sched.exe_taskq = NULL;
896 962
 		erts_smp_runq_unlock(xrunq);
  963
+
897 964
 		erts_smp_notify_inc_runq(xrunq);
898 965
 	    }
899 966
 
@@ -902,7 +969,12 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
902 969
 #endif
903 970
     }
904 971
 
905  
-    res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  972
+    res = 
  973
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  974
+	(erts_smp_atomic_read_nob(&runq->ports.outstanding_io_tasks)
  975
+#else
  976
+    (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  977
+#endif
906 978
 	   != (erts_aint_t) 0);
907 979
 
908 980
     ERTS_PT_CHK_PRES_PORTQ(runq, pp);
@@ -925,7 +997,12 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
925 997
 	    erts_smp_runq_unlock(runq);
926 998
 	    erts_port_cleanup(pp); /* Might aquire runq lock */
927 999
 	    erts_smp_runq_lock(runq);
928  
-	    res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  1000
+	    res = 
  1001
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1002
+        (erts_smp_atomic_read_nob(&runq->ports.outstanding_io_tasks)
  1003
+#else
  1004
+	    (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
  1005
+#endif
929 1006
 		   != (erts_aint_t) 0);
930 1007
 	}
931 1008
     }
@@ -1004,6 +1081,7 @@ erts_port_is_scheduled(Port *pp)
1004 1081
     return res;
1005 1082
 }
1006 1083
 
  1084
+
1007 1085
 #ifdef ERTS_SMP
1008 1086
 void
1009 1087
 erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
@@ -1023,6 +1101,7 @@ erts_dequeue_port(ErtsRunQueue *rq)
1023 1101
     ASSERT(!pp
1024 1102
 	   || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
1025 1103
     ASSERT(!pp || pp->sched.in_runq);
  1104
+    
1026 1105
     return pp;
1027 1106
 }
1028 1107
 
7  erts/emulator/beam/erl_port_task.h
@@ -120,10 +120,15 @@ int erts_port_task_schedule(Eterm,
120 120
 			    ErtsPortTaskHandle *,
121 121
 			    ErtsPortTaskType,
122 122
 			    ErlDrvEvent,
123  
-			    ErlDrvEventData);
  123
+			    ErlDrvEventData,
  124
+			    int);
124 125
 void erts_port_task_free_port(Port *);
125 126
 int erts_port_is_scheduled(Port *);
126 127
 
  128
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  129
+int erts_transfer_outstanding_io_tasks(Port* pp, ErtsRunQueue* from, ErtsRunQueue* to);
  130
+#endif
  131
+
127 132
 #ifdef ERTS_SMP
128 133
 void erts_enqueue_port(ErtsRunQueue *rq, Port *pp);
129 134
 Port *erts_dequeue_port(ErtsRunQueue *rq);
122  erts/emulator/beam/erl_process.c
@@ -1066,14 +1066,22 @@ erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags)
1066 1066
 {
1067 1067
     switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) {
1068 1068
     case ERTS_SSI_FLG_POLL_SLEEPING:
1069  
-	erts_sys_schedule_interrupt(1);
  1069
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  1070
+    erts_sys_schedule_interrupt_rq(ssi->ix, 1);
  1071
+#else
  1072
+    erts_sys_schedule_interrupt(1);
  1073
+#endif
1070 1074
 	break;
1071 1075
     case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING:
1072 1076
 	/*
1073 1077
 	 * Thread progress blocking while poll sleeping; need
1074 1078
 	 * to signal on both...
1075 1079
 	 */
1076  
-	erts_sys_schedule_interrupt(1);
  1080
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  1081
+    erts_sys_schedule_interrupt_rq(ssi->ix, 1);
  1082
+#else
  1083
+    erts_sys_schedule_interrupt(1);
  1084
+#endif
1077 1085
 	/* fall through */
1078 1086
     case ERTS_SSI_FLG_TSE_SLEEPING:
1079 1087
 	erts_tse_set(ssi->event);
@@ -1106,7 +1114,7 @@ set_aux_work_flags_wakeup_nob(ErtsSchedulerSleepInfo *ssi,
1106 1114
 #ifdef ERTS_SMP
1107 1115
 	    erts_sched_poke(ssi);
1108 1116
 #else
1109  
-	    erts_sys_schedule_interrupt(1);
  1117
+        erts_sys_schedule_interrupt(1);
1110 1118
 #endif
1111 1119
 	}
1112 1120
     }
@@ -1126,7 +1134,7 @@ set_aux_work_flags_wakeup_relb(ErtsSchedulerSleepInfo *ssi,
1126 1134
 #ifdef ERTS_SMP
1127 1135
 	erts_sched_poke(ssi);
1128 1136
 #else
1129  
-	erts_sys_schedule_interrupt(1);
  1137
+    erts_sys_schedule_interrupt(1);
1130 1138
 #endif
1131 1139
     }
1132 1140
 }
@@ -2075,18 +2083,33 @@ try_set_sys_scheduling(void)
2075 2083
     return 0 == erts_smp_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0);
2076 2084
 }
2077 2085
 
  2086
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  2087
+
  2088
+static ERTS_INLINE int
  2089
+runq_have_outstanding_io_tasks(ErtsRunQueue* rq)
  2090
+{
  2091
+    return 0 != erts_smp_atomic_read_acqb(&rq->ports.outstanding_io_tasks);
  2092
+}
  2093
+
  2094
+#endif
  2095
+
2078 2096
 #endif
2079 2097
 
2080 2098
 static ERTS_INLINE int
2081  
-prepare_for_sys_schedule(void)
  2099
+prepare_for_sys_schedule(ErtsRunQueue *rq)
2082 2100
 {
2083 2101
 #ifdef ERTS_SMP
  2102
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  2103
+    if (!runq_have_outstanding_io_tasks(rq))
  2104
+        return 1;
  2105
+#else
2084 2106
     while (!erts_port_task_have_outstanding_io_tasks()
2085 2107
 	   && try_set_sys_scheduling()) {
2086 2108
 	if (!erts_port_task_have_outstanding_io_tasks())
2087 2109
 	    return 1;
2088 2110
 	clear_sys_scheduling();
2089 2111
     }
  2112
+#endif
2090 2113
     return 0;
2091 2114
 #else
2092 2115
     return !erts_port_task_have_outstanding_io_tasks();
@@ -2239,7 +2262,11 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type)
2239 2262
 	erts_tse_reset(ssi->event);
2240 2263
     else {
2241 2264
 	ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING);
2242  
-	erts_sys_schedule_interrupt(0);
  2265
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  2266
+    erts_sys_schedule_interrupt_rq(ssi->ix, 0);
  2267
+#else
  2268
+    erts_sys_schedule_interrupt(0);
  2269
+#endif
2243 2270
     }
2244 2271
 
2245 2272
     while (1) {
@@ -2395,7 +2422,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2395 2422
      * be waiting in erl_sys_schedule()
2396 2423
      */
2397 2424
 
2398  
-    if (!prepare_for_sys_schedule()) {
  2425
+    if (!prepare_for_sys_schedule(rq)) {
2399 2426
 
2400 2427
 	sched_waiting(esdp->no, rq);
2401 2428
 
@@ -2500,9 +2527,12 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2500 2527
 	    if (working)
2501 2528
 		sched_wall_time_change(esdp, working = 0);
2502 2529
 
  2530
+        
  2531
+#ifndef ERTS_POLLSET_PER_SCHEDULER
2503 2532
 	    ASSERT(!erts_port_task_have_outstanding_io_tasks());
  2533
+#endif
2504 2534
 
2505  
-	    erl_sys_schedule(1); /* Might give us something to do */
  2535
+	    erl_sys_schedule(rq->ix, 1); /* Might give us something to do */
2506 2536
 
2507 2537
 	    dt = erts_do_time_read_and_reset();
2508 2538
 	    if (dt) erts_bump_timer(dt);
@@ -2541,13 +2571,17 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2541 2571
 	     * If we got new I/O tasks we aren't allowed to
2542 2572
 	     * call erl_sys_schedule() until it is handled.
2543 2573
 	     */
  2574
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  2575
+        if (runq_have_outstanding_io_tasks(rq)) {
  2576
+#else
2544 2577
 	    if (erts_port_task_have_outstanding_io_tasks()) {
  2578
+#endif
2545 2579
 		clear_sys_scheduling();
2546 2580
 		/*
2547 2581
 		 * Got to check that we still got I/O tasks; otherwise
2548 2582
 		 * we have to continue checking for I/O...
2549 2583
 		 */
2550  
-		if (!prepare_for_sys_schedule()) {
  2584
+		if (!prepare_for_sys_schedule(rq)) {
2551 2585
 		    spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
2552 2586
 		    goto tse_wait;
2553 2587
 		}
@@ -2562,14 +2596,18 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2562 2596
 	 * If we got new I/O tasks we aren't allowed to
2563 2597
 	 * sleep in erl_sys_schedule().
2564 2598
 	 */
  2599
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  2600
+    if (runq_have_outstanding_io_tasks(rq)) {
  2601
+#else
2565 2602
 	if (erts_port_task_have_outstanding_io_tasks()) {
  2603
+#endif
2566 2604
 	    clear_sys_scheduling();
2567 2605
 
2568 2606
 	    /*
2569 2607
 	     * Got to check that we still got I/O tasks; otherwise
2570 2608
 	     * we have to wait in erl_sys_schedule() after all...
2571 2609
 	     */
2572  
-	    if (!prepare_for_sys_schedule()) {
  2610
+	    if (!prepare_for_sys_schedule(rq)) {
2573 2611
 		/*
2574 2612
 		 * Not allowed to wait in erl_sys_schedule;
2575 2613
 		 * do tse wait instead...
@@ -2598,7 +2636,9 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2598 2636
 		ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
2599 2637
 		goto sys_woken;
2600 2638
 	    }
  2639
+#ifndef ERTS_POLLSET_PER_SCHEDULER
2601 2640
 	    ASSERT(!erts_port_task_have_outstanding_io_tasks());
  2641
+#endif
2602 2642
 	    goto sys_poll_aux_work;
2603 2643
 	}
2604 2644
 
@@ -2616,9 +2656,11 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
2616 2656
 	    erts_thr_progress_active(esdp, thr_prgr_active = 0);
2617 2657
 #endif
2618 2658
 
  2659
+#ifndef ERTS_POLLSET_PER_SCHEDULER
2619 2660
 	ASSERT(!erts_port_task_have_outstanding_io_tasks());
  2661
+#endif
2620 2662
 
2621  
-	erl_sys_schedule(0);
  2663
+	erl_sys_schedule(rq->ix, 0);
2622 2664
 
2623 2665
 	dt = erts_do_time_read_and_reset();
2624 2666
 	if (dt) erts_bump_timer(dt);
@@ -2969,7 +3011,7 @@ check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
2969 3011
     int len;
2970 3012
     Uint32 f_flags, f_rq_flags;
2971 3013
     ErtsRunQueue *f_rq;
2972  
-
  3014
+    
2973 3015
     f_flags = mp->prio[prio].flags;
2974 3016
 
2975 3017
     ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(mp->flags, prio));
@@ -3054,10 +3096,17 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp)
3054 3096
 	    if (prio == ERTS_PORT_PRIO_LEVEL) {
3055 3097
 		Port *prt;
3056 3098
 		prt = erts_dequeue_port(rq);
3057  
-		if (prt)
  3099
+		if (prt) {
  3100
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3101
+		    erts_transfer_outstanding_io_tasks(prt, rq, c_rq);
  3102
+#endif
3058 3103
 		    RUNQ_SET_RQ(&prt->run_queue, c_rq);
  3104
+		}
3059 3105
 		erts_smp_runq_unlock(rq);
3060 3106
 		if (prt) {
  3107
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3108
+			erts_change_port_pollset(prt->id, c_rq->ix);
  3109
+#endif
3061 3110
 		    /* port might terminate while we have no lock... */
3062 3111
 		    rq = erts_port_runq(prt);
3063 3112
 		    if (rq) {
@@ -3226,8 +3275,14 @@ evacuate_run_queue(ErtsRunQueue *rq,
3226 3275
 	while (prt) {
3227 3276
 	    ErtsRunQueue *prt_rq;
3228 3277
 	    prt = erts_dequeue_port(rq);
  3278
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3279
+	    erts_transfer_outstanding_io_tasks(prt, rq, to_rq);
  3280
+#endif
3229 3281
 	    RUNQ_SET_RQ(&prt->run_queue, to_rq);
3230 3282
 	    erts_smp_runq_unlock(rq);
  3283
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3284
+		erts_change_port_pollset(prt->id, to_rq->ix);
  3285
+#endif
3231 3286
 	    /*
3232 3287
 	     * The port might terminate while
3233 3288
 	     * we have no lock on it...
@@ -3372,12 +3427,17 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq,
3372 3427
     /*
3373 3428
      * Check for a runnable port to steal...
3374 3429
      */
3375  
-
3376 3430
     if (vrq->ports.start) {
3377 3431
 	ErtsRunQueue *prt_rq;
3378 3432
 	Port *prt = erts_dequeue_port(vrq);
  3433
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3434
+	erts_transfer_outstanding_io_tasks(prt, vrq, rq);
  3435
+#endif
3379 3436
 	RUNQ_SET_RQ(&prt->run_queue, rq);
3380 3437
 	erts_smp_runq_unlock(vrq);
  3438
+#ifdef ERTS_POLLSET_PER_SCHEDULER
  3439
+    erts_change_port_pollset(prt->id, rq->ix);
  3440
+#endif
3381 3441
 
3382 3442
 	/*
3383 3443
 	 * The port might terminate while
@@ -3397,7 +3457,6 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq,
3397 3457
 	    return !0;
3398 3458
 	}
3399 3459
     }
3400  
-
3401 3460
     erts_smp_runq_unlock(vrq);
3402 3461
 
3403 3462
     return 0;
@@ -4637,6 +4696,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
4637 4696
 	rq->ports.info.reds = 0;
4638 4697
 	rq->ports.start = NULL;
4639 4698
 	rq->ports.end = NULL;
  4699
+	
  4700
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  4701
+    erts_smp_atomic_init_nob(&rq->ports.outstanding_io_tasks, 0);
  4702
+#endif	
4640 4703
     }
4641 4704
 
4642 4705
 #ifdef ERTS_SMP
@@ -4672,6 +4735,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
4672 4735
 	ssi->next = NULL;
4673 4736
 	ssi->prev = NULL;
4674 4737
 #endif
  4738
+    ssi->ix = ix-1;
4675 4739
 	erts_smp_atomic32_init_nob(&ssi->flags, 0);
4676 4740
 	ssi->event = NULL; /* initialized in sched_thread_func */
4677 4741
 #endif
@@ -5788,8 +5852,24 @@ sched_thread_func(void *vesdp)
5788 5852
     return NULL;
5789 5853
 }
5790 5854
 
  5855
+#ifdef ERTS_SMP
  5856
+static void *
  5857
+deliver_time_thread(void *unused)
  5858
+{
  5859
+    while (1) {
  5860
+    erts_deliver_time();
  5861
+    erts_milli_sleep(1);
  5862
+    }
  5863
+    return NULL;
  5864
+}
  5865
+#endif
  5866
+
5791 5867
 static ethr_tid aux_tid;
5792 5868
 
  5869
+#ifdef ERTS_SMP
  5870
+static ethr_tid deliver_time_tid;
  5871
+#endif
  5872
+
5793 5873
 void
5794 5874
 erts_start_schedulers(void)
5795 5875
 {
@@ -5827,6 +5907,12 @@ erts_start_schedulers(void)
5827 5907
     res = ethr_thr_create(&aux_tid, aux_thread, NULL, &opts);
5828 5908
     if (res != 0)
5829 5909
 	erl_exit(1, "Failed to create aux thread\n");
  5910
+	
  5911
+#ifdef ERTS_SMP
  5912
+	res = ethr_thr_create(&deliver_time_tid, deliver_time_thread, NULL, &opts);
  5913
+	if (res != 0)
  5914
+	erl_exit(1, "Failed to create deliver time thread\n");
  5915
+#endif
5830 5916
 
5831 5917
     if (actual < 1)
5832 5918
 	erl_exit(1,
@@ -6980,7 +7066,7 @@ Process *schedule(Process *p, int calls)
6980 7066
 
6981 7067
 	    goto check_activities_to_run;
6982 7068
 	}
6983  
-	else if (fcalls > input_reductions && prepare_for_sys_schedule()) {
  7069
+	else if (fcalls > input_reductions && prepare_for_sys_schedule(rq)) {
6984 7070
 	    /*
6985 7071
 	     * Schedule system-level activities.
6986 7072
 	     */
@@ -6988,13 +7074,15 @@ Process *schedule(Process *p, int calls)
6988 7074
 	    erts_smp_atomic32_set_relb(&function_calls, 0);
6989 7075
 	    fcalls = 0;
6990 7076
 
  7077
+#ifndef ERTS_POLLSET_PER_SCHEDULER
6991 7078
 	    ASSERT(!erts_port_task_have_outstanding_io_tasks());
  7079
+#endif
6992 7080
 
6993 7081
 #if 0 /* Not needed since we wont wait in sys schedule */
6994 7082
 	    erts_sys_schedule_interrupt(0);
6995 7083
 #endif
6996 7084
 	    erts_smp_runq_unlock(rq);
6997  
-	    erl_sys_schedule(1);
  7085
+	    erl_sys_schedule(rq->ix, 1);
6998 7086
 	    dt = erts_do_time_read_and_reset();
6999 7087
 	    if (dt) erts_bump_timer(dt);
7000 7088
 
4  erts/emulator/beam/erl_process.h
@@ -291,6 +291,7 @@ struct ErtsSchedulerSleepInfo_ {
291 291
 #ifdef ERTS_SMP
292 292
     ErtsSchedulerSleepInfo *next;
293 293
     ErtsSchedulerSleepInfo *prev;
  294
+    int ix;
294 295
     erts_smp_atomic32_t flags;
295 296
     erts_tse_t *event;
296 297
 #endif
@@ -403,6 +404,9 @@ struct ErtsRunQueue_ {
403 404
 	ErtsRunQueueInfo info;
404 405
 	struct port *start;
405 406
 	struct port *end;
  407
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  408
+	erts_smp_atomic_t outstanding_io_tasks;
  409
+#endif
406 410
     } ports;
407 411
 };
408 412
 
3  erts/emulator/beam/global.h
@@ -187,6 +187,9 @@ struct port {
187 187
     ErtsPrtSD *psd;		 /* Port specific data */
188 188
 };
189 189
 
  190
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  191
+int erts_change_port_pollset(Eterm, int);
  192
+#endif
190 193
 
191 194
 ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
192 195
 
2  erts/emulator/beam/io.c
@@ -2833,7 +2833,7 @@ static void schedule_port_timeout(Port *p)
2833 2833
 				   &p->timeout_task,
2834 2834
 				   ERTS_PORT_TASK_TIMEOUT,
2835 2835
 				   (ErlDrvEvent) -1,
2836  
-				   NULL);
  2836
+				   NULL, -1);
2837 2837
 }
2838 2838
 
2839 2839
 ErlDrvTermData driver_mk_term_nil(void)
4  erts/emulator/beam/sys.h
@@ -661,7 +661,9 @@ extern char *erts_sys_ddll_error(int code);
661 661
  */
662 662
 #include "erl_time.h"
663 663
 
  664
+void erts_sys_schedule_interrupt_rq(int ix, int set);
664 665
 void erts_sys_schedule_interrupt(int set);
  666
+
665 667
 #ifdef ERTS_SMP
666 668
 void erts_sys_schedule_interrupt_timed(int set, erts_short_time_t msec);
667 669
 void erts_sys_main_thread(void);
@@ -671,7 +673,7 @@ extern void erts_sys_prepare_crash_dump(void);
671 673
 extern void erts_sys_pre_init(void);
672 674
 extern void erl_sys_init(void);
673 675
 extern void erl_sys_args(int *argc, char **argv);
674  
-extern void erl_sys_schedule(int);
  676
+extern void erl_sys_schedule(int, int);
675 677
 void sys_tty_reset(int);
676 678
 
677 679
 int sys_max_files(void);
604  erts/emulator/sys/common/erl_check_io.c
@@ -78,16 +78,25 @@ typedef char EventStateFlags;
78 78
 #define ERTS_CIO_POLL_INIT	ERTS_POLL_EXPORT(erts_poll_init)
79 79
 #define ERTS_CIO_POLL_INFO	ERTS_POLL_EXPORT(erts_poll_info)
80 80
 
81  
-static struct pollset_info
  81
+struct pollset_info
82 82
 {
83 83
     ErtsPollSet ps;
84 84
     erts_smp_atomic_t in_poll_wait;        /* set while doing poll */
  85
+};
  86
+
  87
+static struct pollset_info *erts_pollsets;
  88
+static Uint erts_no_pollsets;
  89
+
  90
+#define ERTS_POLLSET_IX(IX)                 \
  91
+  (ASSERT_EXPR(0 <= (IX) && (IX) < erts_no_pollsets),			\
  92
+   &erts_pollsets[(IX)])
  93
+
  94
+struct deselected_info {
85 95
 #ifdef ERTS_SMP
86 96
     struct removed_fd* removed_list;       /* list of deselected fd's*/
87 97
     erts_smp_spinlock_t removed_list_lock;
88 98
 #endif
89  
-}pollset;
90  
-#define NUM_OF_POLLSETS 1
  99
+} deselected;
91 100
 
92 101
 typedef struct {
93 102
 #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
@@ -120,6 +129,23 @@ struct removed_fd {
120 129
 };
121 130
 #endif
122 131
 
  132
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  133
+struct selected_fd {
  134
+    struct selected_fd *next;
  135
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  136
+    ErtsSysFdType fd;
  137
+#else
  138
+    ErtsDrvEventState *state;
  139
+#endif
  140
+};
  141
+
  142
+struct selected_fd_list {       /* list of selected fd in each port */
  143
+    struct selected_fd *head;
  144
+    erts_smp_mtx_t lock;
  145
+};
  146
+
  147
+#endif
  148
+
123 149
 #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
124 150
 static int max_fds = -1;
125 151
 #endif
@@ -212,17 +238,250 @@ drvport2id(ErlDrvPort dp)
212 238
     }
213 239
 }
214 240
 
  241
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  242
+static struct selected_fd_list *sfl_tab;
  243
+static erts_smp_mtx_t sfl_tab_lock;
  244
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(selected_fd, struct selected_fd, 64, ERTS_ALC_T_UNDEF)
  245
+
  246
+static void init_sfl_tab(void)
  247
+{
  248
+    erts_smp_mtx_lock(&sfl_tab_lock);
  249
+    if (sfl_tab == NULL) {
  250
+        int i;
  251
+        sfl_tab = (struct selected_fd_list *) erts_alloc(ERTS_ALC_T_UNDEF,
  252
+            erts_max_ports * sizeof(struct selected_fd_list));
  253
+        for (i = 0; i < erts_max_ports; i++) {
  254
+            sfl_tab[i].head = NULL;
  255
+            erts_smp_mtx_init(&sfl_tab[i].lock,
  256
+			   "sfl_lock");
  257
+        }
  258
+    }
  259
+    erts_smp_mtx_unlock(&sfl_tab_lock);
  260
+}
  261
+
  262
+static void add_selected_fd(ErlDrvPort drvport, ErtsDrvEventState* state)
  263
+{
  264
+    int ix = (int) drvport;
  265
+    struct selected_fd *sflp;
  266
+    struct selected_fd **insp;
  267
+    
  268
+    ASSERT(ix >= 0 && ix < erts_max_ports);
  269
+    if(sfl_tab == NULL) init_sfl_tab();
  270
+
  271
+    erts_smp_mtx_lock(&sfl_tab[ix].lock);
  272
+    sflp = sfl_tab[ix].head;
  273
+    insp = &sfl_tab[ix].head;
  274
+    while (sflp) {
  275
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  276
+        if (sflp->fd == state->fd)
  277
+            break;
  278
+#else
  279
+        if (sflp->state == state)
  280
+            break;
  281
+#endif
  282
+        insp = &sflp->next;
  283
+        sflp = sflp->next;
  284
+    }
  285
+    
  286
+    if(!sflp) {
  287
+        struct selected_fd *p = selected_fd_alloc();
  288
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  289
+        p->fd = state->fd;
  290
+#else
  291
+        p->state = state;
  292
+#endif
  293
+        p->next = NULL;
  294
+        *insp = p;
  295
+    }
  296
+    erts_smp_mtx_unlock(&sfl_tab[ix].lock);
  297
+}
  298
+
  299
+static void del_selected_fd(ErlDrvPort drvport, ErtsDrvEventState* state)
  300
+{
  301
+    int ix = (int) drvport;
  302
+    struct selected_fd *sflp;
  303
+    struct selected_fd **delp;
  304
+    
  305
+    ASSERT(ix >= 0 && ix < erts_max_ports);
  306
+    if(sfl_tab == NULL)
  307
+        return;
  308
+    
  309
+    erts_smp_mtx_lock(&sfl_tab[ix].lock);
  310
+    sflp = sfl_tab[ix].head;
  311
+    delp = &sfl_tab[ix].head;
  312
+    while (sflp) {
  313
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  314
+        if (sflp->fd == state->fd)
  315
+            break;
  316
+#else
  317
+        if (sflp->state == state)
  318
+            break;
  319
+#endif
  320
+        delp = &sflp->next;
  321
+        sflp = sflp->next;
  322
+    }
  323
+    
  324
+    if (sflp) {
  325
+        *delp = sflp->next;
  326
+        selected_fd_free(sflp);
  327
+    }
  328
+    erts_smp_mtx_unlock(&sfl_tab[ix].lock);
  329
+}
  330
+    
  331
+static struct selected_fd* deepcopy_sfl(struct selected_fd *sflp)
  332
+{
  333
+    struct selected_fd *copy, **insp;
  334
+    insp = &copy;
  335
+    /* deep copy */
  336
+    while(sflp) {
  337
+        struct selected_fd *sf = selected_fd_alloc();
  338
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  339
+        sf->fd = sflp->fd;
  340
+#else
  341
+        sf->state = sflp->state;
  342
+#endif
  343
+        *insp = sf;
  344
+        insp = &sf->next;
  345
+        sflp = sflp->next;
  346
+    }
  347
+    *insp = NULL;
  348
+    return copy;
  349
+}
  350
+
  351
+static void free_sfl(struct selected_fd *sflp)
  352
+{
  353
+    struct selected_fd *tofree;
  354
+    while(sflp) {
  355
+        tofree = sflp;
  356
+        sflp = sflp->next;
  357
+        selected_fd_free(tofree);
  358
+    }
  359
+}
  360
+
  361
+#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \
  362
+  ((erts_port_status_get((P)) & ERTS_PORT_SFLGS_DEAD) || (P)->id != (ID))
  363
+
  364
+int
  365
+ERTS_CIO_EXPORT(erts_check_io_change_port_pollset)(Eterm id, int to_rq_ix)
  366
+{
  367
+    Port *pp;
  368
+    int ix;
  369
+    ErtsPollSet to_ps = ERTS_POLLSET_IX(to_rq_ix)->ps;
  370
+    struct selected_fd *copy, *sflp;
  371
+    int do_wake;
  372
+    
  373
+    if(sfl_tab == NULL) /* can safe migrate this port */
  374
+        return 0;
  375
+        
  376
+    ASSERT(is_internal_port(id));
  377
+    ix = internal_port_index(id);
  378
+    pp = &erts_port[ix];
  379
+    
  380
+    erts_smp_mtx_lock(&sfl_tab[ix].lock);
  381
+    copy = sflp = deepcopy_sfl(sfl_tab[ix].head);
  382
+    erts_smp_mtx_unlock(&sfl_tab[ix].lock);
  383
+    
  384
+    if (!copy) return 0;
  385
+
  386
+    while(sflp) {
  387
+        ErtsDrvEventState* state ;
  388
+        ErtsSysFdType fd;
  389
+        erts_smp_mtx_t* mtx;
  390
+        ErtsRunQueue *runq;
  391
+    
  392
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
  393
+        fd = sflp->fd;
  394
+        mtx = fd_mtx(fd);
  395
+        erts_smp_mtx_lock(mtx);
  396
+        state = &drv_ev_state[fd];
  397
+#else
  398
+        state = sflp->state;
  399
+        fd = state->fd;
  400
+        mtx = fd_mtx(fd);
  401
+        erts_smp_mtx_lock(mtx);
  402
+#endif
  403
+
  404
+        /* lock rq after lock fd_mutex to avoid deadlock */
  405
+        runq = erts_port_runq(pp); 
  406
+        
  407
+        /* it's possible port had been migrated to another rq or closed */
  408
+        if ( !runq || runq->ix != to_rq_ix || ERTS_PORT_TASK_INVALID_PORT(pp, id) ) {
  409
+        if (runq) {
  410
+            erts_smp_runq_unlock(runq);
  411
+            erts_smp_mtx_unlock(mtx);
  412
+        }
  413
+        free_sfl(copy);
  414
+        return -1;
  415
+        }
  416
+
  417
+        switch (state->type) {
  418
+        case ERTS_EV_TYPE_STOP_USE:
  419
+        case ERTS_EV_TYPE_NONE:
  420
+        break;
  421
+        case ERTS_EV_TYPE_DRV_SEL: {
  422
+        Eterm iid = state->driver.select->inport;
  423
+        Eterm oid = state->driver.select->outport;
  424
+        
  425
+        if( id == iid ) {
  426
+            ErtsPollSet from_ps = state->driver.select->inps;
  427
+            if( from_ps != to_ps ) {
  428
+            state->driver.select->inps = to_ps;
  429
+            do_wake = 0;
  430
+            ERTS_CIO_POLL_CTL(from_ps, state->fd, ERTS_POLL_EV_IN, 0, &do_wake);
  431
+            do_wake = 1;
  432
+            state->events = ERTS_CIO_POLL_CTL(to_ps, state->fd, ERTS_POLL_EV_IN, 1, &do_wake);
  433
+            }
  434
+        }
  435
+        
  436
+        if( id == oid ) {
  437
+            ErtsPollSet from_ps = state->driver.select->outps;
  438
+            if( from_ps != to_ps ) {
  439
+            state->driver.select->outps = to_ps;
  440
+            do_wake = 0;
  441
+            ERTS_CIO_POLL_CTL(from_ps, state->fd, ERTS_POLL_EV_OUT, 0, &do_wake);
  442
+            do_wake = 1;
  443
+            state->events = ERTS_CIO_POLL_CTL(to_ps, state->fd, ERTS_POLL_EV_OUT, 1, &do_wake);
  444
+            }
  445
+        }
  446
+        break;
  447
+        }
  448
+        case ERTS_EV_TYPE_DRV_EV: {
  449
+        ErtsPollSet from_ps = state->driver.event->ps;
  450
+        if (from_ps != to_ps) {
  451
+        state->driver.event->ps = to_ps;
  452
+        do_wake = 0;
  453
+        ERTS_CIO_POLL_CTL(from_ps, state->fd, state->events, 0, &do_wake);
  454
+        do_wake = 1;
  455
+        state->events = ERTS_CIO_POLL_CTL(to_ps, state->fd, state->events, 1, &do_wake);
  456
+        }
  457
+        break;
  458
+        }
  459
+        default:
  460
+        ASSERT(0);
  461
+        }
  462
+        erts_smp_runq_unlock(runq);
  463
+        erts_smp_mtx_unlock(mtx);
  464
+        
  465
+        sflp = sflp->next;
  466
+    }
  467
+    
  468
+    free_sfl(copy);
  469
+    return 0;
  470
+}
  471
+
  472
+#endif
  473
+
215 474
 #ifdef ERTS_SMP
216 475
 ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
217 476
 #endif
218 477
 
219 478
 static ERTS_INLINE void
220  
-remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
  479
+remember_removed(ErtsDrvEventState *state, struct deselected_info* psi)
221 480
 {
222 481
 #ifdef ERTS_SMP
223 482
     struct removed_fd *fdlp;
224 483
     ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(fd_mtx(state->fd)));
225  
-    if (erts_smp_atomic_read_nob(&psi->in_poll_wait)) {
  484
+    //if (erts_smp_atomic_read_nob(&psi->in_poll_wait)) {
226 485
 	state->remove_cnt++;
227 486
 	ASSERT(state->remove_cnt > 0);
228 487
 	fdlp = removed_fd_alloc();
@@ -236,7 +495,7 @@ remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
236 495
 	fdlp->next = psi->removed_list;
237 496
 	psi->removed_list = fdlp;
238 497
 	erts_smp_spin_unlock(&psi->removed_list_lock);
239  
-    }
  498
+    //}
240 499
 #endif
241 500
 }
242 501
 
@@ -257,7 +516,7 @@ is_removed(ErtsDrvEventState *state)
257 516
 }
258 517
 
259 518
 static void
260  
-forget_removed(struct pollset_info* psi)
  519
+forget_removed(struct deselected_info* psi)
261 520
 {
262 521
 #ifdef ERTS_SMP
263 522
     struct removed_fd* fdlp;
@@ -425,12 +684,60 @@ static void
425 684
 deselect(ErtsDrvEventState *state, int mode)
426 685
 {
427 686
     int do_wake = 0;
428  
-    ErtsPollEvents rm_events;
429 687
     ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(fd_mtx(state->fd)));
430 688
     ASSERT(state->events);
431 689
 
432 690
     abort_tasks(state, mode);
433 691
 
  692
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  693
+    switch(state->type) {
  694
+    case ERTS_EV_TYPE_DRV_SEL: {
  695
+        Eterm iid = state->driver.select->inport;
  696
+        Eterm oid = state->driver.select->outport;
  697
+        if (!mode) {
  698
+            mode = ERL_DRV_READ | ERL_DRV_WRITE;
  699
+        }
  700
+        if (mode & ERL_DRV_READ && is_not_nil(iid)) {
  701
+            state->events = ERTS_CIO_POLL_CTL(state->driver.select->inps,
  702
+                    state->fd, ERTS_POLL_EV_IN, 0, &do_wake);
  703
+            state->driver.select->inport = NIL;
  704
+            state->driver.select->inps = NULL;
  705
+            
  706
+            if( is_nil(oid) || iid != oid || !(mode & ERL_DRV_WRITE) ) {
  707
+                del_selected_fd(internal_port_index(iid), state);
  708
+            }
  709
+        }
  710
+        if (mode & ERL_DRV_WRITE && is_not_nil(oid)) {
  711
+            do_wake = 0;
  712
+            state->events = ERTS_CIO_POLL_CTL(state->driver.select->outps,
  713
+                    state->fd, ERTS_POLL_EV_OUT, 0, &do_wake);
  714
+            state->driver.select->outport = NIL;
  715
+            state->driver.select->outps = NULL;
  716
+            
  717
+            if( is_nil(iid) || iid != oid || mode & ERL_DRV_READ)
  718
+                del_selected_fd(internal_port_index(oid), state);
  719
+        }
  720
+        
  721
+        break;
  722
+    }
  723
+#if ERTS_CIO_HAVE_DRV_EVENT
  724
+	case ERTS_EV_TYPE_DRV_EV:
  725
+        ASSERT(!mode);
  726
+        state->events = ERTS_CIO_POLL_CTL(state->driver.event->ps,
  727
+                state->fd, state->events, 0, &do_wake);
  728
+        del_selected_fd(internal_port_index(state->driver.event->port), state);
  729
+	    break;
  730
+#endif
  731
+	case ERTS_EV_TYPE_NONE:
  732
+	    break;
  733
+	default:
  734
+	    ASSERT(0);
  735
+	    break;
  736
+    }
  737
+
  738
+#else
  739
+    {
  740
+    ErtsPollEvents rm_events;
434 741
     if (!mode)
435 742
 	rm_events = state->events;
436 743
     else {
@@ -446,7 +753,9 @@ deselect(ErtsDrvEventState *state, int mode)
446 753
 	}
447 754
     }
448 755
 
449  
-    state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake);
  756
+    state->events = ERTS_CIO_POLL_CTL(ERTS_POLLSET_IX(0)->ps, state->fd, rm_events, 0, &do_wake);
  757
+    }
  758
+#endif
450 759
 
451 760
     if (!(state->events)) {
452 761
 	switch (state->type) {
@@ -473,7 +782,7 @@ deselect(ErtsDrvEventState *state, int mode)
473 782
 	state->driver.select = NULL;
474 783
 	state->type = ERTS_EV_TYPE_NONE;
475 784
 	state->flags = 0;
476  
-	remember_removed(state, &pollset);
  785
+	remember_removed(state, &deselected);
477 786
     }
478 787
 }
479 788
 
@@ -499,6 +808,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
499 808
     ErtsDrvEventState *state;
500 809
     int wake_poller;
501 810
     int ret;
  811
+    ErtsPollSet inps = NULL;
  812
+    ErtsPollSet outps = NULL;
  813
+    
502 814
 #ifdef USE_VM_PROBES
503 815
     DTRACE_CHARBUF(name, 64);
504 816
 #endif
@@ -506,6 +818,10 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
506 818
     ERTS_SMP_LC_ASSERT(erts_drvport2port(ix)
507 819
 		       && erts_lc_is_port_locked(erts_drvport2port(ix)));
508 820
 
  821
+#if !defined(ERTS_SMP) || !defined(ERTS_POLLSET_PER_SCHEDULER)
  822
+    inps = outps = ERTS_POLLSET_IX(0)->ps;
  823
+#endif
  824
+
509 825
 #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
510 826
     if ((unsigned)fd >= (unsigned)erts_smp_atomic_read_nob(&drv_ev_state_len)) {
511 827
 	if (fd < 0) {
@@ -585,14 +901,45 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
585 901
 	   (state->type == ERTS_EV_TYPE_NONE && !state->events));
586 902
 
587 903
     if (!on && !(state->flags & ERTS_EV_FLAG_USED) 
588  
-	&& state->events && !(state->events & ~ctl_events)) {	
  904
+	&& state->events && !(state->events & ~ctl_events)) {
589 905
 	/* Old driver removing all events. At least wake poller.
590 906
 	   It will not make close() 100% safe but it will prevent
591 907
 	   actions delayed by poll timeout. */
592 908
 	wake_poller = 1;
593 909
     }
594 910
 
595  
-    new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
  911
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  912
+    new_events = state->events;
  913
+    {
  914
+    int wake_poller_cmd;
  915
+    int wake_poller_res = 0;
  916
+    if (ctl_events & ERTS_POLL_EV_IN) {
  917
+        wake_poller_cmd = wake_poller;
  918
+        if(state->type == ERTS_EV_TYPE_DRV_SEL && state->driver.select->inps ) {
  919
+            inps = state->driver.select->inps;
  920
+        } else {
  921
+            inps = ERTS_POLLSET_IX(((ErtsRunQueue *) erts_smp_atomic_read_nob(
  922
+                    &erts_drvport2port(ix)->run_queue))->ix)->ps;
  923
+        }
  924
+        new_events = ERTS_CIO_POLL_CTL(inps, state->fd, ERTS_POLL_EV_IN, on, &wake_poller_cmd);
  925
+        wake_poller_res |= wake_poller_cmd;
  926
+    }
  927
+    if (ctl_events & ERTS_POLL_EV_OUT) {
  928
+        wake_poller_cmd = wake_poller;
  929
+        if(state->type == ERTS_EV_TYPE_DRV_SEL && state->driver.select->outps ) {
  930
+            outps = state->driver.select->outps;
  931
+        } else {
  932
+            outps = ERTS_POLLSET_IX(((ErtsRunQueue *) erts_smp_atomic_read_nob(
  933
+                    &erts_drvport2port(ix)->run_queue))->ix)->ps;
  934
+        }
  935
+        new_events = ERTS_CIO_POLL_CTL(outps, state->fd, ERTS_POLL_EV_OUT, on, &wake_poller_cmd);
  936
+        wake_poller_res |= wake_poller_cmd;
  937
+    }
  938
+    wake_poller = wake_poller_res;
  939
+    }
  940
+#else
  941
+    new_events = ERTS_CIO_POLL_CTL(inps, state->fd, ctl_events, on, &wake_poller);
  942
+#endif
596 943
 
597 944
     if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
598 945
 	if (state->type == ERTS_EV_TYPE_DRV_SEL && !state->events) {
@@ -625,15 +972,32 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
625 972
 		dsdsp->outport = NIL;
626 973
 		erts_port_task_handle_init(&dsdsp->intask);
627 974
 		erts_port_task_handle_init(&dsdsp->outtask);
  975
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  976
+		dsdsp->inps = NULL;
  977
+		dsdsp->outps = NULL;
  978
+#endif
628 979
 		ASSERT(state->driver.select == NULL);
629 980
 		state->driver.select = dsdsp;
630 981
 		state->type = ERTS_EV_TYPE_DRV_SEL;
631 982
 	    }
632 983
 	    ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL);
633  
-	    if (ctl_events & ERTS_POLL_EV_IN)
  984
+	    if (ctl_events & ERTS_POLL_EV_IN) {
634 985
 		state->driver.select->inport = id;
635  
-	    if (ctl_events & ERTS_POLL_EV_OUT)
  986
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  987
+		state->driver.select->inps = inps;
  988
+#endif
  989
+	    }
  990
+	    if (ctl_events & ERTS_POLL_EV_OUT) {
636 991
 		state->driver.select->outport = id;
  992
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  993
+		state->driver.select->outps = outps;
  994
+#endif
  995
+	    }
  996
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  997
+	    if (ctl_events & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
  998
+            add_selected_fd(ix, state);
  999
+	    }
  1000
+#endif
637 1001
 	    if (mode & ERL_DRV_USE) {
638 1002
 		state->flags |= ERTS_EV_FLAG_USED;
639 1003
 	    }
@@ -643,16 +1007,30 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
643 1007
 		if (ctl_events & ERTS_POLL_EV_IN) {
644 1008
 		    abort_tasks(state, ERL_DRV_READ);
645 1009
 		    state->driver.select->inport = NIL;
  1010
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1011
+            state->driver.select->inps = NULL;
  1012
+#endif
646 1013
 		}
647 1014
 		if (ctl_events & ERTS_POLL_EV_OUT) {
648 1015
 		    abort_tasks(state, ERL_DRV_WRITE);
649 1016
 		    state->driver.select->outport = NIL;
  1017
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1018
+            state->driver.select->outps = NULL;
  1019
+#endif
650 1020
 		}
  1021
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1022
+	    if (ctl_events & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
  1023
+	        if( state->driver.select->inport != id &&
  1024
+                state->driver.select->outport != id) {
  1025
+            del_selected_fd(ix, state);
  1026
+            }
  1027
+	    }
  1028
+#endif
651 1029
 		if (new_events == 0) {
652 1030
 		    ASSERT(!erts_port_task_is_scheduled(&state->driver.select->intask));
653 1031
 		    ASSERT(!erts_port_task_is_scheduled(&state->driver.select->outtask));
654 1032
 		    if (old_events != 0) {
655  
-			remember_removed(state, &pollset);
  1033
+			remember_removed(state, &deselected);
656 1034
 		    }		    
657 1035
 		    if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) {
658 1036
 			state->type = ERTS_EV_TYPE_NONE;
@@ -723,10 +1101,15 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
723 1101
     ErtsDrvEventState *state;
724 1102
     int do_wake = 0;
725 1103
     int ret;
  1104
+    ErtsPollSet ps;
726 1105
 
727 1106
     ERTS_SMP_LC_ASSERT(erts_drvport2port(ix)
728 1107
 		       && erts_lc_is_port_locked(erts_drvport2port(ix)));
729 1108
 
  1109
+#if !defined(ERTS_SMP) || !defined(ERTS_POLLSET_PER_SCHEDULER)
  1110
+    ps = ERTS_POLLSET_IX(0)->ps;
  1111
+#endif
  1112
+
730 1113
 #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
731 1114
     if ((unsigned)fd >= (unsigned)erts_smp_atomic_read_nob(&drv_ev_state_len)) {
732 1115
 	if (fd < 0)
@@ -769,6 +1152,15 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
769 1152
     ASSERT(state->type == ERTS_EV_TYPE_DRV_EV
770 1153
 	   || state->type == ERTS_EV_TYPE_NONE);
771 1154
 
  1155
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1156
+    if(state->type == ERTS_EV_TYPE_DRV_EV && state->driver.event->ps ) {
  1157
+        ps = state->driver.event->ps;
  1158
+    } else {
  1159
+        ps = ERTS_POLLSET_IX(((ErtsRunQueue *) erts_smp_atomic_read_nob(
  1160
+                &erts_drvport2port(ix)->run_queue))->ix)->ps;
  1161
+    }
  1162
+#endif
  1163
+
772 1164
     events = state->events;
773 1165
 
774 1166
     if (!event_data) {
@@ -781,14 +1173,14 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
781 1173
     }
782 1174
 
783 1175
     if (add_events) {
784  
-	events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, add_events, 1, &do_wake);
  1176
+	events = ERTS_CIO_POLL_CTL(ps, state->fd, add_events, 1, &do_wake);
785 1177
 	if (events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
786 1178
 	    ret = -1;
787 1179
 	    goto done;
788 1180
 	}
789 1181
     }
790 1182
     if (remove_events) {
791  
-	events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, remove_events, 0, &do_wake);
  1183
+	events = ERTS_CIO_POLL_CTL(ps, state->fd, remove_events, 0, &do_wake);
792 1184
 	if (events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
793 1185
 	    ret = -1;
794 1186
 	    goto done;
@@ -806,7 +1198,13 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
806 1198
 	    erts_port_task_handle_init(&state->driver.event->task);
807 1199
 	    state->driver.event->port = id;
808 1200
 	    state->driver.event->removed_events = (ErtsPollEvents) 0;
  1201
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1202
+		state->driver.event->ps = ps;
  1203
+#endif
809 1204
 	    state->type = ERTS_EV_TYPE_DRV_EV;
  1205
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1206
+	    add_selected_fd(ix, state);
  1207
+#endif
810 1208
 	}
811 1209
 	state->driver.event->data = event_data;
812 1210
     }
@@ -818,7 +1216,10 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
818 1216
 	}
819 1217
 	state->driver.select = NULL;
820 1218
 	state->type = ERTS_EV_TYPE_NONE;
821  
-	remember_removed(state, &pollset);
  1219
+#if defined(ERTS_SMP) && defined(ERTS_POLLSET_PER_SCHEDULER)
  1220
+	del_selected_fd(ix, state);
  1221
+#endif
  1222
+	remember_removed(state, &deselected);
822 1223
     }
823 1224
     state->events = events;
824 1225
     ASSERT(event_data ? events == event_data->events : events == 0);