In [26]:
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

import Control.Monad (when)
import qualified Data.ByteString.Lazy as L
import Data.List (foldl')
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector.Unboxed as VU
import Data.Word

import DataFrame ((|>), DataFrame)
import qualified DataFrame as D
import qualified DataFrame.Display.Web.Plot as Plt
import DataFrame.Functions ((.=), (.>=))
import qualified DataFrame.Functions as F

import GHC.RTS.Events hiding (header)
import GHC.RTS.Events.Incremental

data HeapSampleRow = HeapSampleRow
    { hTime :: !Word64
    , hLabel :: !Text
    , hBytes :: !Word64
    }
    deriving (Show)

data GcStatRow = GcStatRow
    { gTime :: !Word64
    , gGen :: !Int
    , gCopied :: !Word64
    , gSlop :: !Word64
    , gFrag :: !Word64
    }
    deriving (Show)

data BytesLocation = LIVE | BLOCK | HEAP deriving (Show, Eq)

data BytesRow = BytesRow
    { bTime :: !Word64
    , bLocation :: !BytesLocation
    , bBytes :: !Word64
    }
    deriving (Show)

data ParseState = ParseState
    { psHeapRows :: ![HeapSampleRow]
    , psGcRows :: ![GcStatRow]
    , psBytesRows :: ![BytesRow]
    , psCcMap :: !(Map Word32 Text)
    }

toHeapDataFrame :: [HeapSampleRow] -> DataFrame
toHeapDataFrame heapRows = D.fromRows ["time", "cc_label", "residency"] heapData
    where
        heapData = map (\r -> [D.toAny (hTime r), D.toAny (hLabel r), D.toAny (hBytes r)]) heapRows

toGcDataFrame :: [GcStatRow] -> DataFrame
toGcDataFrame gcRows = D.fromRows ["time", "generation", "copied", "slop", "frag"] gcData
    where
        gcData = map (\r -> [ D.toAny (gTime r), D.toAny (gGen r), D.toAny (gCopied r), D.toAny (gSlop r), D.toAny (gFrag r)]) gcRows

initialState :: ParseState
initialState = ParseState [] [] [] Map.empty

parseLogToRows :: [Event] -> ([HeapSampleRow], [GcStatRow], [BytesRow])
parseLogToRows events =
    let finalState = foldl' step initialState events
     in ( reverse (psHeapRows finalState)
        , reverse (psGcRows finalState)
        , reverse (psBytesRows finalState)
        )
  where
    step :: ParseState -> Event -> ParseState
    step st@ParseState{..} e =
        case evSpec e of
            HeapProfCostCentre ccId ccLabel ccModule _srcLoc _flags ->
                let fullLabel = T.pack (T.unpack ccLabel ++ " (" ++ T.unpack ccModule ++ ")")
                 in st{psCcMap = Map.insert ccId fullLabel psCcMap}
            HeapProfSampleCostCentre{..} ->
                let
                    topCcId =
                        if VU.null heapProfStack
                            then Nothing
                            else Just (VU.head heapProfStack)
                    label = case topCcId of
                        Nothing -> "UNKNOWN (empty stack)"
                        Just ccId ->
                            fromMaybe
                                ("CC#" <> T.pack (show ccId))
                                (Map.lookup ccId psCcMap)
                    row =
                        HeapSampleRow
                            { hTime = nsToSec (evTime e)
                            , hLabel = label
                            , hBytes = heapProfResidency
                            }
                 in
                    st{psHeapRows = row : psHeapRows}
            HeapProfSampleString{..} ->
                let row =
                        HeapSampleRow
                            { hTime = nsToSec (evTime e)
                            , hLabel = heapProfLabel
                            , hBytes = heapProfResidency
                            }
                 in st{psHeapRows = row : psHeapRows}
            GCStatsGHC{..} ->
                let row =
                        GcStatRow
                            { gTime = nsToSec (evTime e)
                            , gGen = fromIntegral gen
                            , gCopied = copied
                            , gSlop = slop
                            , gFrag = frag
                            }
                 in st{psGcRows = row : psGcRows}
            HeapLive _capset b ->
                let row =
                        BytesRow
                            { bTime = nsToSec (evTime e)
                            , bLocation = LIVE
                            , bBytes = b
                            }
                 in st{psBytesRows = row : psBytesRows}
            HeapSize _capset b ->
                let row =
                        BytesRow
                            { bTime = nsToSec (evTime e)
                            , bLocation = HEAP
                            , bBytes = b
                            }
                 in st{psBytesRows = row : psBytesRows}
            BlocksSize _capset b ->
                let row =
                        BytesRow
                            { bTime = nsToSec (evTime e)
                            , bLocation = BLOCK
                            , bBytes = b
                            }
                 in st{psBytesRows = row : psBytesRows}
            _ -> st

    nsToSec :: Timestamp -> Word64
    nsToSec ns = fromIntegral ns `div` 1_000_000_000

In [27]:
fastRawLogs <- L.readFile "./fast.eventlog"
leakyRawLogs <- L.readFile "./leaky.eventlog"

mkHeapDf :: L.ByteString -> D.DataFrame
mkHeapDf rawLog =
    case readEventLog rawLog of
            Left err -> error $ "Error parsing eventlog: " ++ err
            Right (EventLog _header (Data events), _) ->
                    let
                        (heapRows, _, _) = parseLogToRows events
                    in toHeapDataFrame heapRows

fastHeapDf = mkHeapDf fastRawLogs
leakyHeapDf = mkHeapDf leakyRawLogs

In [28]:
D.takeLast 10 fastHeapDf

  ------------------------------------------------------------------  
| time<br>Word64 |      cc_label<br>Text       | residency<br>Word64 |
| ---------------|-----------------------------|-------------------- |
| 5              | CAF (GHC.Weak.Finalize)     | 56                  |
| 5              | CAF (GHC.IO.Encoding.Iconv) | 120                 |
| 5              | CAF (GHC.IO.Encoding)       | 912                 |
| 5              | CAF (GHC.Conc.Signal)       | 640                 |
| 5              | UNKNOWN (empty stack)       | 160                 |
| 5              | SYSTEM (SYSTEM)             | 32888               |
| 5              | main (Main)                 | 290010128           |
| 5              | CAF (GHC.IO.Handle.FD)      | 17072               |
| 5              | sumFast.\ (Main)            | 16                  |
| 5              | parseAll.go (Main)          | 16                  |


In [29]:
D.takeLast 10 leakyHeapDf

  --------------------------------------------------------------------  
| time<br>Word64 |       cc_label<br>Text        | residency<br>Word64 |
| ---------------|-------------------------------|-------------------- |
| 8              | UNKNOWN (empty stack)         | 160                 |
| 8              | main (Main)                   | 290010128           |
| 8              | CAF (GHC.Conc.Signal)         | 640                 |
| 8              | CAF (GHC.Weak.Finalize)       | 56                  |
| 8              | CAF (GHC.IO.Handle.FD)        | 17072               |
| 8              | CAF (GHC.IO.Encoding.Iconv)   | 120                 |
| 8              | CAF (GHC.IO.Encoding)         | 912                 |
| 8              | sumLeaky.step.history' (Main) | 47592               |
| 8              | sumLeaky.step (Main)          | 32                  |
| 8              | parseAll.go (Main)            | 16                  |


In [30]:
import IHaskell.Display

residency = F.col @Word64 "residency"            

aggFastHeapDf =
        fastHeapDf
            |> D.groupBy ["cc_label"]
            |> D.aggregate
                [ "max_residency" .= F.lift fromIntegral (F.maximum residency)
                , "total_residency" .= F.lift fromIntegral (F.sum residency)
                , "samples" .= F.count residency
                ]
            |> D.sortBy [D.Desc "max_residency"]

aggLeakyHeapDf =
        leakyHeapDf
            |> D.groupBy ["cc_label"]
            |> D.aggregate
                [ "max_residency" .= F.lift fromIntegral (F.maximum residency)
                , "total_residency" .= F.lift fromIntegral (F.sum residency)
                , "samples" .= F.count residency
                ]
            |> D.sortBy [D.Desc "max_residency"]


fastColumns = ["Right_max_residency", "Right_total_residency", "Right_samples"]

joinedUnclean = D.fullOuterJoin ["cc_label"] aggLeakyHeapDf aggFastHeapDf

-- impute all rows except label.
joined = D.fold imputeZero maybeIntColumns joinedUnclean
    where
        imputeZero name = D.impute (F.col @(Maybe Int) name) 0
        maybeIntColumns = D.columnNames (D.selectBy [D.byProperty (D.hasElemType @(Maybe Int))] joinedUnclean)

joined

  ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
|       cc_label<br>Text        | max_residency<br>Int | total_residency<br>Int | samples<br>Int | Right_max_residency<br>Int | Right_total_residency<br>Int | Right_samples<br>Int |
| ------------------------------|----------------------|------------------------|----------------|----------------------------|------------------------------|--------------------- |
| CAF (GHC.IO.Handle.FD)        | 17072                | 904816                 | 53             | 17072                      | 1348688                      | 79                   |
| UNKNOWN (empty stack)         | 160                  | 8640                   | 54             | 160                        | 12800                        | 80                   |
| CAF (GHC.IO.Encoding.Iconv)   | 120                  | 6480                   | 54             | 120                        | 9600                         | 80                   |
| SYSTEM (SYSTEM)               | 32888                | 1775952                | 54             | 32888                      | 2631040                      | 80                   |
| CAF (GHC.Conc.Signal)         | 640                  | 34560                  | 54             | 640                        | 51200                        | 80                   |
| CAF (GHC.Weak.Finalize)       | 56                   | 2968                   | 53             | 56                         | 4424                         | 79                   |
| main (Main)                   | 290019472            | 15660556256            | 54             | 290019472                  | 23200819584                  | 80                   |
| parseAll.go (Main)            | 72                   | 1056                   | 53             | 72                         | 3488                         | 79                   |
| CAF (GHC.IO.Encoding)         | 912                  | 49248                  | 54             | 912                        | 72960                        | 80                   |
| sumFast.\ (Main)              | 16                   | 832                    | 52             | 0                          | 0                            | 0                    |
| sumLeaky.step (Main)          | 0                    | 0                      | 0              | 32                         | 2496                         | 78                   |
| sumLeaky.step.history' (Main) | 0                    | 0                      | 0              | 47592                      | 1909080                      | 79                   |


In [31]:
-- We can generate these automatically with F.declareColumns but
-- we define them manually to be explicit.
leftMaxResidency = F.col @Int "max_residency"
rightMaxResidency = F.col @Int "Right_max_residency"
leftTotalResidency = F.col @Int "total_residency"
rightTotalResidency = F.col @Int "Right_total_residency"

joined |> D.deriveMany [ "diff_max_residency"   .= rightMaxResidency - leftMaxResidency
                       , "diff_total_residency" .= rightTotalResidency - leftTotalResidency
                       ]
       |> D.select ["cc_label", "diff_max_residency", "diff_total_residency"]

  ---------------------------------------------------------------------------------------  
|       cc_label<br>Text        | diff_max_residency<br>Int | diff_total_residency<br>Int |
| ------------------------------|---------------------------|---------------------------- |
| CAF (GHC.IO.Handle.FD)        | 0                         | 443872                      |
| UNKNOWN (empty stack)         | 0                         | 4160                        |
| CAF (GHC.IO.Encoding.Iconv)   | 0                         | 3120                        |
| SYSTEM (SYSTEM)               | 0                         | 855088                      |
| CAF (GHC.Conc.Signal)         | 0                         | 16640                       |
| CAF (GHC.Weak.Finalize)       | 0                         | 1456                        |
| main (Main)                   | 0                         | 7540263328                  |
| parseAll.go (Main)            | 0                         | 2432                        |
| CAF (GHC.IO.Encoding)         | 0                         | 23712                       |
| sumFast.\ (Main)              | -16                       | -832                        |
| sumLeaky.step (Main)          | 32                        | 2496                        |
| sumLeaky.step.history' (Main) | 47592                     | 1909080                     |


In [32]:
mkBytesDf :: L.ByteString -> D.DataFrame
mkBytesDf rawLog =
    case readEventLog rawLog of
            Left err -> error $ "Error parsing eventlog: " ++ err
            Right (EventLog _header (Data events), _) ->
                    let
                        (_, _, bytesRows) = parseLogToRows events
                    in D.fromRows ["time", "type", "bytes"]
                        (map (\r -> [D.toAny (bTime r),
                                     D.toAny (T.pack (show (bLocation r))),
                                     D.toAny (bBytes r)]) bytesRows)

fastBytesDf = mkBytesDf fastRawLogs |> D.derive "profile" (F.lit "FAST")
leakyBytesDf = mkBytesDf leakyRawLogs |> D.derive "profile" (F.lit "LEAKY")

In [33]:
D.take 10 leakyBytesDf

  -----------------------------------------------------------------  
| time<br>Word64 | type<br>Text | bytes<br>Word64 | profile<br>Text |
| ---------------|--------------|-----------------|---------------- |
| 0              | LIVE         | 290055864       | LEAKY           |
| 0              | HEAP         | 296747008       | LEAKY           |
| 0              | BLOCK        | 291225600       | LEAKY           |
| 0              | HEAP         | 296747008       | LEAKY           |
| 0              | BLOCK        | 291311616       | LEAKY           |
| 0              | LIVE         | 290073560       | LEAKY           |
| 0              | HEAP         | 296747008       | LEAKY           |
| 0              | BLOCK        | 291295232       | LEAKY           |
| 0              | HEAP         | 296747008       | LEAKY           |
| 0              | BLOCK        | 291291136       | LEAKY           |


In [34]:
-- Again we specify these manually but we can generate them automatically.
bytesType = F.col @Text "type"
bytes = F.col @Word64 "bytes"
profile = F.col @Text "profile"
time = F.col @Word64 "time"

In [35]:
aggBytes :: Text -> Text -> Text -> DataFrame -> DataFrame
aggBytes p ty outCol df =
      df
        |> D.filterWhere (bytesType .== F.lit ty .&& (profile .== F.lit p))
        |> D.groupBy    [F.name time]
        |> D.aggregate  ["bytes" .= F.mean bytes]
        |> D.exclude    [F.name bytesType]
        |> D.rename     "bytes" (p <> "_" <> outCol)

fastHeap  = aggBytes "FAST" "HEAP"  "heap_bytes" fastBytesDf
fastLive  = aggBytes "FAST" "LIVE"  "live_bytes" fastBytesDf
fastBlock = aggBytes "FAST" "BLOCK" "block_bytes" fastBytesDf

leakyHeap  = aggBytes "LEAKY" "HEAP"  "heap_bytes" leakyBytesDf
leakyLive  = aggBytes "LEAKY" "LIVE"  "live_bytes" leakyBytesDf
leakyBlock = aggBytes "LEAKY" "BLOCK" "block_bytes" leakyBytesDf

-- We'll need to impute these since foldl_prime runs faster
fastBytesColumns = ["FAST_block_bytes", "FAST_heap_bytes", "FAST_live_bytes"]

pivoted = 
      D.sortBy [D.Asc "time"] $
        foldr1 (D.rightJoin ["time"]) [ D.rightJoin ["time"] leakyBlock fastBlock
                                      , D.rightJoin ["time"] leakyHeap fastHeap
                                      , D.rightJoin ["time"] leakyLive fastLive]
        |> D.fold (\name df -> D.impute (F.col @(Maybe Double) name) 0 df) fastBytesColumns

pivoted |> D.sortBy [D.Asc "time"] |> D.takeLast 10

  -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
| time<br>Word64 | LEAKY_block_bytes<br>Double | FAST_block_bytes<br>Double | LEAKY_heap_bytes<br>Double | FAST_heap_bytes<br>Double | LEAKY_live_bytes<br>Double | FAST_live_bytes<br>Double |
| ---------------|-----------------------------|----------------------------|----------------------------|---------------------------|----------------------------|-------------------------- |
| 0              | 2.912487520214765e8         | 2.9124753515789473e8       | 2.96747008e8               | 2.96747008e8              | 2.900672544e8              | 2.90063712e8              |
| 1              | 2.912581850998825e8         | 2.912461651913478e8        | 2.96747008e8               | 2.96747008e8              | 2.90076988e8               | 2.90063464e8              |
| 2              | 2.912681550305882e8         | 2.912461790169218e8        | 2.96747008e8               | 2.96747008e8              | 2.90087152e8               | 2.900634728e8             |
| 3              | 2.912779012830189e8         | 2.912461757009346e8        | 2.96747008e8               | 2.96747008e8              | 2.900973264e8              | 2.90063484e8              |
| 4              | 2.912882600851064e8         | 2.9124613984415585e8       | 2.96747008e8               | 2.96747008e8              | 2.9010748e8                | 2.90063464e8              |
| 5              | 2.9129830036449707e8        | 2.907930078732171e8        | 2.96747008e8               | 2.9629002162916005e8      | 2.901176304e8              | 2.41727228e8              |
| 6              | 2.913097393058677e8         | 0.0                        | 2.96747008e8               | 0.0                       | 2.901275464e8              | 0.0                       |
| 7              | 2.9131656997869825e8        | 0.0                        | 2.96747008e8               | 0.0                       | 2.901373008e8              | 0.0                       |
| 8              | 2.886536948037383e8         | 0.0                        | 2.94052069682243e8         | 0.0                       | 1.45094424e8               | 0.0                       |


In [39]:
import qualified DataFrame.Display.Web.Plot as Plt

Plt.plotLines "time" (filter (T.isSuffixOf "bytes") (D.columnNames pivoted)) pivoted

In [37]:
import qualified DataFrame.Display.Web.Plot as Plt

Plt.plotLines "time" (filter (T.isSuffixOf "_live_bytes") (D.columnNames pivoted)) (pivoted |> D.take 5)

In [38]:
D.plotLines "time" (filter (T.isSuffixOf "_live_bytes") (D.columnNames pivoted)) (pivoted |> D.take 5)

 2.9e8│                                                            
      │                                                      [94m⢀[0m[94m⠤[0m[94m⠒[0m[94m⠁[0m  
      │                                                   [94m⣀[0m[94m⠔[0m[94m⠊[0m[94m⠁[0m     
      │                                               [94m⢀[0m[94m⡠[0m[94m⠒[0m[94m⠉[0m         
      │                                            [94m⣀[0m[94m⠤[0m[94m⠊[0m[94m⠁[0m            
      │                                         [94m⡠[0m[94m⠔[0m[94m⠊[0m                
      │                                     [94m⢀[0m[94m⡠[0m[94m⠒[0m[94m⠉[0m                   
      │                                  [94m⢀[0m[94m⠤[0m[94m⠊[0m[94m⠁[0m                      
      │                               [94m⣀[0m[94m⠔[0m[94m⠊[0m[94m⠁[0m                         
      │                           [94m⢀[0m[94m⡠[0m[94m⠔[0m[94m⠉[0m                             
 2.9e