In [None]:
# Imports
from ast import literal_eval
from os import chdir, walk

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import LocalCluster
from IPython.display import clear_output

In [None]:
# Instantiates dask cluster
cluster = LocalCluster()
client = cluster.get_client()
client

In [None]:
# Move up two directories, to project base directory
chdir("../..")

# Instantiates empty list to hold file paths
paths = []

# Gets paths to review JSONs by appending file name to name of containing folder
for dir, _, items in walk('data/reviews/'):
    for item in items:
        paths.append(dir+'/'+item)

In [None]:
def extract_reviews(row: pd.Series) -> pd.DataFrame:
    """Extracts and formats Pandas DataFrame of reviews from within JSON super-DataFrame.

    Args:
        row (pd.Series): Pandas Series corresponding to contents of one JSON file.

    Returns:
        pd.DataFrame: Pandas DataFrame containing relevant contents of reviews JSON (i.e, excluding metadata).
    """

    # Parses review information as a dictionary (from string)
    dict_reviews = literal_eval(row["reviews"])

    # Parses review information as a pandas DataFrame
    df_reviews = pd.DataFrame(dict_reviews)
    
    # Adds appid to review DataFrame
    df_reviews["steam_appid"] = row["path"]

    # Extracts author information as separate DataFrame
    df_author_info = df_reviews["author"].apply(pd.Series)

    # Joins author information onto reviews DataFrame
    df_reviews = df_reviews.join(df_author_info.add_prefix("author_"))

    # Drops (now redundant) author information field
    df_reviews = df_reviews.drop("author", axis=1)

    # Reformats datetime columns
    for datetime_col in [
        "timestamp_created",
        "timestamp_updated",
        "author_last_played",
    ]:
        df_reviews[datetime_col] = pd.to_datetime(df_reviews[datetime_col], unit="s")


    # Coerces columns that should be ints to ints
    for int_col in [
        "recommendationid",
        "author_steamid",
        "steam_appid",
    ]:
        df_reviews[int_col] = df_reviews[int_col].astype(int)

    # Coerces column that should be float to float
    df_reviews["weighted_vote_score"] = df_reviews["weighted_vote_score"].astype(float)

    # Sets index to recommendation id column
    df_reviews = df_reviews.set_index("recommendationid")

    return df_reviews


In [None]:
def ingest_reviews(paths:list, batch_index:int, batch_count:int, meta:pd.DataFrame):
    """Imports reviews from a batch of review JSONs.

    Args:
        batch_paths (list): List of paths to review JSONs. Should refer to some subset of all review JSONs.
        batch_index (int): Index of current batch (for progress reporting).
        batch_count (int): Total number of batches (for progress reporting).
        meta (pd.DataFrame): Empty pandas DataFrame dask uses as a template.
    """
    
    # Reads review JSONs into a dask DataFrame
    ddf_review_jsons = dd.read_json(paths, include_path_column=True)

    # Trims values of path column to just name of containing folder.
    # (Review download function uses steam app ids as folder names)
    ddf_review_jsons["path"] = (
        ddf_review_jsons["path"].astype(str).str[:-14].replace(".+/", "", regex=True)
    )

    # Extracts reviews into dask DataFrame of pandas DataFrames (using template)
    ddf_reviews_dfs = ddf_review_jsons.apply(extract_reviews, axis=1, meta=meta)

    # Extracts pandas DataFrames of recommendation summaries from dask DataFrame;
    #   Concatenates into dask DataFrame
    ddf_reviews = dd.concat(ddf_reviews_dfs.compute().tolist())
    clear_output()
    print(f"Batch {batch_index+1}/{batch_count} loaded!")

    # Repartitions to one partition so that only one parquet is written per batch
    ddf_reviews = ddf_reviews.repartition(npartitions=1)

    # Writes reviews to parquet file.
    #   Append is false on first loop, so it should overwrite any parquet files from previous runs.
    ddf_reviews.to_parquet("data/reviews.parquet", append=(batch_index > 0))


In [None]:
# Instantiates some variables for batch processing step

# Imports template DataFrame so dask knows to to interpret things
meta = pd.read_parquet("dask_templates/reviews_meta.parquet")

# Sets batch size. 2^13 works pretty well with 64 GB of RAM;
#   You'll probably want to experiment if you have less.
#   I'd (naively) recommend 2^12 for 32 GB RAM, 2^11 for 16 GB RAM, etc.
batch_size = 2**13

# Calculates batch count for better progress reporting.
batch_count = len(paths) // batch_size

In [None]:
# Batch processing
for batch_index, paths_subset in enumerate(np.array_split(paths, len(paths)/batch_size)):
    ingest_reviews(paths_subset.tolist(), batch_index, batch_count, meta)

# Progress reporting
clear_output()
print('Ingest finished!')

# Shuts down dask cluster
client.shutdown()
print("Dask cluster terminated.")