In [72]:
import findspark
findspark.init()

# Explore Spark configuration options

In [73]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

EXE_MEMORY = "2g"
DRIVER_MEMORY = "10g"
spark = SparkSession.builder.appName("AWS").config("spark.executor.memory", EXE_MEMORY).config("spark.executor.cores", "3").config("spark.driver.memory", DRIVER_MEMORY).config("spark.cores.max", "10").getOrCreate()

# Reading the data 

# Read data as RDD

In [76]:
#Read data as RDD
sc = spark.sparkContext
rdd = sc.textFile('E:/sakshi/aws/Cell_Phones_and_Accessories_5.json')

In [77]:
rdd.take(1)

['{"overall": 5.0, "verified": true, "reviewTime": "08 4, 2014", "reviewerID": "A24E3SXTC62LJI", "asin": "7508492919", "style": {"Color:": " Bling"}, "reviewerName": "Claudia Valdivia", "reviewText": "Looks even better in person. Be careful to not drop your phone so often because the rhinestones will fall off (duh). More of a decorative case than it is protective, but I will say that it fits perfectly and securely on my phone. Overall, very pleased with this purchase.", "summary": "Can\'t stop won\'t stop looking at it", "unixReviewTime": 1407110400}']

In [78]:
type(rdd)

pyspark.rdd.RDD

# Read data as Dataframe

In [79]:
#Read a json file into Spark DataFrame

df=spark.read.csv('E:/sakshi/aws/Cell_Phones_and_Accessories_5.json')

In [80]:
df.head()

Row(_c0='{"overall": 5.0', _c1=' "verified": true', _c2=' "reviewTime": "08 4', _c3=' 2014"', _c4=' "reviewerID": "A24E3SXTC62LJI"', _c5=' "asin": "7508492919"', _c6=' "style": {"Color:": " Bling"}', _c7=' "reviewerName": "Claudia Valdivia"', _c8=' "reviewText": "Looks even better in person. Be careful to not drop your phone so often because the rhinestones will fall off (duh). More of a decorative case than it is protective', _c9=' but I will say that it fits perfectly and securely on my phone. Overall', _c10=' very pleased with this purchase."', _c11=' "summary": "Can\'t stop won\'t stop looking at it"', _c12=' "unixReviewTime": 1407110400}')

In [81]:
type(df)

pyspark.sql.dataframe.DataFrame

# Convert rdd to spark dataframe

In [82]:
#Convert rdd to spark dataframe

sdf=rdd.map(lambda x: (x,)).toDF()

In [83]:
sdf.show(3,truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1                                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [84]:
type(sdf)

pyspark.sql.dataframe.DataFrame

# Convert Spark DataFrame to RDD

In [85]:
rdd2=df.rdd

In [86]:
rdd2.take(3)

[Row(_c0='{"overall": 5.0', _c1=' "verified": true', _c2=' "reviewTime": "08 4', _c3=' 2014"', _c4=' "reviewerID": "A24E3SXTC62LJI"', _c5=' "asin": "7508492919"', _c6=' "style": {"Color:": " Bling"}', _c7=' "reviewerName": "Claudia Valdivia"', _c8=' "reviewText": "Looks even better in person. Be careful to not drop your phone so often because the rhinestones will fall off (duh). More of a decorative case than it is protective', _c9=' but I will say that it fits perfectly and securely on my phone. Overall', _c10=' very pleased with this purchase."', _c11=' "summary": "Can\'t stop won\'t stop looking at it"', _c12=' "unixReviewTime": 1407110400}'),
 Row(_c0='{"overall": 5.0', _c1=' "verified": true', _c2=' "reviewTime": "02 12', _c3=' 2014"', _c4=' "reviewerID": "A269FLZCB4GIPV"', _c5=' "asin": "7508492919"', _c6=' "reviewerName": "sarah ponce"', _c7=' "reviewText": "When you don\'t want to spend a whole lot of cash but want a great deal...this is the shop to buy from!"', _c8=' "summary"

In [88]:
type(rdd2)

pyspark.rdd.RDD

# Convert Spark DataFrame to Pandas DataFrame

In [89]:
#Pandas Dataframe from Pyspark Dataframe
pandasdf=df.toPandas()

In [90]:
pandasdf.head(2)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
0,"{""overall"": 5.0","""verified"": true","""reviewTime"": ""08 4","2014""","""reviewerID"": ""A24E3SXTC62LJI""","""asin"": ""7508492919""","""style"": {""Color:"": "" Bling""}","""reviewerName"": ""Claudia Valdivia""","""reviewText"": ""Looks even better in person. B...",but I will say that it fits perfectly and sec...,"very pleased with this purchase.""","""summary"": ""Can't stop won't stop looking at it""","""unixReviewTime"": 1407110400}"
1,"{""overall"": 5.0","""verified"": true","""reviewTime"": ""02 12","2014""","""reviewerID"": ""A269FLZCB4GIPV""","""asin"": ""7508492919""","""reviewerName"": ""sarah ponce""","""reviewText"": ""When you don't want to spend a...","""summary"": ""1""","""unixReviewTime"": 1392163200}",,,


# Performing Tasks

In [91]:

## Reading the data(Json file)

json_rdd = spark.read.json('E:/sakshi/aws/Cell_Phones_and_Accessories_5.json')

# Task 1 :Select first 10 rows of dataset.

In [92]:
#print 10 rows of dataset
json_rdd.show(10)

+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|7508492919| null|    5.0|Looks even better...| 08 4, 2014|A24E3SXTC62LJI|    Claudia Valdivia|[,  Bling,,,,,,,,...|Can't stop won't ...|    1407110400|    true|null|
|7508492919| null|    5.0|When you don't wa...|02 12, 2014|A269FLZCB4GIPV|         sarah ponce|                null|                   1|    1392163200|    true|null|
|7508492919| null|    3.0|so the case came ...| 02 8, 2014| AB6CHQWHZW4TV|                 Kai|                null|            Its okay|    1391817600|    true|null

# Task 2 :Get Schema of Dataset

In [93]:
#Printing Schema of dataset
json_rdd.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color Name:: string (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Design:: string (nullable = true)
 |    |-- Edition:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Length:: string (nullable = true)
 |    |-- Material Type:: string (nullable = true)
 |    |-- Model:: string (nullable = true)
 |    |-- Offer Type:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Package Type:: string (nullable = true)
 |    |-- Pattern:: string (nullable = true)
 |    |-- P

In [94]:
#Only Selected five Column from dataset
selective_aws_df = json_rdd.select('overall', 'reviewText','reviewerID','reviewerName','reviewTime')

In [95]:
#Selected Column print
selective_aws_df.show(10)

+-------+--------------------+--------------+--------------------+-----------+
|overall|          reviewText|    reviewerID|        reviewerName| reviewTime|
+-------+--------------------+--------------+--------------------+-----------+
|    5.0|Looks even better...|A24E3SXTC62LJI|    Claudia Valdivia| 08 4, 2014|
|    5.0|When you don't wa...|A269FLZCB4GIPV|         sarah ponce|02 12, 2014|
|    3.0|so the case came ...| AB6CHQWHZW4TV|                 Kai| 02 8, 2014|
|    2.0|DON'T CARE FOR IT...| A1M117A53LEI8|     Sharon Williams| 02 4, 2014|
|    4.0|I liked it becaus...|A272DUT8M88ZS8|     Bella Rodriguez| 02 3, 2014|
|    2.0|The product looke...|A1DW2L6XCC5TJS|     Amazon Customer|01 27, 2014|
|    3.0|I FINALLY got my ...| AQC61R4UST7UH|        DaMara Estep|01 23, 2014|
|    5.0|It is a very cute...|A31OVFL91BCKXG|Ashley Nicole Miller|01 17, 2014|
|    1.0|DO NOT BUY! this ...|A1K0VLK6O5Z22M|           BeeLove21|12 27, 2013|
|    4.0|I really love thi...|A1K3BWU73YB44P|       

In [96]:
selective_aws_df.createOrReplaceTempView("selective_aws_df")

#  Selecting 10 rows with sql query.

In [97]:
Selecting=spark.sql("Select * from selective_aws_df LIMIT 10 ").show()

+-------+--------------------+--------------+--------------------+-----------+
|overall|          reviewText|    reviewerID|        reviewerName| reviewTime|
+-------+--------------------+--------------+--------------------+-----------+
|    5.0|Looks even better...|A24E3SXTC62LJI|    Claudia Valdivia| 08 4, 2014|
|    5.0|When you don't wa...|A269FLZCB4GIPV|         sarah ponce|02 12, 2014|
|    3.0|so the case came ...| AB6CHQWHZW4TV|                 Kai| 02 8, 2014|
|    2.0|DON'T CARE FOR IT...| A1M117A53LEI8|     Sharon Williams| 02 4, 2014|
|    4.0|I liked it becaus...|A272DUT8M88ZS8|     Bella Rodriguez| 02 3, 2014|
|    2.0|The product looke...|A1DW2L6XCC5TJS|     Amazon Customer|01 27, 2014|
|    3.0|I FINALLY got my ...| AQC61R4UST7UH|        DaMara Estep|01 23, 2014|
|    5.0|It is a very cute...|A31OVFL91BCKXG|Ashley Nicole Miller|01 17, 2014|
|    1.0|DO NOT BUY! this ...|A1K0VLK6O5Z22M|           BeeLove21|12 27, 2013|
|    4.0|I really love thi...|A1K3BWU73YB44P|       

# Task 3 :Group by and get max, min, count of a column in the dataset

In [98]:
#Count and groupby

In [99]:
review_sql_df = spark.sql("select overall , count(overall) from selective_aws_df group by overall")
review_sql_df.show()

+-------+--------------+
|overall|count(overall)|
+-------+--------------+
|    1.0|         81539|
|    4.0|        184431|
|    3.0|         98254|
|    2.0|         57175|
|    5.0|        707038|
+-------+--------------+



In [100]:
# Max function

In [101]:
max = spark.sql("SELECT MAX(overall) as maxoverall from selective_aws_df")

In [102]:
max.show()

+----------+
|maxoverall|
+----------+
|       5.0|
+----------+



In [103]:
# Min function

In [104]:
min=spark.sql("SELECT MIN(overall) as minoverall from selective_aws_df")

In [105]:
min.show()

+----------+
|minoverall|
+----------+
|       1.0|
+----------+



In [106]:
# Count of daatset attribute

In [107]:
rname=spark.sql("SELECT count(*) from selective_aws_df where overall= 5.0 ")
rname.show()

+--------+
|count(1)|
+--------+
|  707038|
+--------+



# Task:4 Filter your dataset by some conditions based on your column

In [108]:
filetrquery =spark.sql("SELECT * from selective_aws_df where overall= 5.0 and reviewText RLIKE  '^Looks'").show()

+-------+--------------------+--------------+-------------------+-----------+
|overall|          reviewText|    reviewerID|       reviewerName| reviewTime|
+-------+--------------------+--------------+-------------------+-----------+
|    5.0|Looks even better...|A24E3SXTC62LJI|   Claudia Valdivia| 08 4, 2014|
|    5.0|Looks just like t...| ADP0AGC42GW3F|             Jimmie|09 17, 2013|
|    5.0|          Looks good|A24X7XXLSY68DB|              Jason|04 11, 2015|
|    5.0|Looks just like t...| AFGS7LT2TC573|            Brian J|12 28, 2014|
|    5.0|Looks amazing on ...|A3AZLDHN6JF8YW|           Jason He|01 24, 2011|
|    5.0|Looks well made &...|A1RBGNDI62AVP5|              hchou|03 23, 2015|
|    5.0|Looks like origin...|A3SD8POS0G85C6|            jassper|03 16, 2013|
|    5.0|Looks great, fits...|A2POXPD0VMLUNK|         Video Andy| 12 7, 2012|
|    5.0|Looks like there ...|A2OV0EKZK35VC1|Jorge&#039;s Amazon|05 15, 2014|
|    5.0|Looks bright, sof...|A26YO235AO6Q2Z|              Hutch

# Task 5 : Apply group by with having clause.

In [109]:
Having = spark.sql("SELECT overall from selective_aws_df GROUP BY overall HAVING overall>2 ").show()

+-------+
|overall|
+-------+
|    4.0|
|    3.0|
|    5.0|
+-------+



# Task 6 : Apply order by

In [110]:
order=spark.sql("SELECT reviewerID from selective_aws_df  ORDER BY overall ASC ").show()

+--------------+
|    reviewerID|
+--------------+
|A31ZZRB12R3PGY|
|A1X6CQMDP6ZNVD|
| AMB8C7C3HEEIY|
|A1ROZSS0KIE0CC|
| AV9OETMS2X9TT|
|A3U2VAZBCBU7K6|
|A2H566J34LUPE3|
|A17E8FKH09XAY2|
| ABGQ3LNDE1VM8|
| ABVQ1EZPPHZ5F|
|A1W7T37NSUFURN|
| AZW6WE7UXAMU0|
| A4J4N4WFC64I4|
|A1LFNDW0RIPTDP|
|A3ODQQDGMRX3UV|
|A119QJ6TQ922RH|
|A18ELSI3X0Y2B8|
|A317T1S6EM6G23|
|A1A2GP9SM4GCPD|
|A3GGJ6C884GU4M|
+--------------+
only showing top 20 rows



# Task 7 :Select distinct records by a column.

In [111]:
distinct=spark.sql("SELECT distinct reviewText from selective_aws_df").show()

+--------------------+
|          reviewText|
+--------------------+
|Not really :).  I...|
|This pad is a foa...|
|Works well but ma...|
|I have loved the ...|
|The positive revi...|
|Love the stand up...|
|The case is easy ...|
|I am very happy w...|
|Or maybe not.  Th...|
|This case is made...|
|Traveling to Euro...|
|Works as expected...|
|The case it's a l...|
|Worked as adverti...|
|Bought this for s...|
|  Didnt work at all,|
|Heavy Duty Extra ...|
|I bought this cas...|
|Well, it works fi...|
|This is good prod...|
+--------------------+
only showing top 20 rows



# Task 8 : Transform a categorical column with OneHot Encoding

In [112]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder 
from pyspark.ml import Pipeline

In [113]:
indexer = StringIndexer(inputCol='overall', outputCol='overallIndex')
encoder = OneHotEncoder(inputCol='overallIndex', outputCol='overallVec')

pipe = Pipeline(stages = [indexer, encoder])

newDF = pipe.fit(selective_aws_df).transform(selective_aws_df)

In [114]:
newDF.show()

+-------+--------------------+--------------+--------------------+-----------+------------+-------------+
|overall|          reviewText|    reviewerID|        reviewerName| reviewTime|overallIndex|   overallVec|
+-------+--------------------+--------------+--------------------+-----------+------------+-------------+
|    5.0|Looks even better...|A24E3SXTC62LJI|    Claudia Valdivia| 08 4, 2014|         0.0|(4,[0],[1.0])|
|    5.0|When you don't wa...|A269FLZCB4GIPV|         sarah ponce|02 12, 2014|         0.0|(4,[0],[1.0])|
|    3.0|so the case came ...| AB6CHQWHZW4TV|                 Kai| 02 8, 2014|         2.0|(4,[2],[1.0])|
|    2.0|DON'T CARE FOR IT...| A1M117A53LEI8|     Sharon Williams| 02 4, 2014|         4.0|    (4,[],[])|
|    4.0|I liked it becaus...|A272DUT8M88ZS8|     Bella Rodriguez| 02 3, 2014|         1.0|(4,[1],[1.0])|
|    2.0|The product looke...|A1DW2L6XCC5TJS|     Amazon Customer|01 27, 2014|         4.0|    (4,[],[])|
|    3.0|I FINALLY got my ...| AQC61R4UST7UH| 