This repository has been archived by the owner on Jan 5, 2023. It is now read-only.
/
TFCallbackQueue.class.st
184 lines (145 loc) · 4.62 KB
/
TFCallbackQueue.class.st
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"
I'm a callback queue associated to a worker.
I handle all callbacks defined in the context of a worker.
"
Class {
#name : #TFCallbackQueue,
#superclass : #Object,
#instVars : [
'semaphore',
'callbacksByAddress',
'pendingQueue',
'callbackProcess',
'mutex'
],
#classVars : [
'UniqueInstance'
],
#category : #'ThreadedFFI-Callbacks'
}
{ #category : #'instance creation' }
TFCallbackQueue class >> initialize [
SessionManager default
registerSystemClassNamed: self name
atPriority: 70.
self startUp: true
]
{ #category : #'instance creation' }
TFCallbackQueue class >> shutDown: inANewImageSession [
inANewImageSession ifFalse: [ ^ self ].
self uniqueInstance shutDown.
UniqueInstance := nil
]
{ #category : #'instance creation' }
TFCallbackQueue class >> startUp: inANewImageSession [
inANewImageSession ifFalse: [ ^ self ].
self uniqueInstance startUp
]
{ #category : #'instance creation' }
TFCallbackQueue class >> uniqueInstance [
^ UniqueInstance ifNil: [ UniqueInstance := self new ]
]
{ #category : #accessing }
TFCallbackQueue >> callbackProcess [
^ callbackProcess
]
{ #category : #operations }
TFCallbackQueue >> executeCallback: aCallbackInvocation [
aCallbackInvocation ifNil: [ ^self ].
aCallbackInvocation getHandle isNull ifTrue: [ ^self ].
aCallbackInvocation callback: ((self lookupCallback: aCallbackInvocation callbackData)
ifNil: [
Stdio stdout << 'INV: ' << aCallbackInvocation getHandle printString; lf.
self error: 'Callback not found.' ]).
aCallbackInvocation runner executeCallback: aCallbackInvocation
]
{ #category : #initialization }
TFCallbackQueue >> forkCallbackProcess [
(callbackProcess notNil and: [ callbackProcess isTerminated not ])
ifTrue: [ ^ self ].
callbackProcess := [
[ true ] whileTrue: [
semaphore wait.
[self executeCallback: self nextPendingCallback]
on: Exception fork: [:ex | ex pass ] ] ]
forkAt: Processor highIOPriority
named: 'Callback queue'
]
{ #category : #initialization }
TFCallbackQueue >> initialize [
super initialize.
mutex := Mutex new.
semaphore := Semaphore new.
callbacksByAddress := WeakValueDictionary new.
pendingQueue := OrderedCollection new.
self forkCallbackProcess
]
{ #category : #initialization }
TFCallbackQueue >> initializeQueue [
| semaphoreIndex |
[ semaphoreIndex := Smalltalk registerExternalObject: semaphore.
self primitiveInitializeQueueWith: semaphoreIndex ]
onErrorDo: [ :e |
semaphoreIndex ifNotNil: [ Smalltalk unregisterExternalObject: semaphoreIndex ].
e pass]
]
{ #category : #private }
TFCallbackQueue >> lookupCallback: anExternalAddress [
"#registerCallback: and #lookupCallback: are protected by a mutex because I need to prevent
the execution of a collection grow while performing the lookup (this will move the elements
and may cause the lookup to fail).
Still, I am keeping a second lookup to ensure if a movement was done, I have another oportunity
to find the callback (just in case, I was not able to verify is actually needed)."
^ mutex critical: [
callbacksByAddress
at: anExternalAddress
ifAbsent: [
callbacksByAddress
at: anExternalAddress
ifAbsent: [ nil ] ] ]
]
{ #category : #private }
TFCallbackQueue >> nextPendingCallback [
| externalAddress |
externalAddress := self primNextPendingCallback.
externalAddress ifNil: [ ^ nil ].
^ TFCallbackInvocation fromHandle: externalAddress
]
{ #category : #'private - primitives' }
TFCallbackQueue >> primNextPendingCallback [
<primitive: 'primitiveReadNextCallback'>
^ self primitiveFailed
]
{ #category : #primitives }
TFCallbackQueue >> primitiveInitializeQueueWith: anInteger [
<primitive: 'primitiveInitilizeCallbacks'>
^ self primitiveFailed
]
{ #category : #operations }
TFCallbackQueue >> registerCallback: aCallback [
"#registerCallback: and #lookupCallback: are protected by a mutex because I need to prevent
the execution of a collection grow while performing the lookup (this will move the elements
and may cause the lookup to fail)."
mutex critical: [
callbacksByAddress
at: aCallback callbackData
put: aCallback ]
]
{ #category : #'system startup' }
TFCallbackQueue >> shutDown [
self terminateProcess.
callbacksByAddress removeAll.
pendingQueue removeAll
]
{ #category : #'system startup' }
TFCallbackQueue >> startUp [
[self initializeQueue.
self forkCallbackProcess]
onErrorDo: [ self inform: 'The current VM does not support TFFI Callbacks. It will use the old implementation' ]
]
{ #category : #initialization }
TFCallbackQueue >> terminateProcess [
callbackProcess ifNil: [ ^ self ].
callbackProcess terminate.
callbackProcess := nil
]