## Imports

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Final Project").getOrCreate()
print(spark)

import random
import concurrent.futures
import time
from pyspark.sql.functions import col, when
from pyspark.sql.functions import min as spark_min, max as spark_max

<pyspark.sql.session.SparkSession object at 0x7faa625e23c8>


In [3]:
spark = SparkSession.builder \
    .appName("Product Rating Analysis") \
    .master("yarn") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://hadoop-master:9000") \
    .config("spark.yarn.jars", "hdfs://hadoop-master:9000/spark-jars/*.jar") \
    .getOrCreate()

In [30]:
spark2 = SparkSession.builder \
    .appName("Product Rating Analysis") \
    .master("yarn") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://hadoop-master:9000") \
    .config("spark.executor.instances", "3") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

# "3" tells YARN to allocate 3 executors for your Spark application
# "2" tells YARN to allocate 2 cores for each executor
# "2g" tells YARN to allocate 2 GB of memory for each executor

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'yarn'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1718607186279'),
 ('spark.hadoop.fs.defaultFS', 'hdfs://hadoop-master:9000'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'Product Rating Analysis'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'localhost'),
 ('spark.yarn.jars', 'hdfs://hadoop-master:9000/spark-jars/*.jar'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '45167')]

In [16]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'yarn'),
 ('spark.rdd.compress', 'True'),
 ('spark.hadoop.fs.defaultFS', 'hdfs://hadoop-master:9000'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'Product Rating Analysis'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'localhost'),
 ('spark.app.id', 'local-1718525097980'),
 ('spark.yarn.jars', 'hdfs://hadoop-master:9000/spark-jars/*.jar'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '33767')]

In [1]:
!python --version
!pyspark --version

Python 2.7.17
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.4
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_362
Branch 
Compiled by user  on 2019-08-26T08:09:33Z
Revision 
Url 
Type --help for more information.


## Data loading

In [2]:
df = spark.read.csv("hdfs://localhost:9000/user/input/data.csv", header=True, inferSchema=True)
rdd = df.rdd
df = df.drop('Timestamp')
df.show(3)

+--------------+----------+------+
|        UserId| ProductId|Rating|
+--------------+----------+------+
|A39HTATAQ9V7YF|0205616461|   5.0|
|A3JM6GV9MNOF9X|0558925278|   3.0|
|A1Z513UWSAAO0F|0558925278|   5.0|
+--------------+----------+------+
only showing top 3 rows



In [31]:
df1 = spark1.read.csv("hdfs://localhost:9000/user/input/data.csv", header=True, inferSchema=True)
df2 = spark2.read.csv("hdfs://localhost:9000/user/input/data.csv", header=True, inferSchema=True)

## Project needs to read dataset categorize it. 

### Average Rating per Product

In [12]:
df.groupBy("ProductId").avg("Rating").withColumnRenamed("avg(Rating)", "AverageRating").show(3)

+----------+-------------+
| ProductId|AverageRating|
+----------+-------------+
|9790773587|          5.0|
|9790794207|          5.0|
|B00004VBMM|          5.0|
+----------+-------------+
only showing top 3 rows



In [26]:
def get_avg_rating(df):
    return df.groupBy("ProductId").avg("Rating").withColumnRenamed("avg(Rating)", "AverageRating")

In [41]:
start = time.time()
for i in range(1000):
    get_avg_rating(df1)
print("Execution time with 1 executor:  ", time.time() - start)

start = time.time()
for i in range(1000):
    get_avg_rating(df2)
print("Execution time with 3 executors: ", time.time() - start)

Execution time with 1 executor:   4.436811923980713
Execution time with 3 executors:  4.402645587921143


### Average Rating per User

In [13]:
df.groupBy("UserId").avg("Rating").withColumnRenamed("avg(Rating)", "AverageRating").show(3)

+--------------+-------------+
|        UserId|AverageRating|
+--------------+-------------+
|A2HNQ3JHXDSVMW|          3.0|
|A2DOQ89OLXNHNL|          5.0|
|A17U6P3YQISHYH|          4.8|
+--------------+-------------+
only showing top 3 rows



In [14]:
def get_avg_rating(df):
    return df.groupBy("UserId").avg("Rating").withColumnRenamed("avg(Rating)", "AverageRating")

In [15]:
# Compare execution time
start = time.time()
get_avg_rating(df)
print("Execution time with DataFrame: ", time.time() - start)

start = time.time()
get_avg_rating_rdd(rdd)
print("Execution time with RDD: ", time.time() - start)

Execution time with DataFrame:  0.011167526245117188
Execution time with RDD:  0.016162395477294922


## It should be possible to find lowest and highest scores in dataset

In [14]:
min_rating = df.select(spark_min("Rating")).collect()[0][0]
max_rating = df.select(spark_max("Rating")).collect()[0][0]
print("Min rating: ", min_rating)
print("Max rating: ", max_rating)

Min rating:  1.0
Max rating:  5.0


In [3]:
def get_min_rating(df):
    return df.select(spark_min("Rating")).collect()[0][0]

In [4]:
def get_max_rating(df):
    return df.select(spark_max("Rating")).collect()[0][0]

## Add new data

In [31]:
def add_data(df, new_data, columns, spark=SparkSession.builder.appName("Product Rating Analysis").getOrCreate()):
    new_df = spark.createDataFrame(new_data, schema=columns)
    return df.union(new_df)

In [37]:
input = ['A2HNQ3JHXDSVMW', 'B0000C321X', 5]
user_id, product_id, new_rating = input
df = add_data(df, [(user_id, product_id, new_rating)], df.columns)
df.filter((col("UserId") == user_id) & (col("ProductId") == product_id)).show()

+--------------+----------+------+
|        UserId| ProductId|Rating|
+--------------+----------+------+
|A2HNQ3JHXDSVMW|B0000C321X|   5.0|
+--------------+----------+------+



## Update existing one

In [46]:
def update_data(df, user_id, product_id, new_rating):
    condition = (col("UserId") == user_id) & (col("ProductId") == product_id)
    if df.filter(condition).count() > 0:
        df = df.withColumn(
            'Rating',
            when(condition, new_rating).otherwise(col("Rating"))
        )
        print(f"Updated UserId: {user_id}, ProductId: {product_id} with new Rating: {new_rating}")
        df.filter(condition).show()
        return df
    else:
        print(f"No entry found for UserId: {user_id}, ProductId: {product_id}")

    return df

In [50]:
df = update_data(df, 'A2HNQ3JHXDSVMW' , 'B0000C321X', 4)

Updated UserId: A2HNQ3JHXDSVMW, ProductId: B0000C321X with new Rating: 4
+--------------+----------+------+
|        UserId| ProductId|Rating|
+--------------+----------+------+
|A2HNQ3JHXDSVMW|B0000C321X|   4.0|
+--------------+----------+------+



## Stress Test 1: The client makes the same request very quickly min (10000 times).

In [53]:
def task_1():
    for _ in range(10000):
        get_avg_rating(df)

In [55]:
print("Running Stress Test 1: Single client making the same request very quickly")
start_time = time.time()
task_1()
print(f"Stress Test 1 completed in {time.time() - start_time:.2f} seconds")

Running Stress Test 1: Single client making the same request very quickly
Stress Test 1 completed in 44.23 seconds


## Stress Test 2: Two or more clients make the possible requests randomly (10000 times).

In [5]:
counter = 0
def task_2(no_requests, worker_id):
    global counter
    requests = [random.randint(1, 100) for _ in range(no_requests)]
    ratings = []
    try:
        for req in requests:
            print(f"Counter: {counter}, Client: {worker_id}, Request: {req}")
            if req % 2 == 0:
                counter += 1
                ratings.append(get_max_rating(df))
            else:
                counter += 1
                ratings.append(get_min_rating(df))
    except Exception as e:
            print(f"Error processing request {req}: {e}")

In [47]:
import os
max_workers = os.cpu_count() 
print(f"Number of CPU cores: {max_workers}")

Number of CPU cores: 16


In [6]:
print("Running Stress Test 2: Multiple clients making requests randomly")
start_time = time.time()
no_requests = 10000
client_numbers = []
remaining_amount = no_requests
w_id = 0
for _ in range(15):
    client = no_requests//16
    client_numbers.append((client, w_id))
    remaining_amount -= client
    w_id += 1
client_numbers.append((remaining_amount, w_id)) 

with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
    futures = [executor.submit(task_2, client, worker_id) for client, worker_id in client_numbers]
    try:
        concurrent.futures.wait(futures)
    except KeyboardInterrupt:
        executor.shutdown(wait=False)
        print("Tasks interrupted and executor shut down.")

print(f"Stress Test 2 completed in {time.time() - start_time:.2f} seconds")

Running Stress Test 2: Multiple clients making requests randomly
Counter: 0, Client: 0, Request: 6
Counter: 1, Client: 1, Request: 93
Counter: 2, Client: 2, Request: 52
Counter: 3, Client: 3, Request: 54
Counter: 4, Client: 4, Request: 46
Counter: 5, Client: 5, Request: 63
Counter: 6, Client: 6, Request: 75
Counter: 7, Client: 7, Request: 6
Counter: 8, Client: 8, Request: 67
Counter: 9, Client: 9, Request: 27
Counter: 10, Client: 10, Request: 76
Counter: 11, Client: 11, Request: 19
Counter: 12, Client: 12, Request: 70
Counter: 13, Client: 13, Request: 41
Counter: 14, Client: 14, Request: 26
Counter: 15, Client: 15, Request: 49
Counter: 16, Client: 10, Request: 75
Counter: 17, Client: 8, Request: 1
Counter: 18, Client: 7, Request: 62
Counter: 19, Client: 9, Request: 59
Counter: 20, Client: 12, Request: 5
Counter: 21, Client: 6, Request: 31
Counter: 22, Client: 13, Request: 24
Counter: 23, Client: 15, Request: 43
Counter: 24, Client: 2, Request: 42
Counter: 25, Client: 0, Request: 53
Cou

Counter: 1317, Client: 6, Request: 79


KeyboardInterrupt: 

In [55]:
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

counter = 0
counter_lock = Lock()

def handle_request(req, worker_id):
    global counter
    with counter_lock:
        counter += 1
        print(f"Counter: {counter}, Client: {worker_id}")
    if req % 2 == 0:
        get_max_rating(df)
    else:
        get_min_rating(df)

def task_2_concurrent(no_requests, worker_id):
    requests = [random.randint(1, 100) for _ in range(no_requests)]
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(handle_request, req, worker_id) for req in requests]
        for future in as_completed(futures):
            future.result()

In [7]:
print("Running Stress Test 2: Multiple clients making requests randomly")
start_time = time.time()

no_requests = 10000

client1 = random.randint(1, no_requests)
client2 = random.randint(1, no_requests - client1)
client3 = no_requests - client1 - client2

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task_2, client1), 
               executor.submit(task_2, client2), 
               executor.submit(task_2, client3)]
    try:
        concurrent.futures.wait(futures)
    except KeyboardInterrupt:
        executor.shutdown(wait=False) 
        print("Tasks interrupted and executor shut down.")

print(f"Stress Test 2 completed in {time.time() - start_time:.2f} seconds")

Running Stress Test 2: Multiple clients making requests randomly


## Stress Test 3: System has to make some processing base on all data it has and handle at least (1000 in short period of time)

In [60]:
def remove_data(df, user_id, product_id):
    condition = (col("UserId") == user_id) & (col("ProductId") == product_id)
    if df.filter(condition).count() > 0:
        df = df.filter(~condition)
        # print(f"Removed UserId: {user_id}, ProductId: {product_id}")
        return df
    else:
        print(f"No entry found for UserId: {user_id}, ProductId: {product_id}")

    return df

In [59]:
def task_3_add():
    for i in range(1000):
        add_data(df, [(i, f"Product_{i}", random.randint(1, 5))], ["UserId", "ProductId", "Rating"])
        
def task_3_remove():
    for i in range(1000):
        remove_data(df, [(i, f"Product_{i}", random.randint(1, 5))], ["UserId", "ProductId", "Rating"])

In [62]:
print("Running Stress Test 3: System processing a large load of data quickly")
start_time = time.time()
task_3_add()
print(f"Stress Test 3 completed in {time.time() - start_time:.2f} seconds")

Running Stress Test 3: System processing a large load of data quickly
Stress Test 3 completed in 9.81 seconds


In [63]:
print("Running Stress Test 3: System processing a large load of data quickly")
start_time = time.time()
task_3_remove()
print(f"Stress Test 3 completed in {time.time() - start_time:.2f} seconds")

Running Stress Test 3: System processing a large load of data quickly
Stress Test 3 completed in 9.45 seconds
