In [1]:
import pandas as pd
import numpy as np

df = pd.read_csv('kafka-consumer-metrics.txt',
                 delim_whitespace=True,
                 dtype={'value':np.float},
                 parse_dates=['when'],
                 na_values=['-Infinity'],
                 names="when consumer metric value".split())

In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 264 entries, 0 to 263
Data columns (total 4 columns):
when        264 non-null datetime64[ns]
consumer    264 non-null object
metric      264 non-null object
value       264 non-null float64
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 8.3+ KB


In [5]:
df.sample(4)

Unnamed: 0,when,consumer,metric,value
257,2018-02-06 12:30:22.780,my-consumer-0,fetch-size-avg,0.0
98,2018-02-06 12:27:22.714,my-consumer-3,records-per-request-avg,1598.425101
234,2018-02-06 12:30:22.780,my-consumer-2,fetch-rate,4.039455
114,2018-02-06 12:27:22.714,my-consumer-1,fetch-size-avg,638403.090615


In [6]:
df.consumer.unique()

array(['my-consumer-3', 'my-consumer-2', 'my-consumer-1', 'my-consumer-0'], dtype=object)

### Multiindex dataframe indexed by `when` and `consumer`
we want to create multiindex dataframe where `when` and `consumer` are hierarchical index, `metric` are column name and `value` is their respective values.

In [7]:
df2 = df.groupby(by='consumer').apply(lambda d: d.pivot(index='when', columns='metric', values='value'))

In [10]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 24 entries, (my-consumer-0, 2018-02-06 12:25:22.643000) to (my-consumer-3, 2018-02-06 12:30:22.780000)
Data columns (total 11 columns):
bytes-consumed-rate        24 non-null float64
fetch-latency-avg          24 non-null float64
fetch-latency-max          24 non-null float64
fetch-rate                 24 non-null float64
fetch-size-avg             24 non-null float64
fetch-size-max             24 non-null float64
fetch-throttle-time-avg    24 non-null float64
fetch-throttle-time-max    24 non-null float64
records-consumed-rate      24 non-null float64
records-lag-max            24 non-null float64
records-per-request-avg    24 non-null float64
dtypes: float64(11)
memory usage: 2.3+ KB


In [11]:
df2.sample(3)

Unnamed: 0_level_0,metric,bytes-consumed-rate,fetch-latency-avg,fetch-latency-max,fetch-rate,fetch-size-avg,fetch-size-max,fetch-throttle-time-avg,fetch-throttle-time-max,records-consumed-rate,records-lag-max,records-per-request-avg
consumer,when,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
my-consumer-0,2018-02-06 12:25:22.643,2251930.0,230.475352,604.0,8.668844,259756.957746,1033454.0,0.0,0.0,8328.419253,4118.0,960.788732
my-consumer-2,2018-02-06 12:27:22.714,11.08134,501.810606,503.0,4.018387,2.757576,364.0,0.0,0.0,0.03044,0.0,0.007576
my-consumer-3,2018-02-06 12:26:22.682,3952981.0,254.105691,505.0,7.828661,504922.108,1045024.0,0.0,0.0,12668.253123,6088112.0,1618.344


In [12]:
df2.to_csv('kafka-consumer-metrics.grouped.txt')