In [1]:
import pandas as pd
import numpy as np 
from collections import defaultdict 
import pickle
import statistics
import scipy.stats as st
import pyspark.sql.functions as ps
from pyspark.sql import Window
from pyspark.sql.types import *
from scipy import stats
from shutil import copyfile
import pickle as pkl
#import databricks.koalas as ks

# Instructions 

**NOTE: DATES ARE NOT INCLUSIVE!!! So if you want conversations from November 2019, December 2019, and January 2019 your start period should be 2019-11-01 and your end date should be 2020-02-01 and NOT 2020-01-01**

**NOTE: PLEASE CLONE THIS NOTEBOOK INTO YOUR OWN FOLDER AND ATTACH TO DS DEV1 CLUSTER BEFORE RUNNING**

1. You must have done a sentiment curation for the code to work properly - please input the output id for which you ran a sentiment curation in the **01. Sent Curation Output ID** widget

2. Specify working Amazon s3 working directory for your file to be saved to in the **02. Working Directory** widget - for example /bsinsights/Raim 

3. Specify a file name in the **03. File Name** widget

4. Specify a start date for the "previous/baseline period" in the **04. Prev Period Start Date** widget - please note that it MUST be in YYYY-MM-DD format 

5. Specify an end date for the "previous/baseline period" in the **05. Prev Period End Date** widget - please note that it MUST be in YYYY-MM-DD format 

6. Specify a start date for the "recent/comparison period" in the **06. Recent Period Start Date** widget - please note that it MUST be in YYYY-MM-DD format 

7. Specify an end date for the "recent/comparison period" in the **07. Recent Period End Date** widget - please note that it MUST be in YYYY-MM-DD format 

8. Select which level you want the volumes, sentiment breakdown, and rankings for in the **08. Level** widget. You can select all three levels if you like - just note that the more levels you select, the longer the notebook takes to run.
 
9. Specify the Standard Aviation Category in the **09. Std Aviation Category** widget. This is the category where you have things like "airports"/"countries" etc. it usually is NOT INCLUDED in when doing sentiment breakdowns and rankings and will add *SIGNIFICANT computation time if not excluded*!!!! 

10. Specifiy the name of the lense in which the comparison entities are located in the **10. Comp Entity Lense** widget. For example, if you want to compare airline performance against other taxonomy elements you should have a lense named "Active Airlines" and within this lense, the sublenses will be the actual airlines which you want to compare.

11. Specify the name of the comparison entities in the **11. Comp Entity Name** widget. For example, if want to compare airlines to each other you should put the comparison entity name as "airline", if you wanted to compare chip brands to each other you should put the comparison entity as "Chip Brand".

12. Specify if you want to save your results as an excel file or if just want to use the notebook to visualize/sort your results in the **12. Write to Excel** widget.

10. Run all cells - the notebook will take between *15-30 minutes to finish* (depending on how many levels have been choosen) and you will find your excel file with 3 * the number of levels you chose in the working directory that you specified under the file name that you input. For each level there will be 3 tabs in the excel file:

  (a) *level_name_df_prev*: the sentiment volumes, percentages, and rankings over the previous/baseline period
  
  (b) *level_name_df_rec*: the sentiment volumes, percentages, and rankings over the recent/comparison period
  
  (c) *level_name_df_rankings*: the net sentiment and net sentiment ranks over both the previous/baseline period and the recent/comparison period as well as the net sentiment change between the two periods and the net sentiment rank change between the two periods

## Widget Config

In [4]:
dbutils.widgets.removeAll()

dbutils.widgets.text("01. Sent Curation Output ID", "")
dbutils.widgets.text("02. Working Directory", "")
dbutils.widgets.text("03. File Name","")
dbutils.widgets.text("04. Prev Period Start Date","")
dbutils.widgets.text("05. Prev Period End Date","")
dbutils.widgets.text("06. Recent Period Start Date","")
dbutils.widgets.text("07. Recent Period End Date","")
dbutils.widgets.multiselect("08. Level","Category", ["Category","Lense","Sublense"])
dbutils.widgets.text("09. Excluded Category","")
dbutils.widgets.text("10. Comp Entity Lense","")
dbutils.widgets.text("11. Comp Entity Name","")
dbutils.widgets.dropdown("12. Write to Excel?","No", ['Yes','No'])

In [5]:
df = spark.read.table("sentiment_curations." + str(dbutils.widgets.get("01. Sent Curation Output ID")))
p_dates = (str(dbutils.widgets.get("04. Prev Period Start Date")), str(dbutils.widgets.get("05. Prev Period End Date")))
r_dates = (str(dbutils.widgets.get("06. Recent Period Start Date")), str(dbutils.widgets.get("07. Recent Period End Date")))
output_path = str(dbutils.widgets.get("02. Working Directory"))
output_filename = str(dbutils.widgets.get("03. File Name"))
levels = dbutils.widgets.get("08. Level").split(",")
excluded_cat = str(dbutils.widgets.get("09. Excluded Category"))
comp_ent_lense = str(dbutils.widgets.get("10. Comp Entity Lense"))
comp_ent_name = str(dbutils.widgets.get("11. Comp Entity Name"))
to_excel = str(dbutils.widgets.get("12. Write to Excel?"))

In [6]:

# ----------------------------------------- sentiment aggregation and calculation function utils --------------------------------- #
'''
    agg_cat(entity: str, p:str, input_dict: dict) -> spark dataframe
        - calculates the total, positive, and negative volumes and percentages for each comparison entity within each category 
        - ranks the comparison entities against their performance in each category for positive, negative, and net sentiment

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                p - the period in which you aggregating and calculating volumes, percentages, and rankings for the comparison entities 
                    - inherted from main_agg 
                    - takes values "df_prev" or "df_rec" 
                input_dict - the dictionary for "Category" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_agg

    returns: a spark dataframe with the sentiment volumes, percentages, and rankings calculated at the category level for all
            comparison entities for a given time period
'''
def agg_cat(entity: str, p: str, input_dict: dict):
    df = input_dict[p]
    grouping = ['Category',entity]
    
    df_total = df.groupby(grouping).agg(ps.countDistinct('id').alias('total_vol'))
  
    df_pos = df.filter(ps.col('sentiment') == 1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('pos_vol')).withColumnRenamed('Category','Category_pos').withColumnRenamed(entity,entity+'_pos')

    df_neg = df.filter(ps.col('sentiment') == -1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('neg_vol')).withColumnRenamed('Category','Category_neg').withColumnRenamed(entity,entity+'_neg')

    df1 = df_total.join(df_pos, on = [df_total['Category'] == df_pos['Category_pos'],
                                           df_total[entity] == df_pos[entity+'_pos']],
                                      how = 'inner')

    df_new = df1.join(df_neg, 
                         on = [df1['Category'] == df_neg['Category_neg'],
                               df1[entity] == df_neg[entity+'_neg']],
                                      how = 'inner')\
                        .drop('Category_pos')\
                        .drop(entity+'_pos')\
                        .drop('Category_neg')\
                        .drop(entity+'_neg')
    
    df_new = df_new.withColumn('percent_pos',  ps.round((ps.col('pos_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('percent_neg', ps.round((ps.col('neg_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('net_sent', ps.round((ps.col('percent_pos')-ps.col('percent_neg')),2))
    
    wPos = Window.partitionBy('Category').orderBy(ps.desc("percent_pos"))
    wNeg = Window.partitionBy('Category').orderBy(ps.desc("percent_neg"))
    wNet = Window.partitionBy('Category').orderBy(ps.desc("net_sent"))
    df_new = df_new.withColumn(
        "percent_pos_rank", 
        ps.dense_rank().over(wPos)
    ).withColumn(
        "percent_neg_rank", 
        ps.dense_rank().over(wNeg)
    ).withColumn(
        "net_sent_rank",
        ps.dense_rank().over(wNet))

    return df_new

'''
    agg_lense(entity: str, p:str, input_dict: dict) -> spark dataframe
        - calculates the total, positive, and negative volumes and percentages for each comparison entity within each lense 
        - ranks the comparison entities against their performance in each lense for positive, negative, and net sentiment

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                p - the period in which you aggregating and calculating volumes, percentages, and rankings for the comparison entities 
                    - inherted from main_agg 
                    - takes values "df_prev" or "df_rec" 
                input_dict - the dictionary for "Lense" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_agg

    returns: a spark dataframe with the sentiment volumes, percentages, and rankings calculated at the lense level for all
            comparison entities for a given time period
'''


def agg_lense(entity: str, p: str, input_dict: dict):
    df = input_dict[p]
    grouping = ['Category','Lense',entity]
    
    df_total = df.groupby(grouping).agg(ps.countDistinct('id').alias('total_vol'))
  
    df_pos = df.filter(ps.col('sentiment') == 1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('pos_vol')).withColumnRenamed('Category','Category_pos').withColumnRenamed(entity,entity+'_pos').withColumnRenamed('Lense','Lense_pos')

    df_neg = df.filter(ps.col('sentiment') == -1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('neg_vol')).withColumnRenamed('Category','Category_neg').withColumnRenamed(entity,entity+'_neg').withColumnRenamed('Lense','Lense_neg')

    df1 = df_total.join(df_pos, on = [df_total['Category'] == df_pos['Category_pos'],
                                           df_total[entity] == df_pos[entity+'_pos'],
                                           df_total['Lense'] == df_pos['Lense_pos']],
                                      how = 'inner')

    df_new = df1.join(df_neg, 
                         on = [df1['Category'] == df_neg['Category_neg'],
                               df1[entity] == df_neg[entity+'_neg'],
                               df1['Lense'] == df_neg['Lense_neg']],
                                      how = 'inner')\
                        .drop('Category_pos')\
                        .drop(entity+'_pos')\
                        .drop('Lense_pos')\
                        .drop('Lense_neg')\
                        .drop('Category_neg')\
                        .drop(entity+'_neg')
    
    df_new = df_new.withColumn('percent_pos',  ps.round((ps.col('pos_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('percent_neg', ps.round((ps.col('neg_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('net_sent', ps.round((ps.col('percent_pos')-ps.col('percent_neg')),2))
    
    wPos = Window.partitionBy('Category', 'Lense').orderBy(ps.desc("percent_pos"))
    wNeg = Window.partitionBy('Category','Lense').orderBy(ps.desc("percent_neg"))
    wNet = Window.partitionBy('Category','Lense').orderBy(ps.desc("net_sent"))
    df_new = df_new.withColumn(
        "percent_pos_rank", 
        ps.dense_rank().over(wPos)
    ).withColumn(
        "percent_neg_rank", 
        ps.dense_rank().over(wNeg)
    ).withColumn(
        "net_sent_rank",
        ps.dense_rank().over(wNet))

    return df_new

'''
    agg_sublense(entity: str, p:str, input_dict: dict) -> spark dataframe
        - calculates the total, positive, and negative volumes and percentages for each comparison entity within each sublense 
        - ranks the comparison entities against their performance in each sublense for positive, negative, and net sentiment

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                p - the period in which you aggregating and calculating volumes, percentages, and rankings for the comparison entities 
                    - inherted from main_agg 
                    - takes values "df_prev" or "df_rec" 
                input_dict - the dictionary for "Sublense" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_agg

    returns: a spark dataframe with the sentiment volumes, percentages, and rankings calculated at the lense level for all
            comparison entities for a given time period
'''

def agg_sublense(entity: str, p: str, input_dict: dict):
    df = input_dict[p]
    grouping = ['Category','Lense','Sublense',entity]
    df_total = df.groupby(grouping).agg(ps.countDistinct('id').alias('total_vol'))
  
    df_pos = df.filter(ps.col('sentiment') == 1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('pos_vol')).withColumnRenamed('Category','Category_pos').withColumnRenamed(entity,entity+'_pos').withColumnRenamed('Lense','Lense_pos')\
              .withColumnRenamed('Sublense','Sublense_pos')

    df_neg = df.filter(ps.col('sentiment') == -1).groupby(grouping)\
              .agg(ps.countDistinct('id').alias('neg_vol')).withColumnRenamed('Category','Category_neg').withColumnRenamed(entity,entity+'_neg').withColumnRenamed('Lense','Lense_neg')\
              .withColumnRenamed('Sublense','Sublense_neg')

    df1 = df_total.join(df_pos, on = [df_total['Category'] == df_pos['Category_pos'],
                                           df_total[entity] == df_pos[entity+'_pos'],
                                           df_total['Lense'] == df_pos['Lense_pos'],
                                           df_total['Sublense'] == df_pos['Sublense_pos']
                                           ],
                                      how = 'inner')

    df_new = df1.join(df_neg, 
                         on = [df1['Category'] == df_neg['Category_neg'],
                               df1[entity] == df_neg[entity+'_neg'],
                              df1['Lense'] == df_neg['Lense_neg'],
                              df1['Sublense'] == df_neg['Sublense_neg']],
                                      how = 'inner')\
                        .drop('Category_pos')\
                        .drop(entity+'_pos')\
                        .drop('Lense_pos')\
                        .drop('Lense_neg')\
                        .drop('Category_neg')\
                        .drop('Sublense_pos')\
                        .drop('Sublense_neg')\
                        .drop(entity+'_neg')
    
    df_new = df_new.withColumn('percent_pos',  ps.round((ps.col('pos_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('percent_neg', ps.round((ps.col('neg_vol')/ps.col('total_vol'))*100,2))\
            .withColumn('net_sent', ps.round((ps.col('percent_pos')-ps.col('percent_neg')),2))
    
    wPos = Window.partitionBy('Category', 'Lense', 'Sublense').orderBy(ps.desc("percent_pos"))
    wNeg = Window.partitionBy('Category','Lense','Sublense').orderBy(ps.desc("percent_neg"))
    wNet = Window.partitionBy('Category','Lense','Sublense').orderBy(ps.desc("net_sent"))
    df_new = df_new.withColumn(
        "percent_pos_rank", 
        ps.dense_rank().over(wPos)
    ).withColumn(
        "percent_neg_rank", 
        ps.dense_rank().over(wNeg)
    ).withColumn(
        "net_sent_rank",
        ps.dense_rank().over(wNet))

    return df_new

# --------------------------------------------- ranking function utils ---------------------------------------------------------- # 

'''
    rank_cat(entity: str, input_dict: dict) -> spark dataframe
        - combines the net sentiment and the net sentiment rank for the baseline and the comparison periods into one dataframe and
        calculates the net sentiment and the net sentiment rank change between the two periods at the category level

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                input_dict - the dictionary for "Category" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_rank
    returns: a spark dataframe with the net sentiment, net sentiment rankings, net sentiment change, and net sentiment change for all
            comparison entities at the "Category" level
'''

def rank_cat(entity: str, input_dict: dict):

    df_prev = input_dict['df_prev']
    df_rec = input_dict['df_rec']

    df_prev_rank = df_prev.select('Category',entity,'net_sent','net_sent_rank')\
                    .withColumnRenamed('net_sent','net_sent_prev')\
                    .withColumnRenamed('net_sent_rank','net_sent_rank_prev')

    df_rec_rank = df_rec.select('Category',entity,'net_sent','net_sent_rank')\
                .withColumnRenamed('Category','Category_rec')\
                .withColumnRenamed(entity,entity+'_rec')\
                .withColumnRenamed('net_sent','net_sent_rec')\
                .withColumnRenamed('net_sent_rank','net_sent_rank_rec')

    df_rankings = df_prev_rank.join(df_rec_rank, on = [df_prev_rank['Category'] == df_rec_rank['Category_rec'],
                                                    df_prev_rank[entity] == df_rec_rank[entity+'_rec']],
                                how = 'inner')\
            .withColumn('net_sent_change', ps.round(ps.col('net_sent_rec')- ps.col('net_sent_prev'),2))\
            .withColumn('rank_change', ps.col('net_sent_rank_prev') - ps.col('net_sent_rank_rec'))\
            .drop('Category_rec')\
            .drop(entity+'_rec')

    return df_rankings

'''
    rank_lense(entity: str, input_dict: dict) -> spark dataframe
        - combines the net sentiment and the net sentiment rank for the baseline and the comparison periods into one dataframe and
        calculates the net sentiment and the net sentiment rank change between the two periods at the lense level

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                input_dict - the dictionary for "Lense" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_rank
    returns: a spark dataframe with the net sentiment, net sentiment rankings, net sentiment change, and net sentiment change for all
            comparison entities at the "Lense" level
'''

def rank_lense(entity: str, input_dict: dict):

    df_prev = input_dict['df_prev']
    df_rec = input_dict['df_rec']

    df_prev_rank = df_prev.select('Category','Lense',entity,'net_sent','net_sent_rank')\
                    .withColumnRenamed('net_sent','net_sent_prev')\
                    .withColumnRenamed('net_sent_rank','net_sent_rank_prev')

    df_rec_rank = df_rec.select('Category','Lense',entity,'net_sent','net_sent_rank')\
                .withColumnRenamed('Category','Category_rec')\
                .withColumnRenamed('Lense','Lense_rec')\
                .withColumnRenamed(entity,entity+'_rec')\
                .withColumnRenamed('net_sent','net_sent_rec')\
                .withColumnRenamed('net_sent_rank','net_sent_rank_rec')

    df_rankings = df_prev_rank.join(df_rec_rank, on = [df_prev_rank['Category'] == df_rec_rank['Category_rec'],
                                                    df_prev_rank['Lense'] == df_rec_rank['Lense_rec'],
                                                    df_prev_rank[entity] == df_rec_rank[entity+'_rec']],
                                how = 'inner')\
            .withColumn('net_sent_change', ps.round(ps.col('net_sent_rec')- ps.col('net_sent_prev'),2))\
            .withColumn('rank_change', ps.col('net_sent_rank_prev') - ps.col('net_sent_rank_rec'))\
            .drop('Category_rec')\
            .drop('Lense_rec')\
            .drop(entity+'_rec')

    return df_rankings

'''
    rank_sublense(entity: str, input_dict: dict) -> spark dataframe
        - combines the net sentiment and the net sentiment rank for the baseline and the comparison periods into one dataframe and
        calculates the net sentiment and the net sentiment rank change between the two periods at the sublense level

    parameters: entity - the name of the column which has the entities you want to compare 
                    - inhereted from main_agg
                input_dict - the dictionary for "Sublense" formatted as: 
                            input_dict = {'df_initial':spark_df, 'df_prev': spark_df, 'df_rec':spark_df, 'df_rankings': None}
                            - inherted from main_rank
    returns: a spark dataframe with the net sentiment, net sentiment rankings, net sentiment change, and net sentiment change for all
            comparison entities at the "Sublense" level
'''

def rank_sublense(entity:str, input_dict: dict):

    df_prev = input_dict['df_prev']
    df_rec = input_dict['df_rec']

    df_prev_rank = df_prev.select('Category','Lense','Sublense',entity,'net_sent','net_sent_rank')\
                    .withColumnRenamed(entity,entity)\
                    .withColumnRenamed('net_sent','net_sent_prev')\
                    .withColumnRenamed('net_sent_rank','net_sent_rank_prev')

    df_rec_rank = df_rec.select('Category','Lense','Sublense',entity,'net_sent','net_sent_rank')\
                .withColumnRenamed('Category','Category_rec')\
                .withColumnRenamed('Lense','Lense_rec')\
                .withColumnRenamed('Sublense','Sublense_rec')\
                .withColumnRenamed(entity,entity+'_rec')\
                .withColumnRenamed('net_sent','net_sent_rec')\
                .withColumnRenamed('net_sent_rank','net_sent_rank_rec')

    df_rankings = df_prev_rank.join(df_rec_rank, on = [df_prev_rank['Category'] == df_rec_rank['Category_rec'],
                                                    df_prev_rank['Lense'] == df_rec_rank['Lense_rec'],
                                                    df_prev_rank['Sublense'] == df_rec_rank['Sublense_rec'],
                                                    df_prev_rank[entity] == df_rec_rank[entity+'_rec']],
                                how = 'inner')\
            .withColumn('net_sent_change', ps.round(ps.col('net_sent_rec')- ps.col('net_sent_prev'),2))\
            .withColumn('rank_change', ps.col('net_sent_rank_prev') - ps.col('net_sent_rank_rec'))\
            .drop('Category_rec')\
            .drop('Lense_rec')\
            .drop('Sublense_rec')\
            .drop(entity+'_rec')

    return df_rankings






In [7]:
#from utils import *

''' 
    data_prep(spark_df: spark dataframe, level_list: list, entity_lense: str, entity: str, excluded_category: str,
            baseline_dates: tuple, comparison_dates: tuple) -> dictionary

            - takes in parameters from the notebook widgets to filter the input spark dataframe
            - divides the original dataframe up into the specified levels (provided by level_list) and for each level 
            creates two dataframes (df_prev and df_rec) based on dates provided in baseline_dates and comparison_dates
            
    parameters: spark_df - raw spark dataframe corresponding to given sentiment curation pipeline output ID

                level_list - list of level(s) (Category, Lense, Sublense) that the sentiment rankings will be 
                calculated for given by level widget selection
                    - inherted from main

                entity_lense - name of lense in taxonomy that contains entities which you want to compare against each other
                across the rest of the taxonomy (i.e you want to compare airline performance across customer satisfaction taxonomy, 
                so you have a lense for airlines where the sublenses are the airlines which you want to compare against each other) 
                - given by Comp Ent Lense widget
                    - inherted from main

                entity - the name of your entity (i.e if you are comparing airlines your entity name shoud be "airline", if you are 
                comparing food brands, your entity name should be "food brand) - given by Comp Ent Name widget
                    - inherted from main

                - excluded_category - if you have a category that has extraneous factors that you do not care to compare entities against
                specify the name of the category here 
                    - inherted from main

                - baseline_dates - a tuple in YYYY-MM-DD format specifying the start and end dates for the baseline period
                    - inherted from main

                - comparison_dates - a tuple in YYYY-MM-DD format specifying the start and end dates for the comparison period
                    - inherted from main

    returns: a nested dictionary of format:
            {'Category': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
            'Lense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
            'Sublense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none}}

            df_initial is the input spark df filtered and reformatted version according to specified comparison entity lense
            df_prev is df_initial filtered by the baseline_dates 
            df_rec is df_initial filtered by the comparison_dates 
'''

def data_prep(spark_df, level_list, entity_lense, entity, excluded_category,baseline_dates,comparison_dates):
    
    input_dict = {}

    cols = [ps.col("id"), ps.col("date"), ps.col("taxonomies"), ps.col("message"), ps.col("sentiment")]

    entity_df = spark_df.select(
                cols + [ps.explode(ps.col("taxonomies.taxonomyPath")).alias("taxonomyPath")])\
            .where(
                ps.element_at(ps.col("taxonomyPath"), 2) == ps.lit(entity_lense))\
            .select(
                cols + [ps.element_at(ps.col("taxonomyPath"), 3).alias(entity)]
            )

    for c in level_list:

        inner_dict = {'df_initial': None, 'df_prev': None, 'df_rec': None, 'df_rankings': None}
        input_dict[c] = inner_dict

        entity_df = entity_df.select(
                cols + [entity] + [ps.explode(ps.col("taxonomies.taxonomyPath")).alias("taxonomyPath")])\
            .select(
                cols + [entity] + [ps.element_at(ps.col("taxonomyPath"), 1).alias("Category")] + 
                [ps.element_at(ps.col("taxonomyPath"), 2).alias("Lense")] + [ps.element_at(ps.col("taxonomyPath"), 3).alias("Sublense")]
            )

        if len(excluded_category) > 0:
            entity_df = entity_df.filter(ps.col('Category') != excluded_category)
        else:
            entity_df = entity_df
        
        entity_df = entity_df.withColumn("year", ps.year(ps.col("date"))).withColumn("month", ps.month(ps.col("date"))).withColumn('day', ps.dayofmonth(ps.col("date")))\
                    .withColumn("date_str",  ps.concat_ws("_", (ps.col("year")), ps.format_string('%02d', (ps.col("month")))))\
                    .withColumn('dt',ps.to_timestamp(ps.col('date_str'), 'yyyy_MM'))
                    

        input_dict[c]['df_initial'] = entity_df\
                                    .drop('date')\
                                    .drop('year')\
                                    .drop('month')\
                                    .drop('taxonomies')\
                                    .drop('message')
        input_dict[c]['df_prev'] = entity_df.where(ps.col('dt').between(*baseline_dates))\
                                .drop('date')\
                                .drop('year')\
                                .drop('month')\
                                .drop('taxonomies')\
                                .drop('message')
        input_dict[c]['df_rec'] = entity_df.where(ps.col('dt').between(*comparison_dates))\
                                .drop('date')\
                                .drop('year')\
                                .drop('month')\
                                .drop('taxonomies')\
                                .drop('message')
    
    return input_dict 
                

''' 
    main_agg(entity: str, level_list: list, period_list: list, input_dict: dict) -> dictionary
        - calculates the sentiment, volumes, and ranks for levels specified in level_list across the baseline and comparison time periods
    
    parameters: entity - name of comparison entity 
                    - inhereted from main 
                level_list - list of levels for which SentRank should be computed 
                    -inherted from main
                period_list - list of periods for which SentRank should be computed
                    -inhereted from main
                input_dict - dictionary formatted as: 
                    - {'Category': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                        'Lense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                        'Sublense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none}}
                    - inherted from output of data_prep function in main function call

'''

def main_agg(entity: str, level_list: list, period_list: list, input_dict: dict):
    result_dict = {}
    for l in level_list:
        inner_dict = {}
        for p in period_list:
            inner_dict[p] = None
        result_dict[l] = inner_dict

    for l in level_list:
        for p in period_list:
            if l == 'Category':
                result_dict[l][p] = agg_cat(entity, p, input_dict[l])
            elif l == 'Lense':
                result_dict[l][p] = agg_lense(entity, p, input_dict[l])
            else:
                result_dict[l][p] = agg_sublense(entity, p, input_dict[l])
    return result_dict

''' 
    main_rank(entity: str, level_list: list, input_dict: dict) -> dictionary
        - aggregates the net sentiment, net sentiment rank across the baseline and comparison time periods 
            and calculates net sentiment/net sentiment rank change between periods for levels specified in level_list 
    
    parameters: entity - name of comparison entity 
                    - inhereted from main 
                level_list - list of levels for which SentRank should be computed 
                    -inherted from main
                input_dict - dictionary formatted as: 
                    - {'Category': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                        'Lense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                        'Sublense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none}}
                    - inherted from output of agg_main function in main function call

'''

def main_rank(entity: str, level_list: list, input_dict: dict):

    for l in level_list:
        if l == 'Category':
            input_dict[l]['df_rankings'] = rank_cat(entity, input_dict[l])
        elif l == 'Lense':
            input_dict[l]['df_rankings'] = rank_lense(entity, input_dict[l])
        else:
            input_dict[l]['df_rankings'] = rank_sublense(entity, input_dict[l])
    return input_dict




In [8]:
#from sentRank_prep_agg_rank import *

''' 
    main(spark_df: spark dataframe, level_list: list, entity_lense: str, entity: str, excluded_category: str,
        baseline_dates: tuple ,comparison_dates: tuple ,period_list: list)

    parameters: spark_df - raw spark dataframe corresponding to given sentiment curation pipeline output ID
                    - given by Sent Curation Output ID widget

                level_list - list of level(s) (Category, Lense, Sublense) that the sentiment rankings will be 
                calculated for 
                    - given by level widget selection

                entity_lense - name of lense in taxonomy that contains entities which you want to compare against each other
                across the rest of the taxonomy (i.e you want to compare airline performance across customer satisfaction taxonomy, 
                so you have a lense for airlines where the sublenses are the airlines which you want to compare against each other) 
                    - given by Comp Ent Lense widget

                entity - the name of your entity (i.e if you are comparing airlines your entity name shoud be "airline", if you are 
                comparing food brands, your entity name should be "food brand) 
                    - given by Comp Ent Name widget

                - excluded_category - if you have a category that has extraneous factors that you do not care to compare entities against
                specify the name of the category here 
                    - given by Excluded Cat widget

                - baseline_dates - a tuple in YYYY-MM-DD format specifying the start and end dates for the baseline period
                    - given by Prev Period End Date and Prev Period Start Date widgets

                - comparison_dates - a tuple in YYYY-MM-DD format specifying the start and end dates for the comparison period
                    - given by Recent Period End Date and Recent Period Start Date widgets
                
                - period_list - ['df_prev','df_rec']

    returns: nested dictionry formatted as: 
                - {'Category': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                    'Lense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none},
                    'Sublense': {'df_initial': spark_df, 'df_prev': spark_df, 'df_rec': spark_df, 'df_rankings': none}}

                - each nested dictionary within in each level is the result of main_agg and main_rank
'''

def main(spark_df, level_list, entity_lense, entity, excluded_category,baseline_dates,comparison_dates,period_list):
    
    #prep data from raw sentiment curation pipeline output id --> input_dict
    input_dict = data_prep(spark_df, level_list, entity_lense, entity, excluded_category,baseline_dates,comparison_dates)

    #calculate sentiment volumes, percentages, and rakings for each level in in each period
    temp_dict = main_agg(entity, level_list, period_list, input_dict)

    #calculate net sentiment rankings and changes in net sentiment and rakings between periods
    result_dict = main_rank(entity, level_list, temp_dict)

    return input_dict,result_dict
  
input_dict,result_dict = main(df, levels, comp_ent_lense, comp_ent_name, excluded_cat,p_dates,r_dates,['df_prev','df_rec'])

## SentRank Results for Basline Time Period

In [10]:
display(result_dict['Category']['df_prev'])

In [11]:
display(result_dict['Lense']['df_prev'])

In [12]:
display(result_dict['Sublense']['df_prev'])

## Sent Results for Comparison Time Period

In [14]:
display(result_dict['Category']['df_rec'])

In [15]:
display(result_dict['Lense']['df_rec'])

In [16]:
display(result_dict['Sublense']['df_rec'])

## Sentiment Rankings Results by Level

In [18]:
display(result_dict['Category']['df_rankings'])

In [19]:
display(result_dict['Lense']['df_rankings'])

In [20]:
display(result_dict['Sublense']['df_rankings'])

## Write to Excel in Specified Working Directory

In [22]:
#helper funcs to write to excel
def check_copyfile(copy_path: str, writing_path: str, databricks_write: bool):
    if databricks_write:
        copyfile(copy_path, writing_path)

def writing_check(writing_path: str, databricks_write: bool, writing_name: str) -> str:
    if databricks_write:
        copy_path = "/tmp/" + writing_name + ".xlsx"
    else:
        copy_path = writing_path
    return copy_path

def df_to_file(result_dict: dict, writing_path: str, databricks_write: bool = True) -> str:
  copied_path = writing_check(writing_path, databricks_write, "deduplication_result")
  with pd.ExcelWriter(copied_path) as writer:
      for key in result_dict.keys():
        for t in result_dict[key].keys():
          result_dict[key][t].to_excel(writer, sheet_name=key+"_"+t)
  check_copyfile(copied_path, writing_path, databricks_write)
  return writing_path

In [23]:
if to_excel == 'Yes':
  pandas_dict = {}
  for key in result_dict.keys():
    inner_dict = {}
    for t in result_dict[key].keys():
      if t != 'df_initial':
        inner_dict[t] = result_dict[key][t].toPandas()
      else:
        continue
    pandas_dict[key] = inner_dict
    
  #write to excel in working dir
  out_file_prefix =  output_path + "/" + output_filename
  writing_path = "/dbfs/mnt/" + out_file_prefix + ".xlsx"
  df_to_file(pandas_dict,writing_path, True)

  #write standardized df to pickle 
  with open("/dbfs/mnt"+output_path+"/"+output_filename+".pkl", "wb") as file:
    pickle.dump(pandas_dict, file)
    
else:
  print("Results NOT SAVED - if you wanted to save your results in an excel file please select 'Yes' from widget 12 and re-run commands 5 and 23")


In [24]:
#have to manually write output file to bsinsights bucket to be able to download
writing_path = "/dbfs/mnt/bsinsights/Raim/covid_rankings_May19" + ".xlsx"
df_to_file(pandas_dict,writing_path, True)