In [46]:
import boto3
import pandas as pd
import io
import gzip
from fbprophet import Prophet
s3 = boto3.resource('s3')
bucket = 'twde-datalab'
train_key = 'raw/train.csv'
test_key = 'raw/test.csv'
items_key = 'raw/items.csv'
def get_df(key):
    obj = s3.Object(bucket,key)
    data = obj.get()['Body'].read()
    df = pd.read_csv(io.BytesIO(data), encoding='utf8')
    print(df.shape)
    return df
def get_testdf(key):
    obj = s3.Object(bucket,key)
    data = obj.get()['Body'].read()
    df = pd.read_csv(io.BytesIO(data), parse_dates=['date'], encoding='utf8')
    print(df.shape)
    return df
def get_traindf(key):
    obj = s3.Object(bucket,key)
    data = obj.get()['Body'].read()
    traindf = pd.read_csv(io.BytesIO(data), parse_dates=['date'],
                        encoding='utf8',
                    skiprows=range(1, 86672217) #Skip dates before 2016-08-01
                    )
    return traindf
def get_test_good(train, test):
    train_last_date = train.groupby(['item_nbr', 'store_nbr'])['date'].max().reset_index()
    train_last_date.rename(columns={'date':'last_date'}, inplace=True)
    train_good_item_store = train_last_date[train_last_date['last_date']>=pd.to_datetime('2016-08-01')][['item_nbr', 'store_nbr', 'last_date']]
    test_item_store = test.groupby(['item_nbr', 'store_nbr'])['date'].size().reset_index()[['item_nbr', 'store_nbr']]
    test_item_store_join_good = pd.merge(test_item_store, train_good_item_store, on=['item_nbr', 'store_nbr'], how='left')
    test_item_store_good = test_item_store_join_good[test_item_store_join_good['last_date'].notnull()][['item_nbr', 'store_nbr']]
    test_good=pd.merge(test_item_store_good, test, on=['item_nbr', 'store_nbr'], how='left')
    return test_good
def fill_missing_date(df, total_dates):
    idx = df.iloc[-1:,0].values[0]
    for d in set(total_dates)-set(df['date'].unique()): 
        idx+=1
        df.loc[idx, ['date', 'item_nbr', 'store_nbr']]= [pd.to_datetime(d), int(df.iloc[0]['item_nbr']), int(df.iloc[0]['store_nbr'])]
    return df
def get_predictions_for_test_good(test_good, train):
    total_dates = train['date'].unique()
    result = pd.DataFrame(columns=['id', 'unit_sales'])
    problem_pairs = []
    for name, y in test_good.groupby(['item_nbr', 'store_nbr']):
        item_nbr=name[0]
        store_nbr = name[1]
        df = train[(train.item_nbr==item_nbr)&(train.store_nbr==store_nbr)]
        print("item_nbr :",item_nbr,"store_nbr :", store_nbr, "df :", df.shape, df['date'].max())
        CV_SIZE = 16 #if you make it bigger, fill missing dates in cv with 0 if any
        TRAIN_SIZE = 365
        total_dates = train['date'].unique()
        df = fill_missing_date(df, total_dates)
        df = df.sort_values(by=['date'])
        X = df[-TRAIN_SIZE:]
        print('Train on: {}'.format(X.shape))
        X = X[['date','unit_sales']]
        X.columns = ['ds', 'y']
        m = Prophet(yearly_seasonality=True)
        try: 
            m.fit(X)
        except ValueError:
            print("problem for this item store pair")
            problem_pairs.append((item_nbr, store_nbr))
            continue           
        future = m.make_future_dataframe(periods=CV_SIZE)
        pred = m.predict(future)
        data = pred[['ds','yhat']].tail(CV_SIZE)
        data = pred[['ds','yhat']].merge(y, left_on='ds', right_on='date')
        data['unit_sales'] = data['yhat'].fillna(0).clip(0, 999999)
        result = result.append(data[['id', 'unit_sales']])
        print("result", result.shape)
    return (result, problem_pairs)
def get_full_predictions_for_test_good(test_good, train):
    total_dates = train['date'].unique()
    result = pd.DataFrame(columns=['id', 'unit_sales'])
    problem_pairs = []
    for name, y in test_good.groupby(['item_nbr', 'store_nbr']):
        item_nbr=name[0]
        store_nbr = name[1]
        df = train[(train.item_nbr==item_nbr)&(train.store_nbr==store_nbr)]
        CV_SIZE = 16 #if you make it bigger, fill missing dates in cv with 0 if any
        TRAIN_SIZE = 365
        total_dates = train['date'].unique()
        df = fill_missing_date(df, total_dates)
        df = df.sort_values(by=['date'])
        X = df[-TRAIN_SIZE:]
        X = X[['date','unit_sales']]
        X.columns = ['ds', 'y']
        m = Prophet(yearly_seasonality=True)
        try: 
            m.fit(X)
        except ValueError:
            print("problem for this item store pair", item_nbr)
            problem_pairs.append((item_nbr, store_nbr))
            continue           
        future = m.make_future_dataframe(periods=CV_SIZE)
        pred = m.predict(future)
        data = pred.tail(CV_SIZE)
        data = pred.merge(y, left_on='ds', right_on='date')
        data['unit_sales'] = data['yhat'].fillna(0).clip(0, 999999)
        result = result.append(data.loc[:, data.columns != 'ds'])
        print("result", result.shape)
    return (result, problem_pairs)

def save_s3(df, key):
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)

    csv_buffer.seek(0)
    gz_buffer = io.BytesIO()

    with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
        gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8'))

    s3_object = s3.Object(bucket, key)
    s3_object.put(Body=gz_buffer.getvalue())
def run_forecast(train, test_good, test_good_item_store, key_part, begin_i, end_i=None):
    """
    it takes 1000 (item, store) pairs as one unit
    For example, if test_good_item_store.shape[0] is 162720, then you can choose begin_i from 1 to 163 and 
    end_i greater than begin_i while leaving end_i=None for the last begin_i=163 or in case you want to run for all the rest
    For example, you can distribute the computation among several ec2 instances
    """
    for i in range(begin_i, end_i):
        name = "test_good_item_store"+str(i)
        test_good_item_store_part = test_good_item_store[(i-1)*1000:i*1000]
        if end_i==None:
            test_good_item_store_part = test_good_item_store[(i-1)*1000:]
        test_good_part = pd.merge(test_good, test_good_item_store_part, on=['item_nbr', 'store_nbr'], how='inner')
        test_result_part, problem_pairs_part = get_full_predictions_for_test_good(test_good_part, train)
        print(i, test_result_part.shape)
        save_s3(test_result_part, key_part +str(i)+'.csv.gz')
def get_gzdf(key):
    obj = s3.Object(bucket,key)
    data = obj.get()['Body'].read()
    df = pd.read_csv(io.BytesIO(data), compression='gzip')
    print(df.shape)
    return df

def get_zipdf(key):
    obj = s3.Object(bucket,key)
    data = obj.get()['Body'].read()
    df = pd.read_csv(io.BytesIO(data), compression='zip')
    print(df.shape)
    return df

In [33]:
train = get_traindf(train_key)
train.loc[train['unit_sales']<0, 'unit_sales']=0
test = get_testdf(test_key)
items = get_df(items_key)
test_good = get_test_good(train, test)
test_good_item_store = test_good.groupby(['item_nbr', 'store_nbr'])['date'].count().reset_index()[['item_nbr', 'store_nbr']]

INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTPS connection (2): s3.eu-west-1.amazonaws.com
INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Resetting dropped connection: s3.eu-west-1.amazonaws.com


(3370464, 5)
(4100, 4)


## Run prophet forecast

In [29]:
test_good_item_store.shape[0]

162720

In [None]:
run_forecast(train, test_good, test_good_item_store, 'forecast/test_clean_result', 1, 163)

## Combine results

In [48]:
clean_result_keys = ['forecast/test_clean_result'+str(i)+'.csv.gz' for i in range(1, 164)]
clean_frames = [ get_gzdf(key) for key in clean_result_keys ]
clean_result = pd.concat(clean_frames)

INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): s3.amazonaws.com
INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): s3.eu-west-1.amazonaws.com


(15936, 24)
(15984, 24)
(16000, 24)
(15952, 24)
(16000, 24)
(15952, 24)
(15968, 24)
(16000, 24)
(15936, 24)
(15952, 24)
(15920, 24)
(16000, 24)
(16000, 24)
(15984, 24)
(15904, 24)
(15984, 24)
(15936, 24)
(15984, 24)
(15920, 24)
(15952, 24)
(15952, 24)
(15968, 24)
(15952, 24)
(15984, 24)
(15936, 24)
(15984, 24)
(15952, 24)
(15968, 24)
(15968, 24)
(15984, 24)
(16000, 24)
(15776, 24)
(15888, 24)
(15952, 24)
(15936, 24)
(15936, 24)
(15888, 24)
(15904, 24)
(15920, 24)
(15888, 24)
(15984, 24)
(16000, 24)
(15904, 24)
(15920, 24)
(15984, 24)
(15984, 24)
(15936, 24)
(15968, 24)
(16000, 24)
(15984, 24)
(15952, 24)
(15824, 24)
(16000, 24)
(16000, 24)
(15984, 24)
(15952, 24)
(15984, 24)
(15920, 24)
(15952, 24)
(16000, 24)
(15920, 24)
(15920, 24)
(15984, 24)
(15984, 24)
(15952, 24)
(15984, 24)
(15872, 24)
(15984, 24)
(15856, 24)
(15968, 24)
(15904, 24)
(15952, 24)
(16000, 24)
(15840, 24)
(15984, 24)
(15968, 24)
(15888, 24)
(15984, 24)
(16000, 24)
(15936, 24)
(15888, 24)
(15712, 24)
(15952, 24)
(159

INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Resetting dropped connection: s3.eu-west-1.amazonaws.com


(15952, 24)
(15936, 24)
(15968, 24)
(16000, 24)
(15968, 24)
(15952, 24)
(15984, 24)
(16000, 24)
(15952, 24)
(15968, 24)
(15984, 24)
(16000, 24)
(15920, 24)
(15888, 24)
(15776, 24)
(15488, 24)
(15776, 24)
(15664, 24)
(15952, 24)
(15936, 24)
(15920, 24)
(15856, 24)
(15904, 24)
(15712, 24)
(15712, 24)
(15696, 24)
(15936, 24)
(15904, 24)
(16000, 24)
(15952, 24)
(15952, 24)
(15936, 24)
(15936, 24)
(15952, 24)
(15952, 24)
(15968, 24)
(16000, 24)
(15936, 24)
(15952, 24)
(15872, 24)
(15968, 24)
(15952, 24)
(15984, 24)
(15920, 24)
(15952, 24)
(15968, 24)
(15984, 24)
(16000, 24)
(15920, 24)
(15888, 24)
(15808, 24)
(15968, 24)
(16000, 24)
(15984, 24)
(15808, 24)
(15872, 24)
(15904, 24)
(16000, 24)
(15968, 24)
(15920, 24)
(15888, 24)
(15936, 24)
(15872, 24)
(9856, 24)


In [47]:
clean_result.columns

Index(['date', 'id', 'item_nbr', 'onpromotion', 'seasonal', 'seasonal_lower',
       'seasonal_upper', 'seasonalities', 'seasonalities_lower',
       'seasonalities_upper', 'store_nbr', 'trend', 'trend_lower',
       'trend_upper', 'unit_sales', 'weekly', 'weekly_lower', 'weekly_upper',
       'yearly', 'yearly_lower', 'yearly_upper', 'yhat', 'yhat_lower',
       'yhat_upper'],
      dtype='object')

In [13]:
clean_result.head()

Unnamed: 0,date,id,item_nbr,onpromotion,seasonal,seasonal_lower,seasonal_upper,seasonalities,seasonalities_lower,seasonalities_upper,...,unit_sales,weekly,weekly_lower,weekly_upper,yearly,yearly_lower,yearly_upper,yhat,yhat_lower,yhat_upper
0,2017-08-16,125497040,96995.0,False,-0.308822,-0.308822,-0.308822,-0.308822,-0.308822,-0.308822,...,0.646213,-0.447095,-0.447095,-0.447095,0.138274,0.138274,0.138274,0.646213,0.170156,1.165704
1,2017-08-17,125707694,96995.0,False,0.02985,0.02985,0.02985,0.02985,0.02985,0.02985,...,0.977416,-0.070507,-0.070507,-0.070507,0.100357,0.100357,0.100357,0.977416,0.454921,1.455963
2,2017-08-18,125918348,96995.0,False,0.205031,0.205031,0.205031,0.205031,0.205031,0.205031,...,1.145129,0.138249,0.138249,0.138249,0.066782,0.066782,0.066782,1.145129,0.625048,1.676324
3,2017-08-19,126129002,96995.0,False,0.423354,0.423354,0.423354,0.423354,0.423354,0.423354,...,1.355984,0.38572,0.38572,0.38572,0.037634,0.037634,0.037634,1.355984,0.851699,1.835655
4,2017-08-20,126339656,96995.0,False,0.13943,0.13943,0.13943,0.13943,0.13943,0.13943,...,1.064592,0.126504,0.126504,0.126504,0.012926,0.012926,0.012926,1.064592,0.550025,1.583967


In [49]:
save_s3(clean_result, 'forecast/test_clean_result_total.csv.gz')

INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Resetting dropped connection: s3.eu-west-1.amazonaws.com
