-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
100 lines (92 loc) · 3.79 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
(ns forever-socket-client.core
(:import [java.net Socket InetAddress InetSocketAddress SocketException ConnectException])
(:require [clojure.core.async :refer [put! <! chan go-loop timeout]]))
(defprotocol SocketIO
"IO protocols for ForeverSocket record"
(append-callback [this cb]) ; Add callback function with one arg for incoming data
(start-reader! [this]) ; Start loop on thread pool to handle incoming data
(read-stream [this]) ; Read stream and return either data or status
(write [this data])) ; Write bytes to socket
(defprotocol Stoppable
"Implements stop function"
(stop [this])) ; Stop all threads and gracefully close socket
(defrecord ForeverSocket
[socket buffer-size read-callbacks-atom event-channel]
SocketIO
(append-callback [_ cb]
(reset! read-callbacks-atom
(conj @read-callbacks-atom cb)))
(write [_ data]
(let [ostream (.getOutputStream socket)]
(.write ostream data 0 (count data))
(.flush ostream)))
(start-reader! [this]
(go-loop []
(let [read-result (read-stream this)]
(if (keyword? read-result)
(put! event-channel read-result)
(do
(doall (map #(% read-result) @read-callbacks-atom))
(recur))))))
(read-stream [this]
(try
(let [buffer (byte-array buffer-size)
count (.read (.getInputStream (:socket this)) buffer)]
(if (= count -1)
:closed
(byte-array (take count buffer))))
(catch SocketException e
(if (= (.getMessage e) "Socket closed")
:stopped
:closed))))
Stoppable
(stop [_]
(.close socket)))
(defn- java-net-factory
([hostname port]
(let [socket (Socket. (InetAddress/getByName hostname) port)]
(.setKeepAlive socket true)
socket))
([^InetSocketAddress socket-address]
(let [socket (Socket. (.getAddress socket-address) (.getPort socket-address))]
(.setKeepAlive socket true)
socket)))
(defn- wrap-socket-watcher!
"Wrap socket with watcher that handles reconnection returns atom that allows user to
continue referencing the same object while a new socket is instantiated behind the scenes"
[retry-interval forever-socket]
(let [fsocket-atom (atom forever-socket)]
(go-loop []
(when (not (= :stopped (<! (:event-channel @fsocket-atom))))
(when-let [fsocket (try
(->ForeverSocket (-> (:socket @fsocket-atom)
.getRemoteSocketAddress
java-net-factory)
(:buffer-size @fsocket-atom)
(:read-callbacks-atom @fsocket-atom)
(:event-channel @fsocket-atom))
(catch ConnectException _
(put! (:event-channel @fsocket-atom) :closed)
(<! (timeout retry-interval))
false))]
(reset! fsocket-atom fsocket)
(start-reader! @fsocket-atom))
(recur)))
fsocket-atom))
(defn factory
"Returns an atom associated with the instantiated ForeverSocket record"
[hostname port buffer-size retry-interval]
(let [fsocket (map->ForeverSocket {:socket (java-net-factory hostname port)
:buffer-size buffer-size
:read-callbacks-atom (atom [])
:event-channel (chan)})]
(start-reader! fsocket)
(wrap-socket-watcher! retry-interval fsocket)))
(defn str-to-bytes
"Convert str to bytes"
[^String input]
(byte-array (map (comp byte char) input)))
(defn bytes-to-str
"Convert bytes to str"
[data]
(apply str (map char data)))