Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

golang client phase 1

  • Loading branch information...
commit 6c98c53bf20b6ab750595e9f708ca0365c892937 1 parent bbc3c32
@crimsonred crimsonred authored
View
417 go/src/pubnubMessaging/pubnub.go
@@ -0,0 +1,417 @@
+package pubnubMessaging
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+ "net"
+ "crypto/tls"
+)
+
+const _limit = 1800
+const _origin = "pubsub.pubnub.com"
+const _timeout = 310
+
+
+type Pubnub struct {
+ origin string
+ publishKey string
+ subscribeKey string
+ secretKey string
+ cipherKey string
+ limit int
+ ssl bool
+ uuid string
+ subscribedChannels string
+ timeToken string
+ resetTimeToken bool
+}
+
+//Init pubnub struct
+func PubnubInit(publishKey string, subscribeKey string, secretKey string, cipherKey string, sslOn bool, customUuid string) *Pubnub {
+ newPubnub := &Pubnub{
+ origin: _origin,
+ publishKey: publishKey,
+ subscribeKey: subscribeKey,
+ secretKey: secretKey,
+ cipherKey: cipherKey,
+ limit: _limit,
+ ssl: sslOn,
+ uuid: "",
+ subscribedChannels: "",
+ resetTimeToken: true,
+ timeToken: "0",
+ }
+
+ if newPubnub.ssl {
+ newPubnub.origin = "https://" + newPubnub.origin
+ } else {
+ newPubnub.origin = "http://" + newPubnub.origin
+ }
+
+ if customUuid == "" {
+ uuid, err := GenUuid()
+ if err == nil {
+ newPubnub.uuid = uuid
+ } else {
+ fmt.Println(err)
+ }
+ } else {
+ newPubnub.uuid = customUuid
+ }
+
+ return newPubnub
+}
+
+func (pub *Pubnub) Abort() {
+ pub.subscribedChannels = ""
+}
+
+func (pub *Pubnub) GetTime(c chan []byte) {
+ url := ""
+ url += "/time"
+ url += "/0"
+
+ value, err := pub.HttpRequest(url)
+
+ // send response to channel
+ if err != nil {
+ c <- value
+ } else {
+ c <- []byte(fmt.Sprintf("%s", value))
+ }
+ close(c)
+}
+
+func (pub *Pubnub) Publish(channel string, message string, c chan []byte) {
+ signature := ""
+ if pub.secretKey != "" {
+ signature = GetHmacSha256(pub.secretKey, fmt.Sprintf("%s/%s/%s/%s/%s", pub.publishKey, pub.subscribeKey, pub.secretKey, channel, message))
+ } else {
+ signature = "0"
+ }
+ url := ""
+ url += "/publish"
+ url += "/" + pub.publishKey
+ url += "/" + pub.subscribeKey
+ url += "/" + signature
+ url += "/" + channel
+ url += "/0"
+
+ //Now only for string, need add encrypt for other types
+ // use "/{\"msg\":\"%s\"}" for sending hash
+ if pub.cipherKey != "" {
+ url += fmt.Sprintf("/\"%s\"", EncryptString(pub.cipherKey, fmt.Sprintf("\"%s\"", message)))
+ } else {
+ url += fmt.Sprintf("/\"%s\"", message)
+ }
+
+ value, err := pub.HttpRequest(url)
+
+ if err != nil {
+ c <- value
+ } else {
+ c <- []byte(fmt.Sprintf("%s", value))
+ }
+ close(c)
+}
+
+func (pub *Pubnub) Subscribe(channels string, c chan []byte, isPresenceSubscribe bool) {
+ channelArray := strings.Split(channels, ",")
+ pub.resetTimeToken = true
+ if isPresenceSubscribe {
+ for i := 0; i < len(channelArray); i++ {
+ channelToSub := strings.TrimSpace(channelArray[i]) + "-pnpres"
+ if pub.NotDuplicate(channelToSub) {
+ if len(pub.subscribedChannels)>0 {
+ pub.subscribedChannels += ","
+ }
+ pub.subscribedChannels += channelToSub
+ }else{
+ //TODO: channel already subscribed message
+ }
+ }
+ }else{
+ for i := 0; i < len(channelArray); i++ {
+ channelToSub := strings.TrimSpace(channelArray[i])
+ if pub.NotDuplicate(channelToSub) {
+ if len(pub.subscribedChannels)>0 {
+ pub.subscribedChannels += ","
+ }
+ pub.subscribedChannels += channelToSub
+ }else{
+ //TODO: channel already subscribed message
+ }
+ }
+ }
+ for {
+ if len(pub.subscribedChannels) > 0 {
+ url := ""
+ url += "/subscribe"
+ url += "/" + pub.subscribeKey
+ url += "/" + pub.subscribedChannels
+ url += "/0"
+ if pub.resetTimeToken {
+ url += "/0"
+ pub.resetTimeToken = false
+ }else{
+ url += "/" + pub.timeToken
+ }
+
+ if pub.uuid != "" {
+ url += "?uuid=" + pub.uuid
+ }
+
+ value, err := pub.HttpRequest(url)
+
+ if err != nil {
+ c <- value
+ if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "no such host") {
+ SleepForAWhile()
+ }
+ } else if string(value) != "" {
+ if string(value) == "[]" {
+ SleepForAWhile()
+ continue
+ }
+
+ //data, returnTimeToken, returnedChannels, err := ParseJson(value)
+ data, returnTimeToken, _ , err := ParseJson(value)
+ pub.timeToken = returnTimeToken
+ if data == "[]" {
+ continue
+ }
+
+ if err != nil {
+ fmt.Println(fmt.Sprintf("Error: %s", err))
+ }
+
+ //fmt.Println(fmt.Sprintf("timetoken %s:", pub.timeToken))
+
+ c <- []byte(fmt.Sprintf("%s", value))
+ /*if pub.cipherKey != "" {
+ c <- []byte(DecryptString(pub.cipherKey, fmt.Sprintf("%s", value)))
+ } else {
+ //c <- []byte(fmt.Sprintf("%s %s %s %s ", value, data, timeToken, returnedChannels))
+ c <- []byte(fmt.Sprintf("%s", value))
+ }*/
+ }
+ }else {
+ break;
+ }
+ }
+ fmt.Println("Closing Subscribe channel")
+ //close(c)
+}
+
+func SleepForAWhile(){
+ //TODO: change to reconnect val
+ time.Sleep(1000 * time.Millisecond)
+}
+
+func (pub *Pubnub) NotDuplicate(channel string) (b bool){
+ var channels = strings.Split(pub.subscribedChannels, ",")
+ for i, u := range channels {
+ if channel == u {
+ return false
+ }
+ i++
+ //fmt.Println(i, u)
+ }
+ return true
+}
+
+func (pub *Pubnub) RemoveFromSubscribeList(channel string) (b bool){
+ var channels = strings.Split(pub.subscribedChannels, ",")
+ newChannels := ""
+ found := false
+ for i, u := range channels {
+ if channel == u {
+ found = true
+ } else {
+ if len(newChannels)>0 {
+ newChannels += ","
+ }
+ newChannels += u
+ }
+ i++
+ //fmt.Println(i, u)
+ }
+ if found {
+ pub.subscribedChannels = newChannels
+ //fmt.Println(fmt.Sprintf("%s", newChannels))
+ }
+ return found
+}
+
+func (pub *Pubnub) Unsubscribe(channels string, c chan []byte) {
+ channelArray := strings.Split(channels, ",")
+ unsubscribeChannels := ""
+ for i := 0; i < len(channelArray); i++ {
+ if i>0 {
+ unsubscribeChannels += ","
+ }
+ channelToUnsub := strings.TrimSpace(channelArray[i]);
+ unsubscribeChannels += channelToUnsub
+ removed := pub.RemoveFromSubscribeList(channelToUnsub)
+ if !removed {
+ //TODO: channel not subscribed message
+ }
+ }
+ pub.resetTimeToken = true
+
+ url := ""
+ url += "/v2/presence"
+ url += "/sub-key/" + pub.subscribeKey
+ url += "/channel/" + unsubscribeChannels
+ url += "/leave?uuid=" + pub.uuid
+ value, err := pub.HttpRequest(url)
+ c <- value
+ if err != nil {
+ c <- value
+ }
+ close(c)
+}
+
+func (pub *Pubnub) PrsenceUnsubscribe(channels string, c chan []byte) {
+ channelArray := strings.Split(channels, ",")
+ presenceChannels := ""
+ for i := 0; i < len(channelArray); i++ {
+ if i>0 {
+ presenceChannels += ","
+ }
+ channelToUnsub := strings.TrimSpace(channelArray[i]) + "-pnpres"
+ presenceChannels += channelToUnsub
+ removed := pub.RemoveFromSubscribeList(channelToUnsub)
+ if !removed {
+ //TODO: channel not subscribed message
+ }
+
+ }
+ pub.resetTimeToken = true
+
+ url := ""
+ url += "/v2/presence"
+ url += "/sub-key/" + pub.subscribeKey
+ url += "/channel/" + presenceChannels
+ url += "/leave?uuid=" + pub.uuid
+
+ value, err := pub.HttpRequest(url)
+ c <- value
+ if err != nil {
+ c <- value
+ }
+ close(c)
+}
+
+func (pub *Pubnub) History(channel string, limit int, c chan []byte) {
+ url := ""
+ url += "/history"
+ url += "/" + pub.subscribeKey
+ url += "/" + channel
+ url += "/0"
+ url += "/" + fmt.Sprintf("%d", limit)
+
+ value, err := pub.HttpRequest(url)
+
+ if err != nil {
+ c <- value
+ } else {
+ c <- []byte(fmt.Sprintf("%s", value))
+ }
+ close(c)
+}
+
+func (pub *Pubnub) HereNow(channel string, c chan []byte) {
+ url := ""
+ url += "/v2/presence"
+ url += "/sub-key/" + pub.subscribeKey
+ url += "/channel/" + channel
+
+ value, err := pub.HttpRequest(url)
+
+ if err != nil {
+ c <- value
+ } else {
+ c <- []byte(fmt.Sprintf("%s", value))
+ }
+ close(c)
+}
+
+func ParseJson (contents []byte) (data string, timeToken string, channels string, err error){
+ var s interface{}
+ returnData := ""
+ returnTimeToken := ""
+ returnChannels := ""
+ if err := json.Unmarshal(contents, &s); err == nil {
+ v := s.(interface{})
+ switch vv := v.(type) {
+ case string:
+ //fmt.Println("is string", vv)
+ case int:
+ //fmt.Println("is int", vv)
+ case []interface{}:
+ //fmt.Println("is an array:")
+
+ for i, u := range vv {
+ //fmt.Println(i, u)
+ if i==0 {
+ returnData = fmt.Sprintf("%s", u)
+ }else if (i==1){
+ returnTimeToken = fmt.Sprintf("%s", u)
+ }else if (i==2){
+ channels = fmt.Sprintf("%s", u);
+ }
+
+ }
+ default:
+ //fmt.Println("is of a type I don't know how to handle")
+ }
+ } else {
+ //something went wrong
+ fmt.Println("err:", err)
+ }
+ return returnData, returnTimeToken, returnChannels, err
+}
+
+func (pub *Pubnub) HttpRequest(url string) ([]byte, error) {
+ //fmt.Println("pub.ORIGIN+url:", pub.origin+url)
+ response, err := Connect(pub.origin+url)
+
+ if err != nil {
+ if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
+ return []byte(fmt.Sprintf("%s: Reconnecting from timeout", time.Now().String())), nil
+ } else {
+ return []byte(fmt.Sprintf("Network Error: %s", err.Error())), err
+ }
+ }
+
+ contents, err := ioutil.ReadAll(response.Body)
+ return contents, err
+}
+
+func Connect (url string) (*http.Response, error) {
+ transport := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ Dial: func(netw, addr string) (net.Conn, error) {
+ deadline := time.Now().Add(_timeout * time.Second)
+ c, err := net.DialTimeout(netw, addr, time.Second)
+ if err != nil {
+ return nil, err
+ }
+ c.SetDeadline(deadline)
+ return c, nil
+ }}
+ httpclient := &http.Client{Transport: transport, CheckRedirect: nil}
+
+ response, err := httpclient.Get(url)
+ if err != nil {
+ //fmt.Printf("Connect error here: %s", err)
+ }
+
+ return response, err
+}
+
+
View
108 go/src/pubnubMessaging/pubnubCrypto.go
@@ -0,0 +1,108 @@
+package pubnubMessaging
+
+import (
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/hmac"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/hex"
+ "fmt"
+ "io"
+)
+
+var _IV = "0123456789012345"
+
+func GetHmacSha256(secretKey string, input string) string {
+ hmacSha256 := hmac.New(sha256.New, []byte(secretKey))
+ io.WriteString(hmacSha256, input)
+ return fmt.Sprintf("%x", hmacSha256.Sum(nil))
+}
+
+func GenUuid() (string, error) {
+ uuid := make([]byte, 16)
+ n, err := rand.Read(uuid)
+ if n != len(uuid) || err != nil {
+ return "", err
+ }
+ // TODO: verify the two lines implement RFC 4122 correctly
+ uuid[8] = 0x80 // variant bits see page 5
+ uuid[4] = 0x40 // version 4 Pseudo Random, see page 7
+
+ return hex.EncodeToString(uuid), nil
+}
+
+func Pkcs5pad(data []byte, blocksize int) []byte {
+ pad := blocksize - len(data)%blocksize
+ b := make([]byte, pad, pad)
+ for i := 0; i < pad; i++ {
+ b[i] = uint8(pad)
+ }
+ return append(data, b...)
+}
+
+func Pkcs5unpad(data []byte) []byte {
+ if len(data) == 0 {
+ return data
+ }
+ pad := int(data[len(data)-1])
+ return data[0 : len(data)-pad]
+}
+
+func EncryptString(cipherKey string, message string) string {
+ block, _ := AesCipher(cipherKey)
+
+ value := []byte(message)
+ value = Pkcs5pad(value, aes.BlockSize)
+
+ blockmode := cipher.NewCBCEncrypter(block, []byte(_IV))
+ cipherBytes := make([]byte, len(value))
+ blockmode.CryptBlocks(cipherBytes, value)
+
+ return fmt.Sprintf("%s", Encode(cipherBytes))
+}
+
+func DecryptString(cipherKey string, message string) string { //need add error catching
+ block, _ := AesCipher(cipherKey)
+ value, _ := Decode([]byte(message))
+
+ decrypter := cipher.NewCBCDecrypter(block, []byte(_IV))
+ decrypted := make([]byte, len(value))
+ decrypter.CryptBlocks(decrypted, value)
+
+ return fmt.Sprintf("%s", Pkcs5unpad(decrypted))
+}
+
+func AesCipher(cipherKey string) (cipher.Block, error) {
+ block, err := aes.NewCipher(EncryptCipherKey(cipherKey))
+ if err != nil {
+ return nil, err
+ }
+ return block, nil
+}
+
+func EncryptCipherKey(cipherKey string) []byte {
+ hash := sha256.New()
+ hash.Write([]byte(cipherKey))
+
+ sha256String := hash.Sum(nil)[:16]
+ return []byte(hex.EncodeToString(sha256String))
+}
+
+//Encodes a value using base64
+func Encode(value []byte) []byte {
+ encoded := make([]byte, base64.StdEncoding.EncodedLen(len(value)))
+ base64.StdEncoding.Encode(encoded, value)
+ return encoded
+}
+
+//Decodes a value using base64
+func Decode(value []byte) ([]byte, error) {
+ decoded := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
+ b, err := base64.StdEncoding.Decode(decoded, value)
+ if err != nil {
+ return nil, err
+ }
+ return decoded[:b], nil
+}
View
244 go/src/pubnubTest.go
@@ -0,0 +1,244 @@
+package main
+
+import (
+ "bufio"
+ "os"
+ "fmt"
+ "time"
+ "pubnubMessaging"
+ "strings"
+)
+
+const delim = '\n'
+
+var pubnubChannel = ""
+var ssl bool
+var cipher = ""
+var uuid = ""
+var pub *pubnubMessaging.Pubnub
+
+func main() {
+ b := Init()
+ if b {
+ ch := make(chan int)
+ ReadLoop(ch)
+ }
+ fmt.Println("Exit")
+}
+
+func Init() (b bool){
+ fmt.Println("Please enter the channel name(s). Enter multiple channels separated by comma.")
+ reader := bufio.NewReader(os.Stdin)
+
+ line, _ , err := reader.ReadLine()
+ if err != nil {
+ fmt.Println(err)
+ }else{
+ pubnubChannel = string(line)
+ if strings.TrimSpace(pubnubChannel) != "" {
+ fmt.Println("Channel: ", pubnubChannel)
+ fmt.Println("Enable SSL. Enter y for Yes, n for No.")
+ var enableSsl string
+ fmt.Scanln(&enableSsl)
+
+ if enableSsl == "y" || enableSsl == "Y" {
+ ssl = true
+ fmt.Println("SSL enabled")
+ }else{
+ ssl = false
+ fmt.Println("SSL disabled")
+ }
+
+ fmt.Println("Please enter a CIPHER key, leave blank if you don't want to use this.")
+ fmt.Scanln(&cipher)
+ fmt.Println("Cipher: ", cipher)
+
+ fmt.Println("Please enter a Custom UUID, leave blank for default.")
+ fmt.Scanln(&uuid)
+ fmt.Println("UUID: ", uuid)
+
+ pubInstance := pubnubMessaging.PubnubInit("demo", "demo", "", cipher, ssl, uuid)
+ pub = pubInstance
+ return true
+ }else{
+ fmt.Println("Channel cannot be empty.")
+ }
+ }
+ return false
+}
+
+func ReadLoop(ch chan int){
+ fmt.Println("")
+ fmt.Println("ENTER 1 FOR Subscribe")
+ fmt.Println("ENTER 2 FOR Publish")
+ fmt.Println("ENTER 3 FOR Presence")
+ fmt.Println("ENTER 4 FOR Detailed History")
+ fmt.Println("ENTER 5 FOR Here_Now")
+ fmt.Println("ENTER 6 FOR Unsubscribe")
+ fmt.Println("ENTER 7 FOR Presence-Unsubscribe")
+ fmt.Println("ENTER 8 FOR Time")
+ fmt.Println("ENTER 9 FOR Exit")
+ fmt.Println("")
+ reader := bufio.NewReader(os.Stdin)
+
+ for{
+ var action string
+ fmt.Scanln(&action)
+ breakOut := false
+ switch action {
+ case "1":
+ fmt.Println("Running Subscribe")
+ go SubscribeRoutine()
+ case "2":
+ fmt.Println("Please enter the message")
+ message, _ , err := reader.ReadLine()
+ if err != nil {
+ fmt.Println(err)
+ }else{
+ go PublishRoutine(string(message))
+ }
+ case "3":
+ fmt.Println("Running Presence")
+ go PresenceRoutine()
+ case "4":
+ fmt.Println("Running detailed history")
+ go DetailedHistoryRoutine()
+ case "5":
+ fmt.Println("Running here now")
+ go HereNowRoutine()
+ case "6":
+ fmt.Println("Running Unsubscribe")
+ go UnsubscribeRoutine()
+ case "7":
+ fmt.Println("Running Unsubscribe Presence")
+ go UnsubscribePresenceRoutine()
+ case "8":
+ fmt.Println("Running Time")
+ go TimeRoutine()
+ case "9":
+ fmt.Println("Exiting")
+ pub.Abort()
+ breakOut = true
+ case "default":
+ }
+ if breakOut {
+ break
+ }else{
+ time.Sleep(1000 * time.Millisecond)
+ }
+ }
+ close(ch)
+}
+
+func ParseResponseSubscribe(channel chan []byte){
+ for {
+ value, ok := <-channel
+ if !ok {
+ fmt.Println("")
+ break
+ }
+ if string(value) != "[]"{
+ fmt.Println(fmt.Sprintf("Subscribe: %s", value))
+ //fmt.Println(fmt.Sprintf("%s", value))
+ fmt.Println("")
+ }
+ }
+}
+
+func ParseResponsePresence(channel chan []byte){
+ for {
+ value, ok := <-channel
+ if !ok {
+ break
+ }
+ if string(value) != "[]"{
+ fmt.Println(fmt.Sprintf("Presence: %s ", value))
+ //fmt.Println(fmt.Sprintf("%s", value))
+ fmt.Println("");
+ }
+ }
+}
+
+func ParseResponse(channel chan []byte){
+ for {
+ value, ok := <-channel
+ if !ok {
+ break
+ }
+ if string(value) != "[]"{
+ fmt.Println(fmt.Sprintf("Response: %s ", value))
+ //fmt.Println(fmt.Sprintf("%s", value))
+ fmt.Println("");
+ }
+ }
+}
+
+func SubscribeRoutine(){
+ var subscribeChannel = make(chan []byte)
+ go pub.Subscribe(pubnubChannel, subscribeChannel, false)
+ ParseResponseSubscribe(subscribeChannel)
+}
+
+func PublishRoutine(message string){
+ channelArray := strings.Split(pubnubChannel, ",");
+
+ for i:=0; i < len(channelArray); i++ {
+ ch := strings.TrimSpace(channelArray[i])
+ fmt.Println("Publish to channel: ",ch)
+ channel := make(chan []byte)
+ go pub.Publish(ch, message, channel)
+ ParseResponse(channel)
+ }
+}
+
+func PresenceRoutine(){
+ var presenceChannel = make(chan []byte)
+ //go pub.Subscribe(pubnubChannel, subscribeChannel, true)
+ go pub.Subscribe(pubnubChannel, presenceChannel, true)
+ ParseResponsePresence(presenceChannel)
+}
+
+func DetailedHistoryRoutine(){
+ channelArray := strings.Split(pubnubChannel, ",");
+ for i:=0; i < len(channelArray); i++ {
+ ch := strings.TrimSpace(channelArray[i])
+ fmt.Println("DetailedHistory for channel: ", ch)
+
+ channel := make(chan []byte)
+
+ go pub.History(ch, 100, channel)
+ ParseResponse(channel)
+ }
+}
+
+func HereNowRoutine(){
+ channelArray := strings.Split(pubnubChannel, ",");
+ for i:=0; i < len(channelArray); i++ {
+ channel := make(chan []byte)
+ ch := strings.TrimSpace(channelArray[i])
+ fmt.Println("HereNow for channel: ", ch)
+
+ go pub.HereNow(ch, channel)
+ ParseResponse(channel)
+ }
+}
+
+func UnsubscribeRoutine(){
+ channel := make(chan []byte)
+
+ go pub.Unsubscribe(pubnubChannel, channel)
+ ParseResponse(channel)
+}
+
+func UnsubscribePresenceRoutine(){
+ channel := make(chan []byte)
+
+ go pub.PrsenceUnsubscribe(pubnubChannel, channel)
+ ParseResponse(channel)
+}
+
+func TimeRoutine(){
+ channel := make(chan []byte)
+ go pub.GetTime(channel)
+ ParseResponse(channel)
+}
Please sign in to comment.
Something went wrong with that request. Please try again.