/
screener.py
297 lines (227 loc) · 9.59 KB
/
screener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from finviz.helper_functions.request_functions import Connector, http_request_get
from finviz.helper_functions.error_handling import NoResults, InvalidTableType
from finviz.helper_functions.save_data import export_to_db, export_to_csv
from finviz.helper_functions.display_functions import create_table_string
from urllib.parse import urlencode, urlparse, parse_qs as urlparse_qs
import finviz.helper_functions.scraper_functions as scrape
TABLE_TYPES = {
'Overview': '111',
'Valuation': '121',
'Ownership': '131',
'Performance': '141',
'Custom': '152',
'Financial': '161',
'Technical': '171'
}
class Screener(object):
""" Used to download data from http://www.finviz.com/screener.ashx. """
@classmethod
def init_from_url(cls, url, rows=None):
"""
Initilizes from url
:param url: screener url
:type url: string
:param rows: total number of rows to get
:type rows: int
"""
splitted_url = urlparse(url)
splitted_query = urlparse_qs(splitted_url.query)
tickers = None
if 't' in splitted_query:
tickers = splitted_query['t'][0].split(',')
filters = None
if 'f' in splitted_query:
filters = splitted_query['f'][0].split(',')
table = None
if 'v' in splitted_query:
table_numbers_types = {v: k for k, v in TABLE_TYPES.items()}
table_number_string = splitted_query['v'][0][0:3]
try:
table = table_numbers_types[table_number_string]
except KeyError:
raise InvalidTableType(splitted_query['v'][0])
else:
table = 'Overview'
custom = None
if 'c' in splitted_query:
custom = splitted_query['c'][0].split(',')
order = ''
if 'o' in splitted_query:
order = splitted_query['o'][0]
signal = ''
if 's' in splitted_query:
order = splitted_query['s'][0]
return cls(tickers, filters, rows, order, signal, table, custom)
def __init__(self, tickers=None, filters=None, rows=None, order='', signal='', table=None, custom=None):
"""
Initilizes all variables to its values
:param tickers: collection of ticker strings eg.: ['AAPL', 'AMD', 'WMT']
:type tickers: list
:param filters: collection of filters strings eg.: ['exch_nasd', 'idx_sp500', 'fa_div_none']
:type filters: list
:param rows: total number of rows to get
:type rows: int
:param order: table order eg.: '-price' (to sort table by descending price)
:type order: str
:param signal: show by signal eg.: 'n_majornews' (for stocks with major news)
:type signal: str
:param table: table type eg.: 'Performance'
:type table: str
:param custom: collection of custom columns eg.: ['1', '21', '23', '45']
:type custom: list
:var self.data: list of dictionaries containing row data
:type self.data: list
"""
if tickers is None:
self._tickers = []
else:
self._tickers = tickers
if filters is None:
self._filters = []
else:
self._filters = filters
if table is None:
self._table = 'Overview'
else:
self._table = self.__check_table(table)
if custom is None:
self._custom = []
else:
self._table = '152'
self._custom = custom
if '0' not in self._custom: # 0 (No.) is required for the sequence algorithm to work
self._custom = ['0'] + self._custom
self._rows = rows
self._order = order
self._signal = signal
self.data = self.__search_screener()
def __call__(self, tickers=None, filters=None, rows=None, order='', signal='', table=None, custom=None):
"""
Adds more filters to the screener. Example usage:
stock_list = Screener(filters=['cap_large']) # All the stocks with large market cap
# After analyzing you decide you want to see which of the stocks have high dividend yield
# and show their performance:
stock_list(filters=['fa_div_high'], table='Performance')
# Shows performance of stocks with large market cap and high dividend yield
"""
if tickers:
[self._tickers.append(item) for item in tickers]
if filters:
[self._filters.append(item) for item in filters]
if table:
self._table = self.__check_table(table)
if order:
self._order = order
if signal:
self._signal = signal
if rows:
self._rows = rows
if custom:
self._custom = custom
self.data = self.__search_screener()
add = __call__
def __str__(self):
""" Returns a readable representation of a table. """
table_list = [self.headers]
for row in self.data:
table_list.append([row[col] or '' for col in self.headers])
return create_table_string(table_list)
def __repr__(self):
""" Returns a string representation of the parameter's values. """
values = f'tickers: {tuple(self._tickers)}\n' \
f'filters: {tuple(self._filters)}\n' \
f'rows: {self._rows}\n' \
f'order: {self._order}\n' \
f'signal: {self._signal}\n' \
f'table: {self._table}\n' \
f'table: {self._custom}'
return values
def __len__(self):
""" Returns an int with the number of total rows. """
return int(self._rows)
def __getitem__(self, position):
""" Returns a dictionary containting specific row data. """
return self.data[position]
get = __getitem__
def to_sqlite(self, filename):
""" Exports the generated table into a SQLite database.
:param filename: SQLite database file path
:type filename: str
"""
export_to_db(self.headers, self.data, filename)
def to_csv(self, filename=None):
""" Exports the generated table into a CSV file.
Returns a CSV string if filename is None.
:param filename: CSV file path
:type filename: str
"""
return export_to_csv(self.headers, self.data, filename)
def get_charts(self, period='d', size='l', chart_type='c', ta='1'):
"""
Downloads the charts of all tickers shown by the table.
:param period: table period eg. : 'd', 'w' or 'm' for daily, weekly and monthly periods
:type period: str
:param size: table size eg.: 'l' for large or 's' for small - choose large for better quality but higher size
:type size: str
:param chart_type: chart type: 'c' for candles or 'l' for lines
:type chart_type: str
:param ta: technical analysis eg.: '1' to show ta '0' to hide ta
:type ta: str
"""
payload = {
'ty': chart_type,
'ta': ta,
'p': period,
's': size
}
base_url = 'https://finviz.com/chart.ashx?' + urlencode(payload)
chart_urls = []
for row in self.data:
chart_urls.append(base_url + f"&t={row.get('Ticker')}")
async_connector = Connector(scrape.download_chart_image, chart_urls)
async_connector.run_connector()
def __check_rows(self):
"""
Checks if the user input for row number is correct.
Otherwise, modifies the number or raises NoResults error.
"""
self._total_rows = scrape.get_total_rows(self._page_content)
if self._total_rows == 0:
raise NoResults(self._url.split('?')[1])
elif self._rows is None or self._rows > self._total_rows:
return self._total_rows
else:
return self._rows
def __check_table(self, input_table):
""" Checks if the user input for table type is correct. Otherwise, raises an InvalidTableType error. """
try:
table = TABLE_TYPES[input_table]
return table
except KeyError:
raise InvalidTableType(input_table)
def __get_table_headers(self):
""" Private function used to return table headers. """
return self._page_content.cssselect('tr[valign="middle"]')[0].xpath('td//text()')
def __search_screener(self):
""" Private function used to return data from the FinViz screener. """
self._page_content, self._url = http_request_get('https://finviz.com/screener.ashx', payload={
'v': self._table,
't': ','.join(self._tickers),
'f': ','.join(self._filters),
'o': self._order,
's': self._signal,
'c': ','.join(self._custom)
})
self._rows = self.__check_rows()
self.headers = self.__get_table_headers()
page_urls = scrape.get_page_urls(self._page_content, self._rows, self._url)
async_connector = Connector(scrape.get_table,
page_urls,
self.headers,
self._rows)
pages_data = async_connector.run_connector()
data = []
for page in pages_data:
for row in page:
data.append(row)
return data