### Imports

In [338]:
import redis
import re
import json
import pandas as pd
import numpy as np
import sys
sys.path.append('../library')
from core import extractBetween, extractElementsInOrder, exceptionOutput
from datetime import datetime
import arrow

r = redis.Redis(
    host='localhost',
    port=6379,
    charset="utf-8",
    decode_responses=True,
    db = 2
)

### Load Redis Data from DB 2

In [339]:
ids = r.keys()
vals = r.mget(ids)

### Form Basic Dfs

In [340]:
valsJson = [json.loads(e) for e in vals]
rawDf = pd.DataFrame.from_dict(valsJson)

weekendDf = rawDf[rawDf['isWeekend'] == True]
weekdayDf = rawDf[rawDf['isWeekend'] == False]

In [341]:
def explodeAndStack(df, column):
    # Create a list to hold the individual DataFrames
    explodedDfs = []

    # Iterate over each row in the DataFrame
    for index, row in df.iterrows():
        # Convert the list of lists into a DataFrame
        tempDf = pd.DataFrame(row[column])
        # Append the DataFrame to the list with the original index as a key
        explodedDfs.append((index, tempDf))

    # Concatenate all the DataFrames in the list with keys to maintain the index
    resultDf = pd.concat([df for _, df in explodedDfs], keys=[index for index, _ in explodedDfs])
    
    return resultDf

In [342]:
# Get ids for mapping
weekendIdxDict = weekendDf['imdbId'].to_dict()
weekdayIdxDict = weekdayDf['imdbId'].to_dict()

In [343]:
def mapIdx(df: pd.DataFrame, d: dict, columns: list):
    dfOut = explodeAndStack(df, 'tableData').reset_index(drop=False)
    dfOut['imdbId'] = dfOut['level_0'].map(d)
    dfOut.drop(columns=['level_0','level_1', 9], inplace=True)

    assert len(columns) == dfOut.shape[1], f"Mismatch in column lengths and df shape: {len(columns)} != {len(dfOut.columns)}"

    dfOut.columns = columns
    
    return dfOut

In [344]:
weekendCols = ['date','rank','weekend','pct','numberOfTheaters','theaterChange','averagePerTheater','toDate','weekendNumber', 'imdbId']
weekdayCols = ['date', 'DOW', 'rank', 'daily', 'dayPct', 'weekPct', 'numberOfTheaters', 'averagePerTheater', 'toDate', 'dayNumber','imdbId']

weekendDf = mapIdx(weekendDf, weekendIdxDict, weekendCols)
weekdayDf = mapIdx(weekdayDf, weekdayIdxDict, weekdayCols)

### Make Dfs Compatible
- Will need to convert weekend to daily...
- To do this, we'll use toDate and weekend in order to impute missing values

In [345]:
tmdbDf = pd.read_csv('../data/tmdbDetails.csv')
tmdbDf.drop_duplicates(subset='imdb_id', inplace=True)
releaseDates = tmdbDf.set_index('imdb_id')['release_date'].to_dict()

  tmdbDf = pd.read_csv('../data/tmdbDetails.csv')


In [346]:
weekendDf = weekendDf[weekendDf['date'] != None]
weekendDf.dropna(subset='date', inplace=True)

def extractDates(date: str, imdbId: str, releaseDates: dict):
    try:
        months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]

        # determine if the weekend spans two months
        # foundMonths = [m for m in months if m in date]
        foundMonths = extractElementsInOrder(date, months)
        multiMonth = True if len(foundMonths) > 1 else False

        if releaseDates[imdbId] != releaseDates[imdbId]:
            return None, None
        
        releaseDate = releaseDates[imdbId].split('-')[0]
    

        if not multiMonth:
            startDateStr = f"{foundMonths[0]}. {extractBetween(date, ' ','-')}, {releaseDate}"
            endDayNumber = re.search(r'\d+', date.split('-')[1]).group()  # Extract the numerical part for days when there are holidays
            endDateStr   = f"{foundMonths[0]}. {endDayNumber}, {releaseDate}"
        else:
            startMonth = foundMonths[0]
            endMonth   = foundMonths[1]

            startDay = extractBetween(date, f'{startMonth} ', '-')
            endDay   = re.search(r'\d+', date.split(f'{endMonth} ')[1]).group()
            startDateStr = f"{foundMonths[0]}. {startDay}, {releaseDate}"
            endDateStr   = f"{foundMonths[1]}. {endDay}, {releaseDate}"

        try:
            startDate = arrow.get(startDateStr, "MMM. D, YYYY")
            endDate   = arrow.get(endDateStr, "MMM. D, YYYY")
        except: # it's not happy with leap years... may need to investigate this further TODO
            startDateStr = startDateStr.replace('Feb. 29', 'Feb. 28')
            endDateStr   = endDateStr.replace('Feb. 29', 'Feb. 28')
            startDate = arrow.get(startDateStr, "MMM. D, YYYY")
            endDate   = arrow.get(endDateStr, "MMM. D, YYYY")


        # rare condition where weekend is over the new year
        # haven't actually encountered this, may be formatted weirdly
        if endDate < startDate:
            endDate = endDate.shift(years=1)

        return [startDate, endDate]
    except Exception as e:
        print(exceptionOutput(e))


startAndEndDates = weekendDf.apply(lambda row: extractDates(row['date'], row['imdbId'], releaseDates), axis=1)
weekendDf[['startDate', 'endDate']] = startAndEndDates.tolist()

In [347]:
weekendDf.drop_duplicates(subset=['imdbId', 'weekendNumber'], inplace=True)

for col in ['weekend','averagePerTheater','numberOfTheaters','toDate', 'pct']:
    weekendDf[col] = weekendDf[col].str.replace('$','', regex=False)\
                                    .str.replace(',','',regex=False)\
                                    .str.replace('%','',regex=False)\
                                    .str.replace('<','',regex=False)\
                                    .replace('-', np.nan)\
                                    .astype(float)

In [348]:
weekendDf[weekendDf['imdbId'] == 'tt25786030']

Unnamed: 0,date,rank,weekend,pct,numberOfTheaters,theaterChange,averagePerTheater,toDate,weekendNumber,imdbId,startDate,endDate
19381,Feb 17-19,9,49945.0,,259.0,-,192.0,49945.0,1,tt25786030,2023-02-17T00:00:00+00:00,2023-02-19T00:00:00+00:00
19382,Feb 24-26,14,7257.0,-85.5,115.0,-144,63.0,68296.0,2,tt25786030,2023-02-24T00:00:00+00:00,2023-02-26T00:00:00+00:00
19383,Mar 3-5,31,649.0,-91.1,8.0,-107,81.0,70751.0,3,tt25786030,2023-03-03T00:00:00+00:00,2023-03-05T00:00:00+00:00
19384,Mar 10-12,41,70.0,-89.2,2.0,-6,35.0,70514.0,4,tt25786030,2023-03-10T00:00:00+00:00,2023-03-12T00:00:00+00:00
19385,Mar 24-26,34,114.0,,1.0,-,114.0,70263.0,6,tt25786030,2023-03-24T00:00:00+00:00,2023-03-26T00:00:00+00:00


### Impute weekend days and then interpolate weekdays

In [349]:
def imputeDates(group):
    """
    This function takes place in two parts:
    1. Impute data within an individual weekend
    2. Interpolate data across weekends

    By breaking this into two parts we take advantage of the limited granularity that we do actually have

    Assumptions and known issues:
    - Box office distribution is not linear across days within a weekend nor a weekday
    - There may be gradual declines in theater counts
    - This is a bit slow, should test with modin to see performance difference

    Future solutions:
    - Develop basic ml model, could probably be just simple linear regression, based on actual daily data to look at the degredation of box office across a weekend or week
    
    """
    try:
        _tempDf = []


        # Part 1
        for idx, row in group.iterrows():
            dateRange = [dt for dt in arrow.Arrow.range('day', row['startDate'], row['endDate'])]
            perDay = row['weekend']/len(dateRange)
            
            for date in dateRange:
                _tempDf.append({
                    'date': date,
                    # 'DOW': later
                    'rank': row['rank'],
                    'daily': perDay,
                    # 'dayPct': later
                    'numberOfTheaters':row['numberOfTheaters'],
                    'averagePerTheater':row['averagePerTheater'],
                    # 'toDate': later
                    # 'dayNumber': later
                    'imdbId': row['imdbId']
                })

        imputedDf = pd.DataFrame.from_dict(_tempDf)

        # Part 2
        minDate = imputedDf['date'].min()
        maxDate = imputedDf['date'].max()

        idxDf = pd.DataFrame(index=[dt for dt in arrow.Arrow.range('day', minDate, maxDate)])
        fullDateDf = idxDf.merge(imputedDf, left_index=True, right_on='date', how = 'left')
        fullDateDf['DOW'] = fullDateDf['date'].apply(lambda x: x.format('dddd'))
        fullDateDf.reset_index(drop=False, inplace=True)

        ffillCols = ['rank','numberOfTheaters','imdbId']
        interpCols = ['daily']

        for col in ffillCols:
            fullDateDf[col] = fullDateDf[col].ffill()

        for col in interpCols:
            fullDateDf[col] = fullDateDf[col].interpolate(method='polynomial', order = 2)

        fullDateDf['dayPct'] = fullDateDf['daily'].pct_change() * 100
        fullDateDf['averagePerTheater'] = fullDateDf['daily'].astype(float) / fullDateDf['numberOfTheaters'].astype(float)
        fullDateDf['toDate'] = fullDateDf['daily'].cumsum()
        fullDateDf['dayNumber'] = list([i+1 for i in fullDateDf.index])

        return fullDateDf
                
    except Exception as e:
        # print(exceptionOutput(e))
        pass

In [350]:
imputedDf = weekendDf.groupby('imdbId').apply(lambda group: imputeDates(group))

  imputedDf = weekendDf.groupby('imdbId').apply(lambda group: imputeDates(group))


### Convert weekday to match our pretty weekndDf

In [351]:
weekdayDf.head()

Unnamed: 0,date,DOW,rank,daily,dayPct,weekPct,numberOfTheaters,averagePerTheater,toDate,dayNumber,imdbId
0,,,,,,,,,,,tt0245803
1,Apr 16,Wednesday,2.0,"$1,443,477",-,-,2955.0,$488,"$1,443,477",False,tt0245803
2,Apr 17,Thursday,2.0,"$1,652,222",+14.5%,-,2955.0,$559,"$3,095,699",False,tt0245803
3,Apr 18,Friday,4.0,"$3,265,329",+97.6%,-,2955.0,"$1,105","$6,361,028",False,tt0245803
4,Apr 19,Saturday,4.0,"$3,123,644",-4.3%,-,2955.0,"$1,057","$9,484,672",False,tt0245803


In [352]:
weekdayDf.drop_duplicates(subset=['imdbId', 'date'], inplace=True)

for col in ['daily','averagePerTheater','numberOfTheaters','toDate', 'dayPct','weekPct']:
    weekdayDf[col] = weekdayDf[col].str.replace('$','', regex=False)\
                                    .str.replace(',','',regex=False)\
                                    .str.replace('%','',regex=False)\
                                    .str.replace('<','',regex=False)\
                                    .replace('-', np.nan)\
                                    .astype(float)

In [355]:
def getArrowDate(row):
    try:
        releaseDate = releaseDates.get(row['imdbId'], np.nan)

        if releaseDate != releaseDate:
            return 
        
        releaseDate = releaseDate.split('-')[0]
        dayNumber   = re.search(r'\d+', row['date'].split(' ')[1]).group()
        dateStr = f"{row['date'].split(' ')[0]}. {dayNumber}, {releaseDate}"

        try:
            dateArr = arrow.get(dateStr, "MMM. D, YYYY")
        except: # also not happy with leap years
            dateStr = dateStr.replace('Feb. 29','Feb. 28')
            dateArr = arrow.get(dateStr, "MMM. D, YYYY")
            

        return dateArr
    except Exception as e:
        exceptionOutput(e)

weekdayDf = weekdayDf[weekdayDf['date'] != None]
weekdayDf.dropna(subset='date', inplace=True)


weekdayDf['date'] = weekdayDf.apply(lambda row: getArrowDate(row), axis=1)
weekdayDf['_1'] = 1

weekdayDf['dayNumber'] = weekdayDf.groupby('imdbId')['_1'].cumsum()

Error on line 16 || ParserMatchError || Failed to match 'MMM. D, YYYY' when parsing 'Sep. 119, 2001'.


### Merge

In [357]:
imputedDf.reset_index(drop=True, inplace=True)

In [358]:
imputedDf.head()

Unnamed: 0,index,date,rank,daily,numberOfTheaters,averagePerTheater,imdbId,DOW,dayPct,toDate,dayNumber
0,0.0,1919-12-02T00:00:00+00:00,102,40.0,1.0,40.0,tt0010680,Tuesday,,40.0,1
1,1.0,1919-12-03T00:00:00+00:00,102,40.0,1.0,40.0,tt0010680,Wednesday,0.0,80.0,2
2,2.0,1919-12-04T00:00:00+00:00,102,40.0,1.0,40.0,tt0010680,Thursday,0.0,120.0,3
3,3.0,1919-12-05T00:00:00+00:00,102,40.0,1.0,40.0,tt0010680,Friday,0.0,160.0,4
4,0.0,1921-11-21T00:00:00+00:00,43,4818.8,10.0,481.88,tt0012349,Monday,,4818.8,1


In [359]:
imputedDf.drop(columns=['index'], inplace=True)
weekdayDf.drop(columns=['weekPct', '_1'], inplace=True)

In [360]:
assert sorted(imputedDf.columns) == sorted(weekdayDf.columns), "ASSERTION ERROR: Two Dfs do not contain the same columns"

In [361]:
mergedDf = pd.concat([imputedDf, weekdayDf], axis=0, ignore_index=True)

### Save to CSV

In [363]:
mergedDf.to_csv('../data/allBoxOffice.csv')