In [1]:
#O ponto de entrada para todas as funcionalidades 
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.functions import desc, asc,rank
from pyspark.sql.functions import col
from pyspark.sql.window import Window

spark = SparkSession \
.builder \
.appName("Parte C") \
.config("spark.some.config.option") \
.getOrCreate()

sc = spark.sparkContext

In [2]:
#path do arquivo que contém os dados de entrada
#Baixe o arquivo amazon-meta.txt.gz, de uma olhada no parser.py e rode: python3 parser.py > amazon.json
path_file = "amazon.json"


In [3]:
#Criação de um DataFrame com base no path (arquivo de entrada)
df = spark.read.json(path_file)

In [4]:
#Imprimir o esquema em um formato de árvore
df.printSchema()

root
 |-- ASIN: string (nullable = true)
 |-- categorie: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- group: string (nullable = true)
 |-- id: string (nullable = true)
 |-- review_stats: string (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _date: string (nullable = true)
 |    |    |-- customer_id: string (nullable = true)
 |    |    |-- helpful: long (nullable = true)
 |    |    |-- rating: long (nullable = true)
 |    |    |-- votes: long (nullable = true)
 |-- salesrank: string (nullable = true)
 |-- similar_items: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)



In [5]:
df.createOrReplaceTempView("amazon")

#### (a) Dado produto, listar os 5 comentários mais úteis e com maior avaliação e os 5 comentários mais úteis e com menor avaliação

In [6]:
print("Solução com dataframe:")
df_a = df.filter(df.ASIN == '0738700797').select("id",F.explode(df.reviews).alias('reviews')).select('reviews.*',"id")
a_df_maior = df_a.sort(desc("helpful"), desc("rating")).limit(5)
a_df_menor = df_a.sort(desc("helpful"), asc("rating")).limit(5)
a_df = a_df_maior.unionByName(a_df_menor).show()

print("Solução com consulta SQL sobre uma view temporária:")
df_a.createOrReplaceTempView("a")
a_sql = spark.sql("(SELECT a.* FROM a WHERE id='2' ORDER BY helpful DESC, rating DESC LIMIT 5) UNION ALL (SELECT a.* FROM a WHERE id='2' ORDER BY helpful DESC, rating ASC LIMIT 5)")
a_sql.show()




Solução com dataframe:
+---------+--------------+-------+------+-----+---+
|    _date|   customer_id|helpful|rating|votes| id|
+---------+--------------+-------+------+-----+---+
| 2002-2-6|A2P6KAWXJ16234|     16|     4|   16|  2|
|2004-2-11|A1CP26N8RHYVVO|      9|     1|   13|  2|
|2002-1-24|A13SG9ACZ9O5IM|      8|     5|    8|  2|
|2002-5-23|A1GIL64QK68WKL|      8|     5|    8|  2|
|2002-3-23|A3GO7UV9XX14D8|      6|     4|    6|  2|
| 2002-2-6|A2P6KAWXJ16234|     16|     4|   16|  2|
|2004-2-11|A1CP26N8RHYVVO|      9|     1|   13|  2|
|2002-1-24|A13SG9ACZ9O5IM|      8|     5|    8|  2|
|2002-5-23|A1GIL64QK68WKL|      8|     5|    8|  2|
|2002-3-23|A3GO7UV9XX14D8|      6|     4|    6|  2|
+---------+--------------+-------+------+-----+---+

Solução com consulta SQL sobre uma view temporária:
+---------+--------------+-------+------+-----+---+
|    _date|   customer_id|helpful|rating|votes| id|
+---------+--------------+-------+------+-----+---+
| 2002-2-6|A2P6KAWXJ16234|     16|     4

#### (b) Dado um produto, listar os produtos similares com maiores vendas do que ele

In [7]:
df_b = df.where("id=2").select(df.salesrank.alias('salesrank_to_cmp'),F.explode("similar_items").alias("ASIN"))
print("Solução com dataframe:")
b_df = df_b.join(df, df_b.ASIN == df.ASIN).where(df_b.salesrank_to_cmp > df.salesrank). \
select('title','salesrank')
b_df.show()

df_a.createOrReplaceTempView("b")
print("Solução com consulta SQL sobre uma view temporária:")
#b_sql = spark.sql("SELECT * FROM b,amazon WHERE amazon.id = b.id AND amazon.salesrank = b.ASIN AND b.salesrank_to_cmp > amazon.salesrank")



Solução com dataframe:
+--------------------+---------+
|               title|salesrank|
+--------------------+---------+
|Midsummer: Magica...|   159277|
|Yule: A Celebrati...|   103012|
+--------------------+---------+

Solução com consulta SQL sobre uma view temporária:


#### (c) Dado um produto, mostrar a evolução diária das médias de avaliação ao longo do intervalo de tempo coberto no arquivo de entrada

In [8]:
df_c = df.where("id=2").select("title",F.explode("reviews._date").alias("data"))
c_join = df_c.join(df, df_c.title == df.title).select(df_c.title,df_c.data,F.explode("reviews.rating").alias("rating"))
c_join.createOrReplaceTempView("c")

print("Solução com dataframe:")
c_df = c_join.groupBy("title","data").agg({'rating':'avg'}).show()

print("Solução com consulta SQL sobre uma view temporária:")
c_sql = spark.sql("SELECT title, data, avg(rating) FROM c GROUP BY title,data")
c_sql.show()

Solução com dataframe:
+--------------------+----------+-----------------+
|               title|      data|      avg(rating)|
+--------------------+----------+-----------------+
|Candlemas: Feast ...|2001-12-16|4.333333333333333|
|Candlemas: Feast ...|  2002-1-7|4.333333333333333|
|Candlemas: Feast ...| 2002-1-24|4.333333333333333|
|Candlemas: Feast ...| 2002-1-28|4.333333333333333|
|Candlemas: Feast ...|  2002-2-6|4.333333333333333|
|Candlemas: Feast ...| 2002-2-14|4.333333333333333|
|Candlemas: Feast ...| 2002-3-23|4.333333333333333|
|Candlemas: Feast ...| 2002-5-23|4.333333333333333|
|Candlemas: Feast ...| 2003-2-25|4.333333333333333|
|Candlemas: Feast ...|2003-11-25|4.333333333333333|
|Candlemas: Feast ...| 2004-2-11|4.333333333333333|
|Candlemas: Feast ...|  2005-2-7|4.333333333333333|
+--------------------+----------+-----------------+

Solução com consulta SQL sobre uma view temporária:
+--------------------+----------+-----------------+
|               title|      data|      a

#### (d) Listar os 10 produtos lideres de venda em cada grupo de produtos

In [9]:
print("Solução com dataframe:")
windows = Window.partitionBy(df['group']).orderBy(df.salesrank.asc())
df_d = df.where('salesrank >= 1').select('group','title','salesrank', rank().over(windows).alias('rank')).where(col('rank') <= 10).show(160)

print("Solução com consulta SQL sobre uma view temporária:")

Solução com dataframe:
+------------+--------------------+---------+----+
|       group|               title|salesrank|rank|
+------------+--------------------+---------+----+
|       Video|The War of the Wo...|        1|   1|
|       Video|Snow White and th...|      100|   2|
|       Video|Pearl (The Minise...|     1000|   3|
|       Video|  Two for the Seesaw|    10001|   4|
|       Video|Highlights of the...|    10002|   5|
|       Video|Madeline - Cookin...|    10003|   6|
|       Video|Swiss Family Robi...|   100048|   7|
|       Video|Commercial Explos...|   100058|   8|
|       Video|The Original Volt...|    10006|   9|
|       Video|The Ford Show Cla...|   100067|  10|
|         Toy|Party Tyme Karaok...|    10732|   1|
|         Toy|Wizard Card Game ...|     1890|   2|
|         Toy|Photostory Junior...|     2288|   3|
|         Toy|The Songs of Brit...|    31296|   4|
|         Toy|Party Tyme Karaok...|     4053|   5|
|         Toy|R- Photostory Senior|    45241|   6|
|       

#### (e) Listar os 10 produtos com a maior média de avaliações úteis positivas por produto

In [10]:
df1 = df.select("title",F.explode("reviews.helpful").alias("helpful"))
df1.createOrReplaceTempView("help")

print("Solução com dataframe:")
df_e = df1.groupBy("title").agg({'helpful':'avg'})
e_df =  df_e.sort(desc("avg(helpful)")).limit(10).show()

print("Solução com consulta SQL sobre uma view temporária:")
e_sql = spark.sql("SELECT title, avg(helpful) FROM help GROUP BY title ORDER BY avg(helpful) DESC LIMIT 10")
e_sql.show()

Solução com dataframe:
+--------------------+------------------+
|               title|      avg(helpful)|
+--------------------+------------------+
|Understanding Lov...|             243.0|
|T'ai Chi for Olde...|             233.0|
|More Than Just Ho...|             203.0|
|Crockpot Cookery ...|             197.0|
|Creative Interven...|             196.0|
|The Story About Ping|186.46511627906978|
|The Story About P...|186.34883720930233|
|The Story about P...|186.32558139534885|
|The Story about Ping|186.30232558139534|
|The Smoked-Foods ...|             183.0|
+--------------------+------------------+

Solução com consulta SQL sobre uma view temporária:
+--------------------+------------------+
|               title|      avg(helpful)|
+--------------------+------------------+
|Understanding Lov...|             243.0|
|T'ai Chi for Olde...|             233.0|
|More Than Just Ho...|             203.0|
|Crockpot Cookery ...|             197.0|
|Creative Interven...|             196.0|


#### (f) Listar a 5 categorias de produto com a maior média de avaliações úteis positivas por produto

In [11]:
dfn = df.select("categorie",F.explode("reviews.helpful").alias("helpful"))
dfn.createOrReplaceTempView("f")

print("Solução com dataframe:")
df_f = dfn.groupBy("categorie").agg({'helpful':'avg'})
f_df =  df_f.sort(desc("avg(helpful)")).limit(5).show()

print("Solução com consulta SQL sobre uma view temporária:")
f_sql = spark.sql("SELECT categorie, avg(helpful) FROM f GROUP BY categorie ORDER BY avg(helpful) DESC LIMIT 5")
f_sql.show()

Solução com dataframe:
+--------------------+------------------+
|           categorie|      avg(helpful)|
+--------------------+------------------+
|[|Books[283155]|S...|             196.0|
|[|Books[283155]|S...|186.46511627906978|
|[|Books[283155]|S...|186.34883720930233|
|[|Books[283155]|F...|186.32558139534885|
|[|Books[283155]|S...|186.30232558139534|
+--------------------+------------------+

Solução com consulta SQL sobre uma view temporária:
+--------------------+------------------+
|           categorie|      avg(helpful)|
+--------------------+------------------+
|[|Books[283155]|S...|             196.0|
|[|Books[283155]|S...|186.46511627906978|
|[|Books[283155]|S...|186.34883720930233|
|[|Books[283155]|F...|186.32558139534885|
|[|Books[283155]|S...|186.30232558139534|
+--------------------+------------------+



#### (g) Listar os 10 clientes que mais fizeram comentários por grupo de produto

In [12]:
print("Solução com dataframe:")


print("Solução com consulta SQL sobre uma view temporária:")

Solução com dataframe:
Solução com consulta SQL sobre uma view temporária:
