# State and the Concurrent World

## Refs

In [1]:
; Intitalizing a ref with an empty map

(def all-users (ref {}))

#'user/all-users

In [2]:
; Obtaining the current value of the ref with 'deref'

(deref all-users)


{}

In [3]:
; Obtaining the current value of the ref with '@'

@all-users


{}

### Mutating refs

set-ref

In [4]:
; Setting a new value to a ref directly with 'ref-set' (it will fail)

(ref-set all-users {})


Execution error (IllegalStateException) at user/eval4100 (REPL:3).
No transaction running


class java.lang.IllegalStateException: 

In [5]:
; Setting a new value to a ref with 'ref-set' and the 'dosync' macro (it will work)

(dosync 
    (ref-set all-users {}))


{}

alter

In [6]:
; The general form of the 'alter' function is as follows

"""
(alter ref function & args)
"""

""

In [7]:
; Defining a new user

(defn new-user [id login monthly-budget]
    {:id id
    :login login
    :monthly-budget monthly-budget
    :total-expenses 0})


#'user/new-user

In [8]:
; Defining a function that mutates the first ref with the new user

(defn add-new-user [login budget-amount]
    (dosync
        (let [current-number (count @all-users)
              user (new-user (inc current-number) login budget-amount)]
            (alter all-users assoc login user))))


#'user/add-new-user

In [9]:
; Adding the 1st user

(add-new-user "amit" 1000000)

{"amit" {:id 1, :login "amit", :monthly-budget 1000000, :total-expenses 0}}

In [10]:
; Adding the 2nd user

(add-new-user "deepthi" 2000000)


{"amit" {:id 1, :login "amit", :monthly-budget 1000000, :total-expenses 0}, "deepthi" {:id 2, :login "deepthi", :monthly-budget 2000000, :total-expenses 0}}

commute

In [11]:
; The general form of the 'commute' function is as follows

"""
(commute ref function & args)
"""

""

## Agents

In [12]:
; Initializing an agent with a value of 0

(def total-cpu-time (agent 0))



#'user/total-cpu-time

In [13]:
; Obtaining the current value of the agent with 'deref'

(deref total-cpu-time)


0

In [14]:
; Obtaining the current value of the agent with '@'

@total-cpu-time

0

send

In [15]:
; The general form of the 'send' function is as follows

"""
(send agent function & more-args)
"""

""

In [16]:
; Mutating the agent and setting a new value to it

(send total-cpu-time + 700)

#agent[{:status :ready, :val 700} 0x309e5882]

In [17]:
; Checking that the agent was mutated

(deref total-cpu-time)

700

send-off

In [18]:
; The general form of the 'send' function is as follows

"""
(send-off agent function & more-args)
"""

""

### Working with agents

await & await-for

In [21]:
; The general form of the 'await' function is as follows

"""
(await & agents)
"""

; The general form of the 'await-for' function is as follows

"""
(await-for timeout & agents)
"""


""

### Agent errors

In [26]:
; A simple agent

(def bad-agent (agent 10))

#'user/bad-agent

In [27]:
; Sending to the agent a function (/) that returns an invalid result (@bad-agent / 0)

(send bad-agent / 0)


#agent[{:status :failed, :val 10} 0x7c1be3b0]

In [28]:
; In case of errors, the agent has the last successful value assigned to it

(deref bad-agent)

10

In [30]:
; Even if it's sent a new valid operation, the agent will throw an error 
; since it's still in an invalid state

(send bad-agent / 2)


Execution error (ArithmeticException) at java.util.concurrent.ThreadPoolExecutor/runWorker (ThreadPoolExecutor.java:1142).
Divide by zero


class java.lang.RuntimeException: 

In [31]:
; Using the function 'agent-error' to see the error reason

(agent-error bad-agent)


#error {
 :cause "Divide by zero"
 :via
 [{:type java.lang.ArithmeticException
   :message "Divide by zero"
   :at [clojure.lang.Numbers divide "Numbers.java" 188]}]
 :trace
 [[clojure.lang.Numbers divide "Numbers.java" 188]
  [clojure.core$_SLASH_ invokeStatic "core.clj" 1027]
  [clojure.core$_SLASH_ invoke "core.clj" 1020]
  [clojure.core$binding_conveyor_fn$fn__5754 invoke "core.clj" 2036]
  [clojure.lang.AFn applyToHelper "AFn.java" 156]
  [clojure.lang.RestFn applyTo "RestFn.java" 132]
  [clojure.lang.Agent$Action doRun "Agent.java" 114]
  [clojure.lang.Agent$Action run "Agent.java" 163]
  [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
  [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
  [java.lang.Thread run "Thread.java" 745]]}

In [32]:
; Using the 'clear-agent-errors' function to make tha agent usable again

(clear-agent-errors bad-agent)


10

## Atoms

In [33]:
; Initializing an atom with a value of 0

(def total-rows (atom 0))


#'user/total-rows

In [34]:
(deref total-rows)


0

In [36]:
; Obtaining the current value of the agent with 'deref'

@total-rows

0

### Mutating Atoms

reset!

In [37]:
; The general form of the 'reset!' function is as follows

"""
(reset! atom new-value)
"""

""

swap!

In [38]:
; The general form of the 'swap!' function is as follows

"""
(swap! atom function & more-args)
"""

""

In [40]:
; Mutating the atom and assigning a new value to it 

(swap! total-rows + 100)

200

## Vars

In [44]:
; Creating the var's initial value (its 'root binding')

(def ^:dynamic *hbase-master* "localhost") ; The starting and ending asterisks denote that this var ought to be rebound before use

(println "Hbase-master is:" *hbase-master*)


Hbase-master is: localhost


nil

In [46]:
; If using a dynamic var without a root binding, it will return a special Unbound object

(def ^:dynamic *rabbitmq-host*)

(println "RabbitMQ host is:" *rabbitmq-host*)

RabbitMQ host is: #object[clojure.lang.Var$Unbound 0x4b65c113 Unbound: #'user/*rabbitmq-host*]


nil

Changing var bindings

In [48]:
; A simple unbounded var to track database connections

(def ^:dynamic *mysql-host*)


#'user/*mysql-host*

In [53]:
; A mock query function (it only counts the number of characters of the dbstring)
; it binds the *mysql-host* var to tge 'db' arg

(defn db-query [db]
    (binding [*mysql-host* db]
        (count *mysql-host*)))


#'user/db-query

In [52]:
; A vector of mysql-hosts values

(def mysql-hosts ["test-mysql" "dev-mysql" "staging-mysql"])


#'user/mysql-hosts

In [54]:
; Applying the querying function to all the hosts
; (pmap applies the function to each element on a different thread per element)
(pmap db-query mysql-hosts)

(10 9 13)

###  Watching for mutation

add-watch

In [55]:
; Adding a watcher to an atom

(def adi (atom 0))

(defn on-change [the-key the-ref old-value new-value]
    (println "Hey, seeing change from" old-value "to" new-value))

(add-watch adi :adi-watcher on-change)

#atom[0 0x2f0a57e3]

In [57]:
; Mutating the atom and therefore invoking the watcher

(swap! adi inc)

Hey, seeing change from 0 to 1


1

In [58]:
; Removing the watcher from the atom

(remove-watch adi :adi-watcher)


#atom[1 0x2f0a57e3]