-
Notifications
You must be signed in to change notification settings - Fork 0
/
yt-data.service.go
147 lines (124 loc) · 4.04 KB
/
yt-data.service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package yt
import (
"context"
// "encoding/json"
// "fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/ryanjoy0000/youtube-notifier/producer/common"
// "github.com/confluentinc/confluent-kafka-go/schemaregistry/serde/avro"
// "github.com/confluentinc/confluent-kafka-go/schemaregistry/serde/jsonschema"
"google.golang.org/api/option"
yt "google.golang.org/api/youtube/v3"
)
var err error
type YTDataService struct {
producerPtr *kafka.Producer
confPtr *kafka.ConfigMap
playlistID string
clientTelegramId string
}
func (y *YTDataService) GetYTData(ctx context.Context) ([]*YTData, error) {
nextPageToken := ""
hasPageToken := true
result := []*YTData{}
resultSection := []*YTData{}
var pId string
if y.playlistID != "" {
pId = y.playlistID
} else {
pId = (*y.confPtr)["YT_PLAYLIST_ID"].(string)
}
for hasPageToken {
nextPageToken, resultSection, err = y.fetchYTPlaylist(context.Background(), (*y.confPtr)["YT_API_KEY"].(string), pId, nextPageToken)
if nextPageToken != "" {
hasPageToken = true
// fmt.Println("=== Fetching next section of results... ===.")
} else {
hasPageToken = false
// fmt.Println("=== END of results===")
}
result = append(result, resultSection...)
}
// close kafka
common.CloseKafka(y.producerPtr)
return result, err
}
func NewYTDataService(conf1 *kafka.ConfigMap, producerPtr *kafka.Producer, playlistID string, clientTelegramId string) *YTDataService {
return &YTDataService{
producerPtr: producerPtr,
confPtr: conf1,
playlistID: playlistID,
clientTelegramId: clientTelegramId,
}
}
func (y *YTDataService) fetchYTPlaylist(ctx context.Context, apiKey string, playlistID string, pageToken string) (string, []*YTData, error) {
var nextPageToken string
// connect to Youtube API
ytSvc, err := yt.NewService(ctx, option.WithAPIKey(apiKey))
if err != nil {
log.Panicln("Unable to connect to Youtube API : ", err)
return "", nil, err
}
// sections required from Youtube data
sectionList := []string{"contentDetails"} //"snippet", "id"
log.Println("fetching playlist...")
// request youtube api
plRespPtr, err := ytSvc.PlaylistItems.List(sectionList).PlaylistId(playlistID).PageToken(pageToken).Do()
if err != nil {
log.Panicln("Unable to fetch Youtube playlist details : ", err)
return "", nil, err
}
// logging
// bSlice, err := json.MarshalIndent(*plRespPtr, "", "\t")
// log.Println("Playlist Response: ", string(bSlice))
// get next page
if plRespPtr.NextPageToken != "" {
nextPageToken = plRespPtr.NextPageToken
}
// Fetch Video Info
log.Println("fetching videos...")
result, err := y.fetchVideos(plRespPtr, ytSvc)
if err != nil {
log.Println("Unable to fetch video")
return "", nil, err
}
// logging
// bSlice, err = json.MarshalIndent(result, "", "\t")
// fmt.Println("video info: ", string(bSlice))
return nextPageToken, result, err
}
func (y *YTDataService) fetchVideos(plRespPtr *yt.PlaylistItemListResponse, ytSvc *yt.Service) ([]*YTData, error) {
result := []*YTData{}
var err error
if plRespPtr.Items != nil {
// extract video ids, looping through playlist
for _, plItemPtr := range plRespPtr.Items {
// log.Println("Index: ", key)
videoId := plItemPtr.ContentDetails.VideoId
sectionList := []string{"statistics", "snippet"}
vidListResp, err := ytSvc.Videos.List(sectionList).Id(videoId).Do()
if err != nil {
log.Println("Unable to fetch video details: ", err)
}
if vidListResp.Items != nil {
for _, v := range vidListResp.Items {
videoPtr := &YTData{
VideoTitle: v.Snippet.Title,
VideoLikeCount: int(v.Statistics.LikeCount),
VideoCommentCount: int(v.Statistics.CommentCount),
ClientTelegramId: y.clientTelegramId,
}
result = append(result, videoPtr)
// SEND DATA USING KAFKA
err := common.SendDataKafka(y.producerPtr, (*y.confPtr)["KAFKA_TOPIC"].(string), v.Id, *videoPtr)
if err != nil {
log.Println("unable to send data using kafka", err)
return nil, err
}
}
}
}
}
return result, err
}