Permalink
Browse files

Damn Haskell, you scary

I seriously just copied and pasted the Yesod source for Handlers,
s/handl/work/g and did what the compiler told me. And I'll be damned if
it didn't just work as soon as it compiled.

![I have no idea what I'm doing](http://i.imgur.com/wp9Qr8V.gif)
  • Loading branch information...
jamesdabbs committed Jul 3, 2014
1 parent b3236e4 commit 9c7186e86835aee78de8810ef96869a4715b932d
Showing with 138 additions and 47 deletions.
  1. +3 −1 Application.hs
  2. +65 −0 Foundation.hs
  3. +1 −1 Handler/Home.hs
  4. +68 −44 Jobs.hs
  5. +1 −1 sarah.cabal
@@ -83,11 +83,13 @@ makeFoundation conf = do
updateLoop
_ <- forkIO updateLoop
q <- spawnWorkers p dbconf (extraWorkers . appExtra $ conf)
q <- emptyQueue
let logger = Yesod.Core.Types.Logger loggerSet' getter
foundation = App conf s p manager dbconf logger q
_ <- spawnWorkers foundation
-- Perform database migration using our application's logging settings.
runLoggingT
(Database.Persist.runPool dbconf (runMigration migrateAll) p)
@@ -1,3 +1,5 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE UndecidableInstances #-}
module Foundation where
import Prelude
@@ -20,6 +22,11 @@ import Text.Jasmine (minifym)
import Text.Hamlet (hamletFile)
import Yesod.Core.Types (Logger)
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (forever, liftM, replicateM_)
import Control.Monad.Trans.Resource (MonadResourceBase, runResourceT, withInternalState)
import Data.Monoid ((<>))
import qualified Data.Text as T
import Jobs
-- | The site argument for your application. This can be a good place to
@@ -53,6 +60,64 @@ mkYesodData "App" $(parseRoutesFile "config/routes")
type Form x = Html -> MForm (HandlerT App IO) (FormResult x, Widget)
type Worker = WorkerT App IO
class MonadResource m => MonadWorker m where
type WorkerSite m
liftWorkerT :: WorkerT (WorkerSite m) IO a -> m a
instance MonadResourceBase m => MonadWorker (WorkerT site m) where
type WorkerSite (WorkerT site m) = site
liftWorkerT (WorkerT f) = WorkerT $ liftIO . f -- . replaceToParent
{-# RULES "liftHandlerT (HandlerT site IO)" liftHandlerT = id #-}
askWorkerEnv :: MonadWorker m => m (RunWorkerEnv (WorkerSite m))
askWorkerEnv = liftWorkerT $ WorkerT $ return . workerEnv
getYesodW :: MonadWorker m => m (WorkerSite m)
getYesodW = rweSite `liftM` askWorkerEnv
runW :: SqlPersistT Worker a -> Worker a
runW f = do
app <- getYesodW
Database.Persist.runPool (persistConfig app) f (connPool app)
test :: Worker [Entity Feed]
test = do
$(logDebug) "Testing logging"
runW $ selectList [] []
perform :: Job -> Worker ()
perform (RunFeedJob _id) = do
$(logDebug) $ "-- Trying " <> (T.pack $ show _id)
mfeed <- runW . get $ _id
case mfeed of
Just feed -> do
$(logDebug) $ "-- Running feed " <> (T.pack . show $ feedUrl feed)
liftIO $ threadDelay 5000000
Nothing -> return ()
runWorker :: App -> Worker a -> IO a
runWorker site worker = runResourceT . withInternalState $ \resState -> do
let rwe = RunWorkerEnv
{ rweSite = site
, rweLog = messageLoggerSource site $ appLogger site
}
let wd = WorkerData
{ workerResource = resState
, workerEnv = rwe
}
-- FIXME: catch and handle errors (see unHandlerT)
unWorkerT worker wd
spawnWorkers :: App -> IO ()
spawnWorkers site = do
replicateM_ (extraWorkers . appExtra . settings $ site) . forkIO . forever $ do
mj <- dequeueJob $ jobQueue site
case mj of
Just job -> runWorker site $ perform job
Nothing -> threadDelay 1000000
-- Please see the documentation for the Yesod typeclass. There are a number
-- of settings which can be configured by overriding methods here.
instance Yesod App where
@@ -11,7 +11,7 @@ import Jobs
queue :: Job -> Handler ()
queue job = do
app <- getYesod
liftIO $ queueJob (jobQueue app) job
liftIO $ enqueueJob (jobQueue app) job
getHomeR :: Handler Html
getHomeR = do
112 Jobs.hs
@@ -1,27 +1,35 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
module Jobs
( Job (..)
, JobQueue
, spawnWorkers
, queueJob
-- , spawnWorkers
, emptyQueue
, enqueueJob
, dequeueJob
, WorkerT (..)
, WorkerData (..)
, RunWorkerEnv (..)
) where
import Prelude
import Model
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
import Control.Monad (forever, replicateM_)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger (runStdoutLoggingT)
import Control.Monad.Trans.Resource (runResourceT)
import Data.Monoid ((<>))
import qualified Data.Sequence as S
import Database.Persist
import Settings (PersistConf)
import Control.Applicative (Applicative (..))
import Control.Monad (liftM, ap)
import Control.Monad.Base (MonadBase (liftBase))
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Logger (LogLevel, LogSource, MonadLogger (..))
import Control.Monad.Trans.Class (MonadTrans (..))
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Monad.Trans.Resource (MonadResource (..), InternalState, runInternalState, MonadThrow (..), monadThrow)
import Language.Haskell.TH.Syntax (Loc)
import System.Log.FastLogger (LogStr, toLogStr)
import Data.Time (getCurrentTime)
import System.Random (randomRIO)
-- The intention is for Job to be a sum type, with different `perform` implementations
-- for each constructor. For now, we just have the one:
@@ -30,9 +38,53 @@ data Job = RunFeedJob FeedId
-- A job queue is simply a sequence of jobs that multiple threads can access safely (using STM)
type JobQueue = TVar (S.Seq Job)
data RunWorkerEnv site = RunWorkerEnv
{ rweSite :: !site
, rweLog :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ())
-- , rheOnError :: !(ErrorResponse -> YesodApp)
}
data WorkerData site = WorkerData
{ workerResource :: !InternalState
, workerEnv :: !(RunWorkerEnv site)
}
newtype WorkerT site m a = WorkerT
{ unWorkerT :: WorkerData site -> m a
}
instance MonadTrans (WorkerT site) where
lift = WorkerT . const
instance Monad m => Functor (WorkerT site m) where
fmap = liftM
instance Monad m => Applicative (WorkerT site m) where
pure = return
(<*>) = ap
instance MonadIO m => MonadIO (WorkerT site m) where
liftIO = lift . liftIO
instance MonadBase b m => MonadBase b (WorkerT site m) where
liftBase = lift . liftBase
-- TODO: absorb the instance declarations below
instance Monad m => Monad (WorkerT site m) where
return = WorkerT . const . return
WorkerT x >>= f = WorkerT $ \r -> x r >>= \x' -> unWorkerT (f x') r
instance MonadBaseControl b m => MonadBaseControl b (WorkerT site m) where
data StM (WorkerT site m) a = StH (StM m a)
liftBaseWith f = WorkerT $ \reader ->
liftBaseWith $ \runInBase ->
f $ liftM StH . runInBase . (\(WorkerT r) -> r reader)
restoreM (StH base) = WorkerT $ const $ restoreM base
instance MonadThrow m => MonadThrow (WorkerT site m) where
throwM = lift . monadThrow
instance (MonadIO m, MonadBase IO m, MonadThrow m) => MonadResource (WorkerT site m) where
liftResourceT f = WorkerT $ \hd -> liftIO $ runInternalState f (workerResource hd)
instance MonadIO m => MonadLogger (WorkerT site m) where
monadLoggerLog a b c d = WorkerT $ \hd ->
liftIO $ rweLog (workerEnv hd) a b c (toLogStr d)
-- Basic helpers for using a Sequence as a queue
enqueue :: S.Seq a -> a -> S.Seq a
enqueue xs x = xs S.|> x
enqueue = (S.|>)
dequeue :: S.Seq a -> Maybe (a, S.Seq a)
dequeue s = case S.viewl s of
@@ -41,8 +93,8 @@ dequeue s = case S.viewl s of
-- The public API for queueing a job
queueJob :: JobQueue -> Job -> IO ()
queueJob qvar j = atomically . modifyTVar qvar $ \v -> enqueue v j
enqueueJob :: JobQueue -> Job -> IO ()
enqueueJob qvar j = atomically . modifyTVar qvar $ \v -> enqueue v j
dequeueJob :: JobQueue -> IO (Maybe Job)
dequeueJob qvar = atomically $ do
@@ -53,33 +105,5 @@ dequeueJob qvar = atomically $ do
return $ Just x
Nothing -> return Nothing
-- Starts an empty job queue and some number of workers to consume from that queue
spawnWorkers :: PersistConfigPool PersistConf -> PersistConf -> Int -> IO JobQueue
spawnWorkers pool dbconf n = do
q <- atomically $ newTVar S.empty
replicateM_ n . forkIO . forever $ do
qi <- dequeueJob q
case qi of
Just i -> perform pool dbconf i
Nothing -> threadDelay 1000000
return q
-- This allows us to run db queries inside a worker, similar to runDB inside a Handler
runDBIO pool dbconf f = runStdoutLoggingT . runResourceT $ runPool dbconf f pool
-- `perform` defines the actual work to be done for each type of job
-- TODO: figure out monadic sugar so that we can use e.g. runW and not need to pass in
-- pool and dbconf
-- also, figure out a getBy404 equivalent
-- also also, hook in logging (w/ numbered workers?)
perform pool dbconf (RunFeedJob _id) = do
now <- liftIO getCurrentTime
liftIO . putStrLn $ (show now) <> " -- Trying " <> (show _id)
mfeed <- runDBIO pool dbconf . get $ _id
liftIO $ case mfeed of
Just feed -> do
putStrLn $ (show now) <> " -- Running feed '" <> (show $ feedUrl feed) <> "'"
-- Pretend these are variably complicated units of work
sleep <- randomRIO (1,10)
threadDelay $ sleep * 1000000
Nothing -> return ()
emptyQueue :: IO JobQueue
emptyQueue = atomically $ newTVar S.empty
@@ -72,7 +72,7 @@ library
, time >= 1.4.0.1 && < 1.5
, resourcet >= 1.1.2.2 && < 1.2
, transformers >= 0.3 && < 0.4
, random >= 1.0.1.1 && < 1.1
, transformers-base >= 0.4.2 && < 0.5
, containers >= 0.5 && < 0.6
executable sarah

0 comments on commit 9c7186e

Please sign in to comment.