# AMAZON data science project

## Imports

In [1]:
from pyspark.sql import SparkSession

In [2]:
from utils import *

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

## Data Preprocessing

1. Create a Spark Dataframe from the text file dataset. It does not have a clear and
easy to read structure. It is a good opportunity to get your hands dirty with Apache
Spark.
2. Create additional dataframes required to create a graph in GraphFrames.
3. Create the graph of how the goods were purchased together.

### 1. Create a Spark Dataframe from the text file dataset. It does not have a clear and easy to read structure. It is a good opportunity to get your hands dirty with Apache Spark.

In [4]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("amazon") \
    .getOrCreate()

23/04/13 16:24:25 WARN Utils: Your hostname, toptish-ThinkPad-T470 resolves to a loopback address: 127.0.1.1; using 192.168.0.126 instead (on interface wlp4s0)
23/04/13 16:24:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/13 16:24:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
data_path = 'data/amazon-meta.txt'

In [7]:
data = spark.sparkContext.textFile(data_path)

In [8]:
data.getNumPartitions()

30

Lets enumerate our entries by adding new column, where global cnt will be updated each time line with "Id:   " is encountered.

In [9]:
id_cnt = -1
def find_ids(x):
    global id_cnt
    if x.find('Id:   ') != -1:
        id_cnt = int(x.split('Id:   ')[1])
    return id_cnt

rdd_with_id = data.map(lambda line: (find_ids(line), line))


In [10]:
rdd_with_id.toDF().show(20)

[Stage 1:>                                                          (0 + 1) / 1]

23/04/13 16:24:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 1): Attempting to kill Python Worker
+---+--------------------+
| _1|                  _2|
+---+--------------------+
| -1|# Full informatio...|
| -1| Total items: 548552|
| -1|                    |
|  0|             Id:   0|
|  0|    ASIN: 0771044445|
|  0|  discontinued pr...|
|  0|                    |
|  1|             Id:   1|
|  1|    ASIN: 0827229534|
|  1|  title: Patterns...|
|  1|         group: Book|
|  1|   salesrank: 396585|
|  1|  similar: 5  080...|
|  1|       categories: 2|
|  1|   |Books[283155]...|
|  1|   |Books[283155]...|
|  1|  reviews: total:...|
|  1|    2000-7-28  cu...|
|  1|    2003-12-14  c...|
|  1|                    |
+---+--------------------+
only showing top 20 rows



                                                                                

Using "reduceByKey" RDD transformation we then group our lines

In [11]:
rdd_with_id_reduced = rdd_with_id.reduceByKey(lambda a, b: f"{a} \n {b}")

In [12]:
rdd_with_id_reduced.sortBy(lambda x: x[0]).toDF().show(6)

                                                                                

+---+--------------------+
| _1|                  _2|
+---+--------------------+
| -1|# Full informatio...|
|  0|Id:   0 \n ASIN: ...|
|  1|Id:   1 \n ASIN: ...|
|  2|Id:   2 \n ASIN: ...|
|  3|Id:   3 \n ASIN: ...|
|  4|Id:   4 \n ASIN: ...|
+---+--------------------+
only showing top 6 rows



Drop the first row with total entries info and id -1

In [13]:
header = rdd_with_id_reduced.sortBy(lambda x: x[0]).first()

                                                                                

In [14]:
rdd_no_first_row = rdd_with_id_reduced.filter(lambda line: line != header)

Split the second column with data to several columns

In [15]:
rdd_no_first_row.toDF().show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  0|Id:   0 \n ASIN: ...|
| 30|Id:   30 \n ASIN:...|
| 60|Id:   60 \n ASIN:...|
| 90|Id:   90 \n ASIN:...|
|120|Id:   120 \n ASIN...|
|150|Id:   150 \n ASIN...|
|180|Id:   180 \n ASIN...|
|210|Id:   210 \n ASIN...|
|240|Id:   240 \n ASIN...|
|270|Id:   270 \n ASIN...|
|300|Id:   300 \n ASIN...|
|330|Id:   330 \n ASIN...|
|360|Id:   360 \n ASIN...|
|390|Id:   390 \n ASIN...|
|420|Id:   420 \n ASIN...|
|450|Id:   450 \n ASIN...|
|480|Id:   480 \n ASIN...|
|510|Id:   510 \n ASIN...|
|540|Id:   540 \n ASIN...|
|570|Id:   570 \n ASIN...|
+---+--------------------+
only showing top 20 rows



In [None]:
# rdd_no_first_row = rdd_no_first_row.filter(lambda line: line != disc_prod)

In [17]:
rdd_filter = rdd_no_first_row.filter(lambda x: 'discontinued product' not in x)

In [18]:
rdd_with_id_reduced_form = rdd_filter.map(lambda x: item_to_dict(x[1]))

In [19]:
rdd_with_columns = rdd_with_id_reduced_form.map(lambda x: x[1].split('\n'))

In [20]:
schema = StructType(fields=[
    StructField("id", IntegerType()),
    StructField("asin", StringType()),
    StructField("title", StringType()),
    StructField("group", StringType()),
    StructField("salesrank", IntegerType()),
    StructField("similark", StringType()),
    StructField("categories", StringType()),
    StructField("reviews", StringType())])
deptDF = spark.createDataFrame(rdd_with_columns, schema=schema)
deptDF.printSchema()

deptDF.orderBy('id').show()



root
 |-- id: integer (nullable = true)
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- group: string (nullable = true)
 |-- salesrank: integer (nullable = true)
 |-- similark: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- reviews: string (nullable = true)

23/04/13 16:28:03 ERROR Executor: Exception in task 0.0 in stage 24.0 (TID 217)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, i

Traceback (most recent call last):
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


Py4JJavaError: An error occurred while calling o212.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 217) (192.168.0.126 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7574/2098849403.py", line 1, in <lambda>
  File "/home/toptish/42/amazon/utils.py", line 25, in item_to_dict
    categories = item.split('categories: ')[1].split('  reviews:')[0].split('\n')[1:]
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
	at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:628)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1539)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1111)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1093)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1545)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1533)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:204)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/toptish/PycharmProjects/virtual_envs/amazon/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7574/2098849403.py", line 1, in <lambda>
  File "/home/toptish/42/amazon/utils.py", line 25, in item_to_dict
    categories = item.split('categories: ')[1].split('  reviews:')[0].split('\n')[1:]
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
	at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:628)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1539)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
rdd_with_columns.toDF().show()

### 2. Create additional dataframes required to create a graph in GraphFrames.

### 3. Create the graph of how the goods were purchased together.

## Dscriptive Analysis

1. Calculate number of books, music and sport items (Group) in the dataset.
2. Calculate maximum number of out-edges for a vertice.
3. Calculate maximum number of in-edges for a vertice and the title corresponding to
that item.
4. Calculate the fraction of connections that are reciprocal (from A to B and vice
versa). In 7 decimal places.
5. Find all the triangles in the graph. Find the item (the title) that is included in the
biggest number of triangles.
6. Find the most important item (the title) that has the biggest pagerank. It has links
from other important items. Probability of resetting to a random vertex equals 0.1
and set maximum number of iterations to 10.

## Bundles and Collections

1. Create bundles of 3 goods that are related to each other as follows: A is a recom-
mendation for B, C is a recommendation for B, A and C are recommendations for
each other. Calculate the number of those bundles
2. Create the same kind of bundles but only for the music group. Calculate the number
of these bundles.
3. Create collections in the DVD group – items that are connected to each other not
obligatory directly but through a chain of other items. It means that there is a
path between them. The path should be reciprocal. Find the number of elements
in the largest collection.

## Graph Visualization

1. Visualize the graph of DVDs using any graph visualization tool or library that you
find useful.
2. The requirements are:
- (a) the vertices and edges should be distinguishable by zooming-in and -out,
- (b) the color of vertices and edges should be different
- (c) it should be easy to find the vertices with the largest in-degree
- (d) use at least 3 different layouts.

## New Recommender System

1. Propose and realize 3 different ideas of how to recommend 5 items for each item in
the dataset.
2. We are not going to assess your recommendations. There is no hidden data since
there is no ground truth. We could test how close your 5 items to the 5 items used
in Amazon, but who said that your recommendations are worse. The subtask is to
describe the way you would test your recommendations in the real world.

## Bonus part

1. Find 3 more ways of how to make 5 recommendations using graph and network
measures and metrics. Save them into 3 different files.
2. Create an interactive web-page based on your graph visualization project.

## Useful links

- https://habr.com/ru/company/piter/blog/276675/
- https://towardsdatascience.com/building-a-recommender-system-for-amazon-products-with-python-8e0010ec772c