Skip to content

Commit

Permalink
Working support for telegram, protocol improvements when multiple ada…
Browse files Browse the repository at this point in the history
…ptors are connected
  • Loading branch information
jamesread committed Jun 15, 2024
1 parent ae4d3e4 commit e4365e9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 21 deletions.
51 changes: 43 additions & 8 deletions cmd/japella-adaptor-telegram/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import (

"github.com/jamesread/japella/internal/runtimeconfig"
log "github.com/sirupsen/logrus"
pb "github.com/jamesread/japella/gen/protobuf"
"github.com/jamesread/japella/internal/amqp"
"strconv"

"time"
)

var bot *tgbotapi.BotAPI

var cfg struct {
Common *runtimeconfig.CommonConfig
Telegram struct {
Expand All @@ -22,7 +27,7 @@ func main() {
cfg.Common = &runtimeconfig.CommonConfig{}

runtimeconfig.LoadConfigCommon(cfg.Common)
runtimeconfig.LoadConfig("config.telegram.yaml", cfg.Telegram)
runtimeconfig.LoadConfig("config.telegram.yaml", &cfg.Telegram)

log.Infof("cfg: %+v", cfg)

Expand All @@ -36,7 +41,9 @@ func main() {
func Start(botToken string) {
log.Infof("botToken: %v", botToken)

bot, err := tgbotapi.NewBotAPI(botToken)
var err error

bot, err = tgbotapi.NewBotAPI(botToken)

if err != nil {
log.Panic(err)
Expand All @@ -49,16 +56,44 @@ func Start(botToken string) {
u := tgbotapi.NewUpdate(0)
u.Timeout = 60

go Replier()

updates := bot.GetUpdatesChan(u)

for update := range updates {
if update.Message != nil { // If we got a message
log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text)
log.Infof("updates: %v", updates)

msg := tgbotapi.NewMessage(update.Message.Chat.ID, update.Message.Text)
msg.ReplyToMessageID = update.Message.MessageID
for update := range updates {
log.Infof("update: %v", update)

bot.Send(msg)
if update.Message != nil { // If we got a message
log.Infof("[%s] %s", update.Message.From.UserName, update.Message.Text)

amqp.PublishPb(&pb.IncommingMessage {
Author: update.Message.From.UserName,
Content: update.Message.Text,
Channel: strconv.FormatInt(update.Message.Chat.ID, 10),
Protocol: "telegram",
})
}
}
}

func Replier() {
amqp.ConsumeForever("telegram-OutgoingMessage", func(d amqp.Delivery) {
reply := pb.OutgoingMessage{}

amqp.Decode(d.Message.Body, &reply)

log.Infof("reply: %v", &reply)

channelId, _ := strconv.ParseInt(reply.Channel, 10, 64)
msg := tgbotapi.NewMessage(channelId, reply.Content)

messageId, _ := strconv.Atoi(reply.IncommingMessageId)

log.Infof("messageId: %v %v", messageId, bot)
msg.ReplyToMessageID = messageId

bot.Send(msg)
})
}
14 changes: 8 additions & 6 deletions cmd/japella-bot-utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func main() {
}

func Start() {
_, handler := amqp.ConsumeForever("MessageReceived", func(d amqp.Delivery) {
msg := &pb.MessageReceived{}
_, handler := amqp.ConsumeForever("IncommingMessage", func(d amqp.Delivery) {
msg := &pb.IncommingMessage{}

amqp.Decode(d.Message.Body, &msg)

Expand All @@ -40,7 +40,7 @@ func Start() {
log.Infof("done")
}

func handleMessage(msg *pb.MessageReceived) {
func handleMessage(msg *pb.IncommingMessage) {
switch msg.Content {
case "!test":
replyTest(msg)
Expand All @@ -50,11 +50,13 @@ func handleMessage(msg *pb.MessageReceived) {
}
}

func replyTest(msg *pb.MessageReceived) {
reply := &pb.MessageReply{
func replyTest(msg *pb.IncommingMessage) {
reply := &pb.OutgoingMessage{
Channel: msg.Channel,
Content: "This is a reply",
IncommingMessageId: msg.MessageId,
Protocol: msg.Protocol,
}

amqp.PublishPb(reply)
amqp.PublishPbWithRoutingKey(reply, msg.Protocol + "-OutgoingMessage")
}
10 changes: 6 additions & 4 deletions internal/adaptor/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func registerCommand(name string, handler func(s *discordgo.Session, i *discordg
}

func Replier() {
amqp.ConsumeForever("MessageReply", func(d amqp.Delivery) {
reply := pb.MessageReply{}
amqp.ConsumeForever("discord-OutgoingMessage", func(d amqp.Delivery) {
reply := pb.OutgoingMessage{}

amqp.Decode(d.Message.Body, &reply)

log.Infof("reply: %+v %v", reply, goBot)
log.Infof("reply: %+v %v", &reply, goBot)

goBot.ChannelMessageSend(reply.Channel, reply.Content)

Expand All @@ -100,10 +100,12 @@ func messageHandler(s *discordgo.Session, m *discordgo.MessageCreate) {
return
}

msg := pb.MessageReceived{
msg := pb.IncommingMessage{
Author: m.Author.ID,
Content: m.Content,
Channel: m.ChannelID,
MessageId: m.ID,
Protocol: "discord",
}

amqp.PublishPb(&msg)
Expand Down
19 changes: 18 additions & 1 deletion internal/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ func PublishWithChannel(c *amqp.Channel, routingKey string, msg amqp.Publishing)
return err
}

func PublishPbWithRoutingKey(msg interface{}, routingKey string) {
channel, err := GetChannel("Publish-" + getMsgType(msg))

if err != nil {
log.Errorf("PublishPbWithRoutingKey: %v", err)
return
}

env := newEnvelope(getMsgType(msg), Encode(msg))

err = PublishWithChannel(channel, routingKey, env)

if err != nil {
log.Errorf("PublishPbWithRoutingKey: %v", err)
}
}

func PublishPb(msg interface{}) {
channel, err := GetChannel("Publish-" + getMsgType(msg))

Expand Down Expand Up @@ -216,7 +233,7 @@ func consumeForever(consumerReady *sync.WaitGroup, handlerDone *sync.WaitGroup,
}

func consumeWithChannel(consumerReady *sync.WaitGroup, handlerWait *sync.WaitGroup, c *amqp.Channel, deliveryTag string, handlerFunc HandlerFunc) {
queueName := getHostname() + "-" + InstanceId + "-" + deliveryTag
queueName := "japella-" + getHostname() + "-" + InstanceId + "-" + deliveryTag

_, err := c.QueueDeclare(
queueName,
Expand Down
7 changes: 5 additions & 2 deletions japella.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ syntax = "proto3";

option go_package = "./gen/protobuf";

message MessageReceived {
message IncommingMessage {
string content = 1;
string channel = 2;
string author = 3;
string server = 4;
string protocol = 5;
string message_id = 6;
}

message MessageReply {
message OutgoingMessage {
string content = 1;
string channel = 2;
string protocol = 3;
string incomming_message_id = 4;
}

0 comments on commit e4365e9

Please sign in to comment.