Skip to content

Commit

Permalink
whatsgo
Browse files Browse the repository at this point in the history
  • Loading branch information
javahongxi committed Sep 30, 2019
1 parent a7e7d68 commit ad5bd11
Show file tree
Hide file tree
Showing 73 changed files with 6,393 additions and 0 deletions.
15 changes: 15 additions & 0 deletions crawler/config/config.go
@@ -0,0 +1,15 @@
package config

const (
// Parser names
ParseCity = "ParseCity"
ParseCityList = "ParseCityList"
ParseProfile = "ParseProfile"
NilParser = "NilParser"

// ElasticSearch
ElasticIndex = "dating_profile"

// Rate limiting
Qps = 20
)
82 changes: 82 additions & 0 deletions crawler/engine/concurrent.go
@@ -0,0 +1,82 @@
package engine

type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
ItemChan chan Item
RequestProcessor Processor
}

type Processor func(Request) (ParseResult, error)

type Scheduler interface {
ReadyNotifier
Submit(Request)
WorkerChan() chan Request
Run()
}

type ReadyNotifier interface {
WorkerReady(chan Request)
}

func (e *ConcurrentEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()

for i := 0; i < e.WorkerCount; i++ {
e.createWorker(e.Scheduler.WorkerChan(),
out, e.Scheduler)
}

for _, r := range seeds {
if isDuplicate(r.Url) {
continue
}
e.Scheduler.Submit(r)
}

for {
result := <-out
for _, item := range result.Items {
go func(i Item) {
e.ItemChan <- i
}(item)
}

for _, request := range result.Requests {
if isDuplicate(request.Url) {
continue
}
e.Scheduler.Submit(request)
}
}
}

func (e *ConcurrentEngine) createWorker(
in chan Request,
out chan ParseResult, ready ReadyNotifier) {
go func() {
for {
ready.WorkerReady(in)
request := <-in
result, err := e.RequestProcessor(
request)
if err != nil {
continue
}
out <- result
}
}()
}

var visitedUrls = make(map[string]bool)

func isDuplicate(url string) bool {
if visitedUrls[url] {
return true
}

visitedUrls[url] = true
return false
}
31 changes: 31 additions & 0 deletions crawler/engine/simple.go
@@ -0,0 +1,31 @@
package engine

import (
"log"
)

type SimpleEngine struct{}

func (e SimpleEngine) Run(seeds ...Request) {
var requests []Request
for _, r := range seeds {
requests = append(requests, r)
}

for len(requests) > 0 {
r := requests[0]
requests = requests[1:]

parseResult, err := Worker(r)
if err != nil {
continue
}

requests = append(requests,
parseResult.Requests...)

for _, item := range parseResult.Items {
log.Printf("Got item: %v", item)
}
}
}
63 changes: 63 additions & 0 deletions crawler/engine/types.go
@@ -0,0 +1,63 @@
package engine

import "github.com/javahongxi/whatsgo/crawler/config"

type ParserFunc func(
contents []byte, url string) ParseResult

type Parser interface {
Parse(contents []byte, url string) ParseResult
Serialize() (name string, args interface{})
}

type Request struct {
Url string
Parser Parser
}

type ParseResult struct {
Requests []Request
Items []Item
}

type Item struct {
Url string
Type string
Id string
Payload interface{}
}

type NilParser struct{}

func (NilParser) Parse(
_ []byte, _ string) ParseResult {
return ParseResult{}
}

func (NilParser) Serialize() (
name string, args interface{}) {
return config.NilParser, nil
}

type FuncParser struct {
parser ParserFunc
name string
}

func (f *FuncParser) Parse(
contents []byte, url string) ParseResult {
return f.parser(contents, url)
}

func (f *FuncParser) Serialize() (
name string, args interface{}) {
return f.name, nil
}

func NewFuncParser(
p ParserFunc, name string) *FuncParser {
return &FuncParser{
parser: p,
name: name,
}
}
19 changes: 19 additions & 0 deletions crawler/engine/worker.go
@@ -0,0 +1,19 @@
package engine

import (
"log"

"github.com/javahongxi/whatsgo/crawler/fetcher"
)

func Worker(r Request) (ParseResult, error) {
body, err := fetcher.Fetch(r.Url)
if err != nil {
log.Printf("Fetcher: error "+
"fetching url %s: %v",
r.Url, err)
return ParseResult{}, err
}

return r.Parser.Parse(body, r.Url), nil
}
64 changes: 64 additions & 0 deletions crawler/fetcher/fetcher.go
@@ -0,0 +1,64 @@
package fetcher

import (
"bufio"
"fmt"
"io/ioutil"
"net/http"

"log"

"time"

"github.com/javahongxi/whatsgo/crawler/config"
"golang.org/x/net/html/charset"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)

var (
rateLimiter = time.Tick(
time.Second / config.Qps)
verboseLogging = false
)

func SetVerboseLogging() {
verboseLogging = true
}

func Fetch(url string) ([]byte, error) {
<-rateLimiter
if verboseLogging {
log.Printf("Fetching url %s", url)
}
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil,
fmt.Errorf("wrong status code: %d",
resp.StatusCode)
}

bodyReader := bufio.NewReader(resp.Body)
e := determineEncoding(bodyReader)
utf8Reader := transform.NewReader(bodyReader,
e.NewDecoder())
return ioutil.ReadAll(utf8Reader)
}

func determineEncoding(
r *bufio.Reader) encoding.Encoding {
bytes, err := r.Peek(1024)
if err != nil {
log.Printf("Fetcher error: %v", err)
return unicode.UTF8
}
e, _, _ := charset.DetermineEncoding(
bytes, "")
return e
}
105 changes: 105 additions & 0 deletions crawler/frontend/controller/searchresult.go
@@ -0,0 +1,105 @@
package controller

import (
"net/http"
"strconv"
"strings"

"context"
"reflect"

"regexp"

"github.com/javahongxi/whatsgo/crawler/config"
"github.com/javahongxi/whatsgo/crawler/engine"
"github.com/javahongxi/whatsgo/crawler/frontend/model"
"github.com/javahongxi/whatsgo/crawler/frontend/view"
"gopkg.in/olivere/elastic.v5"
)

type SearchResultHandler struct {
view view.SearchResultView
client *elastic.Client
}

func CreateSearchResultHandler(
template string) SearchResultHandler {
client, err := elastic.NewClient(
elastic.SetSniff(false))
if err != nil {
panic(err)
}

return SearchResultHandler{
view: view.CreateSearchResultView(
template),
client: client,
}
}

func (h SearchResultHandler) ServeHTTP(
w http.ResponseWriter, req *http.Request) {
q := strings.TrimSpace(req.FormValue("q"))

from, err := strconv.Atoi(
req.FormValue("from"))
if err != nil {
from = 0
}

page, err := h.getSearchResult(q, from)
if err != nil {
http.Error(w, err.Error(),
http.StatusBadRequest)
return
}

err = h.view.Render(w, page)
if err != nil {
http.Error(w, err.Error(),
http.StatusBadRequest)
return
}
}

const pageSize = 10

func (h SearchResultHandler) getSearchResult(
q string, from int) (model.SearchResult, error) {
var result model.SearchResult
result.Query = q

resp, err := h.client.
Search(config.ElasticIndex).
Query(elastic.NewQueryStringQuery(
rewriteQueryString(q))).
From(from).
Do(context.Background())

if err != nil {
return result, err
}

result.Hits = resp.TotalHits()
result.Start = from
result.Items = resp.Each(
reflect.TypeOf(engine.Item{}))
if result.Start == 0 {
result.PrevFrom = -1
} else {
result.PrevFrom =
(result.Start - 1) /
pageSize * pageSize
}
result.NextFrom =
result.Start + len(result.Items)

return result, nil
}

// Rewrites query string. Replaces field names
// like "Age" to "Payload.Age"
func rewriteQueryString(q string) string {
re := regexp.MustCompile(`([A-Z][a-z]*):`)
return re.ReplaceAllString(q, "Payload.$1:")
}
10 changes: 10 additions & 0 deletions crawler/frontend/model/page.go
@@ -0,0 +1,10 @@
package model

type SearchResult struct {
Hits int64
Start int
Query string
PrevFrom int
NextFrom int
Items []interface{}
}

0 comments on commit ad5bd11

Please sign in to comment.