##########       Analyzing Amazon product review data set  using dataframe    #######################

In [21]:
import os
import sys
# os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
# os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
# os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + r"\python\lib"
sys.path.insert(0, os.environ["PYLIB"] +r"\py4j-0.10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +r"\pyspark.zip")

In [None]:
from pyspark.sql.functions import *

#  Customs User defined functions (UDFs) need to be register
from pyspark.sql.types import *

In [22]:
from pyspark.sql import SparkSession

In [23]:
spark = SparkSession.builder.appName("AmazonReviewDemo").getOrCreate()

In [24]:
spark

In [7]:
# Reading data from S3 to a DataFrame
# reviews_df = spark.read.json('s3a://sparkbucket123/Electronics_5.json')
reviews_df = spark.read.json(r'F:\Users\SunilYadav\Desktop\Electronics_5.json')

In [8]:
reviews_df.show(5)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0528881469|  [0, 0]|    5.0|We got this GPS f...| 06 2, 2013| AO94DHGC771SJ|             amazdnu|     Gotta have GPS!|    1370131200|
|0528881469|[12, 15]|    1.0|I'm a professiona...|11 25, 2010| AMO214LNFCEI4|     Amazon Customer|   Very Disappointed|    1290643200|
|0528881469|[43, 45]|    3.0|Well, what can I ...| 09 9, 2010|A3N7T0DY83Y4IG|       C. A. Freeman|      1st impression|    1283990400|
|0528881469| [9, 10]|    2.0|Not going to writ...|11 24, 2010|A1H8PY3QHMQQA0|Dave M. Shaw "mac...|Great grafics, PO...|    1290556800|
|0528881469|  [0, 0]|    1.0|I've had mine for...|09 29

In [9]:
reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [10]:
# Finding the number of individual reviewers
distinct_reviewers = reviews_df.select("reviewerID").distinct()
distinct_reviewers.count()

192403

In [11]:
# Finding the number of product counts
reviews_df.select("asin").distinct().count()


63001

In [14]:
# Find the products with reviews
reviews_df.select("asin").groupBy("asin").count().orderBy("count" , ascending =False)

DataFrame[asin: string, count: bigint]

In [15]:
# Find the number of reviews given by each user
reviews_df.select("reviewerID").groupBy("reviewerID").count().orderBy("count" , ascending =False).show()

+--------------+-----+
|    reviewerID|count|
+--------------+-----+
|  ADLVFFE4VBT8|  431|
|A3OXHLG6DIBRW8|  407|
|  A6FIAB28IS79|  367|
| A680RUE1FDO8B|  352|
| A5JLAU2ARJ0BO|  351|
|A1ODOGXEYECQQ8|  333|
|A36K2N527TXXJN|  281|
| ARBKYIVNYWK3C|  267|
|A25C2M3QF9G7OQ|  261|
| AWPODHOB4GFWL|  260|
|A22CW0ZHY3NJH8|  255|
|A3EXWV8FNSSFL6|  250|
|A3LGT6UZL99IW1|  245|
| A38RMU1Y5TDP9|  244|
|A2NOW4U7W3F7RI|  241|
|A23GFTVIETX7DS|  241|
|A3AYSYSLHU26U9|  231|
|A17BUUBOU0598B|  228|
|A2AY4YUOX2N1BQ|  228|
|A1UQBFCERIP7VJ|  228|
+--------------+-----+
only showing top 20 rows



In [16]:
# Number of user reviews every month sorted in decreasing order of reviews count

from pyspark.sql.functions import *
reviews_df1= reviews_df.withColumn("ts" , reviews_df['unixReviewTime'].cast('timestamp'))
reviews_df1.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- ts: timestamp (nullable = true)



In [17]:
reviews_df_with_year= reviews_df1.withColumn("year" , year("ts")).withColumn("month" , month("ts"))
reviews_df_with_year.groupBy("year", "month").count().orderBy('count', ascending = False).show()

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2014|    1|65915|
|2013|   12|65530|
|2013|    1|65333|
|2012|   12|60186|
|2014|    3|54776|
|2014|    2|52419|
|2013|    3|51386|
|2013|    2|50421|
|2013|   11|50328|
|2013|    4|47359|
|2014|    4|46877|
|2013|    8|46266|
|2013|    5|45778|
|2013|    7|45420|
|2014|    5|44380|
|2013|   10|43534|
|2014|    6|43467|
|2013|    6|41951|
|2013|    9|39442|
|2014|    7|33354|
+----+-----+-----+
only showing top 20 rows



In [None]:
# Find how many times each prodcut has been reviewed great

In [18]:
great_reviews = reviews_df.filter(reviews_df.reviewText.contains("great"))
great_reviews.groupBy("asin").count().orderBy('count', ascending = False).show()

+----------+-----+
|      asin|count|
+----------+-----+
|B007WTAJTO| 1096|
|B003ES5ZUU| 1013|
|B00DR0PDNE| 1012|
|B0019EHU8G|  977|
|B003ELYQGG|  832|
|B0074BW614|  712|
|B002WE6D44|  621|
|B0002L5R78|  616|
|B00BGGDVOO|  604|
|B009SYZ8OC|  558|
|B00007E7JU|  542|
|B005DKZTMG|  536|
|B002MAPRYU|  464|
|B0027VT6V4|  440|
|B005FYNSPK|  418|
|B005HMKKH4|  413|
|B002V88HFE|  406|
|B0052SCU8U|  395|
|B000LRMS66|  392|
|B0052YFYFK|  387|
+----------+-----+
only showing top 20 rows



In [27]:
lst = list(range(100))
sc = spark.sparkContext

In [28]:
rdd_lst = sc.parallelize(lst)
type(rdd_lst)

pyspark.rdd.RDD

In [42]:
rdd_lst.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99]

In [36]:
lst_df = rdd_lst.toDF("name:int")
type(lst_df)

pyspark.sql.dataframe.DataFrame

In [44]:
lst_df.show()


Py4JJavaError: An error occurred while calling o302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 1102, DevD.mshome.net, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\session.py", line 609, in prepare
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1408, in verify
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1395, in verify_struct
TypeError: StructType can not accept object 0 in type <class 'int'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\session.py", line 609, in prepare
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1408, in verify
  File "C:\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1395, in verify_struct
TypeError: StructType can not accept object 0 in type <class 'int'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
