# Init Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = ( 
    SparkSession.builder
            .appName('test').master("yarn")
            .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/22 12:35:56 INFO SparkEnv: Registering MapOutputTracker
24/03/22 12:35:56 INFO SparkEnv: Registering BlockManagerMaster
24/03/22 12:35:56 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/03/22 12:35:56 INFO SparkEnv: Registering OutputCommitCoordinator


.master("local[*]").  Nếu chạy local. Dấu * tượng trưng cho ý spark được phép dùng hết tài nguyên của máy ( CPU/RAM ) để xử lý 

.master("yarn"). Nếu chạy trên cluster YARN


Có thể thêm config vào. Một số config để connect với s3

  - .config("spark.jars", "aws-sdk-java-2.17.81.jar") 

  - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  
  - .config("spark.hadoop.fs.s3a.access.key", "<your_access_key_id>")
  
  - .config("spark.hadoop.fs.s3a.secret.key", "<your_secret_access_key>")

In [3]:
spark

In [5]:
!pyspark --version and spark-shell --version 

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.14, OpenJDK 64-Bit Server VM, 11.0.20.1
Branch HEAD
Compiled by user  on 2023-08-31T21:36:55Z
Revision f8bfd4536d9f8a9cd950dac4d87551d8e2ce5e3f
Url https://bigdataoss-internal.googlesource.com/third_party/apache/spark
Type --help for more information.


# Read File

In [3]:
data_storage = "gs://tk01_nguyennhatkhanh/NguyenNhatKhanh_First.parquet"

df = spark.read.parquet(data_storage)

                                                                                

In [4]:
df.printSchema()

root
 |-- Link: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Size: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Detail: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [43]:
df.show()

+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|         Apparel|          4|            0|          0|   0|                1|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|         Apparel|          5|            1|          2|   0|                1|Favorite for wint...|I love this dress...| 2014-03-04|
|    

# Read Spark with Schema

In [13]:
from pyspark.sql import types

In [40]:
schema = types.StructType([
    types.StructField('marketplace', types.StringType(), True),
    types.StructField('customer_id', types.StringType(), True),
    types.StructField('review_id', types.StringType(), True),
    types.StructField('product_id', types.StringType(), True),
    types.StructField('product_parent', types.StringType(), True),
    types.StructField('product_category', types.StringType(), True),
    types.StructField('star_rating', types.IntegerType(), True),
    types.StructField('helpful_votes', types.IntegerType(), True),
    types.StructField('total_votes', types.IntegerType(), True),
    types.StructField('vine', types.LongType(), True),
    types.StructField('verified_purchase', types.LongType(), True),
    types.StructField('review_headline', types.StringType(), True),
    types.StructField('review_body', types.StringType(), True),
    types.StructField('review_date', types.StringType(), True)
])

In [41]:
df = spark.read \
    .schema(schema) \
    .parquet(data_storage)

In [42]:
df.show()

+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|         Apparel|          4|            0|          0|   0|                1|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|         Apparel|          5|            1|          2|   0|                1|Favorite for wint...|I love this dress...| 2014-03-04|
|    


                                                                                

# Làm việc với cột ( columns ) 

In [110]:
df = df.withColumn('star_rating', df['star_rating'].cast(types.FloatType())) # Transform Col 
df = df.withColumn('review_date', F.to_date(F.col("review_date")))
# df.drop('product_parent') # Drop column

df = df.withColumnRenamed("product_parent","productParent")

In [111]:
df.select("star_rating","productParent").describe().show()



+-------+-----------------+-------------------+
|summary|      star_rating|      productParent|
+-------+-----------------+-------------------+
|  count|          1440000|            1440000|
|   mean|4.035313194444444|4.984089286230021E8|
| stddev|1.318571677617239|2.895259558244346E8|
|    min|              1.0|          100001307|
|    max|              5.0|          999997291|
+-------+-----------------+-------------------+




                                                                                

In [114]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- productParent: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: float (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: long (nullable = true)
 |-- verified_purchase: long (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



# Pyspark và SQL

### SELECT

In [50]:
df.select("*").show()

+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|         Apparel|          4|            0|          0|   0|                1|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|         Apparel|          5|            1|          2|   0|                1|Favorite for wint...|I love this dress...| 2014-03-04|
|    

In [49]:
df.select(
    "customer_id",
    "product_id",
    "review_headline",
    "star_rating",
    "helpful_votes").show()

+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|   32158956|B01KL6O72Y|★ THESE REALLY DO...|          4|            0|
|    2714559|B01ID3ZS5W|Favorite for wint...|          5|            1|
|   12608825|B01I497BGY|Great Socks for t...|          5|            0|
|   25482800|B01HDXFZK6|          Slick hat!|          5|            0|
|    9310286|B01G6MBEBY|I would do it again!|          5|            0|
|   26631939|B01FWRXN0Y|          Five Stars|          5|            0|
|   48785098|B01EXNH1HE|            Love it!|          5|            0|
|   39548589|B01E7OL09O|         Three Stars|          4|            0|
|   29355866|B01DXHX81O|          Five Stars|          5|            0|
|   27477484|B01DDULIJK|    Not my favorite.|          3|            0|
|   17685865|B01BOKOL4A|The Jockey Women'...|          5|       

In [5]:
df.select("name").show()

                                                                                

+--------------------+
|                name|
+--------------------+
|24 JERSEY TEE - W...|
|CREW NECK TEE - B...|
|PLEATED PANTS - B...|
|   PATCH TEE - WHITE|
|   PATCH TEE - BLACK|
|     PATCH TEE - RED|
|SPORT SHORTS - BLACK|
|8TH SPORT TEE - B...|
| 8TH SPORT TEE - RED|
|8TH SPORT TEE - C...|
|        SWE COLD CUP|
|RED BUTTON SHIRT ...|
|RED BUTTON SHIRT ...|
|CRYSTAL BABY TEE ...|
|TYPE BABY TEE - B...|
|ROCKETMAN POLO - ...|
|BLOCK L/S POLO - ...|
|BANDANA JACKET - RED|
|SKELETON HOODIE -...|
|STRIPED SWEATSHOR...|
+--------------------+
only showing top 20 rows



In [6]:
from pyspark.sql import functions as F

In [7]:
df.select(
    F.col("name"),
    F.col("price"),
).distinct().show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------+
|                name|   price|
+--------------------+--------+
|STRAIGHT JEANS - ...|260,000₫|
|   HAPPY TEE - BLACK|297,000₫|
|RIPPED CARGO JEAN...|637,500₫|
|SWE CORDUROY PANT...|289,500₫|
|WAVELINES CARDIGA...|378,000₫|
|     ILY TEE - BLACK|420,000₫|
|REVERSE TIE DYE P...|108,000₫|
|GOTHIC CORDUROY J...|209,700₫|
|SWE STUDIO TEE - ...|342,000₫|
|BLOCK L/S POLO - ...| 96,000₫|
|SKELETON HOODIE -...|275,000₫|
|24 JERSEY TEE - W...|280,000₫|
|  KID HOODIE - GREEN|325,000₫|
|   PATCH TEE - WHITE|273,000₫|
|SWE WOMEN BRIEFS ...| 90,300₫|
|TIGER VARSITY JAC...|399,600₫|
|BUTTON CARGO PANT...|432,000₫|
|      ILY TEE - PINK|420,000₫|
|CRYSTAL BABY TEE ...|280,000₫|
|ILY HOODIE ZIP - ...|595,000₫|
+--------------------+--------+
only showing top 20 rows



                                                                                

### WHERE

In [81]:
df.select(
    F.col("customer_id"),
    F.col("product_id"),
    F.col("review_headline"),
    F.col("star_rating"),
    F.col("helpful_votes")
).filter("star_rating >= 5").show()


[Stage 41:>                                                         (0 + 1) / 1]

+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|    2714559|B01ID3ZS5W|Favorite for wint...|          5|            1|
|   12608825|B01I497BGY|Great Socks for t...|          5|            0|
|   25482800|B01HDXFZK6|          Slick hat!|          5|            0|
|    9310286|B01G6MBEBY|I would do it again!|          5|            0|
|   26631939|B01FWRXN0Y|          Five Stars|          5|            0|
|   48785098|B01EXNH1HE|            Love it!|          5|            0|
|   29355866|B01DXHX81O|          Five Stars|          5|            0|
|   17685865|B01BOKOL4A|The Jockey Women'...|          5|            0|
|   19787539|B01B3Q4Q0O|          Five Stars|          5|            0|
|   44196725|B01ADDSL9U|          Five Stars|          5|            0|
|   45510794|B019P1X5XI|best ever4 for me...|          5|       


                                                                                

In [82]:
df.select(
    F.col("customer_id"),
    F.col("product_id"),
    F.col("review_headline"),
    F.col("star_rating"),
    F.col("helpful_votes")
).filter(F.col("star_rating") >= 4).show()


[Stage 42:>                                                         (0 + 1) / 1]

+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|   32158956|B01KL6O72Y|★ THESE REALLY DO...|          4|            0|
|    2714559|B01ID3ZS5W|Favorite for wint...|          5|            1|
|   12608825|B01I497BGY|Great Socks for t...|          5|            0|
|   25482800|B01HDXFZK6|          Slick hat!|          5|            0|
|    9310286|B01G6MBEBY|I would do it again!|          5|            0|
|   26631939|B01FWRXN0Y|          Five Stars|          5|            0|
|   48785098|B01EXNH1HE|            Love it!|          5|            0|
|   39548589|B01E7OL09O|         Three Stars|          4|            0|
|   29355866|B01DXHX81O|          Five Stars|          5|            0|
|   17685865|B01BOKOL4A|The Jockey Women'...|          5|            0|
|   19787539|B01B3Q4Q0O|          Five Stars|          5|       


                                                                                

In [83]:
condition = (F.col("star_rating") >= 4) & (F.col("helpful_votes") > 10)

df.select(
    F.col("customer_id"),
    F.col("product_id"),
    F.col("review_headline"),
    F.col("star_rating"),
    F.col("helpful_votes")
).filter(condition)

DataFrame[customer_id: string, product_id: string, review_headline: string, star_rating: int, helpful_votes: int]

### ORDER BY

In [None]:
df.select(
    F.col("customer_id"),
    F.col("product_id"),
    F.col("review_headline"),
    F.col("star_rating"),
    F.col("helpful_votes")
).filter(F.col("star_rating") >= 4).orderBy("star_rating").show()

In [84]:
df.select(
    F.col("customer_id"),
    F.col("product_id"),
    F.col("review_headline"),
    F.col("star_rating"),
    F.col("helpful_votes")
).filter(F.col("star_rating") >= 4).sort("star_rating").show()



+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|   30740886|B014WBIGKI|but is a nice swe...|          4|            0|
|   23554103|B013ZZGA7S|          Nice dress|          4|            0|
|    2457386|B014PKNCGE|       quick arrival|          4|            0|
|     123119|B01480VCNG|Nice dress for th...|          4|            7|
|   11003977|B0145UPYWY|Very sheer and li...|          4|            0|
|   39548589|B01E7OL09O|         Three Stars|          4|            0|
|    5189641|B0145U03U2|               Happy|          4|            0|
|   32158956|B01KL6O72Y|★ THESE REALLY DO...|          4|            0|
|   35007013|B0145Q9KZ0|Great quality and...|          4|            0|
|   11003977|B0149DYA4K|Must have a tiny ...|          4|            0|
|   19176319|B0143L7C7K|             Pleased|          4|       


                                                                                

In [85]:
( 
    df.select(
            F.col("customer_id"),
            F.col("product_id"),
            F.col("review_headline"),
            F.col("star_rating"),
            F.col("helpful_votes"))
        .filter(F.col("star_rating") >= 4)
        .sort(
            F.col("star_rating").desc()
        ).show()
)



+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|   22116819|B014MSSP66|          Five Stars|          5|            0|
|     643748|B014E1C364|          Five Stars|          5|            0|
|   47482829|B014M5N1WW|Arrived quickly a...|          5|            0|
|    2714559|B01ID3ZS5W|Favorite for wint...|          5|            1|
|    4105777|B014L79H8I|          Great Gift|          5|            0|
|   25482800|B01HDXFZK6|          Slick hat!|          5|            0|
|     502031|B014K9ETKS|          Five Stars|          5|            3|
|   26631939|B01FWRXN0Y|          Five Stars|          5|            0|
|   44617615|B014K3PHXW|          Five Stars|          5|            0|
|   46404187|B014K01GSU|Excellent RFID Wa...|          5|            0|
|   26780399|B014EDMV32|            Love it!|          5|       


                                                                                

In [95]:
( 
    df.select(
            F.col("customer_id"),
            F.col("product_id"),
            F.col("review_headline"),
            F.col("star_rating"),
            F.col("helpful_votes"))
        .filter(F.col("star_rating") >= 4)
        .sort(
            F.desc("star_rating") 
        ).show()
)



+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|   22116819|B014MSSP66|          Five Stars|        5.0|            0|
|     643748|B014E1C364|          Five Stars|        5.0|            0|
|   47482829|B014M5N1WW|Arrived quickly a...|        5.0|            0|
|    2714559|B01ID3ZS5W|Favorite for wint...|        5.0|            1|
|    4105777|B014L79H8I|          Great Gift|        5.0|            0|
|   25482800|B01HDXFZK6|          Slick hat!|        5.0|            0|
|     502031|B014K9ETKS|          Five Stars|        5.0|            3|
|   26631939|B01FWRXN0Y|          Five Stars|        5.0|            0|
|   44617615|B014K3PHXW|          Five Stars|        5.0|            0|
|   46404187|B014K01GSU|Excellent RFID Wa...|        5.0|            0|
|   26780399|B014EDMV32|            Love it!|        5.0|       


                                                                                

###  Limit 

In [86]:
( 
    df.select(
            F.col("customer_id"),
            F.col("product_id"),
            F.col("review_headline"),
            F.col("star_rating"),
            F.col("helpful_votes"))
        .filter(F.col("star_rating") >= 4)
        .sort(
            F.col("star_rating").desc()
        )
        .limit(1)
        .show()
)



+-----------+----------+--------------------+-----------+-------------+
|customer_id|product_id|     review_headline|star_rating|helpful_votes|
+-----------+----------+--------------------+-----------+-------------+
|    2714559|B01ID3ZS5W|Favorite for wint...|          5|            1|
+-----------+----------+--------------------+-----------+-------------+




                                                                                

### GROUP BY 

In [97]:
( 
    df.select(
            F.col("product_id"),
            F.col("star_rating"),
        )
        .groupBy("product_id")
        .mean()
        .show()
)

[Stage 55:>                                                         (0 + 1) / 1]

+----------+----------------+
|product_id|avg(star_rating)|
+----------+----------------+
|B014110CIS|             2.0|
|B013RTQQXK|             5.0|
|B013GVHRZ0|             3.0|
|B012HKN4B2|             3.0|
|B012C86TZW|             5.0|
|B012698SU6|             5.0|
|B0120RNKE8|             3.0|
|B0120OP7NI|             5.0|
|B011XDAW3W|             3.0|
|B011WELXAI|             5.0|
|B011PJXIE4|             5.0|
|B011NNTFDK|             1.0|
|B011NF7YUY|             5.0|
|B011M97GR2|             5.0|
|B011M5NSZ0|             5.0|
|B011JG86VI|             5.0|
|B011J5WPBQ|             5.0|
|B011IF6G3K|             5.0|
|B011HWM0GQ|             3.0|
|B011FERJM6|             4.0|
+----------+----------------+
only showing top 20 rows




                                                                                

In [99]:
( 
    df.select(
            F.col("product_id"),
            F.col("review_headline"),
            F.col("star_rating"),
            F.col("helpful_votes")
        )
        .groupBy("product_id")
        .mean()
        .show()
)



+----------+----------------+------------------+
|product_id|avg(star_rating)|avg(helpful_votes)|
+----------+----------------+------------------+
|B014110CIS|             2.0|               0.0|
|B013RTQQXK|             5.0|               0.0|
|B013GVHRZ0|             3.0|               0.0|
|B012HKN4B2|             3.0|               1.0|
|B012C86TZW|             5.0|               0.0|
|B012698SU6|             5.0|               2.0|
|B0120RNKE8|             3.0|               1.0|
|B0120OP7NI|             5.0|               0.0|
|B011XDAW3W|             3.0|               0.0|
|B011WELXAI|             5.0|               0.0|
|B011PJXIE4|             5.0|               0.0|
|B011NNTFDK|             1.0|               0.0|
|B011NF7YUY|             5.0|               2.0|
|B011M97GR2|             5.0|               0.0|
|B011M5NSZ0|             5.0|               7.0|
|B011JG86VI|             5.0|               0.0|
|B011J5WPBQ|             5.0|               5.0|
|B011IF6G3K|        


[Stage 61:>                                                         (0 + 1) / 1]

                                                                                

In [100]:
( 
    df.select(
            F.col("product_id"),
            F.col("review_headline"),
            F.col("star_rating"),
            F.col("helpful_votes")
        )
        .groupBy("product_id")
        .agg(
            F.min("star_rating"),
            F.max("helpful_votes")
        )
        .show()
)

[Stage 64:>                                                         (0 + 1) / 1]

+----------+----------------+------------------+
|product_id|min(star_rating)|max(helpful_votes)|
+----------+----------------+------------------+
|B014110CIS|             2.0|                 0|
|B013RTQQXK|             5.0|                 0|
|B013GVHRZ0|             3.0|                 0|
|B012HKN4B2|             3.0|                 1|
|B012C86TZW|             5.0|                 0|
|B012698SU6|             5.0|                 3|
|B0120RNKE8|             3.0|                 1|
|B0120OP7NI|             5.0|                 0|
|B011XDAW3W|             3.0|                 0|
|B011WELXAI|             5.0|                 0|
|B011PJXIE4|             5.0|                 0|
|B011NNTFDK|             1.0|                 0|
|B011NF7YUY|             5.0|                 2|
|B011M97GR2|             5.0|                 0|
|B011M5NSZ0|             5.0|                 7|
|B011JG86VI|             5.0|                 0|
|B011J5WPBQ|             5.0|                 5|
|B011IF6G3K|        


                                                                                

# User Define Function ( UDF )

In [105]:
def convert_case(string):
    return string.upper()

convert_case_udf = F.udf(convert_case, returnType=types.StringType())

In [108]:
( 
    df
    .withColumn('review_headline_uppercase', convert_case_udf(F.col("review_headline")))
    .select('review_headline_uppercase','review_headline') 
    .show()
)

+-------------------------+--------------------+
|review_headline_uppercase|     review_headline|
+-------------------------+--------------------+
|     ★ THESE REALLY DO...|★ THESE REALLY DO...|
|     FAVORITE FOR WINT...|Favorite for wint...|
|     GREAT SOCKS FOR T...|Great Socks for t...|
|               SLICK HAT!|          Slick hat!|
|     I WOULD DO IT AGAIN!|I would do it again!|
|               FIVE STARS|          Five Stars|
|                 LOVE IT!|            Love it!|
|              THREE STARS|         Three Stars|
|               FIVE STARS|          Five Stars|
|         NOT MY FAVORITE.|    Not my favorite.|
|     THE JOCKEY WOMEN'...|The Jockey Women'...|
|               FIVE STARS|          Five Stars|
|               FIVE STARS|          Five Stars|
|     BEST EVER4 FOR ME...|best ever4 for me...|
|               FIVE STARS|          Five Stars|
|               FIVE STARS|          Five Stars|
|               FOUR STARS|          Four Stars|
|        AWESOME LEG


[Stage 67:>                                                         (0 + 1) / 1]

                                                                                

# Spark SQL

In [115]:
df.createOrReplaceTempView("df")
# df.registerTempTable("df")

In [117]:
spark.sql(" SELECT * from df ").show()


[Stage 71:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------+----------+-------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|productParent|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+-------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|     24485154|         Apparel|        4.0|            0|          0|   0|                1|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|    363128556|         Apparel|        5.0|            1|          2|   0|                1|Favorite for wint...|I love this dress...| 2014-03-04|
|         


                                                                                

In [119]:
query = """

SELECT product_id, MAX(star_rating), MIN(helpful_votes)
FROM df 
WHERE star_rating >=4 and helpful_votes >= 10
GROUP BY product_id
LIMIT 10

""" 
spark.sql(query).show()

[Stage 77:>                                                         (0 + 1) / 1]

+----------+----------------+------------------+
|product_id|max(star_rating)|min(helpful_votes)|
+----------+----------------+------------------+
|B00MMMSNMQ|             5.0|                12|
|B00MF78G84|             4.0|                13|
|B00M8I0B1A|             5.0|                13|
|B00LY1YPAK|             5.0|                17|
|B00LXTN63K|             4.0|                33|
|B00LXG1EM8|             5.0|                27|
|B00LWG59AW|             5.0|                55|
|B00LVZN3F2|             5.0|                11|
|B00LSXELJY|             5.0|                42|
|B00LQMW4YQ|             5.0|                10|
+----------+----------------+------------------+




                                                                                

# Ghi kết quả ra file parquet 

In [125]:
query = """

SELECT product_id, MAX(star_rating), MIN(helpful_votes)
FROM df 
WHERE star_rating >=4 and helpful_votes >= 10
GROUP BY product_id

""" 

spark.sql(query).write.parquet("gs://aws-review-data/write/report-000.parquet",
                               mode = "overwrite")

#Dùng  write.partitionBy(col).parquet để partition

                                                                                

In [131]:
result_df = spark.read.parquet("gs://aws-review-data/write/report-000.parquet")

In [132]:
result_df.show()

+----------+----------------+------------------+
|product_id|max(star_rating)|min(helpful_votes)|
+----------+----------------+------------------+
|B00MMMSNMQ|             5.0|                12|
|B00MF78G84|             4.0|                13|
|B00M8I0B1A|             5.0|                13|
|B00LY1YPAK|             5.0|                17|
|B00LXTN63K|             4.0|                33|
|B00LXG1EM8|             5.0|                27|
|B00LWG59AW|             5.0|                55|
|B00LVZN3F2|             5.0|                11|
|B00LSXELJY|             5.0|                42|
|B00LQMW4YQ|             5.0|                10|
|B00LPAQ2D8|             5.0|                10|
|B00LLKJLEE|             5.0|                92|
|B00LIQOKG0|             5.0|                18|
|B00LGMGU96|             5.0|                15|
|B00LEOTLXS|             5.0|                19|
|B00LD3QSY0|             4.0|                18|
|B00LCJPCRE|             5.0|                15|
|B00LB055GW|        

# Ví dụ thực tế.

Đếm các comment tốt cho theo sản phẩm + mỗi ngày.

Tiêu chí tốt: 

    - star_rating >= 4 

Tiêu chí lọc comment 

    - helpful_votes >= 10


In [129]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

spark = ( 
    SparkSession.builder
            .master("yarn") 
            .appName('Aggreate Good Comment')
            .getOrCreate()
)

schema = types.StructType([
    types.StructField('marketplace', types.StringType(), True),
    types.StructField('customer_id', types.StringType(), True),
    types.StructField('review_id', types.StringType(), True),
    types.StructField('product_id', types.StringType(), True),
    types.StructField('product_parent', types.StringType(), True),
    types.StructField('product_category', types.StringType(), True),
    types.StructField('star_rating', types.IntegerType(), True),
    types.StructField('helpful_votes', types.IntegerType(), True),
    types.StructField('total_votes', types.IntegerType(), True),
    types.StructField('vine', types.LongType(), True),
    types.StructField('verified_purchase', types.LongType(), True),
    types.StructField('review_headline', types.StringType(), True),
    types.StructField('review_body', types.StringType(), True),
    types.StructField('review_date', types.StringType(), True)
])

data_storage = "gs://aws-review-data/read/amazon_us_reviews-train-00000-of-00005.parquet"
data_write = "gs://aws-review-data/write/report-count"

df = spark.read.parquet(data_storage)
df = df.withColumn('review_date', F.to_date(F.col("review_date")))

( 
    df.select(
            F.col("product_id"),
            F.col("review_date"),
            F.col("star_rating"),
            F.col("helpful_votes")
        )
        .where((F.col("star_rating") >= 4 ) & (F.col("helpful_votes") >= 10 ))
        .groupBy(["product_id","review_date"])
        .count()
        .write.parquet(data_write,mode = "overwrite")
)


# df.createOrReplaceTempView("df")
query = """

SELECT product_id,review_date,COUNT(star_rating)
FROM df 
WHERE star_rating >=4 and helpful_votes >= 10
GROUP BY product_id
""" 

spark.sql(query).write.parquet(data_write,
                        mode = "overwrite")

                                                                                

In [133]:
result_df = spark.read.parquet("gs://aws-review-data/write/report-count")

In [136]:
result_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- count: long (nullable = true)



In [138]:
result_df.orderBy(F.desc("count")).show()

+----------+-----------+-----+
|product_id|review_date|count|
+----------+-----------+-----+
|B00W0R8FYE| 2015-04-13|    7|
|B00KRBY8YQ| 2014-06-10|    5|
|B00JJORFVK| 2014-05-14|    4|
|B00K2WN38M| 2014-07-02|    4|
|B00NLXYCUW| 2014-11-27|    2|
|B00PUP46UU| 2015-02-19|    2|
|B00N7TIIKK| 2014-11-01|    2|
|B00NLMZ86A| 2014-09-18|    2|
|B00RNBXZQU| 2015-03-26|    2|
|B00P0DQ0EW| 2015-05-04|    2|
|B00Q6SJCGI| 2014-11-29|    2|
|B00RMFFH3Q| 2015-01-29|    2|
|B00UID20P2| 2015-06-19|    2|
|B00UFMFADA| 2015-05-14|    2|
|B00RVUQJNO| 2015-07-17|    2|
|B00XK273PK| 2015-08-18|    2|
|B00WXPCWY2| 2015-06-01|    2|
|B00OE80FIG| 2014-10-17|    2|
|B00NOFDSSO| 2015-03-16|    2|
|B011MAHVY4| 2015-08-12|    2|
+----------+-----------+-----+
only showing top 20 rows



# RDD

In [139]:
df.select( 
    F.col('review_headline'),
    F.col('review_body')
).limit(10).show()

+--------------------+--------------------+
|     review_headline|         review_body|
+--------------------+--------------------+
|★ THESE REALLY DO...|These Really Do W...|
|Favorite for wint...|I love this dress...|
|Great Socks for t...|Nice socks, great...|
|          Slick hat!|I bought this for...|
|I would do it again!|Perfect dress and...|
|          Five Stars|Excellent for my ...|
|            Love it!|Raw is the only w...|
|         Three Stars|        A bit large.|
|          Five Stars|          Great fit!|
|    Not my favorite.|Shirt a bit too l...|
+--------------------+--------------------+




[Stage 101:>                                                        (0 + 1) / 1]

                                                                                

In [146]:
rdd_review_body = df.select("product_id","review_body").rdd

In [147]:
rdd_review_body

MapPartitionsRDD[359] at javaToPython at NativeMethodAccessorImpl.java:0

In [148]:
rdd_review_body.take(1)


[Stage 104:>                                                        (0 + 1) / 1]

                                                                                

[Row(product_id='B01KL6O72Y', review_body="These Really Do Work Great, But You Do Need To Know a Few Things.  I've Been Using Mine For a Few Years Now.  First, I Paid a Few Dollars Less For Mine and The Price Has Jumped.  They're All Imported, so Try to Find Either a Cheaper One or One That's Extremely Well Made.  This One is Made Well Enough, If You're Careful.  This Thing Can Cut You, So Don't Let Some Kid Use It, &#34;Because It Looks Like Fun!!&#34;  You Need A Pineapple That's Big Enough.  I Can't Tell You How Many Times I Went to Wal-Mart or the Grocery Store and Their Pineapples were Just Too Small of a Diameter.  It HAS to Be Big Enough.  It's Better To Have Some Waste on The Inside Of The Husk.<br /><br />When I'm Finished Using The Pineapple Corer, Then I Cut Up the Husk Of The Pineapple To Get The Rest Of The Pineapple Cut Up and I Save The Core of the Pineapple To Go Into My Iced Tea Pitcher or Drink Pitcher (That's How They Do It In Hawaii)  When I Was In Hawaii, They Neve

In [162]:
all_data = rdd_review_body.collect()

                                                                                

In [153]:
rdd_review_body.getNumPartitions()

2

In [151]:
rdd_review_body.repartition(4).getNumPartitions()

4

In [154]:
rdd_review_body.getNumPartitions()

2

### Map

In [159]:
def count_len(row):
    review = row.review_body
    product_id = row.product_id
    return (product_id,len(review))
        
rdd_review_body.map(count_len).take(10)

[('B01KL6O72Y', 2911),
 ('B01ID3ZS5W', 371),
 ('B01I497BGY', 87),
 ('B01HDXFZK6', 183),
 ('B01G6MBEBY', 51),
 ('B01FWRXN0Y', 48),
 ('B01EXNH1HE', 97),
 ('B01E7OL09O', 12),
 ('B01DXHX81O', 10),
 ('B01DDULIJK', 197)]

In [145]:
def count_word(row):
    review = row.review_body
    for word in review.split(" "):
        yield (word,1)
        
rdd_review_body.flatMap(count_word).take(10)

[('These', 1),
 ('Really', 1),
 ('Do', 1),
 ('Work', 1),
 ('Great,', 1),
 ('But', 1),
 ('You', 1),
 ('Do', 1),
 ('Need', 1),
 ('To', 1)]

In [161]:
def filter_product(row):
    product_id = row.product_id
    return product_id != "B01KL6O72Y"
        
( 
    rdd_review_body
        .filter(filter_product)
        .map(count_len)
        .take(10)
)

[('B01ID3ZS5W', 371),
 ('B01I497BGY', 87),
 ('B01HDXFZK6', 183),
 ('B01G6MBEBY', 51),
 ('B01FWRXN0Y', 48),
 ('B01EXNH1HE', 97),
 ('B01E7OL09O', 12),
 ('B01DXHX81O', 10),
 ('B01DDULIJK', 197),
 ('B01BOKOL4A', 129)]

In [164]:
def sort_by_value(record):
    return record[1]

(
    rdd_review_body
        .filter(filter_product)
        .map(count_len)
        .sortBy(sort_by_value,ascending = False)
        .take(10)
)


                                                                                

[('B00WDZ81JC', 16332),
 ('B00O7AS0MY', 14273),
 ('B00VZTJPRY', 12340),
 ('B00XHRL53O', 11895),
 ('B00OQXJXY6', 11769),
 ('B00JF0KBBO', 8038),
 ('B00JAN2N2G', 7719),
 ('B00TF7GW5Q', 7458),
 ('B00NR1Y30C', 7454),
 ('B011SQ0ERI', 7239)]

### Reduce

In [172]:
def my_sum(x,y):
    return x + y

In [168]:
(
    rdd_review_body
        .map(count_len)
        .reduceByKey(my_sum)
        .sortBy(sort_by_value,ascending = False)
        .take(10)
)

                                                                                

[('B00LMI9A6Y', 181960),
 ('B00ORZIYBQ', 177020),
 ('B00K5AFQ22', 148052),
 ('B00JSJHQP6', 91681),
 ('B00N9OPF3Q', 63939),
 ('B00N9OPMMU', 59295),
 ('B00LDUSX78', 54388),
 ('B00MAVN0R2', 50601),
 ('B00MWH3RNG', 50072),
 ('B00LLIVQNU', 45985)]

In [181]:
from collections import namedtuple
ProductWordCountRow = namedtuple("ProductWordCountRow",["product_id","len_count"])

def return_to_row(record):
    return ProductWordCountRow(record[0],record[1])

df = (
    rdd_review_body
        .map(count_len)
        .reduceByKey(my_sum)
        .map(return_to_row)
        .toDF()
)

                                                                                

In [182]:
df.show()

+----------+---------+
|product_id|len_count|
+----------+---------+
|B01I497BGY|       87|
|B01G6MBEBY|       51|
|B01FWRXN0Y|       48|
|B01DXHX81O|       10|
|B01DDULIJK|      197|
|B01B3Q4Q0O|       93|
|B01ADDSL9U|       10|
|B019MDXIXG|       10|
|B019438FEG|       59|
|B0178HGNIA|       62|
|B016VIU0QI|      366|
|B016PUU3VO|      231|
|B016AQNDM4|       19|
|B01694YS8K|      135|
|B015YCHLHS|       83|
|B014WCV7JY|       21|
|B014PKNCGE|       38|
|B014MSSP66|       40|
|B014L79H8I|      461|
|B014K3PHXW|       33|
+----------+---------+
only showing top 20 rows



# World Count  bằng rdd

In [184]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

def count_word(row):
    review = row.review_body
    for word in review.split(" "):
        yield (word,1)

def my_sum(x,y):
    return x + y

spark = ( 
    SparkSession.builder
            .master("yarn") 
            .appName('Aggreate Good Comment')
            .getOrCreate()
)

schema = types.StructType([
    types.StructField('marketplace', types.StringType(), True),
    types.StructField('customer_id', types.StringType(), True),
    types.StructField('review_id', types.StringType(), True),
    types.StructField('product_id', types.StringType(), True),
    types.StructField('product_parent', types.StringType(), True),
    types.StructField('product_category', types.StringType(), True),
    types.StructField('star_rating', types.IntegerType(), True),
    types.StructField('helpful_votes', types.IntegerType(), True),
    types.StructField('total_votes', types.IntegerType(), True),
    types.StructField('vine', types.LongType(), True),
    types.StructField('verified_purchase', types.LongType(), True),
    types.StructField('review_headline', types.StringType(), True),
    types.StructField('review_body', types.StringType(), True),
    types.StructField('review_date', types.StringType(), True)
])

data_storage = "gs://aws-review-data/amazon_us_reviews-train-00000-of-00005.parquet"
data_write = "gs://aws-review-data/write/world-count"

df = spark.read.schema(schema).parquet(data_storage)
rdd_review_body = df.select("review_body").rdd
(
    rdd_review_body
        .flatMap(count_word)
        .reduceByKey(my_sum)
        .saveAsTextFile(data_write)
)


                                                                                