Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any example to use pipeline offline? #911

Open
taoluo opened this issue Dec 13, 2015 · 29 comments
Open

Any example to use pipeline offline? #911

taoluo opened this issue Dec 13, 2015 · 29 comments

Comments

@taoluo
Copy link

taoluo commented Dec 13, 2015

this question was originally posted on zipline's google group:
There are some posts on quantopian.com introducing pipeline under the online Algorithm environment, while I am wondering how to modify the algorithm to run pipeline offline.

can anyone provide an short example ? I really appreciate any help you could provide.

@ssanderson
Copy link
Contributor

@mickeydonkey the best place to look for examples on how the pipeline machinery works is the pipeline test suite, which lives in tests/pipeline.

There isn't a great short answer to the question of "how do I use Pipeline with real data", because the Pipeline API exists primarily for simplifying computations on large point-in-time datasets, and there aren't many such datasets freely available for public use. The most promising one that I'm aware of is Quandl's WIKI dataset, which contains a couple thousand assets and includes dividends and splits. I have a branch lying around somewhere that started building machinery for creating a zipline-compatible asset database from there.

The long answer to your question is that, to run an algorithm using the Pipeline machinery, you need need to write a function, get_loader, that takes a pipeline dataset column, (e.g. USEquityPricing.close) and returns an object implementing a method named load_adjusted_array whose signature is

def load_adjusted_array(self, columns, dates, sids, mask):

load_adjusted_array should return a dictionary mapping the entries in columns to instances of AdjustedArray containing data for the requested dates and sids (sids is a term for asset_ids in zipline for historical reasons).

If the dataset you want to use is small enough to hold in memory all at once, then you can use the built-in DataFrameLoader class from zipline.pipeline.loaders.frame for your loaders. The docstring for that class describes its functionality fairly well:

    """
    A PipelineLoader that reads its input from DataFrames.

    Mostly useful for testing, but can also be used for real work if your data
    fits in memory.

    Parameters
    ----------
    column : zipline.pipeline.data.BoundColumn
        The column whose data is loadable by this loader.
    baseline : pandas.DataFrame
        A DataFrame with index of type DatetimeIndex and columns of type
        Int64Index.  Dates should be labelled with the first date on which a
        value would be **available** to an algorithm.  This means that OHLCV
        data should generally be shifted back by a trading day before being
        supplied to this class.

    adjustments : pandas.DataFrame, default=None
        A DataFrame with the following columns:
            sid : int
            value : any
            kind : int (zipline.pipeline.loaders.frame.ADJUSTMENT_TYPES)
            start_date : datetime64 (can be NaT)
            end_date : datetime64 (must be set)
            apply_date : datetime64 (must be set)

        The default of None is interpreted as "no adjustments to the baseline".
    """

The adjustments frame is used to represent events that retroactively change our view of history. Most commonly, these are splits and dividends, which apply a backward-looking multiplier to the baseline array. If your dataset already uses "adjusted" prices/volumes, then you probably just want to pass None here.

@taoluo
Copy link
Author

taoluo commented Dec 14, 2015

@ssanderson Thanks for the pointer, I am on my way of inspecting and running tests. I think it will take me some time to fully understand your long answer.

@fredfortier
Copy link

Given this recommendation: "The most promising one that I'm aware of is Quandl's WIKI dataset, which contains a couple thousand assets and includes dividends and splits." in relation to its implementation as a data bundle in 1.0, was the intent to enable Pipeline computations in zipline? If so, my naive attempt at running a Quantopian-tested algorithm did not seem to initialize the Pipeline.

@fredfortier
Copy link

fredfortier commented May 22, 2016

I got a bit further. Guided by the zipline source code, I instantiated TradingAlgorithm() instead of calling run_algorithm(). I gave a get_pipeline_loader parameter to its constructor => get_pipeline_loader=lambda column: pipeline_loader, where pipeline_loader comes from USEquityPricingLoader.from_files(path, path).

However, this seems to only factor in securities which I explicitly referenced using symbols(), not the broader universe. It would be great if I could download complete data bundles by sector and use that in the pipeline.

@llllllllll
Copy link
Contributor

run_algorithm should setup the pipeline loader, not doing so is a bug which we will try to fix soon in a 1.0.1 release. When you say that the loader can only reference things seen in a symbols call, what do you mean. What is the shape of the result of pipeline_output?

@fredfortier
Copy link

Great! I hesitate to add much more info to this issue as I might have loaded the data incorrectly. I do not want to add noise to your issues. As soon as 1.0.1 is out, or this particular fix is committed to the master, I will try again using the natural method and give a more detailed account should I continue to experience the behavior.

@npezolano
Copy link
Contributor

Is it possible to extend DataFrameLoader to accept multiple columns or a full dataset as an input?

See:
https://github.com/quantopian/zipline/blob/master/tests/pipeline/test_frameload.py#L51-L53

@qinghanmeng
Copy link

what's the relationship between data bundle and pipeline.data ?

@marketneutral
Copy link

I've cobbled together a minimal example of running a pipeline combining pricing and a custom data source as:

from zipline.data import bundles
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.data import Column  
from zipline.pipeline.data import DataSet
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.utils.calendars import get_calendar

import numpy as np
import pandas as pd

trading_calendar = get_calendar('NYSE')
bundle_data = bundles.load('quantopian-quandl')


# Set up Custom Data Source for two sids for DataFrameLoader
class MyDataSet(DataSet):  
    column_A = Column(dtype=float)
    column_B = Column(dtype=bool) 

dates = pd.date_range('2014-01-01', '2017-01-01')
assets = bundle_data.asset_finder.lookup_symbols(['A', 'AAL'], as_of_date=None)
sids = pd.Int64Index([asset.sid for asset in assets])

# The values for Column A will just be a 2D array of numbers ranging from 1 -> N.  
column_A_frame = pd.DataFrame(  
    data=np.arange(len(dates)*len(assets), dtype=float).reshape(len(dates), len(assets)),  
    index=dates,
    columns=sids,
)

# Column B will always provide True for 0 and False for 1.  
column_B_frame = pd.DataFrame(data={sids[0]: True, sids[1]: False}, index=dates)

loaders = {  
    MyDataSet.column_A: DataFrameLoader(MyDataSet.column_A, column_A_frame),  
    MyDataSet.column_B: DataFrameLoader(MyDataSet.column_B, column_B_frame),  
}

def my_dispatcher(column):
    return loaders[column]


# Set up pipeline engine

# Loader for pricing
pipeline_loader = USEquityPricingLoader(
    bundle_data.equity_daily_bar_reader,
    bundle_data.adjustment_reader,
)

def choose_loader(column):
    if column in USEquityPricing.columns:
        return pipeline_loader
    return my_dispatcher(column)

engine = SimplePipelineEngine(
    get_loader=choose_loader,
    calendar=trading_calendar.all_sessions,
    asset_finder=bundle_data.asset_finder,
)

p = Pipeline(
    columns={
        'price': USEquityPricing.close.latest,
        'col_A': MyDataSet.column_A.latest,
        'col_B': MyDataSet.column_B.latest
    },
    screen=StaticAssets(assets)
)

df = engine.run_pipeline(
    p,
    pd.Timestamp('2016-01-05', tz='utc'),
    pd.Timestamp('2016-01-07', tz='utc')
)

print df

The pipeline recognizes MyDataSet.column_A and MyDataSet.column_B as valid, however it does not find the actual data; it returns the default values. The returned DataFrame is:

                                           col_A  col_B  price
2016-01-05 00:00:00+00:00 Equity(0 [A])      NaN  False  40.69
                          Equity(2 [AAL])    NaN  False  40.91
2016-01-06 00:00:00+00:00 Equity(0 [A])      NaN  False  40.55
                          Equity(2 [AAL])    NaN  False  40.52
2016-01-07 00:00:00+00:00 Equity(0 [A])      NaN  False  40.73
                          Equity(2 [AAL])    NaN  False  41.23

I can see that the DataFrameLoader is working properly as:

loader = my_dispatcher(MyDataSet.column_A)
adj_array = loader.load_adjusted_array(
    [MyDataSet.column_A],
    dates,
    sids,
    np.ones((len(dates), len(sids)), dtype=bool)
)
print adj_array.values()[0].inspect()

which returns

Adjusted Array (float64):

Data:
array([[  0.00000000e+00,   1.00000000e+00],
       [  2.00000000e+00,   3.00000000e+00],
       [  4.00000000e+00,   5.00000000e+00],
       ..., 
       [  2.18800000e+03,   2.18900000e+03],
       [  2.19000000e+03,   2.19100000e+03],
       [  2.19200000e+03,   2.19300000e+03]])

Adjustments:
{}

Can you give some guidance of where I am going wrong pointing the SimplePipelineEngine to this DataFrame?

@ssanderson
Copy link
Contributor

ssanderson commented Mar 26, 2018

hey @marketneutral! I think the issue you're running into here is that dates in your example contains timezone-naive dates, but your run_pipeline call is using tz-aware dates. If I drop a breakpoint in DataFrameLoader.load_adjusted_array, I see date_indexer coming back as all -1, which means we're failing to align the dates requested by the pipeline engine with the dates held by the loader.

In general, zipline represents dates as midnight of the date in question, localized to UTC. If I could wave a magic wand, I would remove the timezone localization (or, even better, use a pandas Period instead of a timestamp), but doing so would be a major backwards compatibility headache. Given the current state of things, the right fix for this is to change the construction of dates to:

dates = pd.date_range('2014-01-01', '2017-01-01', tz='UTC')

which appears to print the expected result:

$ python scratch2.py
                                              col_A  col_B  price
2016-01-05 00:00:00+00:00 Equity(111 [AAL])  1469.0  False  40.91
                          Equity(2730 [A])   1468.0   True  40.69
2016-01-06 00:00:00+00:00 Equity(111 [AAL])  1471.0  False  40.52
                          Equity(2730 [A])   1470.0   True  40.55
2016-01-07 00:00:00+00:00 Equity(111 [AAL])  1473.0  False  41.23
                          Equity(2730 [A])   1472.0   True  40.73

@calmitchell617
Copy link

Hi @marketneutral and @ssanderson, I have been using the code in this thread as a starting point to include fundamental data in a backtest algorithm. Hopefully you can help me understand the errors coming from pipeline_output() during backtesting. I have loaded data into a pipeline, and printed it out, using the following code in an iPython notebook:

engine = SimplePipelineEngine(
    get_loader=choose_loader,
    calendar=trading_calendar.all_sessions,
    asset_finder=bundle_data.asset_finder,
)

revenue_factor = SimpleMovingAverage( # arbitrary fundamental data... I used quarterly revenue
    inputs=[MyDataSet.column_A],
    window_length=3,
)

def make_pipeline(): 
    return Pipeline(
        columns={
            'price': USEquityPricing.close.latest,
            'Revenue': MyDataSet.column_A.latest,
        },
        screen=StaticAssets(assets) & revenue_factor.top(10)
    )

pipeline_output = engine.run_pipeline( # run the pipeline every day, between these dates
    make_pipeline(),
    pd.Timestamp('2016-01-05', tz='utc'),
    pd.Timestamp('2017-01-12', tz='utc')
)

print(pipeline_output.head(10))
                                                   Revenue   price
2016-01-05 00:00:00+00:00 Equity(3 [AAPL])    5.150100e+10  105.35
                          Equity(10 [ABC])    3.547038e+10   32.03
                          Equity(546 [CVS])   3.864400e+10   96.46
                          Equity(550 [CVX])   3.431500e+10   88.85
                          Equity(747 [F])     3.814400e+10   17.76
                          Equity(1938 [T])    3.909100e+10   12.38
                          Equity(2081 [UNH])  4.148900e+10  116.46
                          Equity(2152 [VZ])   3.315800e+10   33.19
                          Equity(2197 [WMT])  1.174080e+11   61.46
                          Equity(2235 [XOM])  6.734400e+10   77.46

However, the following code (in an iPython cell below the previous code), produces the following errors:

def initialize(context):
    attach_pipeline(
        make_pipeline(),
        'data_pipe'
    )
    
def before_trading_start(context, data):
    print(pipeline_output('data_pipe'))
    
def handle_data(context, data):
    order(symbol('AAPL'), 10)
    # record(AAPL=data[symbol('AAPL')].price)
    
start = pd.Timestamp('2016-01-05', tz='utc')
end = pd.Timestamp('2016-01-09', tz='utc')

run_algorithm(
    before_trading_start=before_trading_start, 
    start = start, 
    end=end, 
    initialize=initialize, 
    capital_base=10000, 
    handle_data=handle_data)

Error 1

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/algorithm.py in _pipeline_output(self, pipeline, chunks, name)
   2486         try:
-> 2487             data = self._pipeline_cache.get(name, today)
   2488         except KeyError:

~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/utils/cache.py in get(self, key, dt)
    152         try:
--> 153             return self._cache[key].unwrap(dt)
    154         except Expired:

KeyError: 'data_pipe'

Error 2

During handling of the above exception, another exception occurred:
# lots of error tracing code... Maybe I should post it, maybe not?

~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/utils/run_algo.py in choose_loader(column)
    170                 return pipeline_loader
    171             raise ValueError(
--> 172                 "No PipelineLoader registered for column %s." % column
    173             )
    174     else:

ValueError: No PipelineLoader registered for column MyDataSet.column_A::float64.

It seems that when you use the run_pipeline() command, you can specify loaders, but when using pipeline_output() in before_trading_start(), you need to do something differently.

Can you please help me understand how to overcome these errors?

@ssanderson
Copy link
Contributor

@calmitchell617 the error you're getting there is happening because your algorithm doesn't know anything about your MyDataSet class.

Internally, run_algorithm constructs and runs an instance of TradingAlgorithm, which in turn constructs an instance of the SimplePipelineEngine class that you're building in your first example. For historical reasons, TradingAlgorithm doesn't take the full engine as a parameter, it only takes the loader-dispatching function (choose_loader, in your example above), but currently run_algorithm always supplies a hard-coded dispatcher that for USEquityPricing:

        pipeline_loader = USEquityPricingLoader(
            bundle_data.equity_daily_bar_reader,
            bundle_data.adjustment_reader,
        )

        def choose_loader(column):
            if column in USEquityPricing.columns:
                return pipeline_loader
            raise ValueError(
                "No PipelineLoader registered for column %s." % column
            )

(source)

The easiest short-term fix for this is probably to add a new optional get_pipeline_loader parameter to run_algorithm, which would be used to supply your own loader dispatcher instead of the default one.

In the medium term, it'd be nice to allow people to register their own pipeline dispatch functions as part of the Zipline extension machinery. @llllllllll might have thoughts about what an API for that would look like.

@calmitchell617
Copy link

Thank you Scott, that worked.

For anyone wondering what I did, here is a brief summary of the changes I made to load external data, using the run_algorithm() command, in an iPython notebook:

In zipline/utils/run_algo.py, I changed run_algorithm() to include an additional parameter my_loader:

def run_algorithm(start,
                  end,
                  initialize,
                  capital_base,
                  handle_data=None,
                  before_trading_start=None,
                  analyze=None,
                  data_frequency='daily',
                  data=None,
                  bundle=None,
                  bundle_timestamp=None,
                  trading_calendar=None,
                  metrics_set='default',
                  default_extension=True,
                  extensions=(),
                  strict_extensions=True,
                  environ=os.environ,
                  my_loader=None):

I also changed the return _run() statement at the end of the run_algorithm() function to include that same parameter:

return _run(
        my_loader,
        handle_data=handle_data,
        initialize=initialize,
        before_trading_start=before_trading_start,
        analyze=analyze,
        algofile=None,
        algotext=None,
        defines=(),
        data_frequency=data_frequency,
        capital_base=capital_base,
        data=data,
        bundle=bundle,
        bundle_timestamp=bundle_timestamp,
        start=start,
        end=end,
        output=os.devnull,
        trading_calendar=trading_calendar,
        print_algo=False,
        metrics_set=metrics_set,
        local_namespace=False,
        environ=environ,
    )

I then changed the _run() function to include the parameter:

def _run(my_loader,
         handle_data,
         initialize,
         before_trading_start,
         analyze,
         algofile,
         algotext,
         defines,
         data_frequency,
         capital_base,
         data,
         bundle,
         bundle_timestamp,
         start,
         end,
         output,
         trading_calendar,
         print_algo,
         metrics_set,
         local_namespace,
         environ):

Lastly, I changed the nested choose_loader() function like so:

def choose_loader(column):
    if column in USEquityPricing.columns:
        return pipeline_loader
    return my_loader

As @ssanderson said, this is a short term solution, and only allows you to load one column of external data, but it could definitely be expanded upon.

Here is the part of the iPython notebook where I call run_algorithm(), and the subsequent output:

run_algorithm(
    before_trading_start=before_trading_start, 
    start = start, 
    end=end, 
    initialize=initialize, 
    capital_base=10000, 
    handle_data=handle_data,
    my_loader=DataFrameLoader(MyDataSet.column_A, column_A_frame))
                         Revenue  price
Equity(3 [AAMC])     5.150100e+10  20.01
Equity(10 [AAWW])    3.547038e+10  40.22
Equity(333 [BDE])    2.805500e+10   4.37
Equity(546 [CFX])    3.864400e+10  23.74
Equity(550 [CHD])    3.431500e+10  83.51
Equity(747 [CUTR])   3.814400e+10  12.25
Equity(1938 [NAT])   3.909100e+10  15.14
Equity(2152 [PATK])  3.315800e+10  41.65
Equity(2197 [PERY])  1.174080e+11  18.18
Equity(2235 [PKOH])  6.734400e+10  34.67

@ssanderson
Copy link
Contributor

@calmitchell617 glad that worked for you. Would you be interested in putting together a PR to update run_algorithm to support custom pipeline dispatchers?

@calmitchell617
Copy link

I would be happy to give it a shot, will follow up early next week.

@peterfabakker
Copy link

peterfabakker commented May 31, 2018

would be great, Cal. Your code helped me as well. If you need to generate earnings data, I made a script that does that and creates files for every bundle you registered. Not production ready, but useful if you want to test with dates. https://github.com/peterfabakker/zipline-utils/blob/master/getearnings.py

@marketneutral
Copy link

Hey @ssanderson, I've been trying to create a minimal pipeline example to get data via the blaze loader, similar to the minimal DataFrameLoader example above. I've gotten this far:

import blaze as bz
import numpy as np
import pandas as pd
import sqlite3
import itertools

from zipline.data import bundles
from zipline.utils.calendars import get_calendar

from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.data import DataSet
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.loaders.blaze import BlazeLoader, from_blaze

trading_calendar = get_calendar('NYSE')
bundle_data = bundles.load('quandl')

# spoof some data ---------------------------

np.random.seed(100)

start = trading_calendar.closes.index.get_loc('2016-01-04 00:00:00+00:00')
end = trading_calendar.closes.index.get_loc('2018-08-06 00:00:00+00:00')

dates = trading_calendar.closes[start:end]
sids = bundle_data.asset_finder.sids

df = pd.DataFrame(
    data={'value': np.random.random(size=len(dates)*len(sids))},
    index = pd.MultiIndex.from_tuples(list(itertools.product(dates,sids)), names=('asof_date', 'sid'))
)

df.to_sql('my_ds_table', sqlite3.connect('temp.db'), if_exists='replace', index=False)


# create the blaze expr and DataSet ------------------------

expr = bz.data(
    'sqlite:///temp.db::my_ds_table',
    dshape='var * {value: float64, sid: int64, asof_date: datetime}'
)

# create and empty BlazeLoader
my_blaze_loader = BlazeLoader()

# create the DataSet
ds = from_blaze(
    expr,
    no_deltas_rule='ignore',
    no_checkpoints_rule='ignore',
    loader=my_blaze_loader
)

The test I can see from your tests is issubclass(ds, DataSet) and this does assert to True.

my_blaze_loader looks like

<BlazeLoader: {<DataSet: 'BlazeDataSet_0'>: ExprData(expr="Merge(_child=_1, children=(_1, label(_1.asof_date, 'timestamp')))", deltas='None', checkpoints='None', odo_kwargs={}, apply_deltas_adjustments=True)}>

And now on to get the engine going

pipeline_loader = USEquityPricingLoader(
    bundle_data.equity_daily_bar_reader,
    bundle_data.adjustment_reader,
)

def choose_loader(column):
    if column in USEquityPricing.columns:
        return pipeline_loader
    else:
        return my_blaze_loader

engine = SimplePipelineEngine(
    get_loader=choose_loader,
    calendar=trading_calendar.all_sessions,
    asset_finder=bundle_data.asset_finder,
)

and on to running the pipline

assets = bundle_data.asset_finder.lookup_symbols(['A', 'AAL'], as_of_date=None)

p = Pipeline(
    columns={
        'price': USEquityPricing.close.latest,
        'col_A': ds.value.latest,
    },
    screen=StaticAssets(assets)
)

df = engine.run_pipeline(
    p,
    pd.Timestamp('2016-01-05', tz='utc'),
    pd.Timestamp('2016-01-07', tz='utc')
)

This returns a KeyError looking for 'asof_date':

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-18-4175221aaafb> in <module>()
      2     p,
      3     pd.Timestamp('2016-01-05', tz='utc'),
----> 4     pd.Timestamp('2016-01-07', tz='utc')
      5 )

/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/engine.pyc in run_pipeline(self, pipeline, start_date, end_date)
    309             dates,
    310             assets,
--> 311             initial_workspace,
    312         )
    313 

/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/engine.pyc in compute_chunk(self, graph, dates, assets, initial_workspace)
    495                 loader = get_loader(term)
    496                 loaded = loader.load_adjusted_array(
--> 497                     to_load, mask_dates, assets, mask,
    498                 )
    499                 workspace.update(loaded)

/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/loaders/blaze/core.pyc in load_adjusted_array(self, columns, dates, assets, mask)
    998             self.pool.imap_unordered(
    999                 partial(self._load_dataset, dates, assets, mask),
-> 1000                 itervalues(groupby(getdataset, columns)),
   1001             ),
   1002         )

/anaconda3/envs/py27/lib/python2.7/site-packages/toolz/dicttoolz.pyc in merge(*dicts, **kwargs)
     36 
     37     rv = factory()
---> 38     for d in dicts:
     39         rv.update(d)
     40     return rv

/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/loaders/blaze/core.pyc in _load_dataset(self, dates, assets, mask, columns)
   1070                 (
   1071                     materialized_checkpoints,
-> 1072                     materialized_expr.get(),
   1073                 ),
   1074                 ignore_index=True,

/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/utils/pool.pyc in get(self)
     30         """
     31         if not self._successful:
---> 32             raise self._value
     33         return self._value
     34 

KeyError: 'asof_date'

If you've gotten this far, thank you 😃 ... I feel like I am close here! Thanks in advance for any pointers.

@ssanderson
Copy link
Contributor

hey @marketneutral. I won't have time to write up a full reply today, but take a look at the module docs for zipline.pipeline.loaders.blaze.core. It's a pretty comprehensive rundown of the data format expected by the blaze loader.

@ssanderson
Copy link
Contributor

@marketneutral are you sure those are running on the same database? If I run your script and look at the schema on the sqlite CLI, I see:

sqlite> .schema
CREATE TABLE "my_ds_table" (
"value" REAL
);

which is what i would expect given that you're calling df.to_sql with index=False.

@marketneutral
Copy link

Yes, thank you. That was genesis of the error. Now I df.reset_index() prior to calling df.to_sql. I will post the cleaned up and working example tomorrow.

@marketneutral
Copy link

Hey @ssanderson This is my complete working minimal example:

https://github.com/marketneutral/research-tools/blob/master/pipeline-blaze-minimal.ipynb

@ssanderson
Copy link
Contributor

ssanderson commented Aug 10, 2018

Works now!

Awesome! One thing to be mindful of if you're using the blaze loader is that the expected semantics for asof_date might not always be what you expect. In particular, we conservatively assume that a record with an asof_date of day N may contain information that wouldn't have been available until the end of that date (e.g., a daily close price). Since pipelines semantically execute at the start of the trading day, this means that data with an asof_date of day N gets labelled with day N + 1 in pipeline outputs (because that's the morning of the first day on which we would have had access to the data observation). The simplest case of this is something like USEquityPricing.close.latest which, for day N, will contain the previous day's close.

In our experience, these semantics are generally what you want for things like pricing and fundamental data, but they can be "off by one" from what you might expect for other datasets, so it's important to be mindful of date-labelling conventions when you're setting up your data.

@RaymondMcT
Copy link

Hi @marketneutral thank you for putting together this minimal example, it is very helpful. I'm having some trouble running it though. Do you have a dependency list that you know this works with?

@marketneutral
Copy link

@RaymondMcT I've spent a little (very little) time refactoring this into a Python package with a proper setup.py which would include dependencies. It's a WIP. Will let you know...

@RaymondMcT
Copy link

I hear ya, If I get it going I'll report back with a list of working dependencies.

@marketneutral
Copy link

hey @RaymondMcT, try this https://github.com/marketneutral/alphatools and please lmk.

@marketneutral
Copy link

@RaymondMcT can you file an issue here so we don't pollute this thread with non-valued added things for the fine Quantopian folks?

@cemal95
Copy link

cemal95 commented Mar 14, 2020

@marketneutral Hi, I have run your code, it works. But when I change the dates, I get the following error:

Traceback (most recent call last):
File "/Users/cemalarican/Desktop/THESIS/PIPE.py", line 82, in
pd.Timestamp('2016-01-01', tz='utc')
File "/opt/anaconda3/envs/zipline/lib/python3.5/site-packages/zipline/pipeline/engine.py", line 293, in run_pipeline
root_mask = self._compute_root_mask(start_date, end_date, extra_rows)
File "/opt/anaconda3/envs/zipline/lib/python3.5/site-packages/zipline/pipeline/engine.py", line 384, in _compute_root_mask
'The first date of the lifetimes matrix does not match the'
ValueError: The first date of the lifetimes matrix does not match the start date of the pipeline. Did you forget to align the start_date to the trading calendar?

@marketneutral
Copy link

ValueError: The first date of the lifetimes matrix does not match the start date of the pipeline. Did you forget to align the start_date to the trading calendar?

This means your date does not exist in the history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests