# #15. New hope

1. [Golang](#Golang)
    1. [Hello, world!](#Hello,-world!)
    2. [Toolchain](#Toolchain)
    3. [Project layout](#Project-layout)
    4. [Dependency management](#Dependency-management)
    5. [All in](#All-in)
    6. [Internals](#Internals)
    7. [Examples](#Examples)
2. [Memory](#Memory)
    1. [Consumption](#Consumption)
    2. [Text lookup](#Text-lookup)
    3. [Probabilistic datastructures](#Probabilistic-datastructures)

## Golang

![layer](http://upload-images.jianshu.io/upload_images/590399-c4c0a9005db667dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

#### Hello, world!

In [None]:
#!/usr/bin/env python

import time


def greeting():
    '''Returns a pleasant, semi-useful greeting.'''
    return "Hello world, the time is: " + time.ctime()


def main():
    print greeting()


if __name__ == '__main__':
    main()

In [None]:
// A "Hello World" program that prints a greeting with the current time.
package main

import (
	"fmt"
	"time"
)

// greeting returns a pleasant, semi-useful greeting.
func greeting() string {
	return "Hello world, the time is: " + time.Now().String()
}

func main() {
	fmt.Println(greeting())
}

#### Toolchain

In [13]:
! go

Go is a tool for managing Go source code.

Usage:

	go command [arguments]

The commands are:

	build       compile packages and dependencies
	clean       remove object files
	doc         show documentation for package or symbol
	env         print Go environment information
	bug         start a bug report
	fix         run go tool fix on packages
	fmt         run gofmt on package sources
	generate    generate Go files by processing source
	get         download and install packages and dependencies
	install     compile and install packages and dependencies
	list        list packages
	run         compile and run Go program
	test        test packages
	tool        run specified go tool
	version     print Go version
	vet         run go tool vet on packages

Use "go help [command]" for more information about a command.

Additional help topics:

	c           calling between Go and C
	buildmode   description of build modes
	filetype    file types
	gopath      GOP

In [12]:
! go version

go version go1.9.1 darwin/amd64


![runtime](http://upload-images.jianshu.io/upload_images/590399-6f8d750aa05f2f8e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

#### Project layout

#### Dependency management

submodules

vgo

#### All in

In [None]:
// Single line comment
/* Multi-
 line comment */

// A package clause starts every source file.
// Main is a special name declaring an executable rather than a library.
package main

// Import declaration declares library packages referenced in this file.
import (
    "fmt"       // A package in the Go standard library.
    "io/ioutil" // Implements some I/O utility functions.
    m "math"    // Math library with local alias m.
    "net/http"  // Yes, a web server!
    "os"        // OS functions like working with the file system
    "strconv"   // String conversions.
)

// A function definition. Main is special. It is the entry point for the
// executable program. Love it or hate it, Go uses brace brackets.
func main() {
    // Println outputs a line to stdout.
    // Qualify it with the package name, fmt.
    fmt.Println("Hello world!")

    // Call another function within this package.
    beyondHello()
}

// Functions have parameters in parentheses.
// If there are no parameters, empty parentheses are still required.
func beyondHello() {
    var x int // Variable declaration. Variables must be declared before use.
    x = 3     // Variable assignment.
    // "Short" declarations use := to infer the type, declare, and assign.
    y := 4
    sum, prod := learnMultiple(x, y)        // Function returns two values.
    fmt.Println("sum:", sum, "prod:", prod) // Simple output.
    learnTypes()                            // < y minutes, learn more!
}

/* <- multiline comment
Functions can have parameters and (multiple!) return values.
Here `x`, `y` are the arguments and `sum`, `prod` is the signature (what's returned).
Note that `x` and `sum` receive the type `int`.
*/
func learnMultiple(x, y int) (sum, prod int) {
    return x + y, x * y // Return two values.
}

// Some built-in types and literals.
func learnTypes() {
    // Short declaration usually gives you what you want.
    str := "Learn Go!" // string type.

    s2 := `A "raw" string literal
can include line breaks.` // Same string type.

    // Non-ASCII literal. Go source is UTF-8.
    g := 'Σ' // rune type, an alias for int32, holds a unicode code point.

    f := 3.14195 // float64, an IEEE-754 64-bit floating point number.
    c := 3 + 4i  // complex128, represented internally with two float64's.

    // var syntax with initializers.
    var u uint = 7 // Unsigned, but implementation dependent size as with int.
    var pi float32 = 22. / 7

    // Conversion syntax with a short declaration.
    n := byte('\n') // byte is an alias for uint8.

    // Arrays have size fixed at compile time.
    var a4 [4]int           // An array of 4 ints, initialized to all 0.
    a5 := [...]int{3, 1, 5, 10, 100} // An array initialized with a fixed size of five
    // elements, with values 3, 1, 5, 10, and 100.

    // Slices have dynamic size. Arrays and slices each have advantages
    // but use cases for slices are much more common.
    s3 := []int{4, 5, 9}    // Compare to a5. No ellipsis here.
    s4 := make([]int, 4)    // Allocates slice of 4 ints, initialized to all 0.
    var d2 [][]float64      // Declaration only, nothing allocated here.
    bs := []byte("a slice") // Type conversion syntax.

    // Because they are dynamic, slices can be appended to on-demand.
    // To append elements to a slice, the built-in append() function is used.
    // First argument is a slice to which we are appending. Commonly,
    // the array variable is updated in place, as in example below.
    s := []int{1, 2, 3}     // Result is a slice of length 3.
    s = append(s, 4, 5, 6)  // Added 3 elements. Slice now has length of 6.
    fmt.Println(s) // Updated slice is now [1 2 3 4 5 6]

    // To append another slice, instead of list of atomic elements we can
    // pass a reference to a slice or a slice literal like this, with a
    // trailing ellipsis, meaning take a slice and unpack its elements,
    // appending them to slice s.
    s = append(s, []int{7, 8, 9}...) // Second argument is a slice literal.
    fmt.Println(s)  // Updated slice is now [1 2 3 4 5 6 7 8 9]

    p, q := learnMemory() // Declares p, q to be type pointer to int.
    fmt.Println(*p, *q)   // * follows a pointer. This prints two ints.

    // Maps are a dynamically growable associative array type, like the
    // hash or dictionary types of some other languages.
    m := map[string]int{"three": 3, "four": 4}
    m["one"] = 1

    // Unused variables are an error in Go.
    // The underscore lets you "use" a variable but discard its value.
    _, _, _, _, _, _, _, _, _, _ = str, s2, g, f, u, pi, n, a5, s4, bs
    // Usually you use it to ignore one of the return values of a function
    // For example, in a quick and dirty script you might ignore the
    // error value returned from os.Create, and expect that the file
    // will always be created.
    file, _ := os.Create("output.txt")
    fmt.Fprint(file, "This is how you write to a file, by the way")
    file.Close()

    // Output of course counts as using a variable.
    fmt.Println(s, c, a4, s3, d2, m)

    learnFlowControl() // Back in the flow.
}

// It is possible, unlike in many other languages for functions in go
// to have named return values.
// Assigning a name to the type being returned in the function declaration line
// allows us to easily return from multiple points in a function as well as to
// only use the return keyword, without anything further.
func learnNamedReturns(x, y int) (z int) {
    z = x * y
    return // z is implicit here, because we named it earlier.
}

// Go is fully garbage collected. It has pointers but no pointer arithmetic.
// You can make a mistake with a nil pointer, but not by incrementing a pointer.
func learnMemory() (p, q *int) {
    // Named return values p and q have type pointer to int.
    p = new(int) // Built-in function new allocates memory.
    // The allocated int is initialized to 0, p is no longer nil.
    s := make([]int, 20) // Allocate 20 ints as a single block of memory.
    s[3] = 7             // Assign one of them.
    r := -2              // Declare another local variable.
    return &s[3], &r     // & takes the address of an object.
}

func expensiveComputation() float64 {
    return m.Exp(10)
}

func learnFlowControl() {
    // If statements require brace brackets, and do not require parentheses.
    if true {
        fmt.Println("told ya")
    }
    // Formatting is standardized by the command line command "go fmt."
    if false {
        // Pout.
    } else {
        // Gloat.
    }
    // Use switch in preference to chained if statements.
    x := 42.0
    switch x {
    case 0:
    case 1:
    case 42:
        // Cases don't "fall through".
        /*
        There is a `fallthrough` keyword however, see:
          https://github.com/golang/go/wiki/Switch#fall-through
        */
    case 43:
        // Unreached.
    default:
        // Default case is optional.
    }
    // Like if, for doesn't use parens either.
    // Variables declared in for and if are local to their scope.
    for x := 0; x < 3; x++ { // ++ is a statement.
        fmt.Println("iteration", x)
    }
    // x == 42 here.

    // For is the only loop statement in Go, but it has alternate forms.
    for { // Infinite loop.
        break    // Just kidding.
        continue // Unreached.
    }

    // You can use range to iterate over an array, a slice, a string, a map, or a channel.
    // range returns one (channel) or two values (array, slice, string and map).
    for key, value := range map[string]int{"one": 1, "two": 2, "three": 3} {
        // for each pair in the map, print key and value
        fmt.Printf("key=%s, value=%d\n", key, value)
    }
    // If you only need the value, use the underscore as the key
    for _, name := range []string{"Bob", "Bill", "Joe"} {
        fmt.Printf("Hello, %s\n", name)
    }

    // As with for, := in an if statement means to declare and assign
    // y first, then test y > x.
    if y := expensiveComputation(); y > x {
        x = y
    }
    // Function literals are closures.
    xBig := func() bool {
        return x > 10000 // References x declared above switch statement.
    }
    x = 99999
    fmt.Println("xBig:", xBig()) // true
    x = 1.3e3                    // This makes x == 1300
    fmt.Println("xBig:", xBig()) // false now.

    // What's more is function literals may be defined and called inline,
    // acting as an argument to function, as long as:
    // a) function literal is called immediately (),
    // b) result type matches expected type of argument.
    fmt.Println("Add + double two numbers: ",
        func(a, b int) int {
            return (a + b) * 2
        }(10, 2)) // Called with args 10 and 2
    // => Add + double two numbers: 24

    // When you need it, you'll love it.
    goto love
love:

    learnFunctionFactory() // func returning func is fun(3)(3)
    learnDefer()      // A quick detour to an important keyword.
    learnInterfaces() // Good stuff coming up!
}

func learnFunctionFactory() {
    // Next two are equivalent, with second being more practical
    fmt.Println(sentenceFactory("summer")("A beautiful", "day!"))

    d := sentenceFactory("summer")
    fmt.Println(d("A beautiful", "day!"))
    fmt.Println(d("A lazy", "afternoon!"))
}

// Decorators are common in other languages. Same can be done in Go
// with function literals that accept arguments.
func sentenceFactory(mystring string) func(before, after string) string {
    return func(before, after string) string {
        return fmt.Sprintf("%s %s %s", before, mystring, after) // new string
    }
}

func learnDefer() (ok bool) {
    // Deferred statements are executed just before the function returns.
    defer fmt.Println("deferred statements execute in reverse (LIFO) order.")
    defer fmt.Println("\nThis line is being printed first because")
    // Defer is commonly used to close a file, so the function closing the
    // file stays close to the function opening the file.
    return true
}

// Define Stringer as an interface type with one method, String.
type Stringer interface {
    String() string
}

// Define pair as a struct with two fields, ints named x and y.
type pair struct {
    x, y int
}

// Define a method on type pair. Pair now implements Stringer because Pair has defined all the methods in the interface.
func (p pair) String() string { // p is called the "receiver"
    // Sprintf is another public function in package fmt.
    // Dot syntax references fields of p.
    return fmt.Sprintf("(%d, %d)", p.x, p.y)
}

func learnInterfaces() {
    // Brace syntax is a "struct literal". It evaluates to an initialized
    // struct. The := syntax declares and initializes p to this struct.
    p := pair{3, 4}
    fmt.Println(p.String()) // Call String method of p, of type pair.
    var i Stringer          // Declare i of interface type Stringer.
    i = p                   // Valid because pair implements Stringer
    // Call String method of i, of type Stringer. Output same as above.
    fmt.Println(i.String())

    // Functions in the fmt package call the String method to ask an object
    // for a printable representation of itself.
    fmt.Println(p) // Output same as above. Println calls String method.
    fmt.Println(i) // Output same as above.

    learnVariadicParams("great", "learning", "here!")
}

// Functions can have variadic parameters.
func learnVariadicParams(myStrings ...interface{}) {
    // Iterate each value of the variadic.
    // The underbar here is ignoring the index argument of the array.
    for _, param := range myStrings {
        fmt.Println("param:", param)
    }

    // Pass variadic value as a variadic parameter.
    fmt.Println("params:", fmt.Sprintln(myStrings...))

    learnErrorHandling()
}

func learnErrorHandling() {
    // ", ok" idiom used to tell if something worked or not.
    m := map[int]string{3: "three", 4: "four"}
    if x, ok := m[1]; !ok { // ok will be false because 1 is not in the map.
        fmt.Println("no one there")
    } else {
        fmt.Print(x) // x would be the value, if it were in the map.
    }
    // An error value communicates not just "ok" but more about the problem.
    if _, err := strconv.Atoi("non-int"); err != nil { // _ discards value
        // prints 'strconv.ParseInt: parsing "non-int": invalid syntax'
        fmt.Println(err)
    }
    // We'll revisit interfaces a little later. Meanwhile,
    learnConcurrency()
}

// c is a channel, a concurrency-safe communication object.
func inc(i int, c chan int) {
    c <- i + 1 // <- is the "send" operator when a channel appears on the left.
}

// We'll use inc to increment some numbers concurrently.
func learnConcurrency() {
    // Same make function used earlier to make a slice. Make allocates and
    // initializes slices, maps, and channels.
    c := make(chan int)
    // Start three concurrent goroutines. Numbers will be incremented
    // concurrently, perhaps in parallel if the machine is capable and
    // properly configured. All three send to the same channel.
    go inc(0, c) // go is a statement that starts a new goroutine.
    go inc(10, c)
    go inc(-805, c)
    // Read three results from the channel and print them out.
    // There is no telling in what order the results will arrive!
    fmt.Println(<-c, <-c, <-c) // channel on right, <- is "receive" operator.

    cs := make(chan string)       // Another channel, this one handles strings.
    ccs := make(chan chan string) // A channel of string channels.
    go func() { c <- 84 }()       // Start a new goroutine just to send a value.
    go func() { cs <- "wordy" }() // Again, for cs this time.
    // Select has syntax like a switch statement but each case involves
    // a channel operation. It selects a case at random out of the cases
    // that are ready to communicate.
    select {
    case i := <-c: // The value received can be assigned to a variable,
        fmt.Printf("it's a %T", i)
    case <-cs: // or the value received can be discarded.
        fmt.Println("it's a string")
    case <-ccs: // Empty channel, not ready for communication.
        fmt.Println("didn't happen.")
    }
    // At this point a value was taken from either c or cs. One of the two
    // goroutines started above has completed, the other will remain blocked.

    learnWebProgramming() // Go does it. You want to do it too.
}

// A single function from package http starts a web server.
func learnWebProgramming() {

    // First parameter of ListenAndServe is TCP address to listen to.
    // Second parameter is an interface, specifically http.Handler.
    go func() {
        err := http.ListenAndServe(":8080", pair{})
        fmt.Println(err) // don't ignore errors
    }()

    requestServer()
}

// Make pair an http.Handler by implementing its only method, ServeHTTP.
func (p pair) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // Serve data with a method of http.ResponseWriter.
    w.Write([]byte("You learned Go in Y minutes!"))
}

func requestServer() {
    resp, err := http.Get("http://localhost:8080")
    fmt.Println(err)
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    fmt.Printf("\nWebserver said: `%s`", string(body))
}

#### Internals

__Goroutine__

Goroutines are cooperatively scheduled, rather than relying on the kernel to manage their time sharing.
The switch between goroutines only happens at well defined points, when an explicit call is made to the Go runtime scheduler.
The compiler knows the registers which are in use and saves them automatically.

Places where Goroutines may yield to others are:
* Channel send and receive operations, if those operations would block.
* The `go` statement, although there is no guarantee that new goroutine will be scheduled immediately.
* Blocking syscalls like file and network operations.
* After being stopped for a garbage collection cycle.

![gor](https://dave.cheney.net/wp-content/uploads/2014/06/Gocon-2014-35.jpg)

![block](https://dave.cheney.net/wp-content/uploads/2014/06/Gocon-2014-36.jpg)

![stack](https://dave.cheney.net/wp-content/uploads/2014/06/Gocon-2014-45.jpg)

__Scheduler__

Go scheduler uses the following terminology for goroutines, threads and processors:
* G: goroutine
* M: OS thread (machine)
* P: processor

![sched](https://rakyll.org/img/scheduler-concepts.png)

Each round of scheduling is simply finding a runnable goroutine and executing it. At each round of scheduling, the search happens in the following order:

In [None]:
runtime.schedule() {
    // only 1/61 of the time, check the global runnable queue for a G.
    // if not found, check the local queue.
    // if not found,
    //     try to steal from other Ps.
    //     if not, check the global runnable queue.
    //     if not found, poll network.
}

Once a runnable G is found, it is executed until it is blocked.

__Optimizations__

Inlining

In [None]:
package util
// Max returns the larger of a or b.
func Max(a, b int) int {
    if a > b {
        return a
    }
    return b
}

In [None]:
package main
import "util"
// Double returns twice the value of the larger of a or b.
func Double(a, b int) int { return 2 * util.Max(a, b) }

![inline](https://dave.cheney.net/wp-content/uploads/2014/06/Gocon-2014-19.jpg)

Dead code elimination

In [None]:
func Expensive() {
    if false {
        // something expensive is now unreachable
    }
}

Escape analysis
* determines whether any references to a value escape the function where the value is declared. 
* If no references escape, the value may be safely stored on the stack.
* Values stored in the stack do not need to be allocated or freed.

![escape](https://dave.cheney.net/wp-content/uploads/2014/06/Gocon-2014-27.jpg)

__Garbage collector__

![gc](https://talks.golang.org/2015/state-of-go-may/gc.png)

#### Examples

Echo server

In [None]:
package main

import (
    "fmt"
    "github.com/cmu440/p0"
    "time"
)

const defaultPort = 9999

func main() {
    // Initialize the server.
    server := p0.New()
    if server == nil {
        fmt.Println("New() returned a nil server. Exiting...")
        return
    }

    // Start the server and continue listening for client connections in the background.
    if err := server.Start(defaultPort); err != nil {
        fmt.Printf("MultiEchoServer could not be started: %s\n", err)
        return
    }

    fmt.Printf("Started MultiEchoServer on port %d...\n", defaultPort)

    for {
        fmt.Printf("num clients: %d\n", server.Count())
        time.Sleep(1 * time.Second)
    }
    server.Close()
}


In [None]:
// Implementation of a MultiEchoServer. Students should write their code in this file.

package p0

import (
    "bufio"
    "fmt"
    "net"
)

const (
    clientSendQueueLen = 100
)

type multiEchoServer struct {
    clients     map[string]*Client
    quitChan    chan chan bool
    newConnChan chan net.Conn
    recieveChan chan *Packet
}

type Client struct {
    conn        net.Conn
    recieveChan chan<- *Packet
    sendChan    chan string
    quitChan    chan bool
}

type Packet struct {
    from    net.Addr
    message string
    err     error
}

// New creates and returns (but does not start) a new MultiEchoServer.
func New() *MultiEchoServer {
    return &multiEchoServer{make(map[string]*Client), make(chan chan bool), make(chan net.Conn), make(chan *Packet)}
}

func recieve(conn net.Conn, recieveChan chan<- *Packet) {
    reader := bufio.NewReader(conn)
    for {
        msgBytes, err := reader.ReadBytes('\n')
        if err != nil {
            recieveChan <- &Packet{from: conn.RemoteAddr(), err: err}
            return
        }
        recieveChan <- &Packet{from: conn.RemoteAddr(), message: string(msgBytes)}
    }
}

func send(conn net.Conn, response string) {
    if _, err := conn.Write([]byte(response)); err != nil {
        return
    }
}

func handle(cli *Client) {
    go recieve(cli.conn, cli.recieveChan)
    for {
        select {
        case <-cli.quitChan:
            cli.conn.Close()
            return
        case m := <-cli.sendChan:
            send(cli.conn, m)
        }
    }
}

func accept(listener *net.TCPListener, newConnChan chan<- net.Conn) {
    for {
        conn, err := listener.Accept()
        if err != nil {
            fmt.Printf("Accept error: %s\n", err)
            return
        }
        newConnChan <- conn
    }
}

func (mes *multiEchoServer) Serve(listener *net.TCPListener) {
    go accept(listener, mes.newConnChan)
    for {
        select {
        case conn := <-mes.newConnChan:
            cli := &Client{conn, mes.recieveChan, make(chan string, clientSendQueueLen), make(chan bool)}
            mes.clients[conn.RemoteAddr().String()] = cli // it's a race here, but I don't mind actually
            go handle(cli)
        case packet := <-mes.recieveChan:
            if packet.err != nil {
                delete(mes.clients, packet.from.String())
                fmt.Printf("Connection read error: %s\n", packet.err)
                continue
            }
            for _, c := range mes.clients {
                select{
                case c.sendChan <- packet.message:
                    continue
                default:
                    continue
                }
            }
        case exitChan := <-mes.quitChan:
            listener.Close()
            for _, c := range mes.clients {
                c.quitChan <- true
                delete(mes.clients, c.conn.RemoteAddr().String())
            }
            exitChan <- true
            return
        }
    }
}

func (mes *multiEchoServer) Start(port int) error {
    var addr *net.TCPAddr = &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: port}
    listener, err := net.ListenTCP("tcp", addr)
    if err != nil {
        fmt.Printf("Listen error: %s\n", err)
        return err
    }
    go mes.Serve(listener)
    return nil
}

func (mes *multiEchoServer) Close() {
    exitChan := make(chan bool)
    mes.quitChan <- exitChan
    <-exitChan
}

func (mes *multiEchoServer) Count() int {
    return len(mes.clients)
}


Download files from Hadoop

In [None]:
package main

import (
    "bufio"
    "bytes"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "path"
    "runtime"
    "strings"
    "sync"
    "time"

    // ... skipped
    "go.uber.org/zap"
)

import _ "net/http/pprof"


// Config весь конфиг
type Config struct {
    Logging    *logging.Config `toml:"logging"`
    LocalRoot  string          `toml:"local-root"`
    RemoteRoot string          `toml:"remote-root"`
    UseMirrors bool            `toml:"use-hdfs-proxy-mirrors"`
    Workers    int             `toml:"workers"`
    Pprof      string          `toml:"pprof"`
    MaxCPU     int             `toml:"max-cpu"` // сколько ядер CPU можно использовать во время работы
}

func newConfig() *Config {
    loggingConfig := logging.NewConfig()
    loggingConfig.Logfile = "/var/log/user-data/user-data.log"

    return &Config{
        Logging:    loggingConfig,
        LocalRoot:  "/data/user_data_fix/",
        RemoteRoot: "http://hadoop.fake.domain.ru:4242/data/score/user_data/",
        Workers:    10,
        Pprof:      ":9494",
        MaxCPU:     1,
    }
}

// Структура для хранения задачи в процессе ее выполнения
type Job struct {
    sync.RWMutex
    wg         *sync.WaitGroup
    doneOnce   sync.Once
    Attempt    int
    Name       string
    URL        string
    SourceTime int64
    Error      error
    isSuccess  bool
    DataRoot   string
    logger     *zap.Logger
}

// Алиас к wg.Done. Нужно жать когда работа с задачей завершена
func (job *Job) Done() {
    job.doneOnce.Do(func() {
        job.wg.Done()
    })
}

// Проверка что задача завершена. Либо успешно выполнена, либо закончились попытки
func (job *Job) IsFinished() bool {
    job.RLock()
    defer job.RUnlock()

    if job.isSuccess {
        return true
    }

    if job.Attempt > 5 { // @TODO: Config?
        return true
    }

    return false
}

// Вызывается при неуспешном завершении.
// Инкрементит количество попыток и запоминает ошибку
func (job *Job) Fail(err error) {
    job.Lock()
    defer job.Unlock()

    job.Attempt += 1
    job.Error = err
}

// Проверяет нужно ли перекачивать файл
// Сравнивает mtime файла из хадупа с mtime локального файла
// updated/part-00000.gz-mtime
func (job *Job) IsUpdated() bool {
    p, _ := url.Parse(job.URL)
    _, file := path.Split(p.Path)

    mtimeFile := path.Join(job.DataRoot, "updated", fmt.Sprintf("%s-mtime", file))

    stat, err := os.Stat(mtimeFile)

    if err != nil {
        return false
    }

    if stat.ModTime().Unix() == job.SourceTime/1000 {
        return true
    }

    return false
}

// Помечает задачу как выполненную. При этом нужно создать файл с mtime
func (job *Job) Success() {
    job.Lock()
    defer job.Unlock()

    job.Error = nil
    job.isSuccess = true

    // Создает локальный файл с mtime, идентичным файлу из хадупа
    // Как маркер что скачалось и не нужно перекачивать
    p, _ := url.Parse(job.URL)
    _, file := path.Split(p.Path)

    updatedPath := path.Join(job.DataRoot, "updated")
    mtimeFile := path.Join(updatedPath, fmt.Sprintf("%s-mtime", file))

    if err := os.MkdirAll(updatedPath, 0755); err != nil {
        job.logger.Error("mkdir failed", zap.String("path", updatedPath), zap.Error(err))
        return
    }

    if err := ioutil.WriteFile(mtimeFile, []byte{}, 0644); err != nil {
        job.logger.Error("write mtime_file failed", zap.String("filename", mtimeFile), zap.Error(err))
        return
    }

    mtime := time.Unix(job.SourceTime/1000, 0)
    if err := os.Chtimes(mtimeFile, mtime, mtime); err != nil {
        job.logger.Error("chtime failed", zap.String("filename", mtimeFile), zap.Error(err))
        return
    }
}

// Пишет чанк на диск. Из ключа строит имя файла, а данные берет из буфера
func writeChunk(dataRoot, key string, buf *bytes.Buffer) error {
    defer buf.Reset()

    if key == "" {
        return nil
    }

    data := buf.Bytes()

    if len(data) == 0 {
        return nil
    }

    dirname := path.Join(dataRoot, key[:1], key[1:3])
    filename := path.Join(dirname, fmt.Sprintf("%s.lz4", key[3:]))
    tmpFilename := path.Join(dirname, fmt.Sprintf(".%s.lz4", key[3:]))

    var err error

    if err = os.MkdirAll(dirname, 0755); err != nil {
        return err
    }

    f, err := rbfile.New(tmpFilename, "w9")
    if err != nil {
        return err
    }

    if _, err = f.Write(data); err != nil {
        f.Close()
        return err
    }

    f.Close()

    if err = os.Rename(tmpFilename, filename); err != nil {
        return err
    }

    return nil
}


func dropLineParts(line []byte) []byte {
    sep := []byte("\t")
    lineParts := bytes.Split(line, sep)
    if len(lineParts) == 3 {
        return line
    } else if len(lineParts) >= 6 {
        // exclude vid uid
        if bytes.HasPrefix(lineParts[1], []byte("vid=")) {
            return []byte{}
        }
        return bytes.Join([][]byte{lineParts[0], lineParts[4], lineParts[5], lineParts[6]}, sep)
    }
    return []byte{}
}

// Обрабатывает файл, разрезая его на чанки
func handleFile(dataRoot, filename string) error {
    chunkSize := 64 * 1024 * 1024

    file, err := rbfile.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := bufio.NewReaderSize(file, chunkSize)

    var buf bytes.Buffer

    var key, prevKey string
    var line []byte

    for {
        line, _, err = reader.ReadLine()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        line = dropLineParts(line)
        if len(line) == 0 {
            continue
        }

        key = string(line[:5])
        if key != prevKey {
            if err = writeChunk(dataRoot, prevKey, &buf); err != nil {
                return err
            }
        }
        prevKey = key

        if buf.Len() > 0 {
            buf.WriteString("\n")
        }
        buf.Write(line[5:])
    }

    if err = writeChunk(dataRoot, prevKey, &buf); err != nil {
        return err
    }

    return nil
}

// Горутина-воркер. Берет из queue задачу и пытается обработать.
// В результате либо завершает ее, либо кладет в конец очереди на повторную обработку
func worker(queue chan *Job, exit chan bool) {
    for {
        select {
        case job := <-queue:
            job.logger.Info("start")
            if err := handleFile(job.DataRoot, job.URL); err != nil {
                job.Fail(err)
                job.logger.Info("fail", zap.Int("attempt", job.Attempt), zap.Error(err))
            } else {
                job.Success()
                job.logger.Info("success")
            }

            if job.IsFinished() {
                job.Done()
            } else {
                queue <- job
            }
        case <-exit:
            return
        }
    }
}

// Получает список файлов из хадуповской HTTPFS. Генерит из него задачи
func makeJobs(dataRoot, remoteUrl string, useMirrors bool) ([]*Job, error) {
    u, _ := url.Parse(remoteUrl)
    hadoopRoot := u.Path
    u.Path = ""
    u.RawQuery = ""
    fs := httpfs.New(u.String(), time.Minute).Path(hadoopRoot)

    files, err := fs.List()

    var wg sync.WaitGroup
    jobs := make([]*Job, 0)

    if err != nil {
        return jobs, err
    }

    for _, file := range files {
        if !strings.HasPrefix(file.Name, "part-") {
            continue
        }

        fileURL := file.OpenURL()
        if useMirrors {
            fileURL = file.MirrorURL()
        }
        newJob := &Job{
            wg:         &wg,
            DataRoot:   dataRoot,
            Name:       file.Name,
            URL:        fileURL,
            SourceTime: file.ModificationTime,
            logger:     zap.L().With(zap.String("url", fileURL), zap.Int64("mtime", file.ModificationTime)),
        }

        // Пропустить файлы, которые не нужно качать так как они и так последней версии
        if newJob.IsUpdated() {
            continue
        }

        jobs = append(jobs, newJob)
        wg.Add(1)
    }

    return jobs, nil
}

func main() {
    flag.Parse()

    cfg := newConfig()
    if err := config.Parse(cfg); err != nil {
        log.Fatal(err.Error())
    }

    runtime.GOMAXPROCS(cfg.MaxCPU)

    if err := logging.SetConfig(cfg.Logging); err != nil {
        log.Fatal(err.Error())
    }

    if cfg.Pprof != "" {
        go func() {
            log.Fatal(http.ListenAndServe(cfg.Pprof, nil))
        }()
    }

    // Составляет список задач
    jobs, err := makeJobs(cfg.LocalRoot, cfg.RemoteRoot, cfg.UseMirrors)

    if err != nil {
        logging.Error(err.Error())
        return
    }

    if jobs == nil || len(jobs) == 0 {
        logging.Info("No new files in %s", cfg.RemoteRoot)
        return
    }

    wg := jobs[0].wg

    // делаем очередь задач из массива
    queue := make(chan *Job, len(jobs))

    for _, job := range jobs {
        queue <- job
    }

    workerExit := make(chan bool)
    defer close(workerExit)

    for i := 0; i < cfg.Workers; i++ {
        go func() {
            worker(queue, workerExit)
        }()
    }

    // ожидать завершение всех воркеров
    wg.Wait()
}


### References

* https://golang.org/doc/code.html
* https://tour.golang.org/welcome/1
* https://golang.org/doc/effective_go.html
* https://dave.cheney.net/2014/03/19/channel-axioms
* http://www.golang-book.com/
* http://divan.github.io/posts/go_concurrency_visualize/
* https://making.pusher.com/golangs-real-time-gc-in-theory-and-practice/
* https://habrahabr.ru/post/259967/
* https://roberto.selbach.ca/intro-to-go-modules/
* http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/index.html
* http://www.jtolds.com/writing/2016/03/go-channels-are-bad-and-you-should-feel-bad/
* https://github.com/ardanlabs/gotraining
* http://dmitryvorobev.blogspot.com/2016/08/golang-channels-implementation.html
* https://gist.github.com/agalitsyn/813c34591c794eb761e4d97d7d31624e
* https://blog.plan99.net/modern-garbage-collection-911ef4f8bd8e
* https://rakyll.org/scheduler/

### Summary

* Go is cool and shiny!

![intern](https://secure.meetupstatic.com/photos/event/7/9/b/8/highres_450331160.jpeg)

## Memory

Fitting more into a machine’s RAM means fewer machines to manage, and it gives you a route to planning capacity for larger projects. Knowing why RAM gets eaten up and considering more efficient ways to use this scarce resource will help you deal with scaling issues.

#### Consumption

Primitive object are expensive

In [4]:
%load_ext memory_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


In [7]:
%memit [0]*int(10**8)

peak memory: 789.94 MiB, increment: 765.55 MiB


In [8]:
%memit

peak memory: 47.78 MiB, increment: 0.04 MiB


In [9]:
%memit range(int(10**8))

peak memory: 957.55 MiB, increment: 909.76 MiB


The array module efficiently stores primitive types like integers, floats, and characters, but not complex numbers or classes. It creates a contiguous block of RAM to hold the underlying data.

In [2]:
import array
%memit array.array('l', xrange(int(1e8)))

peak memory: 373.14 MiB, increment: 331.29 MiB


Note that the unique numbers in the array are not Python objects; they are bytes in the array. If we were to dereference any of them, then a new Python int object would be constructed.

Be aware that the byte size is platform-dependent

In [11]:
import numpy as np
%memit arr = np.zeros(10**8, np.uint64)
print arr.size
print arr.nbytes
print arr.nbytes/arr.size
print arr.itemsize

peak memory: 40.31 MiB, increment: 0.01 MiB
100000000
800000000
8
8


Python’s `sys.getsizeof(obj)` call will tell us something about the memory used by an object 

In [12]:
import sys
print sys.getsizeof(int())
print sys.getsizeof(1)
n = sys.maxint+1
sys.getsizeof(n)

24
24


36

In [13]:
print sys.getsizeof(b"")
print sys.getsizeof(b"a")
print sys.getsizeof(b"ab")

37
38
39


In [14]:
print sys.getsizeof([])
print sys.getsizeof([1])
print sys.getsizeof([1,2])

72
80
88


In [15]:
print sys.getsizeof([b""])
print sys.getsizeof([b"abcdefghijklm"])

80
80


`getsizeof` only reports some of the cost, and often just for the parent object. It also isn’t always implemented, so it can have limited usefulness.

#### Text lookup

In [None]:
import codecs

# "Moby Words lists by Grady Ward"
# http://www.gutenberg.org/ebooks/3201
SUMMARISED_FILE = "all_unique_words.txt"  # 500k approx
CODEC = 'Windows-1252'



def read_words(filename):
    # return words from filename using a generator
    try:
        with codecs.open(filename, 'r', CODEC) as f:
            for line_nbr, line in enumerate(f):
                items = line.strip().split()
                for item in items:
                    yield item
    except UnicodeDecodeError:
        print "UnicodeDecodeError for {} near line {} and word {}".format(filename, line_nbr, line)

readers = read_words(SUMMARISED_FILE)

In [None]:
import time
import timeit
import memory_profiler
import bisect


def index(a, x):
    'Locate the leftmost value exactly equal to x'
    i = bisect.bisect_left(a, x)
    if i != len(a) and a[i] == x:
        return i
    raise ValueError


if __name__ == "__main__":
    print "RAM at start {:0.1f}MiB".format(memory_profiler.memory_usage()[0])
    t1 = time.time()
    words = [w for w in text_example.readers]
    print "Loading {} words".format(len(words))
    t2 = time.time()
    print "RAM after creating list {:0.1f}MiB, took {:0.1f}s".format(memory_profiler.memory_usage()[0], t2 - t1)
    print "The list contains {} words".format(len(words))
    words.sort()
    t3 = time.time()
    print "Sorting list took {:0.1f}s".format(t3 - t2)

    assert u'Zwiebel' in words
    time_cost = sum(timeit.repeat(stmt="index(words, u'Zwiebel')",
                                  setup="from __main__ import words, index",
                                  number=1,
                                  repeat=10000))
    print "Summed time to lookup word {:0.4f}s".format(time_cost)

In [None]:
import marisa_trie


print "RAM at start {:0.1f}MiB".format(memory_profiler.memory_usage()[0])
# avoid building a temporary list of words in Python, store directly in the trie
t1 = time.time()
words_trie = marisa_trie.Trie(text_example.readers)
t2 = time.time()
print "RAM after creating trie {:0.1f}MiB, took {:0.1f}s".format(memory_profiler.memory_usage()[0], t2 - t1)
print "The trie contains {} words".format(len(words_trie))

assert u'Zwiebel' in words_trie
time_cost = sum(timeit.repeat(stmt="u'Zwiebel' in words_trie",
                              setup="from __main__ import words_trie",
                              number=1,
                              repeat=10000))
print "Summed time to lookup word {:0.4f}s".format(time_cost)

#### Probabilistic datastructures

Bloom filter

![bloom](https://upload.wikimedia.org/wikipedia/commons/thumb/a/ac/Bloom_filter.svg/1280px-Bloom_filter.svg.png)

In [1]:
import bitarray
import math
import mmh3


class BloomFilter(object):

    def __init__(self, capacity, error=0.005):
        """
        Initialize a bloom filter with given capacity and false positive rate
        """
        self.capacity = capacity
        self.error = error
        self.num_bits = int(-capacity * math.log(error) / math.log(2) ** 2) + 1
        self.num_hashes = int(
            self.num_bits * math.log(2) / float(capacity)) + 1
        self.data = bitarray.bitarray(self.num_bits)

    def _indexes(self, key):
        h1, h2 = mmh3.hash64(key)
        for i in xrange(self.num_hashes):
            yield (h1 + i * h2) % self.num_bits

    def add(self, key):
        for index in self._indexes(key):
            self.data[index] = True

    def __contains__(self, key):
        return all(self.data[index] for index in self._indexes(key))

    def __len__(self):
        num_bits_on = self.data.count(True)
        return -1.0 * self.num_bits * \
            math.log(1.0 - num_bits_on / float(self.num_bits)) / \
            float(self.num_hashes)

    @staticmethod
    def union(bloom_a, bloom_b):
        assert bloom_a.capacity == bloom_b.capacity, "Capacities must be equal"
        assert bloom_a.error == bloom_b.error, "Error rates must be equal"

        bloom_union = BloomFilter(bloom_a.capacity, bloom_a.error)
        bloom_union.data = bloom_a.data | bloom_b.data
        return bloom_union

In [22]:
class ScalingBloomFilter(object):

    def __init__(self, capacity, error=0.005, max_fill=0.8,
                 error_tightening_ratio=0.5):
        self.capacity = capacity
        self.base_error = error
        self.max_fill = max_fill
        self.items_until_scale = int(capacity * max_fill)
        self.error_tightening_ratio = error_tightening_ratio
        self.bloom_filters = []
        self.current_bloom = None
        self._add_bloom()

    def _add_bloom(self):
        new_error = self.base_error * \
            self.error_tightening_ratio ** len(self.bloom_filters)
        new_bloom = BloomFilter(self.capacity, new_error)
        self.bloom_filters.append(new_bloom)
        self.current_bloom = new_bloom
        return new_bloom

    def add(self, key):
        if key in self:
            return True
        self.current_bloom.add(key)
        self.items_until_scale -= 1
        if self.items_until_scale == 0:
            bloom_size = len(self.current_bloom)
            bloom_max_capacity = int(
                self.current_bloom.capacity * self.max_fill)

            # We may have been adding many duplicate values into the bloom, so
            # we need to check if we actually need to scale or if we still have
            # space
            if bloom_size >= bloom_max_capacity:
                self._add_bloom()
                self.items_until_scale = bloom_max_capacity
            else:
                self.items_until_scale = int(bloom_max_capacity - bloom_size)
        return False

    def __contains__(self, key):
        return any(key in bloom for bloom in self.bloom_filters)

    def __len__(self):
        return sum(len(bloom) for bloom in self.bloom_filters)

One caveat with this is that you can only take the union of two Blooms with the same capacity and error rate. Furthermore, the final Bloom’s used capacity can be as high as the sum of the used capacities of the two Blooms unioned to make it.

In [2]:
bloom = BloomFilter(100)
for i in xrange(50):
    bloom.add(str(i))
print "20" in bloom
print "25" in bloom
print "51" in bloom

True
True
False


In [3]:
num_false_positives = 0
num_true_negatives = 0
for i in xrange(51, 10000):
    if str(i) in bloom:
        num_false_positives += 1
    else:
        num_true_negatives += 1

print num_false_positives
print num_true_negatives
print num_false_positives / float(10000 - 51)
print bloom.error

11
9938
0.00110563875766
0.005


LogLog

![log](https://image.slidesharecdn.com/hlldemo-160623175742/95/hyperloglog-project-6-638.jpg?cb=1467924030)

In [6]:
def trailing_zeros(number):
    """
    Returns the 1-based index of the first bit set to 1 from the right side of a
    32bit integer
    >>> trailing_zeros(0)
    32
    >>> trailing_zeros(0b1000)
    4
    >>> trailing_zeros(0b10000000)
    8
    """
    if not number:
        return 32
    index = 0
    while (number >> index) & 1 == 0:
        index += 1
    return index + 1


class LLRegister(object):
    counter = 0

    def add(self, item):
        item_hash = mmh3.hash(str(item))
        return self._add(item_hash)

    def _add(self, item_hash):
        bit_index = trailing_zeros(item_hash)
        if bit_index > self.counter:
            self.counter = bit_index

    def __len__(self):
        return 2 ** self.counter

In [8]:
class LL(object):

    def __init__(self, p):
        self.p = p
        self.num_registers = 2 ** p
        self.registers = [LLRegister() for i in xrange(int(2 ** p))]
        self.alpha = 0.7213 / (1.0 + 1.079 / self.num_registers)

    def add(self, item):
        item_hash = mmh3.hash(str(item))
        register_index = item_hash & (self.num_registers - 1)
        register_hash = item_hash >> self.p
        self.registers[register_index]._add(register_hash)

    def __len__(self):
        register_sum = sum(h.counter for h in self.registers)
        return 2 ** (float(register_sum) / self.num_registers) * \
            self.num_registers * self.alpha

In [9]:
import math


class HyperLogLog(LL):

    def __len__(self):
        indicator = sum(2 ** -m.counter for m in self.registers)
        E = self.alpha * (self.num_registers ** 2) / float(indicator)

        if E <= 5.0 / 2.0 * self.num_registers:
            V = sum(1 for m in self.registers if m.counter == 0)
            if V != 0:
                Estar = self.num_registers * \
                    math.log(self.num_registers / (1.0 * V), 2)
            else:
                Estar = E
        else:
            if E <= 2 ** 32 / 30.0:
                Estar = E
            else:
                Estar = -2 ** 32 * math.log(1 - E / 2 ** 32, 2)
        return Estar


hll = HyperLogLog(8)
for i in xrange(100000):
    hll.add(mmh3.hash(str(i)))
print len(hll)

99907


### References

* http://code.activestate.com/recipes/546530/
* https://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/
* https://blog.acolyer.org/2016/03/17/hyperloglog-in-practice-algorithmic-engineering-of-a-state-of-the-art-cardinality-estimation-algorithm/

### Summary

* this datastructures could be quite useful at higload setup