Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ivm init 支持 proto stream #25

Closed
jaronnie opened this issue Jul 1, 2024 · 2 comments
Closed

ivm init 支持 proto stream #25

jaronnie opened this issue Jul 1, 2024 · 2 comments
Labels
feature help wanted Extra attention is needed

Comments

@jaronnie
Copy link
Member

jaronnie commented Jul 1, 2024

No description provided.

@jaronnie jaronnie added feature help wanted Extra attention is needed labels Jul 2, 2024
@jaronnie
Copy link
Member Author

jaronnie commented Jul 6, 2024

  1. 仅客户端 stream

v1 logic

package hellologic

import (
	"context"
	"io"
	"strings"

	"simplegateway/internal/pb/hellopb"
	"simplegateway/internal/svc"

	"github.com/zeromicro/go-zero/core/logx"
)

type SayHelloLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
}

func NewSayHelloLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SayHelloLogic {
	return &SayHelloLogic{
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
	}
}

func (l *SayHelloLogic) SayHello(stream hellopb.Hello_SayHelloServer) error {
	var messages []string
	for {
		// 接收客户端发送的消息
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端流结束
			break
		} else if req != nil {
			messages = append(messages, req.Message)
		}
	}

	// 处理收到的消息并生成回复
	reply := &hellopb.SayHelloResponse{
		Message: strings.Join(messages, ""),
	}

	// 发送回复给客户端
	if err := stream.SendAndClose(reply); err != nil {
		l.Logger.Errorf("failed to send message: %v", err)
		return err
	}

	return nil
}

client

package main

import (
	"context"
	"fmt"
	"github.com/zeromicro/go-zero/zrpc"
	"log"
	"simplegateway/rpclient-go/hello"
	"simplegateway/rpclient-go/pb/hellopb"
	"strconv"
)

func main() {
	client, err := zrpc.NewClient(zrpc.NewDirectClientConf([]string{"localhost:8000"}, "", ""))
	if err != nil {
		panic(err)
	}
	helloSvr := hello.NewHello(client)

	stream, err := helloSvr.SayHello(context.Background())
	if err != nil {
		panic(err)
	}

	for i := 0; i < 5; i++ {
		req := &hellopb.SayHelloRequest{
			Message: "Hello" + strconv.Itoa(i),
		}
		if err := stream.Send(req); err != nil {
			log.Fatalf("could not send request: %v", err)
		}
	}
	err = stream.CloseSend()
	if err != nil {
		log.Fatalf("could not close stream: %v", err)
	}

	// 接收服务器的响应
	var res hellopb.SayHelloResponse
	err = stream.RecvMsg(&res)
	if err != nil {
		log.Fatalf("could not receive response: %v", err)
	}
	fmt.Println(res.Message)
}

v2 logic

package hellov2logic

import (
	"context"
	"google.golang.org/protobuf/proto"
	"io"
	"simplegateway/internal/logic/hello"
	"simplegateway/internal/pb/hellopb"
	"simplegateway/internal/pb/hellov2pb"
	"simplegateway/internal/svc"

	"github.com/zeromicro/go-zero/core/logx"
)

// SayHelloServerAdapter adapter to convert hellov2pb.Hellov2_SayHelloServer to hellopb.Hello_SayHelloServer
type SayHelloServerAdapter struct {
	hellov2pb.Hellov2_SayHelloServer
}

func (s *SayHelloServerAdapter) SendAndClose(response *hellopb.SayHelloResponse) error {
	marshal, err := proto.Marshal(response)
	if err != nil {
		return err
	}

	var newResp hellov2pb.SayHelloResponse
	err = proto.Unmarshal(marshal, &newResp)
	if err != nil {
		return err
	}
	return s.SendMsg(&newResp)
}

func (s *SayHelloServerAdapter) Recv() (*hellopb.SayHelloRequest, error) {
	for {
		newIn, err := s.Hellov2_SayHelloServer.Recv()
		if err == io.EOF {
			return nil, io.EOF
		}
		if err != nil {
			return nil, err
		}

		marshal, err := proto.Marshal(newIn)
		if err != nil {
			return nil, err
		}
		var oldIn hellopb.SayHelloRequest
		err = proto.Unmarshal(marshal, &oldIn)
		if err != nil {
			return nil, err
		}

		return &oldIn, nil
	}
}

type SayHelloLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
}

func NewSayHelloLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SayHelloLogic {
	return &SayHelloLogic{
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
	}
}

func (l *SayHelloLogic) SayHello(stream hellov2pb.Hellov2_SayHelloServer) error {
	logic := hellologic.NewSayHelloLogic(l.ctx, l.svcCtx)

	// Create the adapter
	adapter := &SayHelloServerAdapter{
		stream,
	}

	return logic.SayHello(adapter)
}

@jaronnie
Copy link
Member Author

jaronnie commented Jul 7, 2024

@jaronnie jaronnie closed this as completed Jul 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant