Skip to content

Commit f37dad5

Browse files
Rewrite Lambdas in Go and use Coolify for Elasticsearch
BREAKING CHANGE: Complete rewrite for simplicity Changes: - Rewrite webhook-receiver in Go (was Kotlin) - Rewrite es-indexer-worker in Go (was Kotlin) - Remove all Elasticsearch/OpenSearch from Terraform - Elasticsearch deployed on Coolify instead - Much simpler: ~150 lines of Go vs ~500 lines of Kotlin - Faster cold starts: Go ~100ms vs Java ~2-3s - Smaller memory: 128-256MB vs 512-1024MB - Even cheaper: ~/bin/zsh.50/month vs ~/month AWS Infrastructure (Terraform): - SQS queues only - 2 Go Lambda functions - API Gateway for webhook endpoint Coolify (manual): - Elasticsearch container - Persistent volume - See README for setup Total cost: </month AWS + /bin/zsh Coolify (existing)
1 parent 8377a81 commit f37dad5

File tree

16 files changed

+330
-881
lines changed

16 files changed

+330
-881
lines changed

.github/workflows/deploy.yaml

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,24 @@ jobs:
2020
- name: Checkout repository
2121
uses: actions/checkout@v4
2222

23-
- name: Setup Java
24-
uses: actions/setup-java@v3
23+
- name: Setup Go
24+
uses: actions/setup-go@v4
2525
with:
26-
distribution: 'temurin'
27-
java-version: '11'
28-
cache: 'maven'
26+
go-version: '1.21'
2927

3028
- name: Build webhook-receiver Lambda
31-
working-directory: lambda/webhook-receiver
32-
run: mvn clean package -DskipTests
29+
working-directory: lambda/webhook-receiver-go
30+
run: |
31+
go mod download
32+
GOOS=linux GOARCH=amd64 go build -o bootstrap main.go
33+
zip function.zip bootstrap
3334
3435
- name: Build es-indexer-worker Lambda
35-
working-directory: lambda/es-indexer-worker
36-
run: mvn clean package -DskipTests
36+
working-directory: lambda/es-indexer-worker-go
37+
run: |
38+
go mod download
39+
GOOS=linux GOARCH=amd64 go build -o bootstrap main.go
40+
zip function.zip bootstrap
3741
3842
- name: Configure AWS Credentials
3943
uses: aws-actions/configure-aws-credentials@v4

lambda/es-indexer-worker-go/go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module es-indexer-worker
2+
3+
go 1.21
4+
5+
require (
6+
github.com/aws/aws-lambda-go v1.47.0
7+
github.com/aws/aws-sdk-go-v2 v1.24.0
8+
github.com/aws/aws-sdk-go-v2/config v1.26.1
9+
)
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/base64"
7+
"encoding/json"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"os"
12+
"strings"
13+
14+
"github.com/aws/aws-lambda-go/events"
15+
"github.com/aws/aws-lambda-go/lambda"
16+
)
17+
18+
var (
19+
moresleepURL = os.Getenv("MORESLEEP_API_URL")
20+
moresleepUser = os.Getenv("MORESLEEP_USERNAME")
21+
moresleepPass = os.Getenv("MORESLEEP_PASSWORD")
22+
esURL = os.Getenv("ELASTICSEARCH_URL")
23+
esUser = os.Getenv("ELASTICSEARCH_USERNAME")
24+
esPass = os.Getenv("ELASTICSEARCH_PASSWORD")
25+
esIndex = os.Getenv("ELASTICSEARCH_INDEX")
26+
httpClient = &http.Client{}
27+
)
28+
29+
type WebhookEvent struct {
30+
EventID string `json:"eventId"`
31+
EventType string `json:"eventType"`
32+
EntityID string `json:"entityId"`
33+
ConferenceID string `json:"conferenceId"`
34+
}
35+
36+
func handler(ctx context.Context, sqsEvent events.SQSEvent) error {
37+
for _, record := range sqsEvent.Records {
38+
var event WebhookEvent
39+
if err := json.Unmarshal([]byte(record.Body), &event); err != nil {
40+
fmt.Printf("Error parsing event: %v\n", err)
41+
continue
42+
}
43+
44+
fmt.Printf("Processing %s for talk %s\n", event.EventType, event.EntityID)
45+
46+
switch event.EventType {
47+
case "talk.created", "talk.updated", "talk.published":
48+
if err := indexTalk(ctx, event.EntityID); err != nil {
49+
fmt.Printf("Error indexing talk %s: %v\n", event.EntityID, err)
50+
return err // Return error to retry
51+
}
52+
case "talk.unpublished":
53+
if err := updateTalkStatus(ctx, event.EntityID, "DRAFT"); err != nil {
54+
fmt.Printf("Error updating talk %s: %v\n", event.EntityID, err)
55+
return err
56+
}
57+
}
58+
}
59+
60+
return nil
61+
}
62+
63+
func indexTalk(ctx context.Context, talkID string) error {
64+
// Fetch talk from moresleep
65+
talkData, err := fetchTalkFromMoresleep(talkID)
66+
if err != nil {
67+
return fmt.Errorf("fetch talk: %w", err)
68+
}
69+
70+
// Transform to ES document
71+
esDoc := transformTalkToES(talkData)
72+
73+
// Index to Elasticsearch
74+
return indexToElasticsearch(talkID, esDoc)
75+
}
76+
77+
func fetchTalkFromMoresleep(talkID string) (map[string]interface{}, error) {
78+
url := fmt.Sprintf("%s/data/session/%s", moresleepURL, talkID)
79+
req, _ := http.NewRequest("GET", url, nil)
80+
81+
if moresleepUser != "" && moresleepPass != "" {
82+
auth := base64.StdEncoding.EncodeToString([]byte(moresleepUser + ":" + moresleepPass))
83+
req.Header.Set("Authorization", "Basic "+auth)
84+
}
85+
86+
resp, err := httpClient.Do(req)
87+
if err != nil {
88+
return nil, err
89+
}
90+
defer resp.Body.Close()
91+
92+
if resp.StatusCode != 200 {
93+
return nil, fmt.Errorf("moresleep returned %d", resp.StatusCode)
94+
}
95+
96+
var talk map[string]interface{}
97+
if err := json.NewDecoder(resp.Body).Decode(&talk); err != nil {
98+
return nil, err
99+
}
100+
101+
return talk, nil
102+
}
103+
104+
func transformTalkToES(talk map[string]interface{}) map[string]interface{} {
105+
// Simple transformation - extract key fields
106+
// TODO: Add logic to denormalize pkomfeedbacks (comments/ratings)
107+
return map[string]interface{}{
108+
"talkId": talk["id"],
109+
"conferenceId": talk["conferenceid"],
110+
"status": talk["status"],
111+
"data": talk["data"],
112+
"speakers": talk["speakers"],
113+
"lastUpdated": talk["lastUpdated"],
114+
}
115+
}
116+
117+
func indexToElasticsearch(talkID string, doc map[string]interface{}) error {
118+
docJSON, _ := json.Marshal(doc)
119+
120+
url := fmt.Sprintf("%s/%s/_doc/%s", esURL, esIndex, talkID)
121+
req, _ := http.NewRequest("PUT", url, bytes.NewBuffer(docJSON))
122+
req.Header.Set("Content-Type", "application/json")
123+
124+
if esUser != "" && esPass != "" {
125+
auth := base64.StdEncoding.EncodeToString([]byte(esUser + ":" + esPass))
126+
req.Header.Set("Authorization", "Basic "+auth)
127+
}
128+
129+
resp, err := httpClient.Do(req)
130+
if err != nil {
131+
return err
132+
}
133+
defer resp.Body.Close()
134+
135+
if resp.StatusCode >= 300 {
136+
body, _ := io.ReadAll(resp.Body)
137+
return fmt.Errorf("ES returned %d: %s", resp.StatusCode, string(body))
138+
}
139+
140+
fmt.Printf("Indexed talk %s to Elasticsearch\n", talkID)
141+
return nil
142+
}
143+
144+
func updateTalkStatus(ctx context.Context, talkID, status string) error {
145+
url := fmt.Sprintf("%s/%s/_update/%s", esURL, esIndex, talkID)
146+
update := map[string]interface{}{
147+
"doc": map[string]interface{}{
148+
"status": status,
149+
},
150+
}
151+
152+
updateJSON, _ := json.Marshal(update)
153+
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(updateJSON))
154+
req.Header.Set("Content-Type", "application/json")
155+
156+
if esUser != "" && esPass != "" {
157+
auth := base64.StdEncoding.EncodeToString([]byte(esUser + ":" + esPass))
158+
req.Header.Set("Authorization", "Basic "+auth)
159+
}
160+
161+
resp, err := httpClient.Do(req)
162+
if err != nil {
163+
return err
164+
}
165+
defer resp.Body.Close()
166+
167+
fmt.Printf("Updated talk %s status to %s\n", talkID, status)
168+
return nil
169+
}
170+
171+
func main() {
172+
lambda.Start(handler)
173+
}

lambda/es-indexer-worker/pom.xml

Lines changed: 0 additions & 92 deletions
This file was deleted.

0 commit comments

Comments
 (0)