<a href="https://colab.research.google.com/github/jggomes11/Spark/blob/main/Spark_DataFrame_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Steps:


1.   Upload amazon-meta.txt.gz to Google Colab
2.   Run all cells below



Install Dependencies
---

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

Setup Environment Variables
---



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"


Start Spark Session
---



In [None]:
import findspark
findspark.init()


from pyspark import StorageLevel
from pyspark.sql import SparkSession, Window

from pyspark.sql.functions import rank, col, row_number, avg
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType # Dataframe schema types

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Parser
----

In [None]:
import gzip # Used to open amazon-meta.txt.gz file

In [None]:
def __write_categories(fileD, fileF,catList, product, avg):
    """Write amazon-meta.txt file's line categories

    Parameters
    -------------
    fileD : file
      File to answer question c
    fileF : file
      File to answer question f
    catList : list
      Actual product categories list
    product : dict
      Actual product being parsed
    avg : str
      Actual product's average being parsed

    Returns
    -------------

    """
    asin = product["ASIN"]
    salesrank = product["salesrank"]
    avg = float(avg)
    for cat in catList: # Write categorie on categories.txt file
      fileD.write(f"{cat};{salesrank};{asin}\n")
      fileF.write(f"{cat}|{avg}\n")

In [None]:
def __write_product(productFile, product):
    """Write amazon-meta.txt file's line product

    Parameters
    -------------
    productFile : file
      amazon-meta.txt products file
    product : dict
      Actual product being parsed data

    Returns
    -------------

    """
  
    asin = product["ASIN"]
    salesrank = product["salesrank"]
      
    productFile.write(f"{asin};{salesrank}\n")

In [None]:
def __write_e(fileE, product, avg):
    """Write e file's line

    Parameters
    -------------
    fileE : file
      File to answer question e
    product : dict
      Actual product being parsed data
    avg : file
      avg-meta.txt products file

    Returns
    -------------

    """
    asin = product["ASIN"]
    group = product["group"]
    avg = float(avg)

    lineE = f'{group};{asin};{avg}'

    fileE.write(f'{lineE}\n')


In [None]:
def __write_review(fileA, fileC, fileG, reviewInfo, product, avg):
    """Write amazon-meta.txt file's line reviews

    Parameters
    -------------
    fileA : file
      File to answer question a
    fileC : file
      File to answer question c
    fileE : file
      File to answer question e
    fileG : file
      File to answer question g
    reviewInfo : dict
      amazon-meta.txt products file
    product : dict
      Actual product being parsed data
    avg : file
      avg-meta.txt products file

    Returns
    -------------

    """
    # Variables
    asin = product["ASIN"]
    group = product["group"]
    date = reviewInfo[0]
    avg = float(avg)
    customer_id = reviewInfo[2]
    rating = int(reviewInfo[4]) 
    votes = int(reviewInfo[6]) 
    helpful = int(reviewInfo[8])

    # File's lines to write
    lineA = f'{asin};{helpful};{rating}'
    lineC = f'{asin};{date};{rating}'
    lineE = f'{group};{asin};{avg}'
    lineG = f'{group};{customer_id}'

    # Write lines
    fileA.write(f'{lineA}\n')
    fileC.write(f'{lineC}\n')
    #fileE.write(f'{lineE}\n')
    fileG.write(f'{lineG}\n')

In [None]:
def __write_similar(fileB, similarItems, product):
    """Write amazon-meta.txt file's line product's similars

    Parameters
    -------------
    fileB : file
      File to answer question b
    product : list
      similarItems

    Returns
    -------------

    """
    asin = product["ASIN"]
    for item in similarItems:
      fileB.write(f"{asin};{item}\n")

In [None]:
def parse(filepath):
  """Parse amazon-meta.txt.gz file
  Parameters
  -------------
  filepath : str
    Path to amazon-meta.txt.gz file
  Returns
  -------------
  """
  
  IGNORE_FIELDS = ['Total items']
  product = None
  categories = []

  # Product-Sales file
  productFile = open("product.csv", "w")
  # Similar file
  fileB = open("b.csv", "w")
  # Reviews files
  fileA = open("a.csv", "w")
  fileC = open("c.csv", "w")
  fileE = open("e.csv", "w")
  fileG = open("g.csv", "w")
  # Categories files
  fileD = open("d.csv", "w")
  fileF = open("f.csv", "w")

  # Open amazon-meta.txt.gz
  file = gzip.open(filePath, "r")

  for line in file:
    line = line.decode('utf-8')
    line = line.strip()
    colonPos = line.find(":")
    if line.startswith("Id"): # When find a new product
      if product != None: # Find new item, but not the first one
        __write_categories(fileD, fileF, categories, product, avg)
        __write_product(productFile, product)
      categories = []
      avg=-1
      product = {}  # Reset product dict to be used by the new product
      product["id"] = line[colonPos+4:]
      product["title"] = ""
      product["group"] = ""
      product["salesrank"] = "-1"
      product["categorie"] = ""
    elif line.startswith("similar"): #Find similar
      similarItems = line.split()[2:]
      __write_similar(fileB ,similarItems, product)
    elif line.startswith("reviews"): # Reviews header info
      headerInfo = line.split()
      avg = headerInfo[7]
      __write_e(fileE, product, avg)
    elif line.find("cutomer:") != -1: #Find reviews
      reviewInfo = line.split()
      if reviewInfo[2]: # If product has review(s)
        __write_review(fileA, fileC, fileG, reviewInfo, product, avg)
    elif line.startswith("|"): # Find categories
      cats = line.split("|")
      for cat in cats:
        if cat not in categories and cat:
          categories.append(cat)
    elif colonPos != -1:
      prodName = line[:colonPos]
      prodData = line[colonPos+2:]
      if not prodName in IGNORE_FIELDS:
        product[prodName] = prodData.strip()
  #Insert last product on product.txt file
  __write_product(productFile, product)
  __write_categories(fileD, fileF,categories, product, avg)
  # Closes all files
  productFile.close()
  fileA.close()
  fileB.close()
  fileC.close()
  fileD.close()
  fileF.close()
  fileE.close()
  fileG.close() 

In [None]:
filePath = "/content/amazon-meta.txt.gz"

parse(filePath) #Parse amazon-meta.txt.gz file function

<class '_io.TextIOWrapper'>


Variables / Import (Setup)
----

In [None]:
import pandas as pd

In [None]:
folderPath = "/content" # File's folder path

productAsin = "0738700797" # Product

Dataframe
---

(a) Dado produto, listar os 5 comentários mais úteis e com maior avaliação e os 5 comentários mais úteis e com menor avaliação

In [None]:
fileASchema = StructType([
    StructField("Asin", StringType(), False),
    StructField("Helpful", IntegerType(), False),
    StructField("Average", IntegerType(), False)])

In [None]:
fileA = spark.read.option("delimiter", ";").csv(f"{folderPath}/a.csv", header=False, schema=fileASchema)
dfFileA = fileA.toDF("asin", "helpful", "average") #Columns names

In [None]:
reviewsA = dfFileA.filter(col('asin') == productAsin)
mostHelpful = reviewsA.sort(col("helpful").desc()).cache()

dataFrameA1 = mostHelpful.sort(col("average").desc()).take(5)
dataFrameA2 = mostHelpful.sort(col("average").asc()).take(5)

------------

(b) Dado um produto, listar os produtos similares com maiores vendas do que ele

In [None]:
# Schemas

fileBSchema = StructType([
    StructField("Asin", StringType(), False),
    StructField("Similar", StringType(), False)])

fileProductSchema = StructType([
    StructField("Asin", StringType(), False),
    StructField("Salesrank", IntegerType(), False)])

In [None]:
# B file
fileB = spark.read.option("delimiter", ";").csv(f"{folderPath}/b.csv", header=False, schema=fileBSchema)
dfFileB = fileB.toDF("asin", "similar") #Columns names

# Product File
fileProduct = spark.read.option("delimiter", ";").csv(f"{folderPath}/product.csv", header=False, schema=fileBSchema)
dfFileProd = fileProduct.toDF("asin", "salesrank") #Columns names

In [None]:
prodInfo = dfFileProd.filter(col('asin') == productAsin) # Product
similarsProds = dfFileB.filter(col('asin') == productAsin) # Product's similars

In [None]:
similarSales = similarsProds.join(prodInfo, 'asin').withColumnRenamed("salesrank","product sales")

In [None]:
fullSales = dfFileProd.join(similarSales, dfFileProd.asin == similarSales.similar)

# Rename  salesrank column
fullSales = fullSales.drop('asin').withColumnRenamed("salesrank","similar sales")
# Higher salesrank
dataFrameB = fullSales.filter(col('similar sales') < col('product sales')).collect()

----

(c) Dado um produto, mostrar a evolução diária das médias de avaliação ao longo do intervalo de tempo coberto no arquivo de entrada

In [None]:
# Schema
fileCSchema = StructType([
    StructField("Asin", StringType(), False),
    StructField("Date", DateType(), False),
    StructField("Rating", FloatType(), False)])

In [None]:
# C file [asin, date, rating]
fileC = spark.read.option("delimiter", ";").csv(f"{folderPath}/c.csv", header=False, schema=fileCSchema)
dfFileC = fileC.toDF("asin", "date", "rating") #Columns names

In [None]:
dataFrameC = dfFileC.filter(col('asin') == productAsin) \
                    .orderBy("date") \
                    .drop('asin') \
                    .collect()

----

(d) Listar os 10 produtos líderes de venda em categoria

In [None]:
# Schema
fileDSchema = StructType([
    StructField("Categorie", StringType(), False),
    StructField("Salesrank", IntegerType(), False),
    StructField("Asin", StringType(), False)])

In [None]:
# D file [categorie, salesrank, asin]
fileD = spark.read.option("delimiter", ";").csv(f"{folderPath}/d.csv", header=False, schema=fileDSchema)
dfFileD = fileD.toDF("categorie", "salesrank", "asin") \
               .filter(col('salesrank') > -1) # Remove negatives

In [None]:
# Group products by categorie and order each grouped categorie by salesrank
window = Window.partitionBy(dfFileD['categorie']) \
               .orderBy(dfFileD['salesrank'].asc())

# Create an unique id for each group item, and retrieve the top 10 of each group
dataFrameD = dfFileD.select('*', rank()
                    .over(window).alias('id')) \
                    .filter(col('id') <= 10) \
                    .collect()

----

(e) Listar os 10 produtos com a maior média de ratings por grupo (Group) de produtos


In [None]:
# Schema
fileESchema = StructType([
    StructField("Group", StringType(), False),
    StructField("Asin", StringType(), False),
    StructField("Average", FloatType(), False)])

In [None]:
# E file [group, asin, average]
fileE = spark.read.option("delimiter", ";").csv(f"{folderPath}/e.csv", header=False, schema=fileESchema)

dfFileE = fileE.toDF("group", "asin", "average")

In [None]:
# Group products by group and order each grouped group by average
window = Window.partitionBy(dfFileE["group"]) \
               .orderBy(dfFileE["average"].desc())

# Create a row number for each group item, and retrieve the top 10 of each group
dataFrameE = dfFileE.select('*', row_number().over(window).alias('row')) \
                    .filter(col('row') <= 10) \
                    .collect()

----

(f) Listar a 5 categorias de produto com a maior média geral de ratings 


In [None]:
# Schema
fileFSchema = StructType([
    StructField("Categorie", StringType(), False),
    StructField("Average", FloatType(), False)])

In [None]:
# F file [categorie, average]
fileF = spark.read.option("delimiter", "|").csv(f"{folderPath}/f.csv", header=False, schema=fileFSchema)

dfFileF = fileF.toDF("categorie", "average")

In [None]:
dataFrameF = dfFileF.groupBy('categorie') \
                    .avg('average') \
                    .orderBy('avg(average)', ascending=False) \
                    .take(5)

-----

(g) Listar os 10 clientes que mais fizeram comentários por grupo (Group) de produto


In [None]:
# Schema
fileGSchema = StructType([
    StructField("Group", StringType(), False),
    StructField("Customer", StringType(), False)])

In [None]:
# G file [group, customer]
fileG = spark.read.option("delimiter", ";").csv(f"{folderPath}/g.csv", header=False, schema=fileGSchema)

dfFileG = fileG.toDF("group", "customer")

In [None]:
# Count customers comments
countComments = dfFileG.groupBy("group", "customer") \
                       .count()

In [None]:
# Group by group and order counts
window = Window.partitionBy(countComments["group"]) \
               .orderBy(countComments["count"].desc())

In [None]:
# Select top 10 customers with more comments
dataFrameG = countComments.select('*', row_number().over(window).alias('row')) \
                          .filter(col('row') <= 10) \
                          .collect()

Views (SQL)
---

(a) Dado produto, listar os 5 comentários mais úteis e com maior avaliação e os 5 comentários mais úteis e com menor avaliação

In [None]:
dfFileA.createOrReplaceTempView("a")

In [None]:
sqlA1 = spark.sql(f"SELECT * \
                    FROM a  \
                    WHERE asin={productAsin} \
                    ORDER BY average DESC, helpful DESC \
                    LIMIT 5").collect()

sqlA2 = spark.sql(f"SELECT * \
                    FROM a  \
                    WHERE asin={productAsin} \
                    ORDER BY average, helpful DESC \
                    LIMIT 5").collect()

------------

(b) Dado um produto, listar os produtos similares com maiores vendas do que ele

In [None]:
dfFileB.createOrReplaceTempView("b")

In [None]:
dfFileProd.createOrReplaceTempView("p")

In [None]:
sqlB = spark.sql(f" SELECT * \
                    FROM \
                        (SELECT b.similar, p.salesrank \
                        FROM \
                            (SELECT * FROM b WHERE asin={productAsin}) \
                        AS b \
                        JOIN p ON p.asin=b.similar) \
                    WHERE salesrank < (SELECT salesrank FROM p WHERE asin={productAsin})").collect()

----

(c) Dado um produto, mostrar a evolução diária das médias de avaliação ao longo do intervalo de tempo coberto no arquivo de entrada

In [None]:
dfFileC.createOrReplaceTempView("c")

In [None]:
sqlC = spark.sql(f"SELECT date, rating \
                   FROM c \
                   WHERE asin={productAsin} \
                   ORDER BY date").collect()

----

(d) Listar os 10 produtos líderes de venda em categoria

In [None]:
dfFileD.createOrReplaceTempView("d")

In [None]:
sqlD = spark.sql(f"SELECT * \
                   FROM \
                       ( \
                         SELECT *, Rank() \
                          over  \
                              ( \
                                PARTITION BY categorie\
                                ORDER BY salesrank \
                              ) \
                          AS rank \
                          FROM d \
                        ) \
                    WHERE rank <= 10").collect()

----

(e) Listar os 10 produtos com a maior média de ratings por grupo (Group) de produtos


In [None]:
dfFileE.createOrReplaceTempView("e")

In [None]:
sqlE = spark.sql(f"SELECT * \
                   FROM \
                       ( \
                         SELECT *, Row_Number() \
                          over  \
                              ( \
                                PARTITION BY group\
                                ORDER BY average DESC \
                              ) \
                          AS row \
                          FROM e \
                        ) \
                   WHERE row <= 10").collect()

----

(f) Listar a 5 categorias de produto com a maior média geral de ratings 


In [None]:
dfFileF.createOrReplaceTempView("f")

In [None]:
sqlF = spark.sql(f"SELECT categorie, AVG(average) AS AvgRating \
                   FROM f \
                   GROUP BY categorie \
                   ORDER BY AvgRating DESC \
                   LIMIT 5").collect()

-----

(g) Listar os 10 clientes que mais fizeram comentários por grupo (Group) de produto


In [None]:
dfFileG.createOrReplaceTempView("g")

In [None]:
sqlG = spark.sql(f"SELECT * \
                   FROM \
                       ( \
                         SELECT *, Row_Number() \
                          over  \
                              ( \
                                PARTITION BY group\
                                ORDER BY CountComment DESC \
                              ) \
                          AS row \
                          FROM \
                              ( \
                                SELECT group, customer, COUNT(customer) AS CountComment \
                                FROM g \
                                GROUP BY group, customer \
                                ORDER BY CountComment DESC \
                              ) \
                        ) \
                   WHERE row <= 10").collect()

Dashboard (DataFrames)
---

In [None]:
print(f'Product: {productAsin}')

Product: 0738700797


> Dataframe

In [None]:
# A1
pd.DataFrame(dataFrameA1, columns=["asin", "helpful", "rating"])

Unnamed: 0,asin,helpful,rating
0,738700797,5,5
1,738700797,5,5
2,738700797,8,5
3,738700797,8,5
4,738700797,1,5


In [None]:
# A2
pd.DataFrame(dataFrameA2, columns=["asin", "helpful", "rating"])

Unnamed: 0,asin,helpful,rating
0,738700797,9,1
1,738700797,5,4
2,738700797,6,4
3,738700797,16,4
4,738700797,5,4


In [None]:
# B
pd.DataFrame(dataFrameB, columns=["Similar Sales", "ASIN(Similar)", "Product Sales"])

Unnamed: 0,Similar Sales,ASIN(Similar),Product Sales
0,103012,1567184960,168596
1,159277,738700525,168596


In [None]:
# C
pd.DataFrame(dataFrameC, columns=["Date", "Average"])

Unnamed: 0,Date,Average
0,2001-12-16,5.0
1,2002-01-07,4.0
2,2002-01-24,5.0
3,2002-01-28,5.0
4,2002-02-06,4.0
5,2002-02-14,4.0
6,2002-03-23,4.0
7,2002-05-23,5.0
8,2003-02-25,5.0
9,2003-11-25,5.0


In [None]:
# D
pd.DataFrame(dataFrameD, columns=["categorie", "salesrank", "asin", "row (group)"])

Unnamed: 0,categorie,salesrank,asin,row (group)
0,( J )[70031],5224,0679722769,1
1,( J )[70031],8728,0140432620,2
2,( J )[70031],9342,0486268705,3
3,( J )[70031],27533,0486266842,4
4,( J )[70031],28030,0141181265,5
...,...,...,...,...
335934,[139452],2,6301627024,6
335935,[139452],6,B00005T33H,7
335936,[139452],7,6302946387,8
335937,[139452],8,6301729897,9


In [None]:
# E
pd.DataFrame(dataFrameE, columns=["group", "Asin", "average", "row (group)"])

Unnamed: 0,group,Asin,average,row (group)
0,Video,B0000060T5,5.0,1
1,Video,B00000IC8N,5.0,2
2,Video,6304733542,5.0,3
3,Video,6301045734,5.0,4
4,Video,0792296176,5.0,5
5,Video,B00000I9PH,5.0,6
6,Video,6303864120,5.0,7
7,Video,1929732058,5.0,8
8,Video,6304907761,5.0,9
9,Video,6302598869,5.0,10


In [None]:
# F
pd.DataFrame(dataFrameF, columns=["categorie", "average"])

Unnamed: 0,categorie,average
0,"Say, Fazil[58713]",5.0
1,"Verrocchio, Andrea del[1449]",5.0
2,"Major, John[2416]",5.0
3,Warsaw[67673],5.0
4,"Tolkin, Stephen[167969]",5.0


In [None]:
# G
pd.DataFrame(dataFrameG, columns=["group", "customer", "count(comment)", "row(group)"])

Unnamed: 0,group,customer,count(comment),row(group)
0,Video,ATVPDKIKX0DER,72581,1
1,Video,A3UN6WX5RRO2AG,15814,2
2,Video,A2NJO6YE954DBH,1775,3
3,Video,AU8552YCOO5QX,1205,4
4,Video,A3P1A63Q8L32C5,737,5
...,...,...,...,...
71,CE,A1SFX3CR838F36,1,1
72,CE,A2IX9TMXDBUCYV,1,2
73,CE,A1J62O1S6QTHZJ,1,3
74,CE,A13JU90C7AU3RT,1,4


----

DashBoard (Views)
-----

In [None]:
print(f'Product: {productAsin}')

Product: 0738700797


In [None]:
# A1
pd.DataFrame(sqlA1,  columns=["asin", "helpful", "rating"])

Unnamed: 0,asin,helpful,rating
0,738700797,8,5
1,738700797,8,5
2,738700797,5,5
3,738700797,5,5
4,738700797,4,5


In [None]:
# A2
pd.DataFrame(sqlA2,  columns=["asin", "helpful", "rating"])

Unnamed: 0,asin,helpful,rating
0,738700797,9,1
1,738700797,16,4
2,738700797,6,4
3,738700797,5,4
4,738700797,5,4


In [None]:
#B
pd.DataFrame(sqlB, columns=["ASIN(Similar)", "Similar Sales"])

Unnamed: 0,ASIN(Similar),Similar Sales
0,1567184960,103012
1,738700525,159277


In [None]:
# C
pd.DataFrame(sqlC, columns=["Date", "Average"])

Unnamed: 0,Date,Average
0,2001-12-16,5.0
1,2002-01-07,4.0
2,2002-01-24,5.0
3,2002-01-28,5.0
4,2002-02-06,4.0
5,2002-02-14,4.0
6,2002-03-23,4.0
7,2002-05-23,5.0
8,2003-02-25,5.0
9,2003-11-25,5.0


In [None]:
# D
pd.DataFrame(sqlD, columns=["categorie", "salesrank", "asin", "row (categorie)"])

Unnamed: 0,categorie,salesrank,asin,row (categorie)
0,( J )[70031],5224,0679722769,1
1,( J )[70031],8728,0140432620,2
2,( J )[70031],9342,0486268705,3
3,( J )[70031],27533,0486266842,4
4,( J )[70031],28030,0141181265,5
...,...,...,...,...
335934,[139452],2,6301627024,6
335935,[139452],6,B00005T33H,7
335936,[139452],7,6302946387,8
335937,[139452],8,6301729897,9


In [None]:
# E
pd.DataFrame(sqlE, columns=["group", "asin", "average", "row (group)"])

Unnamed: 0,group,asin,average,row (group)
0,Video,B0000060T5,5.0,1
1,Video,B00000IC8N,5.0,2
2,Video,6304733542,5.0,3
3,Video,6301045734,5.0,4
4,Video,0792296176,5.0,5
5,Video,B00000I9PH,5.0,6
6,Video,6303864120,5.0,7
7,Video,1929732058,5.0,8
8,Video,6304907761,5.0,9
9,Video,6302598869,5.0,10


In [None]:
# F
pd.DataFrame(sqlF, columns=["categorie", "average"])

Unnamed: 0,categorie,average
0,"Say, Fazil[58713]",5.0
1,"Verrocchio, Andrea del[1449]",5.0
2,"Major, John[2416]",5.0
3,Warsaw[67673],5.0
4,"Tolkin, Stephen[167969]",5.0


In [None]:
# G
pd.DataFrame(sqlG, columns=["group", "customer", "count(comment)", "row(group)"])

Unnamed: 0,group,customer,count(comment),row(group)
0,Video,ATVPDKIKX0DER,72581,1
1,Video,A3UN6WX5RRO2AG,15814,2
2,Video,A2NJO6YE954DBH,1775,3
3,Video,AU8552YCOO5QX,1205,4
4,Video,A3P1A63Q8L32C5,737,5
...,...,...,...,...
71,CE,A1SFX3CR838F36,1,1
72,CE,A2IX9TMXDBUCYV,1,2
73,CE,A1J62O1S6QTHZJ,1,3
74,CE,A13JU90C7AU3RT,1,4
