-
Notifications
You must be signed in to change notification settings - Fork 0
/
atlassian_text_search.py
191 lines (169 loc) · 7.68 KB
/
atlassian_text_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import json
import os
import requests
from datetime import datetime, timedelta
import time
import boto3, botocore
import csv
with open('config.json', 'r') as file:
config = json.load(file)
auth_email = config['AUTH_EMAIL']
auth_pw = config['AUTH_TOKEN']
report_dir = config['REPORT_DIR']
search_files = config["SEARCH_FILES"]
vendor_id = config["VENDOR_ID"]
addon = config["ADD_ON"]
base_url = "https://marketplace.atlassian.com/rest/2/vendors/" + vendor_id + "/reporting/"
def check_status_code(response):
code = response.status_code
if code == 200:
print("Successful 200 Status API Call")
elif code == 401:
print("ERROR: 401 Unauthorized")
print(response.text)
exit()
elif code == 400:
print("ERROR: 400 Request body is malformed or has an illegal property value.")
print(response.text)
exit()
elif code == 403:
print("ERROR: 403 User was authenticated but is not authorized for this operation.")
print(response.text)
exit()
elif code == 404:
print("ERROR: 404 Resource does not exist or is not visible to this user.")
print(response.text)
exit()
elif code == 500:
print("ERROR: 500 Unexpected internal error in the Atlassian Marketplace.")
print(response.text)
exit()
elif code == 502:
print("ERROR: 502 Unexpected error in an external service.")
print(response.text)
exit()
else:
print("ERROR: Unknown status code")
print(response.status_code + "\n" + response.text)
exit()
def track_successful_searches(input_file, search_term): # write successful search terms into csv
filename = report_dir + 'search_terms_matches_' + input_file
f = open(filename, "a", encoding="utf-8")
f.write(search_term)
f.write("\n")
f.close()
def track_failed_searches(input_file, search_term): # write search terms without data into csv
filename = report_dir + 'search_terms_no_matches_' + input_file
f = open(filename, "a", encoding="utf-8")
f.write(search_term)
f.write("\n")
f.close()
def license_lookup(search_term, time_range, index, row_num, input_file):
print("Searching for licenses associated with term - " + search_term)
# Starting date for the query. Format: Date
start_date = str(datetime.strftime(datetime.now() - timedelta(int(time_range)), '%Y-%m-%d'))
# End date for the query. Format: Date
end_date = str(datetime.strftime(datetime.now(), '%Y-%m-%d'))
# The date field against which filters will be applied. Valid values: end, start
date_type = "start"
# If specified, retrieve marketing funnel attribution data. Must supply a start date after July 2018
with_attribution = "false"
# Specifies the response format. If unspecified, the 'Accept' header will be used.Valid values: csv, json
accept = "csv"
export_licenses = base_url + "licenses/export?addon=" + addon + "&text=" + search_term + "&withAttribution=" + with_attribution + "&accept=" + accept + "&status=active"
# "&startDate="+start_date+"&endDate="+end_date+"&dateType="+date_type+
print(export_licenses)
info = requests.get(export_licenses, auth=(auth_email, auth_pw))
check_status_code(info)
# Write to .CSV
filename = report_dir + 'licenses_' + input_file
f = open(filename, "a", encoding="utf-8")
split = info.text.split("\n")
line_num = 0 # track line number in single API call
for line in split:
if str(line): # checks that string returns true indicating it is not empty
if row_num is 0: # First row of search terms
# print("<<<<<<<<<HEADER>>>>>>>>>")
f.write(line)
f.write("\n")
elif index is not 0 and line_num is not 0: # Not First File and Not First Line of API Call
print("************DATA FOUND*****************")
print(line)
print("\n")
f.write(line)
f.write("\n")
track_successful_searches('licenses_' + input_file, search_term)
elif index is 0 and line_num is not 0: # First File and Not First line of API Call
print("************DATA FOUND*****************")
print(line)
f.write(line)
f.write("\n")
track_successful_searches('licenses_' + input_file, search_term)
line_num = line_num + 1
track_failed_searches('licenses_' + input_file, search_term)
f.close()
def transactions_lookup(search_term, time_range, index, row_num, input_file):
print("Searching for transactions associated with - " + search_term)
# Starting date for the query. Format: Date
start_date = str(datetime.strftime(datetime.now() - timedelta(int(time_range)), '%Y-%m-%d'))
# End date for the query. Format: Date
end_date = str(datetime.strftime(datetime.now(), '%Y-%m-%d'))
# The date field against which filters will be applied. Valid values: end, start
date_type = "start"
# Specifies the response format. If unspecified, the 'Accept' header will be used.Valid values: csv, json
accept = "csv"
export_transactions = base_url + "sales/transactions/export?addon=" + addon + "&text=" + search_term + "&accept=" + accept + "&startDate=" + start_date + "&endDate=" + end_date + "&dateType=" + date_type
# "&startDate="+start_date+"&endDate="+end_date+"&dateType="+date_type+
print(export_transactions)
info = requests.get(export_transactions, auth=(auth_email, auth_pw))
# Write to .CSV
filename = report_dir + 'transactions_' + input_file
f = open(filename, "a", encoding="utf-8")
split = info.text.split("\n")
line_num = 0 # track line number in single API call
for line in split:
if str(line): # checks that string returns true indicating it is not empty
if row_num is 0: # First row of search terms
# print("<<<<<<<<<HEADER>>>>>>>>>")
f.write(line)
f.write("\n")
elif index is not 0 and line_num is not 0: # Not First File and Not First Line of API Call
print("************DATA FOUND*****************")
print(line)
print("\n")
f.write(line)
f.write("\n")
track_successful_searches('transactions_' + input_file, search_term)
elif index is 0 and line_num is not 0: # First File and Not First line of API Call
print("************DATA FOUND*****************")
print(line)
print("\n")
f.write(line)
f.write("\n")
track_successful_searches('transactions_' + input_file, search_term)
line_num = line_num + 1
f.close()
if __name__ == '__main__':
index = 0 # track file number
for filename in search_files:
with open(filename) as csvfile:
readCSV = csv.reader(csvfile, delimiter=',')
row_num = 0 # track row number in a single file
for row in readCSV:
license_lookup(row[0], '60', index, row_num, filename)
row_num = row_num + 1
time.sleep(1)
index = index + 1
print("Licenses Look-Up Completed!" + "\n\n\n\n\n")
time.sleep(10)
index = 0
for filename in search_files:
with open(filename) as csvfile:
readCSV = csv.reader(csvfile, delimiter=',')
row_num = 0
for row in readCSV:
transactions_lookup(row[0], '30', index, row_num, filename)
row_num = row_num + 1
time.sleep(1)
index = index + 1
print("Transactions Look-Up Completed!" + "\n\n\n\n\n")