-
Notifications
You must be signed in to change notification settings - Fork 202
/
salesforce-bulkapi-sink.sh
executable file
·143 lines (115 loc) · 5.99 KB
/
salesforce-bulkapi-sink.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/bin/bash
set -e
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
source ${DIR}/../../scripts/utils.sh
SALESFORCE_USERNAME=${SALESFORCE_USERNAME:-$1}
SALESFORCE_PASSWORD=${SALESFORCE_PASSWORD:-$2}
CONSUMER_KEY=${CONSUMER_KEY:-$3}
CONSUMER_PASSWORD=${CONSUMER_PASSWORD:-$4}
SECURITY_TOKEN=${SECURITY_TOKEN:-$5}
# second account (for Bulk API sink)
SALESFORCE_USERNAME_ACCOUNT2=${SALESFORCE_USERNAME_ACCOUNT2:-$6}
SALESFORCE_PASSWORD_ACCOUNT2=${SALESFORCE_PASSWORD_ACCOUNT2:-$7}
SECURITY_TOKEN_ACCOUNT2=${SECURITY_TOKEN_ACCOUNT2:-$8}
if [ -z "$SALESFORCE_USERNAME" ]
then
logerror "SALESFORCE_USERNAME is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_PASSWORD" ]
then
logerror "SALESFORCE_PASSWORD is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$CONSUMER_KEY" ]
then
logerror "CONSUMER_KEY is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$CONSUMER_PASSWORD" ]
then
logerror "CONSUMER_PASSWORD is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SECURITY_TOKEN" ]
then
logerror "SECURITY_TOKEN is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_USERNAME_ACCOUNT2" ]
then
logerror "SALESFORCE_USERNAME_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_PASSWORD_ACCOUNT2" ]
then
logerror "SALESFORCE_PASSWORD_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SECURITY_TOKEN_ACCOUNT2" ]
then
logerror "SECURITY_TOKEN_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
${DIR}/../../environment/plaintext/start.sh "${PWD}/docker-compose.plaintext.yml"
# the Salesforce PushTopic source connector is used to get data into Kafka and the Salesforce Bulk API sink connector is used to export data from Kafka to Salesforce
log "Creating Salesforce PushTopics Source connector"
docker exec -e SALESFORCE_USERNAME="$SALESFORCE_USERNAME" -e SALESFORCE_PASSWORD="$SALESFORCE_PASSWORD" -e CONSUMER_KEY="$CONSUMER_KEY" -e CONSUMER_PASSWORD="$CONSUMER_PASSWORD" -e SECURITY_TOKEN="$SECURITY_TOKEN" connect \
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.salesforce.SalesforcePushTopicSourceConnector",
"kafka.topic": "sfdc-pushtopic-leads",
"tasks.max": "1",
"curl.logging": "true",
"salesforce.object" : "Lead",
"salesforce.push.topic.name" : "LeadsPushTopic",
"salesforce.username" : "'"$SALESFORCE_USERNAME"'",
"salesforce.password" : "'"$SALESFORCE_PASSWORD"'",
"salesforce.password.token" : "'"$SECURITY_TOKEN"'",
"salesforce.consumer.key" : "'"$CONSUMER_KEY"'",
"salesforce.consumer.secret" : "'"$CONSUMER_PASSWORD"'",
"salesforce.initial.start" : "all",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1"
}' \
http://localhost:8083/connectors/salesforce-pushtopic-source/config | jq .
sleep 10
log "Verify we have received the data in sfdc-pushtopic-leads topic"
timeout 60 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic sfdc-pushtopic-leads --from-beginning --max-messages 1
log "Creating Salesforce Bulk API Sink connector"
docker exec -e SALESFORCE_USERNAME_ACCOUNT2="$SALESFORCE_USERNAME_ACCOUNT2" -e SALESFORCE_PASSWORD_ACCOUNT2="$SALESFORCE_PASSWORD_ACCOUNT2" -e SECURITY_TOKEN_ACCOUNT2="$SECURITY_TOKEN_ACCOUNT2" connect \
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
"topics": "sfdc-pushtopic-leads",
"tasks.max": "1",
"curl.logging": "true",
"salesforce.object" : "Lead",
"salesforce.username" : "'"$SALESFORCE_USERNAME_ACCOUNT2"'",
"salesforce.password" : "'"$SALESFORCE_PASSWORD_ACCOUNT2"'",
"salesforce.password.token" : "'"$SECURITY_TOKEN_ACCOUNT2"'",
"salesforce.ignore.fields": "CleanStatus",
"salesforce.ignore.reference.fields": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"reporter.bootstrap.servers": "broker:9092",
"reporter.error.topic.name": "error-responses",
"reporter.error.topic.replication.factor": 1,
"reporter.result.topic.name": "success-responses",
"reporter.result.topic.replication.factor": 1,
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1"
}' \
http://localhost:8083/connectors/salesforce-bulkapi-sink/config | jq .
sleep 10
log "Verify topic success-responses"
timeout 60 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic success-responses --from-beginning --max-messages 1
# log "Verify topic error-responses"
# timeout 20 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic error-responses --from-beginning --max-messages 1
log "Login to your SFDC account for account #2 to check that Lead has been added"