use postgresql to process time serials data
The idea based on this blog:Designing high-performance time series data tables on Amazon RDS for PostgreSQL
- based on asyncio, it's asynchronous
- only need PostgreSQL version hight than v9.5, dot't need any other plugins or extensions be installed
- use partision table to handle huge data
- based on sqlalchemy async core, can work in existing projects with no pain
pip install pgserials
BUCKET = Bucket('netspeed')
engine = create_async_engine(
"postgresql+asyncpg://alexander:@127.0.0.1/brin_test", echo=True,
)
async with engine.connect() as conn:
await Client(conn).regist_buckets(BUCKET).async_init()
pgserials create partition table ranged by time, it's default create partition table by month. async_init method create partition table current month, so you can use it immediately
you can create partition table by hand like
async with engine.connect() as conn:
await Client(conn).regist_buckets(BUCKET).extends_table(year, month)
async with engine.connect() as conn:
client = Client(conn).regist_buckets(BUCKET)
ts = int(int(time.time() * 1000))
for i in range(10000):
for tag in range(10):
ts += 1000
await client.insert(BUCKET.name, ts, tag, ping=random.randint(10, 1000), delay=random.randint(50, 500))
await conn.commit()
print('insert done')
async with engine.connect() as conn:
client = Client(conn).regist_buckets(BUCKET)
resp = await client.query(
Query(BUCKET.name).range(start='-5m')
)
print(resp)
range method specify two keyword arguments, start and stop, you can directly set timestamp such as int(time.time() * 1000), or string for human reading: 5s or 5sec or 5seconds, 10minute, 10m 10min also ok, prefix subtract mean‘s time be reduced by current timestamp.
you can specify fields to filter labels
async with engine.connect() as conn:
client = Client(conn).regist_buckets(BUCKET)
resp = await client.query(
Query(BUCKET.name).range(start='-5m').fields('delay')
)
print(resp)
if you don't specify keyword argument response_type, pgserials will return database record directly. response_type have 3 options: csvResponse/arrayResponse/ndarrayResponse
- csvResponse will return a csv file string
- arrayResponse will return a nested List
- ndarrayResponse will return a named numpy ndarray
async with engine.connect() as conn:
client = Client(conn).regist_buckets(BUCKET)
resp = await client.query(
Query(BUCKET.name).range(start='-5m').window(Query.minuteWindow, Query.maxFunc)
)
print(resp)
use window method to specify time range to group by,and Aggregate function to apply in data
Options of time range
- Query.minuteWindow
- Query.secondWindow
- Query.minuteWindow
- Query.hourWindow
- Query.dayWindow
- Query.weekWindow
Options of Aggregate function
- Query.countFunc
- Query.sumFunc
- Query.maxFunc
- Query.minFunc
- Query.avgFunc