Skip to content

Commit

Permalink
Upgrade cql-io (#495)
Browse files Browse the repository at this point in the history
Upgrade to the lastest `cql-io`. 

There are multiple reasons for upgrading, see [the changelog](https://gitlab.com/twittner/cql-io/blob/develop/CHANGELOG#L1-23). 

One reason is to try out the changes from [this MR](https://gitlab.com/twittner/cql-io/merge_requests/14) relating to the problem described [here](https://gitlab.com/twittner/cql-io/issues/21). To this end, `initialContactsDNS` is no longer used, so that cql-io can re-resolve the DNS upon losing a control connection. (I tried this out locally by adding an entry to `/etc/hosts`, connecting via DNS, then changing the bind IP of the underlying cassandra - this works as advertised.)

One change done was to map the existing usage of `x1` to `defaultRetrySettings` and `x5` to `eagerRetrySettings`. As commented on `x5`, it is only safe to use this on idempotent queries. Upon inspection of our current queries using x5, it appears all of these queries are idempotent.

Side-effects:

* switch from `MonadBaseControl` and `Control.Concurrent.*` to `UnliftIO.*` everywhere (thanks @neongreen).
  • Loading branch information
jschaul committed Oct 23, 2018
1 parent 009627a commit 3d0645c
Show file tree
Hide file tree
Showing 43 changed files with 129 additions and 158 deletions.
37 changes: 17 additions & 20 deletions libs/cassandra-util/src/Cassandra/Exec.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-# OPTIONS_GHC -Wno-orphans #-} -- for MonadUnliftIO Client

module Cassandra.Exec
( Client
, MonadClient (..)
Expand Down Expand Up @@ -44,13 +42,7 @@ module Cassandra.Exec

-- * Retry Settings
, RetrySettings
, noRetry
, retryForever
, maxRetries
, adjustConsistency
, constDelay
, expBackoff
, fibBackoff
, adjustSendTimeout
, adjustResponseTimeout
, retry
Expand All @@ -61,28 +53,38 @@ import Control.Exception (IOException)
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Data.Conduit
import Data.Int
import Data.Text (Text)
import Database.CQL.IO
import Data.Conduit

params :: Tuple a => Consistency -> a -> QueryParams a
params c p = QueryParams c False p Nothing Nothing Nothing
params c p = QueryParams c False p Nothing Nothing Nothing Nothing
{-# INLINE params #-}

paramsP :: Consistency -> a -> Int32 -> QueryParams a
paramsP c p n = QueryParams c False p (Just n) Nothing Nothing
paramsP c p n = QueryParams c False p (Just n) Nothing Nothing Nothing
{-# INLINE paramsP #-}

-- | 'x5' must only be used for idempotent queries, or for cases
-- when a duplicate write has no severe consequences in
-- the context of the application's data model.
-- For more info see e.g.
-- https://docs.datastax.com/en/developer/java-driver//3.6/manual/idempotence/
--
-- The eager retry policy permits 5 retries with exponential
-- backoff (base-2) with an initial delay of 100ms, i.e. the
-- retries will be performed with 100ms, 200ms, 400ms, 800ms
-- and 1.6s delay, respectively, for a maximum delay of ~3s.
x5 :: RetrySettings
x5 = maxRetries 5 . expBackoff 0.1 5 $ retryForever
x5 = eagerRetrySettings
{-# INLINE x5 #-}

-- | Single, immediate retry, always safe.
-- The 'defRetryHandlers' used are safe also with non-idempotent queries.
x1 :: RetrySettings
x1 = maxRetries 1 retryForever
x1 = defRetrySettings
{-# INLINE x1 #-}

data CassandraError
Expand Down Expand Up @@ -111,8 +113,3 @@ paginateC q p r = go =<< lift (retry r (paginate q p))
yield (result page)
when (hasMore page) $
go =<< lift (retry r (liftClient (nextPage page)))

instance MonadUnliftIO Client where
askUnliftIO = do
env <- ask
pure $ UnliftIO (runClient env)
18 changes: 6 additions & 12 deletions libs/cassandra-util/src/Cassandra/Schema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import Data.Time.Clock
import Data.UUID (UUID)
import Data.Word
import Database.CQL.IO
import Database.CQL.Protocol (Request (..), Query (..), Response(..), Result (..))
import Database.CQL.Protocol (Request (..), Query (..))
import GHC.Generics hiding (to, from, S, R)
import Options.Applicative hiding (info)
import Prelude hiding (log)
Expand Down Expand Up @@ -133,21 +133,15 @@ createKeyspace (Keyspace k) rs = void $ schema (cql rs) (params All ())
pair (dc, ReplicationFactor n) = "'" <> dc <> "': " <> pack (show n)

useKeyspace :: Keyspace -> Client ()
useKeyspace (Keyspace k) = do
r <- qry
case r of
RsResult _ (SetKeyspaceResult _) -> return ()
RsError _ e -> throwM e
_ -> throwM (UnexpectedResponse' r)
useKeyspace (Keyspace k) = void . getResult =<< qry
where
qry = request (RqQuery (Query cql prms)) :: Client (Response () () ())
prms = QueryParams One False () Nothing Nothing Nothing
cql = QueryString $ "use \"" <> fromStrict k <> "\""
qry = request (RqQuery (Query cql prms)) :: Client (HostResponse () () ())
prms = QueryParams One False () Nothing Nothing Nothing Nothing
cql = QueryString $ "use \"" <> fromStrict k <> "\""

migrateSchema :: Logger -> MigrationOpts -> [Migration] -> IO ()
migrateSchema l o ms = do
-- if migHost is a DNS name, resolve it and connect to all nodes
hosts <- initialContactsDNS $ pack (migHost o)
hosts <- initialContactsPlain $ pack (migHost o)
p <- Database.CQL.IO.init l $
setContacts (NonEmpty.head hosts) (NonEmpty.tail hosts)
. setPortNumber (fromIntegral $ migPort o)
Expand Down
25 changes: 9 additions & 16 deletions libs/cassandra-util/src/Cassandra/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Cassandra.Settings
, setRetrySettings
, setPolicy
, initialContactsDisco
, initialContactsDNS
, initialContactsPlain
) where

import Control.Lens
Expand All @@ -31,14 +31,13 @@ import Data.Aeson.Lens
import Data.List.NonEmpty (NonEmpty (..))
import Data.Monoid
import Data.Text (pack, stripSuffix, unpack, Text)
import Data.Text.Encoding (encodeUtf8)
import Database.CQL.IO
import Network.DNS.Lookup
import Network.DNS.Resolver
import Database.CQL.IO hiding (values)
import Network.Wreq

import qualified Data.List.NonEmpty as NE

-- | This function is likely only useful at Wire, as it is Wire-infra specific.
-- Given a server name and a url returning a wire-custom "disco" json (AWS describe-instances-like json), e.g.
-- { "roles" : { "server_name": [ {"privateIpAddress": "...", ...}, {...} ] } },
-- return a list of IP addresses.
initialContactsDisco :: MonadIO m => String -> String -> m (NonEmpty String)
initialContactsDisco (pack -> srv) url = liftIO $ do
rs <- asValue =<< get url
Expand All @@ -57,12 +56,6 @@ initialContactsDisco (pack -> srv) url = liftIO $ do
i:ii -> return (i :| ii)
_ -> error "initial-contacts: no IP addresses found."

initialContactsDNS :: MonadIO m => Text -> m (NonEmpty String)
initialContactsDNS address = liftIO $ do
rs <- makeResolvSeed defaultResolvConf
ips <- withResolver rs $ \resolver -> lookupA resolver (encodeUtf8 address)
return $ case ips of
Right (x:xs) -> NE.map show (x :| xs)
_ -> fallback
where
fallback = unpack address :| [] -- If it's not a valid DNS name, just try using it anyway
-- | Puts the address into a list using the same signature as the other initialContacts
initialContactsPlain :: MonadIO m => Text -> m (NonEmpty String)
initialContactsPlain address = pure $ unpack address :| []
5 changes: 2 additions & 3 deletions libs/extended/extended.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ library
ghc-options: -Wall -O2 -fwarn-tabs

exposed-modules:
Control.Concurrent.Async.Lifted.Safe.Extended
UnliftIO.Async.Extended
Options.Applicative.Extended

build-depends:
base
, base-prelude
, async-pool
, lifted-async
, unliftio-core
, unliftio
, optparse-applicative
, extra
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | A version of "Control.Concurrent.Async.Lifted.Safe" with extra utilities.
module Control.Concurrent.Async.Lifted.Safe.Extended
( module Control.Concurrent.Async.Lifted.Safe
-- | A version of "UnliftIO.Async" with extra utilities.
module UnliftIO.Async.Extended
( module UnliftIO.Async
-- * Pooled functions (using at most T threads)
, forPooled
, mapMPooled
, replicatePooled
, sequencePooled
) where

import Control.Monad.IO.Unlift
import Control.Concurrent.Async.Lifted.Safe
import UnliftIO
import UnliftIO.Async

import qualified Control.Concurrent.Async.Pool as Pool

Expand Down
1 change: 1 addition & 0 deletions services/brig/brig.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,4 @@ executable brig-integration
, warp-tls >= 3.2
, zauth
, yaml >= 0.8.22
, unliftio
9 changes: 5 additions & 4 deletions services/brig/src/Brig/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ turnSetup lgr w dig o = do
startWatching :: FS.WatchManager -> FilePath -> FS.Action -> IO ()
startWatching w p = void . FS.watchDir w (Path.dropFileName p) predicate
where
predicate (FS.Added f _) = Path.equalFilePath f p
predicate (FS.Modified f _) = Path.equalFilePath f p
predicate (FS.Removed _ _) = False
predicate (FS.Added f _ _) = Path.equalFilePath f p
predicate (FS.Modified f _ _) = Path.equalFilePath f p
predicate (FS.Removed _ _ _) = False
predicate (FS.Unknown _ _ _) = False

replaceGeoDb :: Logger -> IORef GeoIp.GeoDB -> FS.Event -> IO ()
replaceGeoDb g ref e = do
Expand Down Expand Up @@ -360,7 +361,7 @@ initExtGetManager = do

initCassandra :: Opts -> Logger -> IO Cas.ClientState
initCassandra o g = do
c <- maybe (Cas.initialContactsDNS ((Opt.cassandra o)^.casEndpoint.epHost))
c <- maybe (Cas.initialContactsPlain ((Opt.cassandra o)^.casEndpoint.epHost))
(Cas.initialContactsDisco "cassandra_brig")
(unpack <$> Opt.discoUrl o)
p <- Cas.init (Log.clone (Just "cassandra.brig") g)
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Data/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import Brig.Data.User (AuthError (..), ReAuthError (..))
import Brig.Types
import Brig.Types.User.Auth (CookieLabel)
import Cassandra hiding (Client)
import Control.Concurrent.Async.Lifted.Safe (mapConcurrently)
import Control.Error
import Control.Lens
import Control.Monad
Expand All @@ -56,6 +55,7 @@ import Data.Word
import Safe (readMay)
import System.CryptoBox (Result (Success))
import System.Logger.Class (field, msg, val)
import UnliftIO (mapConcurrently)

import qualified Brig.Data.User as User
import qualified Control.Exception.Lens as EL
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Data/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import Brig.Types.Intra
import Cassandra
import Control.Monad
import Control.Monad.IO.Class
import Control.Concurrent.Async.Lifted.Safe.Extended (mapMPooled)
import UnliftIO.Async.Extended (mapMPooled)
import Data.Conduit ((.|), runConduit)
import Data.Functor.Identity
import Data.Id
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Provider/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Brig.Types.Client
import Brig.Types.User (publicProfile, User (..), Pict (..))
import Brig.Types.Provider
import Brig.Types.Search
import Control.Concurrent.Async.Lifted.Safe.Extended (mapMPooled)
import UnliftIO.Async.Extended (mapMPooled)
import Control.Lens (view, (^.))
import Control.Error (throwE)
import Control.Exception.Enclosed (handleAny)
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Provider/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import Brig.Types.Common
import Brig.Types.Provider hiding (updateServiceTags)
import Cassandra
import Control.Arrow ((&&&))
import Control.Concurrent.Async.Lifted.Safe (mapConcurrently)
import Control.Monad (when)
import Control.Monad.IO.Class
import Data.Foldable (for_, toList)
Expand All @@ -26,6 +25,7 @@ import Data.Maybe (isJust, catMaybes, fromMaybe, mapMaybe)
import Data.Misc
import Data.Range (Range, fromRange, rnil, rcast)
import Data.Text (Text, toLower, isPrefixOf)
import UnliftIO (mapConcurrently)

import qualified Data.Set as Set
import qualified Data.Text as Text
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Team/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import Brig.Types.Common
import Brig.Types.User
import Brig.Types.Team.Invitation
import Cassandra
import Control.Concurrent.Async.Lifted.Safe.Extended (mapMPooled)
import UnliftIO.Async.Extended (mapMPooled)
import Control.Lens
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/Search.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import API.Search.Util
import Bilge
import Brig.Types
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted.Safe
import Control.Monad
import Control.Monad.IO.Class
import Data.Foldable
Expand All @@ -15,6 +14,7 @@ import Network.HTTP.Client (Manager)
import Test.Tasty
import Test.Tasty.HUnit
import Util
import UnliftIO (Concurrently(..), runConcurrently)

tests :: Manager -> Brig -> IO TestTree
tests mgr brig =
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/Team.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Brig.Types.User.Auth
import Brig.Types.Intra
import Control.Arrow ((&&&))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted.Safe.Extended
import UnliftIO.Async.Extended
(mapConcurrently_, replicateConcurrently, forPooled, replicatePooled)
import Control.Lens ((^.), view)
import Control.Monad
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/User/Account.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Brig.Types.Intra
import Brig.Types.User.Auth hiding (user)
import Control.Arrow ((&&&))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted.Safe (mapConcurrently_)
import Control.Lens ((^?), (^.))
import Control.Monad
import Control.Monad.Catch
Expand Down Expand Up @@ -45,6 +44,7 @@ import Test.Tasty.HUnit
import Web.Cookie (parseSetCookie)
import Util as Util
import Util.AWS as Util
import UnliftIO (mapConcurrently_)

import qualified API.Search.Util as Search
import qualified Brig.AWS as AWS
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/User/Auth.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Brig.Types.User
import Brig.Types.User.Auth
import Brig.ZAuth (ZAuth, runZAuth)
import Control.Concurrent
import Control.Concurrent.Async.Lifted.Safe.Extended hiding (wait)
import UnliftIO.Async.Extended hiding (wait)
import Control.Lens ((^?), set)
import Control.Monad
import Control.Monad.IO.Class
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/User/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import Bilge hiding (accept, timeout)
import Bilge.Assert
import Brig.Types
import Brig.Types.User.Auth hiding (user)
import Control.Concurrent.Async.Lifted.Safe (mapConcurrently)
import Control.Lens ((^?), preview)
import Control.Monad
import Control.Monad.IO.Class
Expand All @@ -27,6 +26,7 @@ import Test.Tasty hiding (Timeout)
import Test.Tasty.Cannon hiding (Cannon)
import Test.Tasty.HUnit
import Util
import UnliftIO (mapConcurrently)

import qualified Brig.Options as Opt
import qualified Data.List1 as List1
Expand Down
2 changes: 1 addition & 1 deletion services/brig/test/integration/API/User/Handles.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import API.User.Util
import Bilge hiding (accept, timeout)
import Bilge.Assert
import Brig.Types
import Control.Concurrent.Async.Lifted.Safe (mapConcurrently)
import Control.Lens ((^?), (^?!))
import Control.Monad
import Control.Monad.IO.Class
Expand All @@ -23,6 +22,7 @@ import Test.Tasty hiding (Timeout)
import Test.Tasty.Cannon hiding (Cannon)
import Test.Tasty.HUnit
import Util
import UnliftIO (mapConcurrently)

import qualified API.Search.Util as Search
import qualified Brig.Options as Opt
Expand Down
2 changes: 2 additions & 0 deletions services/galley/galley.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ executable galley-integration
, wai-utilities
, warp
, yaml
, unliftio

executable galley-journaler
main-is: Main.hs
Expand Down Expand Up @@ -279,6 +280,7 @@ executable galley-journaler
, bytestring
, uuid
, ssl-util
, unliftio

other-modules:
Journal
Expand Down

0 comments on commit 3d0645c

Please sign in to comment.