# INIT

## Imports

In [None]:
# Initializing

# general purpose
import pickle
import pandas as pd
import numpy as np
import scipy.stats as stats
import scipy

# plotting
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.pylab as mpl
import matplotlib.mlab as mlab
import matplotlib
import seaborn as sns
mpl.rcParams['font.family'] = 'DejaVu Sans'
sns.set(palette='muted')
default_palette = sns.color_palette()
sns.palplot(default_palette)

# display
from pprint import pprint
from IPython.display import display
from tqdm import tqdm_notebook

import os
import re
import copy
import glob
import json
import math
import itertools
from collections import namedtuple
import functools
import time
from pprint import pprint
from datetime import datetime
from datetime import timedelta

PROJECT_DIR = '/home/ankur/notebooks/'
def get_path(path_str):
    return os.path.join(PROJECT_DIR, path_str)

## Convenience

In [None]:
def expanding_pipe(df, window_function, min_periods=0):
    return pd.Series(
        [
            df.iloc[0:i].pipe(window_function)
            if i > min_periods else None
            for i in range(1, len(df) + 1)
        ],
        index=df.index
    ) 


def rolling_pipe(df, offsets, window_function, overflow=None):
    '''
    offsets: a tuple/list/iterable such that (0, 0) will return one row at a time
             the first element is the row from the current row it will start the window at,
             and the second element is where it would stop the window
    overflow: can be 'clipped', 'circular' or None
    '''
    return pd.Series(
        [
            df.iloc[max(i + offsets[0], 0) : i + offsets[1] + 1].pipe(window_function)
            if overflow == 'clipped' else
            
            (
                pd.concat((
                    df.tail(-min(i + offsets[0], 0)),
                    df.iloc[max(i + offsets[0], 0): i + offsets[1] + 1],
                    df.head(max(i + offsets[1] - len(df) + 1, 0))
                ))
                .pipe(window_function)
            )
            if overflow == 'circular' else
            
            df.iloc[i + offsets[0] : i + offsets[1] + 1].pipe(window_function)
                
            for i in range(0, len(df))
        ],
        index = df.index
    )

In [None]:
def jointplot(
    x, y,
    x_bins=10, y_bins=10,
    alpha=1.0,
    side_alphas=(0.8, 0.8),
    histtype='step',
    linestyle='dashed',
    figsize=(8, 8),
):
    plt.figure(figsize=figsize)
    gs = plt.GridSpec(
        3, 3,
        wspace=0.1, hspace=0.1,
        width_ratios=[3, 3, 1], height_ratios=[1, 3, 3]
    )
    ax_main = plt.subplot(gs[1:, :-1])
    ax_top = plt.subplot(gs[0, :-1])
    ax_right = plt.subplot(gs[1:, -1])
    ax_main.scatter(x, y, alpha=alpha)
    ax_top.hist(x, bins=x_bins, alpha=side_alphas[0], histtype=histtype, linestyle=linestyle)
    ax_right.hist(y, bins=y_bins, alpha=side_alphas[1], orientation='horizontal', histtype=histtype, linestyle=linestyle)
    ax_main.set_xticks(np.linspace(ax_main.get_xbound()[0], ax_main.get_xbound()[1], len(ax_main.get_xticks())))
    ax_top.set_xticks(np.linspace(ax_main.get_xbound()[0], ax_main.get_xbound()[1], len(ax_main.get_xticks())))
    ax_top.set_xticklabels([])
    ax_top.yaxis.grid(False)
    
    ax_main.set_yticks(np.linspace(ax_main.get_ybound()[0], ax_main.get_ybound()[1], len(ax_main.get_yticks())))
    ax_right.set_yticks(np.linspace(ax_main.get_ybound()[0], ax_main.get_ybound()[1], len(ax_main.get_yticks())))
    ax_right.set_yticklabels([])
    ax_right.xaxis.grid(False)

In [None]:
def display_full(x):
    pd.set_option('display.max_rows', len(x))
    pd.set_option('display.max_colwidth', -1)
    display(x)
    pd.reset_option('display.max_rows')
    pd.reset_option('display.max_colwidth')

def widen_df(x):
    pd.set_option('display.max_colwidth', -1)
    display(x)
    pd.reset_option('display.max_colwidth')

In [None]:
from concurrent.futures import ProcessPoolExecutor
from functools import wraps


class Futurize:
    exec_pool = ProcessPoolExecutor()
    exec_jobs = {}
    
    def __init__(self, func):
        self._func = func
        self._func_name = func.__name__
        
    def __call__(self, *args, **kwargs):
        start_time = time.asctime(time.localtime())
        extra_type = kwargs.pop('extra_type', None)
        futured = self.exec_pool.submit(self._func, *args, **kwargs)
        type_str = '{} - {}'.format(self._func_name, extra_type) if extra_type else self._func_name
        self.exec_jobs[futured] = {
            'start_time': start_time,
            'type': type_str,
        }
        return futured
    
    @staticmethod
    def result(future_instance):
        try:
            result = future_instance.result()
        except:
            raise
        finally:
            del Futurize.exec_jobs[future_instance]
        return result


def query_as_df(sql):
    try:
        conn = jaydebeapi.connect(**conn_props)
    except:
        raise

    try:
        start_time = time.time()
        query_df = pd.read_sql_query(sql, conn)
        print("Time To Query: {:2.2f} sec".format(time.time()-start_time))
        return query_df
    except:
        raise
    finally:
        conn.close()

In [None]:
import multiprocessing as mp

def pandas_mp_apply(grouped_df, apply_func):
    with mp.Pool(mp.cpu_count()) as p:
        ret_list = p.map(apply_func, [group for name, group in grouped_df])
    return pd.concat(ret_list)

In [None]:
query_as_df_async = Futurize(query_as_df)
pandas_mp_apply_async = Futurize(pandas_mp_apply)

In [None]:
def date_range(start_time,end_time,hour_shift=0,table=None):
    #### function to get the date range string for SQL

    s = pd.to_datetime(start_time) - pd.Timedelta(hours = hour_shift)
    e = pd.to_datetime(end_time) - pd.Timedelta(hours = hour_shift)
    s_day =  datetime.datetime(year=s.year, month=s.month, day=s.day)
    e_day =  datetime.datetime(year=e.year, month=e.month, day=e.day)
    year_list = [s_day.year]
    month_list = [[s_day.month]]
    day_list = [[[]]]
    while s_day <= e_day:
        if s_day.year != year_list[-1]:
            year_list.append(s_day.year)
            month_list.append([s_day.month])
            day_list.append([[s_day.day]])
        elif s_day.month != month_list[-1][-1]:
            month_list[-1].append(s_day.month)
            day_list[-1].append([s_day.day])
        else:
            day_list[-1][-1].append(s_day.day)
        s_day += timedelta(days=1)


    if table is None or table == '' or len(table) == 0 or isinstance(table, list) == False:
        string = '('
    else:
        string = ['(']*len(table)
    for y in range(len(year_list)):
        for m in range(len(month_list[y])):
            min_day = day_list[y][m][0]
            max_day = day_list[y][m][-1]
            if table is None or table == '' or len(table) == 0 :
                string += "(year = \'%4d\' and month = \'%02d\' and day between \'%02d\' and \'%02d\')\n or " \
                    %(year_list[y], month_list[y][m], min_day,max_day)
            elif isinstance(table, list) == False:
                string += "(%s.year = \'%4d\' and %s.month = \'%02d\' and %s.day between \'%02d\' and \'%02d\')\n or " \
                        %(table,year_list[y], table, month_list[y][m], table, min_day,max_day)
            else:
                for i in range(len(string)):
                    if  table[i] is None or len(table[i]) == 0:
                        string[i] += "(year = \'%4d\' and month = \'%02d\' and day between \'%02d\' and \'%02d\')\n or " \
                            %(year_list[y], month_list[y][m], min_day,max_day)
                    else:
                        string[i] += "(%s.year = \'%4d\' and %s.month = \'%02d\' and %s.day between \'%02d\' and \'%02d\')\n or " \
                            %(table[i],year_list[y], table[i], month_list[y][m], table[i], min_day,max_day)
    if table is None or table == '' or len(table) == 0 or isinstance(table, list) == False:
        string = string[:-5]
        string += ')'
    else:
        for i in range(len(string)):
            string[i] = string[i][:-5]
            string[i] += ')'
    return string

## Spark UDFs

In [None]:
def year_of_week(x):
    # weekday returns value between 0 to 6
    if isinstance(x, float) or isinstance(x, int):
        dt = datetime.datetime.fromtimestamp(x)
    if isinstance(x, str) or isinstance(x, str):
        dt = datetime.datetime.strptime(x, '%Y-%m-%d')
    if isinstance(x, datetime.date) or isinstance(x, datetime.datetime):
        dt = x
    monday = dt - datetime.timedelta(days=dt.weekday())
    return monday.year

spark.udf.register("yow", year_of_week)


def week(x):
    # weekday returns value between 0 to 6
    if isinstance(x, float) or isinstance(x, int):
        dt = datetime.datetime.fromtimestamp(x)
    if isinstance(x, str) or isinstance(x, str):
        dt = datetime.datetime.strptime(x, '%Y-%m-%d')
    if isinstance(x, datetime.date) or isinstance(x, datetime.datetime):
        dt = x
    return dt.isocalendar()[1]

spark.udf.register("week", week)


def day_of_week(x):
    # weekday returns value between 0 to 6
    if isinstance(x, float) or isinstance(x, int):
        dt = datetime.datetime.fromtimestamp(x)
    if isinstance(x, str) or isinstance(x, str):
        dt = datetime.datetime.strptime(x, '%Y-%m-%d')
    if isinstance(x, datetime.date) or isinstance(x, datetime.datetime):
        dt = x
    return dt.weekday()+1

spark.udf.register("dow", day_of_week)


def week_start(x):
    if isinstance(x, float) or isinstance(x, int):
        dt = datetime.datetime.fromtimestamp(x)
    elif isinstance(x, str) or isinstance(x, str):
        dt = datetime.datetime.strptime(x, '%Y-%m-%d')
    elif isinstance(x, datetime.date) or isinstance(x, datetime.datetime):
        dt = x
    else:
        # dangerous
        dt = x
    return str(dt - datetime.timedelta(days=dt.weekday()))
    
spark.udf.register('week_start', week_start)


def array_to_string(arr):
    return '[' + ','.join([str(element) for element in arr]) + ']'

spark.udf.register('array_to_string', array_to_string)