In [1]:
from pillaralgos.helpers import data_handler as d

This is `data_handler.py`

In [2]:
'''
This file contains a series of classes and functions to help with loading and splitting twitch chat data
also json_saver() that converts given variable into string, saves into .json file
'''
import pandas as pd
import numpy as np
import datetime as dt


def rename_columns(col_string):
    """
    Renames columns to be more presentable
    """
    if (col_string == "created_at_id") | (col_string == "updated_at_id"):
        col_string = col_string.replace("_id", "")
        return "id_" + col_string
    elif (col_string == "created_at_mess") | (col_string == "updated_at_mess"):
        col_string = col_string.replace("_mess", "")
        return col_string
    else:
        return col_string.replace("_mess", "", 1).replace("_id", "", 1)


def select_columns(dataframe, keep_user_vars=False):
    """
    Removes unneeded columns
    """
    if keep_user_vars:
        # If true, include user columns
        bad_cols = ["display_name_id", "name_id", "user_notice_params_mess"]
    else:
        # If false, not analyzing the users
        bad_cols = [
            "display_name_id",
            "name_id",
            "user_notice_params_mess",
            # these aren't used by the simpler fctns
            "bio_id",
            "created_at_id",
            "updated_at_id",
            "logo_id",
        ]
    dataframe = dataframe.drop(bad_cols, axis=1)
    cols = dataframe.columns
    cols = list(pd.Series(cols).apply(rename_columns))
    dataframe.columns = cols
    return dataframe


def organize_twitch_chat(data, keep_user_vars=False):
    """
    Turns json into dataframe. Expands lists of lists into own columns.

    input
    -----
    data: list
        list of dictionaries in json format, loaded with the `open` context manager.

    output
    ------
    df: pd.DataFrame
        Dataframe with the following columns:
            ['created_at', 'updated_at', 'display_name', '_id', 'name', 'type',
             'bio', 'logo', 'body', 'is_action', 'user_badges', 'emoticons']
    """
    if len(data) > 0:
        data = pd.DataFrame.from_records(data)  # convert to df
        df = data[["created_at", "updated_at", "commenter", "message"]].add_suffix("_mess")

        h = dictExtractor(df["message_mess"], label="_mess")
        messages = h.result
        g = dictExtractor(df["commenter_mess"], label="_id")
        users = g.result

        df = df.drop(["message_mess", "commenter_mess"], axis=1)  # duplicate info
        df = pd.concat([df, users, messages], axis=1)
        # all vars were loaded as str. Change type to datetime/int/bool
        df = df.astype(
            {
                "_id_id": int,
                "bio_id": "category",
                "created_at_id": "datetime64[ns]",
                "created_at_mess": "datetime64[ns]",
                "updated_at_id": "datetime64[ns]",
                "updated_at_mess": "datetime64[ns]",
                "is_action_mess": bool,
                "type_id": "category",
            }
        )
        df = select_columns(df, keep_user_vars)
        return df
    else:
        return np.array([])


class dictExtractor:
    def __init__(self, my_series, label=""):
        """
        Extracts dictionaries from series into a new dict using the
        longest dictionary's keys. Converts new dict into df, stored
        as `self.result`. Because not every dict had same number of keys.

        input
        -----
        my_series: pd.Series
            A column from twitch dataframe where each row is a dict
        label: str
            What will be appended to the end of each col
        """
        # find max length of dicts
        length = my_series.apply(lambda x: len(x))
        y = 0
        for x in length:
            if x > y:
                y = x
        # find index of max keys dict
        ind = length[length == y].index[0]
        max_d = my_series.iloc[ind].keys()
        self.max_d = max_d
        # initiate new dict
        self.new_dict = {}
        for k in max_d:
            self.new_dict[k] = []
        # extract dict values into new dict
        my_series.apply(lambda x: self.keys_iterator(x))
        # store as df
        self.result = pd.DataFrame.from_dict(self.new_dict)
        # df.add_suffix() is actually 0.25 seconds slower
        self.result.columns = [col + label for col in self.result.columns]

    def keys_iterator(self, my_dict):
        """
        Checks that all of the `max_d` are in the given dictionary. If not,
        appends np.nan. Otherwise appends the value.
        """
        for k in self.max_d:
            if k not in my_dict.keys():
                self.new_dict[k].append(np.nan)
            else:
                self.new_dict[k].append(my_dict[k])


class dfSplitter:
    def __init__(self, dataframe):
        """
        Splits dataframe into multiple dataframes, each 1 hour long

        output:
        ------
        my_list: list
            List of dataframes
        """
        # init function finds the first split
        dataframe = dataframe.sort_values("created_at")
        first = dataframe[
            dataframe["created_at"]
            <= dataframe.loc[0, "created_at"] + pd.Timedelta(hours=1)
        ]
        self.last_i = first.index.max()
        self.dataframe = dataframe
        self.result = []  # list to append starting timestamp + datasets to
        self.result.append(
            dataframe.iloc[0, 0]
        )  # NOTE: assumes first col is always "created_at" col
        self.result.append(first)

    def find_rest(self):
        """
        Uses last index of first split to find the others
        """
        dataframe = self.dataframe
        last_i = self.last_i
        if last_i + 1 != len(dataframe):
            new_df = dataframe.loc[last_i + 1 :, :]  # clip df to start at last_i
            newest = new_df[
                new_df["created_at"]
                <= new_df.loc[last_i + 1, "created_at"] + pd.Timedelta(hours=1)
            ]  # filter by hour
            self.result.append(newest)  # store in list
            self.last_i = newest.index.max()

            self.find_rest()  # repeat
        else:
            return dataframe  # never actually used


class xminChats:
    def __init__(self, dataframe, big_unique, min_=2):
        """
        Finds the percent unique chatters that chatted every min_ minutes

        input
        -----
        dataframe: pd.DataFrame
            Twitch chat dataframe organized and split by dfSplitter
        big_unique: int
            Total number of unique chatters for the entire Twitch stream
        min_: int
            Minute range to find timestamps for. Ex: Find 2 min long timestamps.
        """

        # init function finds the first split
        dataframe = dataframe.sort_values("created_at")
        first = dataframe[
            dataframe["created_at"] <= dataframe.iloc[0, 0] + pd.Timedelta(minutes=min_)
        ]

        self.min_ = min_
        self.total_uniques = len(dataframe["_id"].unique())
        self.big_unique = big_unique

        self.last_i = first.index.max()
        self.dataframe = dataframe

        self.result = []
        self.result.append(first)

    def find_rest(self):
        """
        Uses last index of first split to find the others
        """
        dataframe = self.dataframe
        last_i = self.last_i
        if (
            last_i + 1 < dataframe.index.max()
        ):  # NOT len(dataframe), that bugs out and i dont wanna explain why
            new_df = dataframe.loc[
                last_i + 1 :, :
            ]  # clip df to start new min_ min calc at last_i+1
            newest = new_df[
                new_df["created_at"]
                <= new_df.loc[last_i + 1, "created_at"]
                + pd.Timedelta(value=self.min_, unit="minutes")
            ]  # filter by minute
            self.result.append(newest)  # store in list

            self.last_i = newest.index.max()
            self.find_rest()  # repeat
        else:
            x = ""


def get_chunks(dataframe, min_=2):
    """
    Iterates through the data_helper classes to divide dataframe into chunks

    input
    -----
    dataframe: pd.DataFrame
        The entire twitch stream chat df
    min_: int
        The min_ value to pass into xminChats()

    output
    ------
    first_stamp: datetime
        The very first timestamp of dataframe
    chunk_list:
        List of `min_` long dataframes
    """
    dhs = dfSplitter(dataframe)
    dhs.find_rest()
    hour_list = dhs.result

    first_stamp = hour_list[0]
    del hour_list[0]

    chunk_list = []
    for i in range(len(hour_list)):
        hour = hour_list[i]

        dhx = xminChats(hour, dataframe["_id"].unique(), min_=min_)
        dhx.find_rest()
        chunks = dhx.result

        for x in range(len(chunks)):
            chunk = chunks[x]
            chunk["hour"] = i
            chunk["chunk"] = x
            chunk_list.append(chunk)
    return first_stamp, chunk_list


def results_jsonified(results, first_sec, results_col):
    """
    Converts timestamps to seconds, extracts results and makes the whole thing machine readable

    input
    -----
    results: pd.DataFrame
        DataFrame with at least the start (datetime) and end (datetime) columns, and a column to sort by.
    first_sec: datetime
        The very first timestamp in the entire twitch chat log. Used to calculate elapsed time in seconds.
    results_col: str
        Column to sort values by (ascending=False)

    output
    ------
    json_results: list
        List of dictionaries with startTime and endTime keys, sorted by best results at top
    """
    results[
        "first_sec"
    ] = first_sec  # to calculate elapsed time from first sec, in seconds
    results = results.sort_values(
        results_col, ascending=False
    )  # so json format is returned with top result being the most relevant
    json_results = []
    for i, row in results.iterrows():
        og = row["first_sec"]
        start = row["start"]
        end = row["end"]

        start_sec = dt.timedelta.total_seconds(
            start - og
        )  # find difference between first sec and given timestamp, convert that to seconds
        end_sec = dt.timedelta.total_seconds(end - og)

        dict_ = {"startTime": start_sec, "endTime": end_sec}
        json_results.append(dict_)

    return json_results


def save_json(json_results, name):
    """
    Converts list of dict to pure str, then saves as a json file.

    input
    -----
    json_results: list
        List of dictionaries containing results
    name: str
        Filename (with optional directory) to save as. Ex: name.json or exports/name.json
    """
    str_ = "["
    for dict_ in json_results:
        str_ += str(dict_) + ", \n "
    str_ += "]"

    with open(f"{name}.json", "w") as f:
        f.write(str_)
    print(f"Saved to {name}.json")


This is `algo1.py`, needed it to be modified (line `140`) to return nonjson results for testing purposes

In [3]:
"""
Sorts the final results by `perc_rel_unique`. Calculated as "number of chatters
at timestamp"/"number of chatters in that one hour"

HOW TO
    algo1.run(data, min_=2, limit=10, sort_by='rel', save_json = False)
"""

import pandas as pd

def perc_uniques(chunk_list, min_, total_uniques, big_unique):
    """
    Finds the percent unique chatters for each dataframe in the list. Dataframes
    assumed to be split using xminChats.find_rest.
    """

    perc_unique = {
        f"{min_}min_chunk": [],
        "start": [],
        "end": [],
        "num_unique": [],
        "perc_rel_unique": [],
        "perc_abs_unique": [],
    }

    for i in range(len(chunk_list)):
        # calcuate
        chunk = i
        unique = len(chunk_list[i]["_id"].unique())
        timestamp = [
            chunk_list[i]["created_at"].min(),
            chunk_list[i]["created_at"].max(),
        ]
        perc_rel = (
            unique / total_uniques
        )  # this is the total uniques in THAT DATAFRAME, ie the hourly cut
        perc_abs = (
            unique / big_unique
        )  # this is the total uniques in the entire twitch session
        # store
        perc_unique[f"{min_}min_chunk"].append(chunk)
        perc_unique["start"].append(timestamp[0])
        perc_unique["end"].append(timestamp[1])
        perc_unique["num_unique"].append(unique)
        perc_unique["perc_rel_unique"].append(perc_rel)
        perc_unique["perc_abs_unique"].append(perc_abs)

    df_unique = pd.DataFrame(perc_unique)
    df_unique["elapsed"] = df_unique["end"] - df_unique["start"]
    return df_unique


def hour_iterator(big_df, limit, min_=2, sort_by="rel"):
    """
    Pushes all dfs in a list through the xminChats function, returns a dataframe of results

    input
    -----
    big_df: pd.DataFrame
        Df of the entire twitch session. This is the one that was split by dfSplitter class
    min_: int
        How long a timestamp range should be
    sort_by: str
        Whether to sort values by `abs` or `rel` unique chatters.
    """
    ds = d.dfSplitter(big_df)  # initiate
    ds.find_rest()  # split big_df into 1 hour long separate dfs
    hour_list = (
        ds.result
    )  # result stored in class var. NOTE: index 0 is always the very first timestamp of big_df
    first_sec = hour_list[0]
    hour_list = hour_list[1:]

    # initiate empty results df
    results = pd.DataFrame(
        columns=[
            "hour",
            f"{min_}min_chunk",
            "start",
            "end",
            "num_unique",
            "perc_rel_unique",
            "perc_abs_unique",
        ]
    )
    max_uniques = len(
        big_df["_id"].unique()
    )  # the total number of unique chatters for the entire twitch session

    # iterate all sections through the class
    for i in range(len(hour_list)):
        fm = d.xminChats(hour_list[i], max_uniques, min_=min_)
        _n = fm.find_rest()  # _n not needed
        chunk_list = fm.result  # get back list of dfs, each 2 minutes long

        hr_uniques = perc_uniques(
            chunk_list, min_, total_uniques=fm.total_uniques, big_unique=fm.big_unique
        )
        hr_uniques["hour"] = i + 1
        results = results.append(hr_uniques)

    results["elapsed"] = results["end"] - results["start"]  # to double check length
    pretty_results = results.reset_index(drop=True)  # prettify
    pretty_results = pretty_results.sort_values(
        f"perc_{sort_by}_unique", ascending=False
    )
    results = results.head(10)

    json_results = d.results_jsonified(
        results, first_sec, results_col=f"perc_{sort_by}_unique"
    )  # ordered by top perc_rel_unique

    return pretty_results, json_results


def run(data, min_=2, limit=10, sort_by="rel", save_json=False):
    """
    Runs algo1 to sort timestamps by the relative percentage of chatters by default.

    input:
    ------
    data: list
        List of dictionaries of data from Twitch chat
    min_: int
        Approximate number of minutes each clip should be
    limit: int
        Number of rows/dictionaries/timestamps to return
    sort_by: str
        'rel': "number of chatters at timestamp"/"number of chatters at that hour"
        'abs': "number of chatters at timestamp"/"total number of chatters in stream"
    save_json: bool
        True if want to save results as json to exports folder
    """
    data = pd.DataFrame.from_records(data)
    big_df = d.organize_twitch_chat(data)  # fetch appropriate data
    if type(big_df) == pd.DataFrame:
        results, json_results = hour_iterator(big_df, limit=limit, min_=min_, sort_by=sort_by)
        if save_json:
            d.save_json(json_results, name=f"algo1_perc_{sort_by}_unique")
        return results # json_results
    else:
        return big_df

In [4]:
import json

In [5]:
data = json.load(open('data/sample_med.json'))

In [6]:
test_dir = 'pypi/prod/test/sample_data'
# df = organize_twitch_chat(data)
# df.to_csv(f'{test_dir}/sample_med_organized.csv', index = False)

In [17]:
results_df = run(data)
results_df = results_df.sort_values('perc_rel_unique', ascending=False)
results_df = results_df[['start','end','perc_rel_unique']]
results_col = 'perc_rel_unique'

In [23]:
first_sec = results_df.loc[0,'start']

In [18]:
results_df.to_csv(f"{test_dir}/sample_med_resultsdf.csv")

In [21]:
reloaded = pd.read_csv(f"{test_dir}/sample_med_resultsdf.csv")
reloaded = reloaded
# reloaded = reloaded[["start", "end", "perc_rel_unique"]]
reloaded = reloaded.astype(
    {"start": "datetime64[ns]",
     "end": "datetime64[ns]",
     "perc_rel_unique": float}
)
reloaded_sec = reloaded.sort_values('Unnamed: 0').iloc[0,1]

In [24]:
reloaded_sec == first_sec

True

In [25]:
results_df.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 212 entries, 210 to 175
Data columns (total 3 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   start            212 non-null    datetime64[ns]
 1   end              212 non-null    datetime64[ns]
 2   perc_rel_unique  212 non-null    float64       
dtypes: datetime64[ns](2), float64(1)
memory usage: 14.7 KB


In [26]:
reloaded.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 212 entries, 0 to 211
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   Unnamed: 0       212 non-null    int64         
 1   start            212 non-null    datetime64[ns]
 2   end              212 non-null    datetime64[ns]
 3   perc_rel_unique  212 non-null    float64       
dtypes: datetime64[ns](2), float64(1), int64(1)
memory usage: 6.8 KB


In [27]:
reloaded_j = results_jsonified(reloaded, reloaded_sec, results_col)

In [28]:
results_j = results_jsonified(results_df, first_sec, results_col)

In [29]:
results_j[:5]

[{'startTime': 25874.858, 'endTime': 25994.723},
 {'startTime': 25630.013, 'endTime': 25749.664},
 {'startTime': 25753.034, 'endTime': 25870.105},
 {'startTime': 12177.737, 'endTime': 12293.058},
 {'startTime': 26005.689, 'endTime': 26046.392}]

In [30]:
results_j_noi = results_jsonified(results_df.reset_index(drop=True), first_sec, results_col)
results_j_noi[:5]

[{'startTime': 25874.858, 'endTime': 25994.723},
 {'startTime': 25630.013, 'endTime': 25749.664},
 {'startTime': 25753.034, 'endTime': 25870.105},
 {'startTime': 12177.737, 'endTime': 12293.058},
 {'startTime': 26005.689, 'endTime': 26046.392}]

In [31]:
results_j_noi == results_j

True

In [183]:
len(new_j) == len(results_j)

True

In [173]:
counter = 0
failures = []
for i in range(len(new_j)):
    if new_j[i] == results_j[i]:
        counter+=1
    else:
        failures.append(i)

In [175]:
i = [{'startTime': 18725.355, 'endTime': 18845.192},
 {'startTime': 24129.932, 'endTime': 24249.818},
 {'startTime': 12012.096, 'endTime': 12131.918},
 {'startTime': 34939.788, 'endTime': 35058.316},
 {'startTime': 27129.808, 'endTime': 27249.795}]

In [184]:
results_j[:5]

[{'startTime': 25874.858, 'endTime': 25994.723},
 {'startTime': 25630.013, 'endTime': 25749.664},
 {'startTime': 25753.034, 'endTime': 25870.105},
 {'startTime': 12177.737, 'endTime': 12293.058},
 {'startTime': 26005.689, 'endTime': 26046.392}]