Skip to content

Commit

Permalink
Initial storage implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
arp242 committed Mar 26, 2020
1 parent 3e193ea commit f1a4acb
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 160 deletions.
49 changes: 26 additions & 23 deletions cron/browser_stat.go
Expand Up @@ -6,7 +6,6 @@ package cron

import (
"context"
"database/sql"
"strings"

"github.com/mssola/user_agent"
Expand All @@ -27,10 +26,11 @@ func updateBrowserStats(ctx context.Context, hits []goatcounter.Hit) error {
return zdb.TX(ctx, func(ctx context.Context, tx zdb.DB) error {
// Group by day + browser.
type gt struct {
count int
day string
browser string
version string
count int
countUnique int
day string
browser string
version string
}
grouped := map[string]gt{}
for _, h := range hits {
Expand All @@ -47,21 +47,24 @@ func updateBrowserStats(ctx context.Context, hits []goatcounter.Hit) error {
v.browser = browser
v.version = version
var err error
v.count, err = existingBrowserStats(ctx, tx, h.Site, day, v.browser, v.version)
v.count, v.countUnique, err = existingBrowserStats(ctx, tx, h.Site, day, v.browser, v.version)
if err != nil {
return err
}
}

v.count += 1
if h.StartedSession {
v.countUnique += 1
}
grouped[k] = v
}

siteID := goatcounter.MustGetSite(ctx).ID
ins := bulk.NewInsert(ctx, tx,
"browser_stats", []string{"site", "day", "browser", "version", "count"})
"browser_stats", []string{"site", "day", "browser", "version", "count", "count_unique"})
for _, v := range grouped {
ins.Values(siteID, v.day, v.browser, v.version, v.count)
ins.Values(siteID, v.day, v.browser, v.version, v.count, v.countUnique)
}
return ins.Finish()
})
Expand All @@ -70,26 +73,26 @@ func updateBrowserStats(ctx context.Context, hits []goatcounter.Hit) error {
func existingBrowserStats(
txctx context.Context, tx zdb.DB, siteID int64,
day, browser, version string,
) (int, error) {
) (int, int, error) {

var c int
err := tx.GetContext(txctx, &c,
`select count from browser_stats where site=$1 and day=$2 and browser=$3 and version=$4`,
var c []struct {
Count int `db:"count"`
CountUnique int `db:"count_unique"`
}
err := tx.SelectContext(txctx, &c,
`select count, count_unique from browser_stats where site=$1 and day=$2 and browser=$3 and version=$4 limit 1`,
siteID, day, browser, version)
if err != nil && err != sql.ErrNoRows {
return 0, errors.Wrap(err, "existing")
if err != nil {
return 0, 0, errors.Wrap(err, "select")
}

if err != sql.ErrNoRows {
_, err = tx.ExecContext(txctx,
`delete from browser_stats where site=$1 and day=$2 and browser=$3 and version=$4`,
siteID, day, browser, version)
if err != nil {
return 0, errors.Wrap(err, "delete")
}
if len(c) == 0 {
return 0, 0, nil
}

return c, nil
_, err = tx.ExecContext(txctx,
`delete from browser_stats where site=$1 and day=$2 and browser=$3 and version=$4`,
siteID, day, browser, version)
return c[0].Count, c[0].CountUnique, errors.Wrap(err, "delete")
}

func getBrowser(uaHeader string) (string, string) {
Expand Down
13 changes: 13 additions & 0 deletions cron/cron.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"zgo.at/goatcounter"
"zgo.at/goatcounter/acme"
"zgo.at/goatcounter/cfg"
"zgo.at/utils/syncutil"
"zgo.at/zdb"
"zgo.at/zhttp/ctxkey"
Expand All @@ -31,6 +32,7 @@ var tasks = []task{
{DataRetention, 1 * time.Hour},
{renewACME, 2 * time.Hour},
{vacuumDeleted, 12 * time.Hour},
{clearSessions, 1 * time.Minute},
}

var (
Expand Down Expand Up @@ -281,3 +283,14 @@ func vacuumDeleted(ctx context.Context) error {
}
return nil
}

func clearSessions(ctx context.Context) error {
var query string
if cfg.PgSQL {
query = `update sessions set hash=null where last_seen > now() + interval '1 hour'`
} else {
query = `update sessions set hash=null where last_seen > datetime(datetime(), '+1 hours')`
}
_, err := zdb.MustGet(ctx).ExecContext(ctx, query)
return err
}
8 changes: 4 additions & 4 deletions cron/cron_test.go
Expand Up @@ -32,10 +32,10 @@ func TestDataRetention(t *testing.T) {
past := now.Add(-40 * 24 * time.Hour)

gctest.StoreHits(ctx, t, []goatcounter.Hit{
{Site: site.ID, CreatedAt: now, Path: "/a"},
{Site: site.ID, CreatedAt: now, Path: "/a"},
{Site: site.ID, CreatedAt: past, Path: "/a"},
{Site: site.ID, CreatedAt: past, Path: "/a"},
{Session: 1, Site: site.ID, CreatedAt: now, Path: "/a"},
{Session: 1, Site: site.ID, CreatedAt: now, Path: "/a"},
{Session: 1, Site: site.ID, CreatedAt: past, Path: "/a"},
{Session: 1, Site: site.ID, CreatedAt: past, Path: "/a"},
}...)

err = DataRetention(ctx)
Expand Down
52 changes: 27 additions & 25 deletions cron/hit_stat.go
Expand Up @@ -6,7 +6,6 @@ package cron

import (
"context"
"database/sql"
"strconv"

"github.com/pkg/errors"
Expand All @@ -29,10 +28,11 @@ func updateHitStats(ctx context.Context, hits []goatcounter.Hit) error {
return zdb.TX(ctx, func(ctx context.Context, tx zdb.DB) error {
// Group by day + path.
type gt struct {
count []int
day string
path string
title string
count []int
countUnique []int
day string
path string
title string
}
grouped := map[string]gt{}
for _, h := range hits {
Expand All @@ -47,7 +47,7 @@ func updateHitStats(ctx context.Context, hits []goatcounter.Hit) error {
v.day = day
v.path = h.Path
var err error
v.count, v.title, err = existingHitStats(ctx, tx, h.Site, day, v.path)
v.count, v.countUnique, v.title, err = existingHitStats(ctx, tx, h.Site, day, v.path)
if err != nil {
return err
}
Expand All @@ -57,16 +57,20 @@ func updateHitStats(ctx context.Context, hits []goatcounter.Hit) error {
v.title = h.Title
}

h, _ := strconv.ParseInt(h.CreatedAt.Format("15"), 10, 8)
v.count[h] += 1
hour, _ := strconv.ParseInt(h.CreatedAt.Format("15"), 10, 8)
v.count[hour] += 1
if h.StartedSession {
v.count[hour] += 1
}
grouped[k] = v
}

siteID := goatcounter.MustGetSite(ctx).ID
ins := bulk.NewInsert(ctx, tx,
"hit_stats", []string{"site", "day", "path", "title", "stats"})
"hit_stats", []string{"site", "day", "path", "title", "stats", "stats_unique"})
for _, v := range grouped {
ins.Values(siteID, v.day, v.path, v.title, jsonutil.MustMarshal(v.count))
ins.Values(siteID, v.day, v.path, v.title, jsonutil.MustMarshal(v.count),
jsonutil.MustMarshal(v.countUnique))
}
return ins.Finish()
})
Expand All @@ -75,37 +79,35 @@ func updateHitStats(ctx context.Context, hits []goatcounter.Hit) error {
func existingHitStats(
txctx context.Context, tx zdb.DB, siteID int64,
day, path string,
) ([]int, string, error) {
) ([]int, []int, string, error) {

var ex []struct {
Stats []byte `db:"stats"`
Title string `db:"title"`
Stats []byte `db:"stats"`
StatsUnique []byte `db:"stats_unique"`
Title string `db:"title"`
}
err := tx.SelectContext(txctx, &ex,
`select stats, title from hit_stats where site=$1 and day=$2 and path=$3`,
`select stats, stats_unique, title from hit_stats where site=$1 and day=$2 and path=$3 limit 1`,
siteID, day, path)
if err != nil && err != sql.ErrNoRows {
return nil, "", errors.Wrap(err, "existingHitStats")
if err != nil {
return nil, nil, "", errors.Wrap(err, "existingHitStats")
}

if len(ex) == 0 {
return make([]int, 24), "", nil
}

if len(ex) > 1 {
return nil, "", errors.Errorf("existingHitStats: %d rows: %#v", len(ex), ex)
return make([]int, 24), make([]int, 24), "", nil
}

_, err = tx.ExecContext(txctx,
`delete from hit_stats where site=$1 and day=$2 and path=$3`,
siteID, day, path)
if err != nil {
return nil, "", errors.Wrap(err, "delete")
return nil, nil, "", errors.Wrap(err, "delete")
}

var r []int
var r, ru []int
if ex[0].Stats != nil {
jsonutil.MustUnmarshal(ex[0].Stats, &r)
jsonutil.MustUnmarshal(ex[0].StatsUnique, &ru)
}
return r, ex[0].Title, nil

return r, ru, ex[0].Title, nil
}
6 changes: 3 additions & 3 deletions cron/hit_stat_test.go
Expand Up @@ -22,9 +22,9 @@ func TestHitStats(t *testing.T) {
now := time.Date(2019, 8, 31, 14, 42, 0, 0, time.UTC)

goatcounter.Memstore.Append([]goatcounter.Hit{
{Site: site.ID, CreatedAt: now, Path: "/asd", Title: "aSd"},
{Site: site.ID, CreatedAt: now, Path: "/asd/"}, // Trailing / should be sanitized and treated identical as /asd
{Site: site.ID, CreatedAt: now, Path: "/zxc"},
{Session: 1, Site: site.ID, CreatedAt: now, Path: "/asd", Title: "aSd"},
{Session: 1, Site: site.ID, CreatedAt: now, Path: "/asd/"}, // Trailing / should be sanitized and treated identical as /asd
{Session: 1, Site: site.ID, CreatedAt: now, Path: "/zxc"},
}...)
hits, err := goatcounter.Memstore.Persist(ctx)
if err != nil {
Expand Down
47 changes: 25 additions & 22 deletions cron/location_stat.go
Expand Up @@ -6,7 +6,6 @@ package cron

import (
"context"
"database/sql"

"github.com/pkg/errors"
"zgo.at/goatcounter"
Expand All @@ -24,9 +23,10 @@ func updateLocationStats(ctx context.Context, hits []goatcounter.Hit) error {
return zdb.TX(ctx, func(ctx context.Context, tx zdb.DB) error {
// Group by day + location.
type gt struct {
count int
day string
location string
count int
countUnique int
day string
location string
}
grouped := map[string]gt{}
for _, h := range hits {
Expand All @@ -37,21 +37,24 @@ func updateLocationStats(ctx context.Context, hits []goatcounter.Hit) error {
v.day = day
v.location = h.Location
var err error
v.count, err = existingLocationStats(ctx, tx, h.Site, day, v.location)
v.count, v.countUnique, err = existingLocationStats(ctx, tx, h.Site, day, v.location)
if err != nil {
return err
}
}

v.count += 1
if h.StartedSession {
v.countUnique += 1
}
grouped[k] = v
}

siteID := goatcounter.MustGetSite(ctx).ID
ins := bulk.NewInsert(ctx, tx,
"location_stats", []string{"site", "day", "location", "count"})
"location_stats", []string{"site", "day", "location", "count", "count_unique"})
for _, v := range grouped {
ins.Values(siteID, v.day, v.location, v.count)
ins.Values(siteID, v.day, v.location, v.count, v.countUnique)
}
return ins.Finish()
})
Expand All @@ -60,24 +63,24 @@ func updateLocationStats(ctx context.Context, hits []goatcounter.Hit) error {
func existingLocationStats(
txctx context.Context, tx zdb.DB, siteID int64,
day, location string,
) (int, error) {
) (int, int, error) {

var c int
err := tx.GetContext(txctx, &c,
`select count from location_stats where site=$1 and day=$2 and location=$3`,
var c []struct {
Count int `db:"count"`
CountUnique int `db:"count_unique"`
}
err := tx.SelectContext(txctx, &c,
`select count, count_unique from location_stats where site=$1 and day=$2 and location=$3`,
siteID, day, location)
if err != nil && err != sql.ErrNoRows {
return 0, errors.Wrap(err, "existing")
if err != nil {
return 0, 0, errors.Wrap(err, "select")
}

if err != sql.ErrNoRows {
_, err = tx.ExecContext(txctx,
`delete from location_stats where site=$1 and day=$2 and location=$3`,
siteID, day, location)
if err != nil {
return 0, errors.Wrap(err, "delete")
}
if len(c) == 0 {
return 0, 0, nil
}

return c, nil
_, err = tx.ExecContext(txctx,
`delete from location_stats where site=$1 and day=$2 and location=$3`,
siteID, day, location)
return c[0].Count, c[0].CountUnique, errors.Wrap(err, "delete")
}

0 comments on commit f1a4acb

Please sign in to comment.