Skip to content

Commit

Permalink
Add InfluxException
Browse files Browse the repository at this point in the history
  • Loading branch information
Mitsutoshi Aoe committed Jun 29, 2014
1 parent b7041f9 commit fda1da4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 80 deletions.
149 changes: 69 additions & 80 deletions src/Database/InfluxDB/Http.hs
Expand Up @@ -246,10 +246,9 @@ deleteSeries
-> Text -- ^ Database name
-> Text -- ^ Series name
-> IO ()
deleteSeries Config {..} databaseName seriesName =
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
deleteSeries config databaseName seriesName = runRequest_ config request
where
makeRequest = def
request = def
{ HC.method = "DELETE"
, HC.path = escapeString $ printf "/db/%s/series/%s"
(T.unpack databaseName)
Expand All @@ -258,7 +257,7 @@ deleteSeries Config {..} databaseName seriesName =
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- TODO: Delete API hasn't been implemented in InfluxDB yet
--
Expand Down Expand Up @@ -297,13 +296,11 @@ query
-> Text -- ^ Database name
-> Text -- ^ Query text
-> IO [a]
query Config {..} databaseName q = do
response <- httpLbsWithRetry configServerPool request configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just xs -> case mapM fromSeries xs of
Left reason -> fail reason
Right ys -> return ys
query config databaseName q = do
xs <- runRequest config request
case mapM fromSeries xs of
Left reason -> seriesDecodeError reason
Right ys -> return ys
where
request = def
{ HC.path = escapeString $ printf "/db/%s/series"
Expand All @@ -313,7 +310,7 @@ query Config {..} databaseName q = do
(T.unpack credsPassword)
(T.unpack q)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Construct streaming output
responseStream :: A.FromJSON a => HC.BodyReader -> IO (Stream IO a)
Expand All @@ -327,9 +324,9 @@ responseStream body = demandPayload $ \payload ->
A.Success a -> return $ Yield a $ if BS.null leftover
then responseStream body
else decode $ parseAsJson leftover
A.Error message -> fail message
A.Error message -> jsonDecodeError message
decode (P.Partial k) = demandPayload (decode . k)
decode (P.Fail _ _ message) = fail message
decode (P.Fail _ _ message) = jsonDecodeError message
parseAsJson = P.parse A.json

-- | Query a specified database like 'query' but in a streaming fashion.
Expand All @@ -347,7 +344,7 @@ queryChunked Config {..} databaseName q f =
responseStream . HC.responseBody >=> S.mapM parse >=> f
where
parse series = case fromSeries series of
Left reason -> fail reason
Left reason -> seriesDecodeError reason
Right a -> return a
request = def
{ HC.path = escapeString $ printf "/db/%s/series"
Expand All @@ -364,26 +361,21 @@ queryChunked Config {..} databaseName q f =

-- | List existing databases.
listDatabases :: Config -> IO [Database]
listDatabases Config {..} = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just xs -> return xs
listDatabases config = runRequest config request
where
makeRequest = def
request = def
{ HC.path = "/db"
, HC.queryString = escapeString $ printf "u=%s&p=%s"
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Create a new database. Requires cluster admin privileges.
createDatabase :: Config -> Text -> IO ()
createDatabase Config {..} name =
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
createDatabase config name = runRequest_ config request
where
makeRequest = def
request = def
{ HC.method = "POST"
, HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
[ "name" .= name
Expand All @@ -393,67 +385,61 @@ createDatabase Config {..} name =
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Drop a database. Requires cluster admin privileges.
dropDatabase
:: Config
-> Text -- ^ Database name
-> IO ()
dropDatabase Config {..} databaseName =
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
dropDatabase config databaseName = runRequest_ config request
where
makeRequest = def
request = def
{ HC.method = "DELETE"
, HC.path = escapeString $ printf "/db/%s"
(T.unpack databaseName)
, HC.queryString = escapeString $ printf "u=%s&p=%s"
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | List cluster administrators.
listClusterAdmins :: Config -> IO [Admin]
listClusterAdmins Config {..} = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just xs -> return xs
listClusterAdmins config = runRequest config request
where
makeRequest = def
request = def
{ HC.path = "/cluster_admins"
, HC.queryString = escapeString $ printf "u=%s&p=%s"
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

authenticateClusterAdmin :: Config -> IO ()
authenticateClusterAdmin Config {..} =
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
authenticateClusterAdmin config = runRequest_ config request
where
makeRequest = def
request = def
{ HC.path = "/cluster_admins/authenticate"
, HC.queryString = escapeString $ printf "u=%s&p=%s"
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Add a new cluster administrator. Requires cluster admin privilege.
addClusterAdmin
:: Config
-> Text -- ^ Admin name
-> Text -- ^ Password
-> IO Admin
addClusterAdmin Config {..} name password = do
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
addClusterAdmin config name password = do
runRequest_ config request
return Admin
{ adminName = name
}
where
makeRequest = def
request = def
{ HC.method = "POST"
, HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
[ "name" .= name
Expand All @@ -464,7 +450,7 @@ addClusterAdmin Config {..} name password = do
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Update a cluster administrator's password. Requires cluster admin
-- privilege.
Expand Down Expand Up @@ -514,11 +500,7 @@ listDatabaseUsers
:: Config
-> Text
-> IO [User]
listDatabaseUsers Config {..} database = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just xs -> return xs
listDatabaseUsers config@Config {..} database = runRequest config makeRequest
where
makeRequest = def
{ HC.path = escapeString $ printf "/db/%s/users"
Expand Down Expand Up @@ -552,10 +534,9 @@ addDatabaseUser
-> Text -- ^ User name
-> Text -- ^ Password
-> IO ()
addDatabaseUser Config {..} databaseName name password =
void $ httpLbsWithRetry configServerPool makeRequest configHttpManager
addDatabaseUser config databaseName name password = runRequest_ config request
where
makeRequest = def
request = def
{ HC.method = "POST"
, HC.requestBody = HC.RequestBodyLBS $ AE.encode $ A.object
[ "name" .= name
Expand All @@ -567,16 +548,15 @@ addDatabaseUser Config {..} databaseName name password =
(T.unpack credsUser)
(T.unpack credsPassword)
}
Credentials {..} = configCreds
Credentials {..} = configCreds config

-- | Delete an user from the database users.
deleteDatabaseUser
:: Config
-> Text -- ^ Database name
-> Text -- ^ User name
-> IO ()
deleteDatabaseUser config@Config {..} databaseName userName =
void $ httpLbsWithRetry configServerPool request configHttpManager
deleteDatabaseUser config databaseName userName = runRequest_ config request
where
request = (makeRequestFromDatabaseUser config databaseName userName)
{ HC.method = "DELETE"
Expand All @@ -589,8 +569,8 @@ updateDatabaseUserPassword
-> Text -- ^ User name
-> Text -- ^ New password
-> IO ()
updateDatabaseUserPassword config@Config {..} databaseName userName password =
void $ httpLbsWithRetry configServerPool request configHttpManager
updateDatabaseUserPassword config databaseName userName password =
runRequest_ config request
where
request = (makeRequestFromDatabaseUser config databaseName userName)
{ HC.method = "POST"
Expand All @@ -605,8 +585,7 @@ grantAdminPrivilegeTo
-> Text -- ^ Database name
-> Text -- ^ User name
-> IO ()
grantAdminPrivilegeTo config@Config {..} databaseName userName =
void $ httpLbsWithRetry configServerPool request configHttpManager
grantAdminPrivilegeTo config databaseName userName = runRequest_ config request
where
request = (makeRequestFromDatabaseUser config databaseName userName)
{ HC.method = "POST"
Expand All @@ -621,8 +600,8 @@ revokeAdminPrivilegeFrom
-> Text -- ^ Database name
-> Text -- ^ User name
-> IO ()
revokeAdminPrivilegeFrom config@Config {..} databaseName userName =
void $ httpLbsWithRetry configServerPool request configHttpManager
revokeAdminPrivilegeFrom config databaseName userName =
runRequest_ config request
where
request = (makeRequestFromDatabaseUser config databaseName userName)
{ HC.method = "POST"
Expand All @@ -648,34 +627,26 @@ makeRequestFromDatabaseUser Config {..} databaseName userName = def
Credentials {..} = configCreds

ping :: Config -> IO Ping
ping Config {..} = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just status -> return status
ping config = runRequest config request
where
makeRequest = def
request = def
{ HC.path = "/ping"
}

-- | Fetch current list of available interfaces
listInterfaces :: Config -> IO [Text]
listInterfaces Config {..} = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case A.decode (HC.responseBody response) of
Nothing -> fail $ show response
Just ifaces -> return ifaces
listInterfaces config = runRequest config request
where
makeRequest = def
request = def
{ HC.path = "/interfaces"
}

isInSync :: Config -> IO Bool
isInSync Config {..} = do
response <- httpLbsWithRetry configServerPool makeRequest configHttpManager
case decodeBool (HC.responseBody response) of
Nothing -> fail $ show response
Just status -> return status
case eitherDecodeBool (HC.responseBody response) of
Left reason -> jsonDecodeError reason
Right status -> return status
where
makeRequest = def
{ HC.path = "/sync"
Expand All @@ -684,9 +655,9 @@ isInSync Config {..} = do
(T.unpack credsPassword)
}
Credentials {..} = configCreds
decodeBool lbs = do
val <- PL.maybeResult $ PL.parse AP.value lbs
AT.parseMaybe A.parseJSON val
eitherDecodeBool lbs = do
val <- PL.eitherResult $ PL.parse AP.value lbs
AT.parseEither A.parseJSON val

-----------------------------------------------------------

Expand Down Expand Up @@ -728,3 +699,21 @@ escapeText = escapeString . T.unpack

escapeString :: String -> BS.ByteString
escapeString = BS8.pack . escapeURIString isAllowedInURI

decodeJsonResponse
:: A.FromJSON a
=> HC.Response BL.ByteString
-> IO a
decodeJsonResponse response =
case A.eitherDecode (HC.responseBody response) of
Left reason -> jsonDecodeError reason
Right a -> return a

runRequest :: A.FromJSON a => Config -> HC.Request -> IO a
runRequest Config {..} req = do
response <- httpLbsWithRetry configServerPool req configHttpManager
decodeJsonResponse response

runRequest_ :: Config -> HC.Request -> IO ()
runRequest_ Config {..} req =
void $ httpLbsWithRetry configServerPool req configHttpManager
22 changes: 22 additions & 0 deletions src/Database/InfluxDB/Types.hs
Expand Up @@ -30,9 +30,15 @@ module Database.InfluxDB.Types
, newServerPoolWithRetrySettings
, activeServer
, failover

-- * Exceptions
, InfluxException(..)
, jsonDecodeError
, seriesDecodeError
) where

import Control.Applicative (empty)
import Control.Exception (Exception, throwIO)
import Data.Data (Data)
import Data.IORef
import Data.Int (Int64)
Expand Down Expand Up @@ -249,6 +255,22 @@ failover ref = atomicModifyIORef' ref $ \pool@ServerPool {..} ->
, serverBackup = rest |> serverActive
}

-----------------------------------------------------------
-- Exceptions

data InfluxException
= JsonDecodeError String
| SeriesDecodeError String
deriving (Show, Typeable)

instance Exception InfluxException

jsonDecodeError :: String -> IO a
jsonDecodeError = throwIO . JsonDecodeError

seriesDecodeError :: String -> IO a
seriesDecodeError = throwIO . SeriesDecodeError

-----------------------------------------------------------
-- Aeson instances

Expand Down

0 comments on commit fda1da4

Please sign in to comment.