In [1]:
from pyspark.sql import SparkSession

In [17]:
from pyspark.sql.functions import col, to_date, when, length, year

## Constants

In [2]:
data_path = "/app/data/binhluan.csv"

## Init spark session

In [3]:
spark = (
    SparkSession.builder.appName("PySpark Demo")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/17 17:35:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Demo

#### Load csv

In [8]:
comments = spark.read.option("header", "true").csv(data_path)

In [9]:
comments.show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------------+--------------+------------+--------------------+---------------+
|                 _c0|                User|comment_date|comment_Rating|Comment_Like|      Comment_Review|Comment_Picture|
+--------------------+--------------------+------------+--------------+------------+--------------------+---------------+
|                   1|Hoa Thanh Trung (...|   7/18/2012|             5|           0|                NULL|             no|
|                   2|    Văn Thắng Nguyễn|   11/7/2012|             5|           0|                NULL|             no|
|                   4|      Trung Trần Mẫn|  11/20/2012|             5|           0|                NULL|             no|
|                  19|        Trần- Nguyễn|   4/20/2014|             5|           0|                NULL|             no|
|                  21|             Minh Le|   5/12/2014|             5|           0|                NULL|             no|
|                  22|  

                                                                                

#### DML

In [11]:
comments.createOrReplaceTempView("comments")

In [13]:
high_ratings_df = spark.sql("SELECT User, comment_Rating FROM comments WHERE comment_Rating = 5")
high_ratings_df.show(truncate=False)

+-------------------------+--------------+
|User                     |comment_Rating|
+-------------------------+--------------+
|Hoa Thanh Trung (P. CNTT)|5             |
|Văn Thắng Nguyễn         |5             |
|Trung Trần Mẫn           |5             |
|Trần- Nguyễn             |5             |
|Minh Le                  |5             |
|Hien Tran                |5             |
|Tiger White              |5             |
|Bich Ngoc Bui            |5             |
|Jung Woo Han             |5             |
|Ty Tran                  |5             |
|Chhim Phan Sochetra      |5             |
|nguyen thang             |5             |
|Tuấn nguyễn quốc         |5             |
|Hoang Hai                |5             |
|Kynghesat Quỳnh An       |5             |
|Potassy                  |5             |
|Luc Tran                 |5             |
|Quoc Duy Nguyen          |5             |
|Di Phuong                |5             |
|chi sang                 |5             |
+----------

In [14]:
comment_counts_df = spark.sql("SELECT User, COUNT(*) as comment_count FROM comments GROUP BY User")
comment_counts_df.show(truncate=False)

[Stage 6:>                                                          (0 + 1) / 1]

+------------------------------------------------+-------------+
|User                                            |comment_count|
+------------------------------------------------+-------------+
|Duy Lâm Đoàn                                    |1            |
|Thanh Lê                                        |1            |
|THUỐC LÀO TIẾN HƯNG                             |1            |
|Le Tan Vo B1911435                              |1            |
| đầy đủ                                         |1            |
|Phuc Nhan Pham                                  |1            |
|Sôpha David                                     |1            |
|Kiên Henry                                      |1            |
|lee歡                                           |1            |
|Trần Minh Công                                  |1            |
|Tuan Au Nguyen Quoc                             |1            |
|Hạc Nguyễn                                      |1            |
|Bao Nguyen Gia           

                                                                                

In [16]:
non_empty_reviews_df = spark.sql("SELECT User, Comment_Review FROM comments WHERE Comment_Review IS NOT NULL AND Comment_Review != ''")
non_empty_reviews_df.show(truncate=False)

+------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|User                    |Comment_Review                                                                                                                                                                                                                                                                                                                                             |
+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### Transformation / Process Data

In [19]:
comments = comments.withColumn("comment_date", to_date(col("comment_date"), "M/d/yyyy"))

In [20]:
comments = comments.withColumn("Long_Review", when(length(col("Comment_Review")) > 100, "Yes").otherwise("No"))

In [21]:
comments = comments.withColumn("comment_year", year(col("comment_date")))

In [22]:
comments = comments.fillna({'Comment_Picture': 'No Picture'})

In [23]:
comments = comments.filter(col("Comment_Like") > 0)

In [24]:
comments.show(truncate=False)

+---+-------------------------+------------+--------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+-----------+------------+
|_c0|User                     |comment_date|comment_Rating|Comment_Like|Comment_Review                                                                                                                                                                                                                                                                                                                                                           

#### Export

In [None]:
comments_single_partition = comments.coalesce(1)

In [30]:
comments_single_partition.write.mode("overwrite").format("csv").save("/app/data/result.csv")