# I. Download a review file with a million reviews & read the million reviews

In [271]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import *

spark = SparkSession.builder.getOrCreate()

#downloaded 'Electronics_5.json' having 1.6 million reviews and reading them

df = spark.read.json(r'C:\Users\Xpro\Downloads\Electronics_5.json')
df.printSchema()
df.show(5)

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0528881469|  [0, 0]|    5.0|We got this GPS f...| 06 2, 2013| AO94DHGC771SJ|             amazdnu|     Gotta have GPS!|    1370131200|
|0528881469|[12, 15]|    1.0|I'm a profess

# II. Item having the least rating

In [42]:
df.agg({'overall': 'min', 'asin': 'min','reviewerID':'min'}).show()

+--------------------+------------+----------+
|     min(reviewerID)|min(overall)| min(asin)|
+--------------------+------------+----------+
|A000715434M800HLC...|         1.0|0528881469|
+--------------------+------------+----------+



# III.a: Item having the most rating (Method 1)

In [249]:
df = df.withColumn('new_col',struct(df.overall,df.asin))

max_df = df.agg(max('new_col').alias('max_rating'))
max_df = max_df.withColumn('asin',max_df.max_rating.asin)
max_df.show()


+----------------+----------+
|      max_rating|      asin|
+----------------+----------+
|[5.0,B00LGQ6HL8]|B00LGQ6HL8|
+----------------+----------+



# III.b: Item having the most rating (Method 2)

In [36]:
df.agg({'overall': 'max', 'asin': 'max','reviewerID':'max'}).show()

+---------------+------------+----------+
|max(reviewerID)|max(overall)| max(asin)|
+---------------+------------+----------+
|  AZZYW4YOE1B6E|         5.0|B00LGQ6HL8|
+---------------+------------+----------+



# IV. Item having the longest review

In [262]:
df_length = df.select('*',length('reviewText').alias('word count'))
df_length=df_length.agg({'word count':'max','overall': 'max', 'asin': 'max','reviewerID':'max', 'reviewText': 'max'})
df_length=df_length.withColumnRenamed("max(reviewerID)","reviewerID").withColumnRenamed("max(asin)","asin").withColumnRenamed("max(word count)","wordcount").withColumnRenamed("max(overall)","overall").withColumnRenamed("max(reviewText)","reviewText")
df_length.show()



+-------------+----------+--------------------+---------+-------+
|   reviewerID|      asin|          reviewText|wordcount|overall|
+-------------+----------+--------------------+---------+-------+
|AZZYW4YOE1B6E|B00LGQ6HL8|~~~~~~~~~~~~~~~~~...|    32703|    5.0|
+-------------+----------+--------------------+---------+-------+



# V. Transform: change the date MM-DD-YYYY format

In [264]:
df = df.withColumn('date', regexp_replace('reviewTime', ', ', '-'))\
        .withColumn('date', regexp_replace('date', ' ', '-'))

df= df.withColumn('date_in_dateFormat', to_date(unix_timestamp(col('date'), 'MM-dd-yyyy').cast("timestamp")))
df.select('date','date_in_dateFormat').show(5)

+----------+------------------+
|      date|date_in_dateFormat|
+----------+------------------+
| 06-2-2013|        2013-06-02|
|11-25-2010|        2010-11-25|
| 09-9-2010|        2010-09-09|
|11-24-2010|        2010-11-24|
|09-29-2011|        2011-09-29|
+----------+------------------+
only showing top 5 rows



# VI. Show a desired data frame operation which you learnt recently

In [294]:
from pyspark.sql.types import StringType

#Here's a simple example of UDF used to grade electronics based on their overall rating

determine_grade_udf = udf(lambda x: 'good' if x>=4.0 else 'bad',StringType())

df_with_grade = df['overall','asin']
df_with_grade = df_with_grade.withColumn("Grade",determine_grade_udf(df.overall))

df_with_grade.show(5)

+-------+----------+-----+
|overall|      asin|Grade|
+-------+----------+-----+
|    5.0|0528881469| good|
|    1.0|0528881469|  bad|
|    3.0|0528881469|  bad|
|    2.0|0528881469|  bad|
|    1.0|0528881469|  bad|
+-------+----------+-----+
only showing top 5 rows



# VII. Convert the whole file into Parquet file after transforming.

In [304]:
#Saving file in parquet format
df.write.parquet(r"C:\Users\Xpro\Documents\TensorDF.parquet")

#checking the files created
import os
path = r"C:\\Users\\Xpro\\Documents\\TensorDF.parquet"

for i in os.listdir(path):
    print(i)

.part-00000-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00001-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00002-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00003-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00004-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00005-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00006-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00007-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00008-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00009-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00010-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
.part-00011-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet.crc
._SUCCESS.crc
part-00000-10358a75-7ba1-4219-8fab-f3047d0db5d5-c000.snappy.parquet
part-00001-10358a75-7ba1-4219-8fab-f3047d0