# LDAS: Project - Team 2
Moritz Eck, moritz.eck.0055@student.uu.se<br>
Tyson McLeod, <br>
Isaline Baret, <br>
Markella-Achilleia Zacharouli, <br>

## Setup & Deploy

In [None]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import time

In [None]:
# start your application with dynamic allocation enabled, a timeout of no more than 30 seconds and a cap on CPU cores:
# REMOTE SESSION
# spark = SparkSession\
#        .builder\
#        .master("spark://192.168.1.153:7077") \
#        .appName("LDSA_Team2_Project")\
#        .config("spark.dynamicAllocation.enabled", True)\
#        .config("spark.shuffle.service.enabled", True)\
#        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
#        .config("spark.dynamicAllocation.maxExecutors", 8)\
# #.config("spark.executor.instances", 8)\ # set this to 1 if you want to compare remote execution with local.
#        .config("spark.executor.cores",8)\
#        .config('spark.executor.memory', "8g")\
#        .config("spark.driver.cores", 2)\
#        .config("spark.driver.memory", "2g")\
#        .config("spark.executor.heartbeatInterval","5s")\
#        .getOrCreate()

# LOCAL SESSION
spark = SparkSession\
    .builder.master("local[4]")\
    .appName("LDSA_Team2_Project")\
    .config("spark.executor.instances", 1)\
    .config("spark.executor.cores",4)\
    .config('spark.executor.memory', "8g")\
    .config("spark.driver.cores", 2)\
    .config("spark.driver.memory", "2g")\
    .config("spark.executor.heartbeatInterval","5s")\
    .getOrCreate()

# spark context (old RDD)
sc = spark.sparkContext

In [None]:
sc.setLogLevel("INFO")
LOCAL = True # TODO: select if the experiment is run remote or local

# filepaths
REMOTE_HDFS_PATH = "hdfs://192.168.1.153:9000/team02/input/"
LOCAL_PATH = "./data/"

# filenames
business_fn = "yelp_academic_dataset_business.json"
users_fn = "yelp_academic_dataset_users.json"
reviews_fn = "yelp_academic_dataset_reviews.json"

# create the filepaths (remote or local)
if LOCAL:
    business_fp = LOCAL_PATH + business_fn
    users_fp = LOCAL_PATH + users_fn
    reviews_fp = LOCAL_PATH + reviews_fn
else:
    business_fp = REMOTE_HDFS_PATH + business_fn
    users_fp = REMOTE_HDFS_PATH + users_fn
    reviews_fp = REMOTE_HDFS_PATH + reviews_fn

## Experiment 1: Business Data

### Load and Preprocess Data

In [None]:
# read JSON file into PySpark dataframe
business = spark.read.json(business_filepath)

# the inferred schema can be visualized using the printSchema() method
business.printSchema()

# show top 5 rows
business.show(5)

In [None]:
# the number of rows
print("Rows in Business Dataframe:\t", business.count())

# the number of RDD partitions
print("Number of Partitions:\t\t", business.rdd.getNumPartitions())

### Experiment 1: Sort all business according to stars and review_count

In [None]:
times = []
for i in range(10):
    # top businesses according to starts and review_count
    start_time = time.time()
    filtered = business.filter(business.stars >= 4.0).sort("stars", "review_count", ascending=[0,0]).head(10)
    end_time = time.time()

    for row in filtered:
        name, stars, rc = row["name"], row["stars"], row["review_count"]
        print("Name:\t{},\tStars:\t{},\tReview Count:\t{}".format(name[:12], stars, rc))
        
    delta = end_time - start_time
    times.append(delta)
    print("\nThe evaluation took: {} seconds".format(delta))
    
    # remove the storage and space
    del filtered, delta
    
print("Average Time Taken: {}".format(sum(times)/len(times)))

In [None]:
# removes the business dataframe (only do it if you need space)
del business 

## Experiment 2: User Data

### Load User Data & Preprocess

In [None]:
# LOCAL 
# read JSON file
users = spark.read.json(users_fp)

# the inferred schema can be visualized using the printSchema() method
users.printSchema()

# the number of rows
print("Rows in Users Dataframe:\t", users.count())

# the number of RDD partitions
print("Number of Partitions:\t\t", users.rdd.getNumPartitions())

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType

# use udf to define a row-at-a-time udf
def count_friends(line):
    # lowercase transformation
    # splitting into tokens/words
    return len(line.lower().split(', '))

# count the number of friends per user and add the value as a new column
count_friends = udf(count_friends, IntegerType())

### Experiment 2: Sorting dataset according "review_count", "useful", "fans" and couting the number of friends per reviewer and sorting according to the number.

In [None]:
times = []
for i in range(5):
    start_time = time.time()

    # sort according to reviewers with most reviews
    top_reviewers = users.sort("review_count", ascending=False).head(20)

    # sort according to reviewers with most useful reviews
    top_useful_reviews = users.sort("useful", "review_count", ascending=[0,0]).head(20)

    # sort according to reviewers with most fans
    top_fan_count = users.sort("fans", "useful", ascending=[0,0]).head(20)

    # count the number of friends per reviewer
    modified = users.withColumn("friendsCount", count_friends(col("friends")))

    # sort according to reviewers with most friends and then fans
    top_friends = modified.sort("friendsCount", "fans", ascending=[0,0]).head(20)

    end_time = time.time()
    delta = end_time - start_time
    times.append(delta)

    print("Top 5 Reviewers by Review Count!")
    for row in top_reviewers[:5]:
        name, since, rc = row["name"], row["yelping_since"], row["review_count"]
        print("Name:\t{}\tReview Count:\t{}\tYelping Since:\t{}".format(name, rc, since))

    print("\nTop 5 Most Useful Reviews by Reviewer!")
    for row in top_useful_reviews[:5]:
        name, since, rc, useful = row["name"], row["yelping_since"], row["review_count"], row["useful"]
        print("Name:\t{}\tUseful Reviews:\t{}\tReview Count:\t{}\tYelping Since:\t{}".format(name, useful, rc, since))

    print("\nTop 5 Reviewers with most Fans!")
    for row in top_fan_count[:5]:
        name, since, rc, useful, fans = row["name"], row["yelping_since"], row["review_count"], row["useful"], row["fans"]
        print("Name:\t{}\tFans:\t{}\tUseful Reviews:\t{}\tReview Count:\t{}\tYelping Since:\t{}".format(name, fans, useful, rc, since))

    print("\nTop 5 Reviewers with most Friends!")
    for row in top_friends[:5]:
        name, since, fc, fans = row["name"], row["yelping_since"], row["friendsCount"], row["fans"]
        print("Name:\t{}\tFriends:\t{}\tFans:\t{}\tYelping Since:\t{}".format(name, fans, fc, since))
    
    print("\nThe evaluation took: {:3.3f} seconds".format(end_time - start_time))
    del top_reviewers, top_useful_reviews, top_fan_count, modified, top_friends, delta
    
print("Average Time: {}".format(sum(times)/len(times)))

## Experiment 3: Reviews

### Load User Data & Preprocess

In [None]:
# LOCAL 
# read JSON file
reviews = spark.read.json("./data/reviews_1000000.json")

# the inferred schema can be visualized using the printSchema() method
reviews.printSchema()

# the number of rows
print("Rows in Reviews Dataframe:\t", reviews.count())

# the number of RDD partitions
print("Number of Partitions:\t\t", reviews.rdd.getNumPartitions())

### Experiment 3: Preprocessing of Reviews & Join with Businesses 

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# use udf to define a row-at-a-time udf
def preprocess(line):
    # lowercase transformation
    # splitting into tokens/words
    tokens = line.lower().split(' ')
    tokens = [token.strip() for token in tokens]
    # TODO: do more fancy preprocessing (e.g., using NLTK or SpaCy to stem and remove stopwords)
    return str(tokens)

# tokenize preprocessing udf
tok = udf(preprocess, StringType())

start_time = time.time()

# preprocess reviews
pr_reviews = reviews.withColumn("text", tok(col("text")))
pr_reviews.drop('user_id', 'review_id')

# print first three rows
# pr_reviews.show(5, False)

# join business and reviews
merged = business.join(pr_reviews, business.business_id == pr_reviews.business_id, 'left_outer').drop('attributes', 'hours')
merged.show(3, False)

end_time = time.time()
    
print("\nThe evaluation took: {:3.3f} seconds".format(end_time - start_time))

## Shutdown

In [None]:
# release the cores for another application!
spark.stop()
sc.stop()