# Metrics
This lab notebook prototypes the process of aggregation and computation of review metrics using PySpark 

In [1]:

import pyspark
from pyspark.sql.functions import  col, from_unixtime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from appvocai-discover.utils.repo import ReviewRepo
from pyspark.sql import functions as sparkFunc

## Spark Session

In [2]:
spark = (
    SparkSession.builder.appName("AppVoCAI-Discover")
    .master("local[*]")
    .config("spark.driver.memory", "32g")
    .config("spark.executor.memory", "32g")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryoserializer.buffer.max", "2000M")
    .config("spark.driver.maxResultSize", "0")
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3")
    .config("spark.sql.legacy.parquet.nanosAsLong", "true")    
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/05/28 22:29:59 WARN Utils: Your hostname, Leviathan resolves to a loopback address: 127.0.1.1; using 172.29.219.103 instead (on interface eth0)
24/05/28 22:29:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/john/anaconda3/envs/appvocai-discover/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/john/.ivy2/cache
The jars for the packages stored in: /home/john/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-714cf5d2-5ed6-4770-9517-668c00dd629f;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.3.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central
	f

## Data

In [3]:
schema = StructType([ \
    StructField("id", StringType(), True), \
    StructField("app_id", StringType(), True), \
    StructField("app_name", StringType(), True), \
    StructField("category_id", StringType(), True), \
    StructField("category", StringType(), True), \
    StructField("author", StringType(), True), \
    StructField("rating", DoubleType(), True), \
    StructField("title", StringType(), True), \
    StructField("content", StringType(), True), \
    StructField("eda_review_length", LongType(), True), \
    StructField("vote_count", LongType(), True), \
    StructField("vote_sum", LongType(), True), \
    StructField("date", LongType(), True), \
    
    ])

column_dict = {
    "id": StringType(),
    "app_id": StringType(),
    "app_name": StringType(),
    "category_id": StringType(),
    "category": StringType(),
    "author": StringType(),
    "rating": DoubleType(),
    "title": StringType(),
    "content": StringType(),
    "eda_review_length": LongType(),
    "vote_count": LongType(),
    "vote_sum": LongType(),
    "date": LongType()
}

In [4]:
FP1 = {"directory": "00_raw", "filename": "reviews.pkl"}
FP2 = {"directory": "05_precomp", "filename": "reviews.parquet"}
FP_CATEGORY = {"directory": "05_precomp", "filename": "category.pkl"}
FP_AUTHOR = {"directory": "05_precomp", "filename": "author.pkl"}
FP_APP = {"directory": "05_precomp", "filename": "app.pkl"}
FP_CATEGORY_AUTHOR = {"directory": "05_precomp", "filename": "category_author.pkl"}
df1 = ReviewRepo.read(directory=FP1['directory'], filename=FP1["filename"])
filepath = ReviewRepo.get_filepath(directory=FP2["directory"], filename=FP2["filename"])

df1.to_parquet(filepath)


## Create Spark DataFrame

In [5]:
df2 = spark.read.schema(schema).parquet(filepath)
df = df2.withColumn("date", (col("date") / 100000000).cast("timestamp"))
df2.printSchema()
df2.take(5)

root
 |-- id: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_name: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- author: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)
 |-- eda_review_length: long (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- vote_sum: long (nullable = true)
 |-- date: long (nullable = true)



                                                                                

[Row(id='1421843662', app_id='536049508', app_name='Full Fitness : Exercise Workout Trainer', category_id='6013', category='Health & Fitness', author='Shawarma Sheikh', rating=1.0, title='Waste of $3', content='Exactly the same as "Fitness Buddy" app. Almost verbatim.', eda_review_length=9, vote_count=0, vote_sum=0, date=1469760910000000000),
 Row(id='2180538326', app_id='386022579', app_name='Pregnancy Tracker - BabyCenter', category_id='6013', category='Health & Fitness', author='Baby, I Am Your Father!', rating=5.0, title='Great App', content='My wife has the app but I think I use it more than her.... 😂', eda_review_length=15, vote_count=0, vote_sum=0, date=1518062808000000000),
 Row(id='830423077', app_id='379693831', app_name='Audible: Audio Entertainment', category_id='6018', category='Book', author='Fluffymarshmellow', rating=1.0, title="Doesn't let me join", content="Not worth it if I can't just join an account so I don't need a computer to do this not downloading ever again", 

## Category Metrics

In [6]:
df_category = df2.groupBy("category").agg(
    sparkFunc.countDistinct(col("app_id")).alias("app_count"),
    sparkFunc.countDistinct(col("author")).alias("author_count"),
    sparkFunc.count("*").alias("review_count"),
    sparkFunc.min(col("rating")).alias("rating_min"),
    sparkFunc.max(col("rating")).alias("rating_max"),
    sparkFunc.avg(col("rating")).alias("rating_avg"),
    sparkFunc.mode(col("rating")).alias("rating_mode"),
    sparkFunc.stddev(col("rating")).alias("rating_std"),
    sparkFunc.min(col("eda_review_length")).alias("eda_review_length_min"),
    sparkFunc.max(col("eda_review_length")).alias("eda_review_length_max"),
    sparkFunc.avg(col("eda_review_length")).alias("eda_review_length_avg"),
    sparkFunc.mode(col("eda_review_length")).alias("eda_review_length_mode"),
    sparkFunc.stddev(col("eda_review_length")).alias("eda_review_length_std"),
    sparkFunc.min(col("vote_count")).alias("vote_count_min"),
    sparkFunc.max(col("vote_count")).alias("vote_count_max"),
    sparkFunc.avg(col("vote_count")).alias("vote_count_avg"),
    sparkFunc.mode(col("vote_count")).alias("vote_count_mode"),
    sparkFunc.stddev(col("vote_count")).alias("vote_count_std"),
    sparkFunc.min(col("vote_sum")).alias("vote_sum_min"),
    sparkFunc.max(col("vote_sum")).alias("vote_sum_max"),
    sparkFunc.avg(col("vote_sum")).alias("vote_sum_avg"),
    sparkFunc.mode(col("vote_sum")).alias("vote_sum_mode"),
    sparkFunc.stddev(col("vote_sum")).alias("vote_sum_std"),
    sparkFunc.min(col("date")).alias("date_min"),
    sparkFunc.max(col("date")).alias("date_max"),
    sparkFunc.mode(col("date")).alias("date_mode")
    )


### Category Metrics Revealed

In [7]:
df_category.take(5)

24/05/28 22:30:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

[Row(category='Education', app_count=1086, author_count=11123, review_count=11133, rating_min=1.0, rating_max=5.0, rating_avg=3.56911883589329, rating_mode=5.0, rating_std=1.7564127936010225, eda_review_length_min=1, eda_review_length_max=915, eda_review_length_avg=27.842540195814244, eda_review_length_mode=1, eda_review_length_std=36.98320623452128, vote_count_min=0, vote_count_max=158, vote_count_avg=0.15988502649779934, vote_count_mode=0, vote_count_std=2.167886280370067, vote_sum_min=0, vote_sum_max=91, vote_sum_avg=0.0997035839396389, vote_sum_mode=0, vote_sum_std=1.274537427514254, date_min=1234909970000000000, date_max=1691898124000000000, date_mode=1585069723000000000),
 Row(category='Entertainment', app_count=952, author_count=20295, review_count=20316, rating_min=1.0, rating_max=5.0, rating_avg=3.536424493010435, rating_mode=5.0, rating_std=1.689947197301256, eda_review_length_min=1, eda_review_length_max=1049, eda_review_length_avg=27.27057491632211, eda_review_length_mode=1

### Category Metrics Saved

In [8]:
df_pandas = df_category.toPandas()
ReviewRepo().write(directory=FP_CATEGORY["directory"], filename=FP_CATEGORY["filename"], data=df_pandas)

                                                                                

## Author Metrics

In [9]:
df_author = df2.groupBy("author").agg(
    sparkFunc.countDistinct(col("app_id")).alias("app_count"),
    sparkFunc.countDistinct(col("category")).alias("category"),
    sparkFunc.count("*").alias("review_count"),
    sparkFunc.min(col("rating")).alias("rating_min"),
    sparkFunc.max(col("rating")).alias("rating_max"),
    sparkFunc.avg(col("rating")).alias("rating_avg"),
    sparkFunc.mode(col("rating")).alias("rating_mode"),
    sparkFunc.stddev(col("rating")).alias("rating_std"),
    sparkFunc.min(col("eda_review_length")).alias("eda_review_length_min"),
    sparkFunc.max(col("eda_review_length")).alias("eda_review_length_max"),
    sparkFunc.avg(col("eda_review_length")).alias("eda_review_length_avg"),
    sparkFunc.mode(col("eda_review_length")).alias("eda_review_length_mode"),
    sparkFunc.stddev(col("eda_review_length")).alias("eda_review_length_std"),
    sparkFunc.min(col("vote_count")).alias("vote_count_min"),
    sparkFunc.max(col("vote_count")).alias("vote_count_max"),
    sparkFunc.avg(col("vote_count")).alias("vote_count_avg"),
    sparkFunc.mode(col("vote_count")).alias("vote_count_mode"),
    sparkFunc.stddev(col("vote_count")).alias("vote_count_std"),
    sparkFunc.min(col("vote_sum")).alias("vote_sum_min"),
    sparkFunc.max(col("vote_sum")).alias("vote_sum_max"),
    sparkFunc.avg(col("vote_sum")).alias("vote_sum_avg"),
    sparkFunc.mode(col("vote_sum")).alias("vote_sum_mode"),
    sparkFunc.stddev(col("vote_sum")).alias("vote_sum_std"),
    sparkFunc.min(col("date")).alias("date_min"),
    sparkFunc.max(col("date")).alias("date_max"),
    sparkFunc.mode(col("date")).alias("date_mode")
    )

### Author Metrics Revealed

In [10]:
df_author.take(5)

                                                                                

[Row(author='$&$FortniteGamer$&$', app_count=1, category=1, review_count=1, rating_min=5.0, rating_max=5.0, rating_avg=5.0, rating_mode=5.0, rating_std=None, eda_review_length_min=3, eda_review_length_max=3, eda_review_length_avg=3.0, eda_review_length_mode=3, eda_review_length_std=None, vote_count_min=0, vote_count_max=0, vote_count_avg=0.0, vote_count_mode=0, vote_count_std=None, vote_sum_min=0, vote_sum_max=0, vote_sum_avg=0.0, vote_sum_mode=0, vote_sum_std=None, date_min=1589815084000000000, date_max=1589815084000000000, date_mode=1589815084000000000),
 Row(author='222Gem', app_count=1, category=1, review_count=1, rating_min=5.0, rating_max=5.0, rating_avg=5.0, rating_mode=5.0, rating_std=None, eda_review_length_min=13, eda_review_length_max=13, eda_review_length_avg=13.0, eda_review_length_mode=13, eda_review_length_std=None, vote_count_min=0, vote_count_max=0, vote_count_avg=0.0, vote_count_mode=0, vote_count_std=None, vote_sum_min=0, vote_sum_max=0, vote_sum_avg=0.0, vote_sum_mo

### Author Metrics Saved

In [11]:
df_pandas = df_author.toPandas()
ReviewRepo().write(directory=FP_AUTHOR["directory"], filename=FP_AUTHOR["filename"], data=df_pandas)

                                                                                

## App Metrics

In [12]:
df_app = df2.groupBy("app_name").agg(
    sparkFunc.countDistinct(col("author")).alias("author_count"),
    sparkFunc.count("*").alias("review_count"),
    sparkFunc.min(col("rating")).alias("rating_min"),
    sparkFunc.max(col("rating")).alias("rating_max"),
    sparkFunc.avg(col("rating")).alias("rating_avg"),
    sparkFunc.mode(col("rating")).alias("rating_mode"),
    sparkFunc.stddev(col("rating")).alias("rating_std"),
    sparkFunc.min(col("eda_review_length")).alias("eda_review_length_min"),
    sparkFunc.max(col("eda_review_length")).alias("eda_review_length_max"),
    sparkFunc.avg(col("eda_review_length")).alias("eda_review_length_avg"),
    sparkFunc.mode(col("eda_review_length")).alias("eda_review_length_mode"),
    sparkFunc.stddev(col("eda_review_length")).alias("eda_review_length_std"),
    sparkFunc.min(col("vote_count")).alias("vote_count_min"),
    sparkFunc.max(col("vote_count")).alias("vote_count_max"),
    sparkFunc.avg(col("vote_count")).alias("vote_count_avg"),
    sparkFunc.mode(col("vote_count")).alias("vote_count_mode"),
    sparkFunc.stddev(col("vote_count")).alias("vote_count_std"),
    sparkFunc.min(col("vote_sum")).alias("vote_sum_min"),
    sparkFunc.max(col("vote_sum")).alias("vote_sum_max"),
    sparkFunc.avg(col("vote_sum")).alias("vote_sum_avg"),
    sparkFunc.mode(col("vote_sum")).alias("vote_sum_mode"),
    sparkFunc.stddev(col("vote_sum")).alias("vote_sum_std"),
    sparkFunc.min(col("date")).alias("date_min"),
    sparkFunc.max(col("date")).alias("date_max"),
    sparkFunc.mode(col("date")).alias("date_mode")
    )

### App Metrics Revealed

In [13]:
df_app.take(5)

                                                                                

[Row(app_name='#TrailsRoc Maps', author_count=1, review_count=1, rating_min=2.0, rating_max=2.0, rating_avg=2.0, rating_mode=2.0, rating_std=None, eda_review_length_min=9, eda_review_length_max=9, eda_review_length_avg=9.0, eda_review_length_mode=9, eda_review_length_std=None, vote_count_min=1, vote_count_max=1, vote_count_avg=1.0, vote_count_mode=1, vote_count_std=None, vote_sum_min=0, vote_sum_max=0, vote_sum_avg=0.0, vote_sum_mode=0, vote_sum_std=None, date_min=1508685986000000000, date_max=1508685986000000000, date_mode=1508685986000000000),
 Row(app_name='-•- (Dash Dot Dash)', author_count=1, review_count=1, rating_min=5.0, rating_max=5.0, rating_avg=5.0, rating_mode=5.0, rating_std=None, eda_review_length_min=51, eda_review_length_max=51, eda_review_length_avg=51.0, eda_review_length_mode=51, eda_review_length_std=None, vote_count_min=12, vote_count_max=12, vote_count_avg=12.0, vote_count_mode=12, vote_count_std=None, vote_sum_min=11, vote_sum_max=11, vote_sum_avg=11.0, vote_sum_

### App Metrics Saved

In [14]:
df_pandas = df_app.toPandas()
ReviewRepo().write(directory=FP_APP["directory"], filename=FP_APP["filename"], data=df_pandas)

                                                                                

## Category / Author Metrics

In [15]:
df_category_author = df2.groupBy("category", "author").agg(    
    sparkFunc.countDistinct(col("app_id")).alias("app_count"),
    sparkFunc.count("*").alias("review_count"),
    sparkFunc.min(col("rating")).alias("rating_min"),
    sparkFunc.max(col("rating")).alias("rating_max"),
    sparkFunc.avg(col("rating")).alias("rating_avg"),
    sparkFunc.mode(col("rating")).alias("rating_mode"),
    sparkFunc.stddev(col("rating")).alias("rating_std"),
    sparkFunc.min(col("eda_review_length")).alias("eda_review_length_min"),
    sparkFunc.max(col("eda_review_length")).alias("eda_review_length_max"),
    sparkFunc.avg(col("eda_review_length")).alias("eda_review_length_avg"),
    sparkFunc.mode(col("eda_review_length")).alias("eda_review_length_mode"),
    sparkFunc.stddev(col("eda_review_length")).alias("eda_review_length_std"),
    sparkFunc.min(col("vote_count")).alias("vote_count_min"),
    sparkFunc.max(col("vote_count")).alias("vote_count_max"),
    sparkFunc.avg(col("vote_count")).alias("vote_count_avg"),
    sparkFunc.mode(col("vote_count")).alias("vote_count_mode"),
    sparkFunc.stddev(col("vote_count")).alias("vote_count_std"),
    sparkFunc.min(col("vote_sum")).alias("vote_sum_min"),
    sparkFunc.max(col("vote_sum")).alias("vote_sum_max"),
    sparkFunc.avg(col("vote_sum")).alias("vote_sum_avg"),
    sparkFunc.mode(col("vote_sum")).alias("vote_sum_mode"),
    sparkFunc.stddev(col("vote_sum")).alias("vote_sum_std"),
    sparkFunc.min(col("date")).alias("date_min"),
    sparkFunc.max(col("date")).alias("date_max"),
    sparkFunc.mode(col("date")).alias("date_mode")
    )

### Category / Author Metrics Revealed

In [16]:
df_category_author.take(5)

                                                                                

[Row(category='Book', author=' missyb08', app_count=1, review_count=1, rating_min=5.0, rating_max=5.0, rating_avg=5.0, rating_mode=5.0, rating_std=None, eda_review_length_min=2, eda_review_length_max=2, eda_review_length_avg=2.0, eda_review_length_mode=2, eda_review_length_std=None, vote_count_min=0, vote_count_max=0, vote_count_avg=0.0, vote_count_mode=0, vote_count_std=None, vote_sum_min=0, vote_sum_max=0, vote_sum_avg=0.0, vote_sum_mode=0, vote_sum_std=None, date_min=1659675780000000000, date_max=1659675780000000000, date_mode=1659675780000000000),
 Row(category='Book', author='"Cici"', app_count=1, review_count=1, rating_min=5.0, rating_max=5.0, rating_avg=5.0, rating_mode=5.0, rating_std=None, eda_review_length_min=5, eda_review_length_max=5, eda_review_length_avg=5.0, eda_review_length_mode=5, eda_review_length_std=None, vote_count_min=0, vote_count_max=0, vote_count_avg=0.0, vote_count_mode=0, vote_count_std=None, vote_sum_min=0, vote_sum_max=0, vote_sum_avg=0.0, vote_sum_mode=0

### Category / Author Metrics Saved

In [17]:
df_pandas = df_category_author.toPandas()
ReviewRepo().write(directory=FP_CATEGORY_AUTHOR["directory"], filename=FP_CATEGORY_AUTHOR["filename"], data=df_pandas)

                                                                                