-
Notifications
You must be signed in to change notification settings - Fork 0
/
activity.go
90 lines (72 loc) · 3.15 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package cassandrainsertrecord
import (
"fmt"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/logger"
"github.com/gocql/gocql"
)
// THIS IS ADDED
// log is the default package logger which we'll use to log
var log = logger.GetLogger("activity-CassandraInsertRecord")
// MyActivity is a stub for your Activity implementation
type MyActivity struct {
metadata *activity.Metadata
}
// NewActivity creates a new activity
func NewActivity(metadata *activity.Metadata) activity.Activity {
return &MyActivity{metadata: metadata}
}
// Metadata implements activity.Activity.Metadata
func (a *MyActivity) Metadata() *activity.Metadata {
return a.metadata
}
// Eval implements activity.Activity.Eval
func (a *MyActivity) Eval(context activity.Context) (done bool, err error) {
// Get the activity data from the context
clusterIP := context.GetInput("ClusterIP").(string)
keySpace := context.GetInput("Keyspace").(string)
clusterPort:=context.GetInput("ClusterPort").(integer)
tableName := context.GetInput("TableName").(string)
insertQuery := context.GetInput("InsertQuery").(string)
// Use the log object to log the greeting
log.Debugf("The Flogo engine says connect to [%s] with [%s] with table [%s]", clusterIP, clusterPort,keySpace, tableName)
log.Debugf("Flogo is about to insert [%s] on cluster [%s]", insertQuery, clusterIP)
fmt.Println("The Flogo engine says connect to "+clusterIP+" with "+keySpace+" with table "+tableName)
fmt.Println("Flogo is about to insert "+insertQuery+" on cluster "+clusterIP)
// Provide the cassandra cluster instance here.
cluster := gocql.NewCluster(clusterIP)
// gocql requires the keyspace to be provided before the session is created.
// In future there might be provisions to do this later.
cluster.Keyspace = keySpace
cluster.Port = clusterPort
session, err := cluster.CreateSession()
log.Debugf("Session Created Sucessfully")
fmt.Println("Session Created Sucessfully")
if err != nil {
log.Debugf("Could not connect to cassandra cluster : " , err)
fmt.Println("Could not connect to cassandra cluster : " , err)
}
log.Debugf("Session : ", session)
log.Debugf("Cluster : ", clusterIP)
log.Debugf("Keyspace : ", keySpace)
log.Debugf("Session Timeout : ", cluster.Timeout)
log.Debugf("TableName : ", tableName)
log.Debugf("Insert Query : ", insertQuery)
log.Debugf("Next Step is Insert Query Execution")
//fmt.Println("Session : ", session)
//fmt.Println("Cluster : ", clusterIP)
//fmt.Println("Keyspace : ", keySpace)
//fmt.Println("Session Timeout : ", cluster.Timeout)
//fmt.Println("TableName : ", tableName)
//fmt.Println("Insert Query : ", insertQuery)
fmt.Println("Next Step is Insert Query Execution")
if err := session.Query(insertQuery).Exec(); err != nil {
log.Debugf("Error In insert Query : " , err)
fmt.Println("Error In insert Query : " , err)
}
// Set the result as part of the context
context.SetOutput("result", "Record Inserted SuccessFully")
fmt.Println("Record Inserted SuccessFully")
// Signal to the Flogo engine that the activity is completed
return true, nil
}