Skip to content

Commit

Permalink
- refactored snowstorm package into the snowflake package. there real…
Browse files Browse the repository at this point in the history
…ly wasn't enough code to justify the confusion of another package.

- reduced the surface area of what's exposed publicly
  • Loading branch information
savaki committed Feb 24, 2017
1 parent 8e70e07 commit 55520f9
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 278 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
@@ -0,0 +1,4 @@
.idea
*.iml
*.ipr
*.iws
4 changes: 2 additions & 2 deletions Dockerfile
@@ -1,12 +1,12 @@
FROM ubuntu:14.04
FROM ubuntu:16.04
MAINTAINER Matt Ho <matt.ho@gmail.com>

ENV PORT 80
ENV GOPATH /home/go

RUN apt-get update && apt-get install -y wget
ADD . /home/go/src/github.com/savaki/snowflake
RUN wget -q -O /tmp/golang.tar.gz https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz && \
RUN wget -q -O /tmp/golang.tar.gz https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz && \
tar -C /usr/local -xzf /tmp/golang.tar.gz && \
/usr/local/go/bin/go install github.com/savaki/snowflake/cmd/snowflake && \
rm -rf /usr/local/go /tmp/golang.tar.gz
Expand Down
59 changes: 55 additions & 4 deletions README.md
@@ -1,10 +1,61 @@
# snowflake
Golang implementation of Twitter snowflake

## User Data
Snowflake is a go implementation of Twitter's Snowflake service. Like its namespace, Snowflake is also a network
service for generating unique ID numbers at high scale with some simple guarantees.

When launching an instance in AWS, you can use AWS userdata to initialize the server id.
[https://blog.twitter.com/2010/announcing-snowflake](https://blog.twitter.com/2010/announcing-snowflake)

## Concepts

Snowflake generates unique int64 ids that (unlike uuids) are loosely time sorted. Each id consists of:

| Bits | Field | Notes |
| :--- | :--- | :--- |
| 41 | Timestamp in MS | ~70yrs |
| 10 | Server ID | Unique Server ID |
| 13 | Sequence ID| sequence to disambiguate requests in the same ms |

## Server Usage

The simplest way to run snowflake is via docker:

```
docker run -p 80:80 -it --rm savaki/snowflake:1.3
```

To retrieve the a single id:

```
curl http://your-host-name?n=4
[152193159915372544]
```

To retrieve the N ids:

```
curl http://your-host-name?n=8
[152193295848570880,152193295848570881,152193295848570882,152193295848570883]
```
{"server-id": 1}

## Client Usage

Snowflake implements two clients, a low level client, and a high level buffered client. In most cases, you'll want to
use the buffered client. The buffered client maintains and replenishes an internal queue of ids so there should always
be one available when you need it.

```
package main
import (
"fmt"
"github.com/savaki/snowflake"
)
func main() {
client, _ := snowflake.NewClient(snowflake.WithHosts("your-host"))
buffered := snowflake.NewBufferedClient(client)
fmt.Println("id:", buffered.Id())
}
```

13 changes: 13 additions & 0 deletions _examples/client/main.go
@@ -0,0 +1,13 @@
package main

import (
"fmt"

"github.com/savaki/snowflake"
)

func main() {
client, _ := snowflake.NewClient(snowflake.WithHosts("your-host"))
buffered := snowflake.NewBufferedClient(client)
fmt.Println("id:", buffered.Id())
}
46 changes: 23 additions & 23 deletions snowstorm/client.go → buffered.go
@@ -1,4 +1,4 @@
package snowstorm
package snowflake

import (
"context"
Expand All @@ -10,35 +10,35 @@ import (
)

const (
RequestSize = 512
defaultRequestSize = 512
)

type Client struct {
factory RemoteFactory
ch chan int64
ctx context.Context
cancel func()
wg *sync.WaitGroup
n int
type BufferedClient struct {
client Client
ch chan int64
ctx context.Context
cancel func()
wg *sync.WaitGroup
n int
}

func (c *Client) Id() int64 {
func (c *BufferedClient) Id() int64 {
return <-c.ch
}

func (c *Client) spawnN(n int) {
func (c *BufferedClient) spawnN(n int) {
c.wg.Add(n)
for i := 0; i < n; i++ {
go c.spawn()
}
}

func (c *Client) spawn() {
func (c *BufferedClient) spawn() {
defer c.wg.Done()

for {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
ids, err := c.factory.IntN(ctx, RequestSize)
ids, err := c.client.IntN(ctx, defaultRequestSize)
if err != nil {
select {
case <-c.ctx.Done():
Expand All @@ -62,22 +62,22 @@ func (c *Client) spawn() {
}
}

func (c *Client) Close() {
func (c *BufferedClient) Close() {
c.cancel()
c.wg.Wait()
}

func New(factory RemoteFactory) *Client {
func NewBufferedClient(c Client) *BufferedClient {
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
ctx: ctx,
cancel: cancel,
factory: factory,
ch: make(chan int64, 4096),
wg: &sync.WaitGroup{},
bc := &BufferedClient{
ctx: ctx,
cancel: cancel,
client: c,
ch: make(chan int64, 4096),
wg: &sync.WaitGroup{},
}

c.spawnN(8)
bc.spawnN(8)

return c
return bc
}
5 changes: 2 additions & 3 deletions snowstorm/client_test.go → buffered_test.go
@@ -1,11 +1,10 @@
package snowstorm_test
package snowflake_test

import (
"context"
"testing"

"github.com/savaki/snowflake"
"github.com/savaki/snowflake/snowstorm"
)

type Remote struct {
Expand All @@ -18,7 +17,7 @@ func (t *Remote) IntN(ctx context.Context, n int) ([]int64, error) {

func TestGenerateIdStream(t *testing.T) {
buffer := 4
client := snowstorm.New(&Remote{snowflake.Default})
client := snowflake.NewBufferedClient(&Remote{snowflake.Mock})

uniques := map[int64]int64{}
iterations := buffer * 10
Expand Down
169 changes: 169 additions & 0 deletions client.go
@@ -0,0 +1,169 @@
package snowflake

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"sync/atomic"
)

type Client interface {
IntN(ctx context.Context, n int) ([]int64, error)
}

type client struct {
hosts []string
hostCount int32
offset int32
doFunc func(r *http.Request) (*http.Response, error)
}

func (c *client) IntN(ctx context.Context, n int) ([]int64, error) {
host := c.hosts[int(c.offset%c.hostCount)]
atomic.AddInt32(&c.offset, 1)
if c.offset > c.hostCount {
atomic.StoreInt32(&c.offset, 0)
}

url := host + "?n=" + strconv.Itoa(n)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)

resp, err := c.doFunc(req)
if err != nil {
fmt.Fprintf(os.Stderr, err.Error())
return nil, err
}
defer resp.Body.Close()

var ids []int64
err = json.NewDecoder(resp.Body).Decode(&ids)
return ids, err
}

func NewClient(opts ...ClientOption) (Client, error) {
h := &client{
hosts: []string{"http://snowflake.altairsix.com/10/13"},
doFunc: http.DefaultClient.Do,
}

for _, opt := range opts {
opt(h)
}

h.hostCount = int32(len(h.hosts))

// ensure the hosts are all valid
for _, host := range h.hosts {
_, err := http.NewRequest("GET", host, nil)
if err != nil {
return nil, err
}
}

return h, nil
}

type ClientOption func(*client)

func WithDoFunc(fn func(r *http.Request) (*http.Response, error)) ClientOption {
return func(h *client) {
h.doFunc = fn
}
}

func WithHosts(hosts ...string) ClientOption {
return func(h *client) {
h.hosts = hosts
}
}

func writeErr(w http.ResponseWriter, err error) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]interface{}{
"error": err.Error(),
})
}

func makeHandler(factory *Factory, maxN int) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
req.ParseForm()

n := 1
if v := req.FormValue("n"); v != "" {
var err error
n, err = strconv.Atoi(v)
if err != nil {
writeErr(w, err)
return
}
if n > maxN {
writeErr(w, errors.New(fmt.Sprintf("exceeded the maximum number per request, %v", maxN)))
return
}
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(factory.IdN(n))
}
}

func info(serverID int) http.Handler {
var handlerFunc http.HandlerFunc = func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]int{
"server-id": serverID,
})
}

return handlerFunc
}

func Multi(serverID, nMax int) http.HandlerFunc {
handlers := map[string]http.Handler{}

for srv := 1; srv <= 13; srv++ {
for seq := 0; seq <= 13; seq++ {
if srv+seq+41 > 64 {
continue
}

func(server, sequence int) {
path := fmt.Sprintf("/%v/%v", srv, seq)
factory := NewFactory(FactoryOptions{
ServerID: int64(serverID),
ServerBits: uint(server),
SequenceBits: uint(sequence),
})
handlers[path] = makeHandler(factory, nMax)
}(srv, seq)
}
}

factory := NewFactory(FactoryOptions{
ServerID: int64(serverID),
})
handlers["/"] = makeHandler(factory, nMax)
handlers["/info"] = info(serverID)

var handler http.HandlerFunc = func(w http.ResponseWriter, req *http.Request) {
handler, ok := handlers[req.URL.Path]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}

handler.ServeHTTP(w, req)
}
return handler
}

0 comments on commit 55520f9

Please sign in to comment.