Skip to content

Commit

Permalink
Merge branch 'shard'
Browse files Browse the repository at this point in the history
  • Loading branch information
kaz committed Aug 21, 2021
2 parents 3783de9 + de82284 commit fda74ca
Showing 1 changed file with 57 additions and 72 deletions.
129 changes: 57 additions & 72 deletions app/webapp/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ const (
)

var (
db *sqlx.DB
db *sqlx.DB

dbHosts = []string{
"isucondition-1.t.isucon.dev",
"isucondition-2.t.isucon.dev",
"isucondition-3.t.isucon.dev",
}
dbShard []*sqlx.DB

sessionStore sessions.Store
mySQLConnectionData *MySQLConnectionEnv

Expand Down Expand Up @@ -181,9 +189,9 @@ func getEnv(key string, defaultValue string) string {
return defaultValue
}

func NewMySQLConnectionEnv() *MySQLConnectionEnv {
func NewMySQLConnectionEnv(host string) *MySQLConnectionEnv {
return &MySQLConnectionEnv{
Host: getEnv("MYSQL_HOST", "127.0.0.1"),
Host: host,
Port: getEnv("MYSQL_PORT", "3306"),
User: getEnv("MYSQL_USER", "isucon"),
DBName: getEnv("MYSQL_DBNAME", "isucondition"),
Expand Down Expand Up @@ -215,9 +223,8 @@ func main() {
go standalone.Integrate(":8888")

e := echo.New()
echoInt.Integrate(e)

e.Use(middleware.Logger())
echoInt.EnableDebugHandler(e)
e.Use(middleware.Recover())

e.POST("/initialize", postInitialize)
Expand All @@ -242,17 +249,20 @@ func main() {
e.GET("/register", getIndex)
e.Static("/assets", frontendContentsPath+"/assets")

mySQLConnectionData = NewMySQLConnectionEnv()
dbShard = make([]*sqlx.DB, len(dbHosts))

var err error
db, err = mySQLConnectionData.ConnectDB()
if err != nil {
e.Logger.Fatalf("failed to connect db: %v", err)
return
for i := range dbShard {
var err error
dbShard[i], err = NewMySQLConnectionEnv(dbHosts[i]).ConnectDB()
if err != nil {
e.Logger.Fatalf("failed to connect db: %v", err)
return
}
dbShard[i].SetMaxOpenConns(100)
dbShard[i].SetMaxIdleConns(100)
defer dbShard[i].Close()
}
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(100)
defer db.Close()
db = dbShard[0]

postIsuConditionTargetBaseURL = os.Getenv("POST_ISUCONDITION_TARGET_BASE_URL")
if postIsuConditionTargetBaseURL == "" {
Expand All @@ -264,6 +274,11 @@ func main() {
e.Logger.Fatal(e.Start(serverPort))
}

func selectDB(id string) *sqlx.DB {
selected := int([]byte(id)[0]) % len(dbShard)
return dbShard[selected]
}

func getSession(r *http.Request) (*sessions.Session, error) {
session, err := sessionStore.Get(r, sessionName)
if err != nil {
Expand Down Expand Up @@ -462,15 +477,8 @@ func getIsuList(c echo.Context) error {
return c.NoContent(http.StatusInternalServerError)
}

tx, err := db.Beginx()
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}
defer tx.Rollback()

isuList := []Isu{}
err = tx.Select(
err = db.Select(
&isuList,
"SELECT * FROM `isu` WHERE `jia_user_id` = ? ORDER BY `id` DESC",
jiaUserID)
Expand All @@ -481,9 +489,11 @@ func getIsuList(c echo.Context) error {

responseList := []GetIsuListResponse{}
for _, isu := range isuList {
sdb := selectDB(isu.JIAIsuUUID)

var lastCondition IsuCondition
foundLastCondition := true
err = tx.Get(&lastCondition, "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` DESC LIMIT 1",
err = sdb.Get(&lastCondition, "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` DESC LIMIT 1",
isu.JIAIsuUUID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand Down Expand Up @@ -522,12 +532,6 @@ func getIsuList(c echo.Context) error {
responseList = append(responseList, res)
}

err = tx.Commit()
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}

return c.JSON(http.StatusOK, responseList)
}

Expand Down Expand Up @@ -749,15 +753,8 @@ func getIsuGraph(c echo.Context) error {
}
date := time.Unix(datetimeInt64, 0).Truncate(time.Hour)

tx, err := db.Beginx()
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}
defer tx.Rollback()

var count int
err = tx.Get(&count, "SELECT COUNT(*) FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
err = db.Get(&count, "SELECT COUNT(*) FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
jiaUserID, jiaIsuUUID)
if err != nil {
c.Logger().Errorf("db error: %v", err)
Expand All @@ -767,30 +764,26 @@ func getIsuGraph(c echo.Context) error {
return c.String(http.StatusNotFound, "not found: isu")
}

res, err := generateIsuGraphResponse(tx, jiaIsuUUID, date)
sdb := selectDB(jiaIsuUUID)
res, err := generateIsuGraphResponse(sdb, jiaIsuUUID, date)
if err != nil {
c.Logger().Error(err)
return c.NoContent(http.StatusInternalServerError)
}

err = tx.Commit()
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}

return c.JSON(http.StatusOK, res)
}

// グラフのデータ点を一日分生成
func generateIsuGraphResponse(tx *sqlx.Tx, jiaIsuUUID string, graphDate time.Time) ([]GraphResponse, error) {
func generateIsuGraphResponse(q sqlx.Queryer, jiaIsuUUID string, graphDate time.Time) ([]GraphResponse, error) {
dataPoints := []GraphDataPointWithInfo{}
conditionsInThisHour := []IsuCondition{}
timestampsInThisHour := []int64{}
var startTimeInThisHour time.Time
var condition IsuCondition

rows, err := tx.Queryx("SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` ASC", jiaIsuUUID)
// ここのQueryerはsdb相当(のはず)
rows, err := q.Queryx("SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` ASC", jiaIsuUUID)
if err != nil {
return nil, fmt.Errorf("db error: %v", err)
}
Expand Down Expand Up @@ -1017,15 +1010,16 @@ func getIsuConditionsFromDB(db *sqlx.DB, jiaIsuUUID string, endTime time.Time, c
conditions := []IsuCondition{}
var err error

sdb := selectDB(jiaIsuUUID)
if startTime.IsZero() {
err = db.Select(&conditions,
err = sdb.Select(&conditions,
"SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ?"+
" AND `timestamp` < ?"+
" ORDER BY `timestamp` DESC",
jiaIsuUUID, endTime,
)
} else {
err = db.Select(&conditions,
err = sdb.Select(&conditions,
"SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ?"+
" AND `timestamp` < ?"+
" AND ? <= `timestamp`"+
Expand Down Expand Up @@ -1085,21 +1079,22 @@ func calculateConditionLevel(condition string) (string, error) {
}

var (
trendCache = []TrendResponse{}
trendCache = []TrendResponse{}
trendCacheMux = sync.RWMutex{}
)

// trendのアップデート
func trendUpdater() {
ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ticker.C:
updateTrend()
case <-ticker.C:
updateTrend()
}
}
}

func updateTrend(){
func updateTrend() {
characterList := []Isu{}
err := db.Select(&characterList, "SELECT `character` FROM `isu` GROUP BY `character`")
if err != nil {
Expand All @@ -1124,8 +1119,10 @@ func updateTrend(){
characterWarningIsuConditions := []*TrendCondition{}
characterCriticalIsuConditions := []*TrendCondition{}
for _, isu := range isuList {
sdb := selectDB(isu.JIAIsuUUID)

conditions := []IsuCondition{}
err = db.Select(&conditions,
err = sdb.Select(&conditions,
"SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC",
isu.JIAIsuUUID,
)
Expand Down Expand Up @@ -1211,15 +1208,8 @@ func postIsuCondition(c echo.Context) error {
return c.String(http.StatusBadRequest, "bad request body")
}

tx, err := db.Beginx()
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}
defer tx.Rollback()

var count int
err = tx.Get(&count, "SELECT COUNT(*) FROM `isu` WHERE `jia_isu_uuid` = ?", jiaIsuUUID)
err = db.Get(&count, "SELECT COUNT(*) FROM `isu` WHERE `jia_isu_uuid` = ?", jiaIsuUUID)
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
Expand All @@ -1229,7 +1219,8 @@ func postIsuCondition(c echo.Context) error {
}

query := make([]string, 0, len(req))
params := make([]interface{}, 0, len(req) * 5)
params := make([]interface{}, 0, len(req)*5)

for _, cond := range req {
timestamp := time.Unix(cond.Timestamp, 0)

Expand All @@ -1240,20 +1231,14 @@ func postIsuCondition(c echo.Context) error {
params = append(params, jiaIsuUUID, timestamp, cond.IsSitting, cond.Condition, cond.Message)
}

_, err = tx.Exec(
sdb := selectDB(jiaIsuUUID)
_, err = sdb.Exec(
"INSERT INTO `isu_condition`"+
" (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `message`)"+
" VALUES "+
strings.Join(query, ","),
params...,
)
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
}


err = tx.Commit()
params...,
)
if err != nil {
c.Logger().Errorf("db error: %v", err)
return c.NoContent(http.StatusInternalServerError)
Expand Down

0 comments on commit fda74ca

Please sign in to comment.