forked from RocketRedNeck/PythonPlayground
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FPS2.py
345 lines (281 loc) · 12 KB
/
FPS2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 24 20:46:25 2017
@author: mtkes
"""
# import the necessary packages
#from __future__ import print_function
import cv2
import time
# import the necessary packages
from threading import Lock
from threading import Thread
class FrameRate:
def __init__(self):
# store the start time, end time, and total number of frames
# that were examined between the start and end intervals
self._start = None
self._end = None
self._numFrames = 0
self._rate = 0.0
def start(self):
# start the timer
self._numFrames = 0
self._start = time.time()
return self
def reset(self):
self.start()
def stop(self):
# stop the timer
self._end = time.time()
def update(self):
# increment the total number of frames examined during the
# start and end intervals
self._numFrames += 1
def elapsed(self):
# return the total number of seconds between the start and
# end interval
return (time.time() - self._start)
def fps(self):
# compute the (approximate) frames per second
if (self._numFrames > 10):
self._rate = self._numFrames / self.elapsed()
self.reset()
return self._rate
class BucketCapture:
def __init__(self, src=0):
self._lock = Lock()
self.fps = FrameRate()
# initialize the video camera stream and read the first frame
# from the stream
self.stream = cv2.VideoCapture(src)
(self._grabbed, self._frame) = self.stream.read()
if (self._grabbed == True):
self.grabbed = self._grabbed
self.frame = self._frame
self.outFrame = self.frame
self.count = 1
self.outCount = self.count
else:
self.grabbed = False
self.frame = None
self.outFrame = None
self.count = 0
self.outCount = self.count
# initialize the variable used to indicate if the thread should
# be stopped
self._stop = False
self.stopped = True
def start(self):
# start the thread to read frames from the video stream
t = Thread(target=self.update, args=())
t.daemon = True
t.start()
return self
def update(self):
# keep looping infinitely until the thread is stopped
self.stopped = False
self.fps.start()
while True:
# if the thread indicator variable is set, stop the thread
if (self.stop == True):
self.stop = False
self.stopped = True
return
# otherwise, read the next frame from the stream
(self._grabbed, self._frame) = self.stream.read()
self.fps.update()
# if something was grabbed and retreived then lock
# the outbound buffer for the update
# This limits the blocking to just the copy operations
# later we may consider a queue or double buffer to
# minimize blocking
if (self._grabbed == True):
self._lock.acquire(blocking=True)
self.count = self.count + 1
self.grabbed = self._grabbed
self.frame = self._frame
self._lock.release()
def read(self):
# return the frame most recently read if the frame
# is not being updated at this exact moment
if (self._lock.acquire(blocking=True) == True):
self.outFrame = self.frame
self.outCount = self.count
self._lock.release()
return (self.outFrame, self.outCount, True)
else:
return (self.outFrame, self.outCount, False)
def stop(self):
# indicate that the thread should be stopped
self._stop = True
class BlueBoiler:
"""
An OpenCV pipeline generated by GRIP.
"""
def __init__(self):
"""initializes all values to presets or None if need to be set
"""
self.__resize_image_width = 320.0
self.__resize_image_height = 240.0
self.__resize_image_interpolation = cv2.INTER_CUBIC
self.resize_image_output = None
self.__rgb_threshold_input = self.resize_image_output
self.__rgb_threshold_red = [0.0, 39.59897610921503]
self.__rgb_threshold_green = [82.55395683453237, 223.39410813723725]
self.__rgb_threshold_blue = [162.81474820143887, 255.0]
self.rgb_threshold_output = None
self.__find_contours_input = self.rgb_threshold_output
self.__find_contours_external_only = True
self.find_contours_output = None
self.__filter_contours_contours = self.find_contours_output
self.__filter_contours_min_area = 20.0
self.__filter_contours_min_perimeter = 0.0
self.__filter_contours_min_width = 0.0
self.__filter_contours_max_width = 1000.0
self.__filter_contours_min_height = 0.0
self.__filter_contours_max_height = 1000.0
self.__filter_contours_solidity = [0, 100]
self.__filter_contours_max_vertices = 1000000.0
self.__filter_contours_min_vertices = 0.0
self.__filter_contours_min_ratio = 0.0
self.__filter_contours_max_ratio = 1000.0
self.filter_contours_output = None
def process(self, source0):
"""
Runs the pipeline and sets all outputs to new values.
"""
# Step Resize_Image0:
self.__resize_image_input = source0
(self.resize_image_output) = self.__resize_image(self.__resize_image_input, self.__resize_image_width, self.__resize_image_height, self.__resize_image_interpolation)
# Step RGB_Threshold0:
self.__rgb_threshold_input = self.resize_image_output
(self.rgb_threshold_output) = self.__rgb_threshold(self.__rgb_threshold_input, self.__rgb_threshold_red, self.__rgb_threshold_green, self.__rgb_threshold_blue)
# Step Find_Contours0:
self.__find_contours_input = self.rgb_threshold_output
(self.find_contours_output) = self.__find_contours(self.__find_contours_input, self.__find_contours_external_only)
# Step Filter_Contours0:
self.__filter_contours_contours = self.find_contours_output
(self.filter_contours_output) = self.__filter_contours(self.__filter_contours_contours, self.__filter_contours_min_area, self.__filter_contours_min_perimeter, self.__filter_contours_min_width, self.__filter_contours_max_width, self.__filter_contours_min_height, self.__filter_contours_max_height, self.__filter_contours_solidity, self.__filter_contours_max_vertices, self.__filter_contours_min_vertices, self.__filter_contours_min_ratio, self.__filter_contours_max_ratio)
return (self.find_contours_output, self.filter_contours_output)
@staticmethod
def __resize_image(input, width, height, interpolation):
"""Scales and image to an exact size.
Args:
input: A numpy.ndarray.
Width: The desired width in pixels.
Height: The desired height in pixels.
interpolation: Opencv enum for the type fo interpolation.
Returns:
A numpy.ndarray of the new size.
"""
return cv2.resize(input, ((int)(width), (int)(height)), 0, 0, interpolation)
@staticmethod
def __rgb_threshold(input, red, green, blue):
"""Segment an image based on color ranges.
Args:
input: A BGR numpy.ndarray.
red: A list of two numbers the are the min and max red.
green: A list of two numbers the are the min and max green.
blue: A list of two numbers the are the min and max blue.
Returns:
A black and white numpy.ndarray.
"""
out = cv2.cvtColor(input, cv2.COLOR_BGR2RGB)
return cv2.inRange(out, (red[0], green[0], blue[0]), (red[1], green[1], blue[1]))
@staticmethod
def __find_contours(input, external_only):
"""Sets the values of pixels in a binary image to their distance to the nearest black pixel.
Args:
input: A numpy.ndarray.
external_only: A boolean. If true only external contours are found.
Return:
A list of numpy.ndarray where each one represents a contour.
"""
if(external_only):
mode = cv2.RETR_EXTERNAL
else:
mode = cv2.RETR_LIST
method = cv2.CHAIN_APPROX_SIMPLE
im2, contours, hierarchy =cv2.findContours(input, mode=mode, method=method)
return contours
@staticmethod
def __filter_contours(input_contours, min_area, min_perimeter, min_width, max_width,
min_height, max_height, solidity, max_vertex_count, min_vertex_count,
min_ratio, max_ratio):
"""Filters out contours that do not meet certain criteria.
Args:
input_contours: Contours as a list of numpy.ndarray.
min_area: The minimum area of a contour that will be kept.
min_perimeter: The minimum perimeter of a contour that will be kept.
min_width: Minimum width of a contour.
max_width: MaxWidth maximum width.
min_height: Minimum height.
max_height: Maximimum height.
solidity: The minimum and maximum solidity of a contour.
min_vertex_count: Minimum vertex Count of the contours.
max_vertex_count: Maximum vertex Count.
min_ratio: Minimum ratio of width to height.
max_ratio: Maximum ratio of width to height.
Returns:
Contours as a list of numpy.ndarray.
"""
output = []
for contour in input_contours:
x,y,w,h = cv2.boundingRect(contour)
if (w < min_width or w > max_width):
continue
if (h < min_height or h > max_height):
continue
area = cv2.contourArea(contour)
if (area < min_area):
continue
if (cv2.arcLength(contour, True) < min_perimeter):
continue
hull = cv2.convexHull(contour)
solid = 100 * area / cv2.contourArea(hull)
if (solid < solidity[0] or solid > solidity[1]):
continue
if (len(contour) < min_vertex_count or len(contour) > max_vertex_count):
continue
ratio = (float)(w) / h
if (ratio < min_ratio or ratio > max_ratio):
continue
output.append(contour)
return output
b = BlueBoiler()
# created a *threaded* video stream, allow the camera sensor to warmup,
# and start the FPS counter
print("[INFO] sampling THREADED frames from webcam...")
cam = 1
width = 320
height = 240
exposure = 100.0
bucketCapture = BucketCapture(src=cam).start()
bucketCapture.stream.set(cv2.CAP_PROP_FRAME_WIDTH,width)
bucketCapture.stream.set(cv2.CAP_PROP_FRAME_HEIGHT,height)
bucketCapture.stream.set(cv2.CAP_PROP_EXPOSURE, exposure)
fps = FrameRate()
fps.start()
# loop over some frames...this time using the threaded stream
startTime = time.clock()
#while fps._numFrames < args["num_frames"]:
while (time.clock() - startTime < 30.0):
# grab the frame from the threaded video stream and resize it
# to have a maximum width of 400 pixels
(frame, count, isNew) = bucketCapture.read()
# check to see if the frame should be displayed to our screen
if (isNew == True):
(f,g) = b.process(frame)
cv2.putText(frame,"{:.1f}".format(bucketCapture.fps.fps()) + " : {:.1f}".format(fps.fps()),(0,240),cv2.FONT_HERSHEY_PLAIN,2,(0,0,255))
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# update the FPS counter
fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elasped time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
cv2.destroyAllWindows()
bucketCapture.stop()