-
Notifications
You must be signed in to change notification settings - Fork 0
/
facecontrol.py
96 lines (87 loc) · 3.35 KB
/
facecontrol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/python3
#The latest version, using in memory images and threading
import face_recognition
import picamera
import numpy as np
import sys
import os.path
import time
import threading
import socket
import io
unknownpictures = False
maxpictures = 100
picturestore = 'Faces/Unknown{}.jpg' # empty for not storing
sendaddress = '192.168.44.{}' # empty for not sending
PORT = 65432
WAIT = 20
echo = False
def client(host, msg, who, description):
try:
with socket.socket() as s:
s.connect((host, PORT))
s.sendall(msg)
if echo: print('Sent', len(msg))
except (ConnectionRefusedError) as e:
print(f'Unable to communicate to {host}: {e}')
def msgclient(host, msg, who, description):
t = time.time()
if host in waituntil and t < waituntil[host]:
print(f'Already notified {host} - still {int(waituntil[host] - t)} sec')
return
waituntil[host] = t + WAIT
if host in threadof: threadof[host].join()
print(f'Notify host {host} ....... {description} .......')
threadof[host] = w = threading.Thread(target=client, args=(host, msg, who, description))
w.start()
kfilenames = sys.argv[1:]
names = [os.path.splitext(os.path.basename(fn))[0] for fn in kfilenames]
addrs = [os.path.basename(os.path.dirname(fn)) for fn in kfilenames]
pictures = [face_recognition.load_image_file(fn) for fn in kfilenames]
faceencs = [face_recognition.face_encodings(im)[0] for im in pictures]
print(f'Ready {len(faceencs)} faces: {",".join(names)}')
camera = picamera.PiCamera()
camera.resolution = (320, 240)
stream = io.BytesIO()
waituntil, threadof = {}, {}
numshots, numpictures = 0, 0
while numpictures < maxpictures:
stream.seek(0)
camera.capture(stream, format='jpeg')
stream.seek(0)
image = face_recognition.load_image_file(stream)
face_locations = face_recognition.face_locations(image)
if face_locations:
tgts, visitors = set(), []
qfaceslst = face_recognition.face_encodings(image)
for x in qfaceslst:
results = face_recognition.compare_faces(faceencs, x)
nresults = results.count(True)
if nresults > 1:
face_distances = face_recognition.face_distance(faceencs, x)
ibest = np.argmin(face_distances)
qname = names[ibest]
pname = f"{qname} ({'|'.join([nam for nam, q in zip(names, results) if q])})"
elif nresults:
ibest = results.index(True)
pname = qname = names[ibest]
else:
if picturestore: ## and len(qfaceslst) == 1:
numpictures += 1
filename = picturestore.format(numpictures)
print(f"Storing {filename}")
stream.seek(0)
open(filename, 'wb').write(stream.read())
qname, pname = '?', f'#{numpictures}'
visitors.append(pname)
if nresults: tgts.add(addrs[ibest])
if not tgts: tgts = set(addrs) # If no target is selected
print(len(qfaceslst), tgts, ', '.join(visitors))
if sendaddress:
stream.seek(0)
im = stream.read()
for tgt in tgts: msgclient(sendaddress.format(tgt), im, qname, pname)
else:
print(f"Found nobody {numshots}")
numshots += 1
print("Total {} faces in files.".format(numpictures))