Skip to content

Commit

Permalink
option to use threads when downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
ranaroussi committed Jun 11, 2017
1 parent 7596773 commit aec8ff6
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 54 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
@@ -1,6 +1,10 @@
Change Log
===========

0.0.9
-------
- Add ``threads`` parameter to ``download()`` (# of threads to use)

0.0.8
-------
- Removed 5 second wait for every failed fetch
Expand Down
7 changes: 6 additions & 1 deletion README.rst
Expand Up @@ -86,7 +86,10 @@ Below is the full list of acceptable parameters:
# adjust all OHLC automatically
# (optional, default is False)
auto_adjust = True
auto_adjust = True,
# How may threads to use?
threads = 10
)
Expand Down Expand Up @@ -115,6 +118,8 @@ Requirements
* `Pandas <https://github.com/pydata/pandas>`_ (tested to work with >=0.18.1)
* `Numpy <http://www.numpy.org>`_ >= 1.11.1
* `requests <http://docs.python-requests.org/en/master/>`_ >= 2.14.2
* `multitasking <https://github.com/ranaroussi/multitasking>`_ >= 0.0.3


Optional (if you want to use ``pandas_datareader``)
---------------------------------------------------
Expand Down
159 changes: 108 additions & 51 deletions fix_yahoo_finance/__init__.py
Expand Up @@ -18,7 +18,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.0.8"
__version__ = "0.0.9"
__author__ = "Ran Aroussi"
__all__ = ['download', 'get_yahoo_crumb', 'parse_ticker_csv']

Expand All @@ -31,7 +31,7 @@
import re
import warnings
import sys

import multitasking

_YAHOO_COOKIE_ = ''
_YAHOO_CRUMB_ = ''
Expand Down Expand Up @@ -65,7 +65,7 @@ def get_yahoo_crumb(force=False):


def parse_ticker_csv(csv_str, auto_adjust):
df = pd.read_csv(csv_str, index_col=0, error_bad_lines=False
df = pd.read_csv(csv_str, index_col=0, error_bad_lines=False, sep=None
).replace('null', np.nan).dropna()

df.index = pd.to_datetime(df.index)
Expand All @@ -92,9 +92,24 @@ def parse_ticker_csv(csv_str, auto_adjust):
return df


_DFS_ = {}
_COMPLETED_ = 0
_PROGRESS_BAR_ = False
_FAILED_ = []


def make_chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]


def download(tickers, start=None, end=None, as_panel=True,
group_by='column', auto_adjust=False, progress=True,
*args, **kwargs):
threads=1, *args, **kwargs):

global _DFS_, _COMPLETED_, _PROGRESS_BAR_, _FAILED_
_COMPLETED_ = 0

# format start
if start is None:
Expand All @@ -115,24 +130,93 @@ def download(tickers, start=None, end=None, as_panel=True,
# iterval
interval = kwargs["interval"] if "interval" in kwargs else "1d"

# url template
url_str = "https://query1.finance.yahoo.com/v7/finance/download/%s"
url_str += "?period1=%s&period2=%s&interval=%s&events=history&crumb=%s"

# dataframe collector
dfs = {}

# create ticker list
tickers = tickers if isinstance(tickers, list) else [tickers]
tickers = [x.upper() for x in tickers]

# initiate progress bar
if progress:
pbar = ProgressBar(len(tickers), 'downloaded')
_PROGRESS_BAR_ = ProgressBar(len(tickers), 'downloaded')

# download using single thread
if threads is None or threads < 2:
download_chunk(tickers, start=start, end=end, as_panel=as_panel,
group_by=group_by, auto_adjust=auto_adjust, progress=progress,
interval=interval, *args, **kwargs)
# threaded download
else:
threads = min([threads, len(tickers)])

# download in chunks
chunks = 0
for chunk in make_chunks(tickers, max([1, len(tickers) // threads])):
chunks += len(chunk)
download_thread(chunk, start=start, end=end, as_panel=as_panel,
group_by=group_by, auto_adjust=auto_adjust, progress=progress,
interval=interval, *args, **kwargs)
if len(tickers[-chunks:]) > 0:
download_thread(tickers[-chunks:], start=start, end=end, as_panel=as_panel,
group_by=group_by, auto_adjust=auto_adjust, progress=progress,
interval=interval, *args, **kwargs)

# wait for completion
while _COMPLETED_ < len(tickers):
time.sleep(0.1)

# create panel (derecated)
if as_panel:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
data = pd.Panel(_DFS_)
if group_by == 'column':
data = data.swapaxes(0, 2)

# create multiIndex df
else:
data = pd.concat(_DFS_.values(), axis=1, keys=_DFS_.keys())
if group_by == 'column':
data.columns = data.columns.swaplevel(0, 1)
data.sort_index(level=0, axis=1, inplace=True)
if auto_adjust:
data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
else:
data = data[['Open', 'High', 'Low',
'Close', 'Adj Close', 'Volume']]

# return single df if only one ticker
if len(tickers) == 1:
data = _DFS_[tickers[0]]

if len(_FAILED_) > 0:
print("\nThe following tickers failed to download:\n",
', '.join(_FAILED_))

return data


@multitasking.task
def download_thread(tickers, start=None, end=None, as_panel=True,
group_by='column', auto_adjust=False, progress=True,
*args, **kwargs):
download_chunk(tickers, start, end, as_panel,
group_by, auto_adjust, progress,
*args, **kwargs)


def download_chunk(tickers, start=None, end=None, as_panel=True,
group_by='column', auto_adjust=False, progress=True,
*args, **kwargs):

global _DFS_, _COMPLETED_, _PROGRESS_BAR_, _FAILED_

interval = kwargs["interval"] if "interval" in kwargs else "1d"

# url template
url_str = "https://query1.finance.yahoo.com/v7/finance/download/%s"
url_str += "?period1=%s&period2=%s&interval=%s&events=history&crumb=%s"

# failed tickers collectors
round1_failed_tickers = []
round2_failed_tickers = []

# start downloading
for ticker in tickers:
Expand All @@ -144,9 +228,9 @@ def download(tickers, start=None, end=None, as_panel=True,
try:
url = url_str % (ticker, start, end, interval, crumb)
hist = io.StringIO(requests.get(url, cookies={'B': cookie}).text)
dfs[ticker] = parse_ticker_csv(hist, auto_adjust)
_DFS_[ticker] = parse_ticker_csv(hist, auto_adjust)
if progress:
pbar.animate()
_PROGRESS_BAR_.animate()
except:
# something went wrong...
# try one more time using a new cookie/crumb
Expand All @@ -157,58 +241,31 @@ def download(tickers, start=None, end=None, as_panel=True,
url = url_str % (ticker, start, end, interval, crumb)
src = requests.get(url, cookies={'B': cookie})
hist = io.StringIO(src.text)
dfs[ticker] = parse_ticker_csv(hist, auto_adjust)
_DFS_[ticker] = parse_ticker_csv(hist, auto_adjust)
if progress:
pbar.animate()
_PROGRESS_BAR_.animate()
except:
round1_failed_tickers.append(ticker)
time.sleep(0.000001)

# try failed items again before giving up
_COMPLETED_ += len(tickers) - len(round1_failed_tickers)

if len(round1_failed_tickers) > 0:
crumb, cookie = get_yahoo_crumb(force=True)
for ticker in round1_failed_tickers:
try:
url = url_str % (ticker, start, end, interval, crumb)
src = requests.get(url, cookies={'B': cookie})
hist = io.StringIO(src.text)
dfs[ticker] = parse_ticker_csv(hist, auto_adjust)
_DFS_[ticker] = parse_ticker_csv(hist, auto_adjust)
if progress:
pbar.animate()
_PROGRESS_BAR_.animate()
except:
round2_failed_tickers.append(ticker)
_FAILED_.append(ticker)
pass
time.sleep(0.000001)

if len(round2_failed_tickers) > 0:
print("\nThe following tickers failed to download:\n",
', '.join(round2_failed_tickers))

# create pandl (derecated)
if as_panel:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
data = pd.Panel(dfs)
if group_by == 'column':
data = data.swapaxes(0, 2)

# create multiIndex df
else:
data = pd.concat(dfs.values(), axis=1, keys=dfs.keys())
if group_by == 'column':
data.columns = data.columns.swaplevel(0, 1)
data.sort_index(level=0, axis=1, inplace=True)
if auto_adjust:
data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
else:
data = data[['Open', 'High', 'Low',
'Close', 'Adj Close', 'Volume']]

# return single df if only one ticker
if len(tickers) == 1:
data = dfs[tickers[0]]

return data
_COMPLETED_ += 1


class ProgressBar:
Expand Down Expand Up @@ -258,4 +315,4 @@ def __str__(self):
import pandas_datareader
pandas_datareader.data.get_data_yahoo = download
except:
pass
pass
3 changes: 2 additions & 1 deletion requirements.txt
@@ -1,3 +1,4 @@
pandas>=0.18.1
numpy>=1.11.1
requests>=2.14.2
requests>=2.14.2
multitasking>=0.0.3
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -44,7 +44,7 @@
platforms = ['any'],
keywords='pandas, yahoo finance, pandas datareader',
packages=find_packages(exclude=['contrib', 'docs', 'tests', 'examples']),
install_requires=['pandas', 'numpy', 'requests'],
install_requires=['pandas', 'numpy', 'requests', 'multitasking'],
entry_points={
'console_scripts': [
'sample=sample:main',
Expand Down

0 comments on commit aec8ff6

Please sign in to comment.