In [1]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as func

spark = SparkSession.builder\
        .master("local")\
        .appName("read_streaming_data")\
        .config('spark-master', '7077')\
        .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")\
        .getOrCreate()

input_schema = spark.read.csv(
    path="hdfs://namenode:9000/bigdataproject/raws_data/part-00000-edec05c0-acef-40e2-a95a-e49933863ee0-c000.csv",
    header=True,
    inferSchema=True,
    multiLine=True).schema


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/31 15:45:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/31 15:45:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

In [2]:
streaming = (
    spark.readStream\
    .schema(input_schema)\
    .option("maxFilesPerTrigger", 1)\
    .csv("hdfs://namenode:9000/bigdataproject/streaming_data/*.csv",escape = "\"", header = True, inferSchema=True,multiLine=True)
)

In [24]:
df = streaming.where(func.col("Tên dự án").isNotNull())\
    .withColumn('timestamp', func.to_timestamp("Thời gian đăng"))\
    .withWatermark("timestamp", "1 days")\
    .groupBy(func.window("timestamp", "2 days", "12 hours"),"Tên dự án")\
    .agg(func.count("id").alias("Số lượng tin"))\
    .orderBy(["window.start", "Số lượng tin"], ascending = False)

In [22]:
### Write Streaming analyze to console
activityQuery = (
    df.writeStream.queryName("dest_counts")
    .trigger(processingTime='30 seconds')
    .option("checkpointLocation", "hdfs://namenode:9000/bigdataproject/streaming_data/checkpoint/")
    .option("truncate", "false")
    .format("console")
    .outputMode("complete") #complete or append
    .start()
)

21/12/28 15:16:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----------------------------+------------+
|window                                    |Tên dự án                    |Số lượng tin|
+------------------------------------------+-----------------------------+------------+
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Khu dân cư An Thuận          |4           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Green Valley                 |1           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Centana Thủ Thiêm            |1           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Khu đô thị Cát Lái           |1           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Biên Hoà Riverside           |1           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Terra Rosa                   |1           |
|{2021-06-01 12:00:00, 2021-06-03 12:00:00}|Văn Phú Victoria             |1           |
|{2021-06-01 12:00:00, 



-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------------------------+------------+
|window                                    |Tên dự án                   |Số lượng tin|
+------------------------------------------+----------------------------+------------+
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|Him Lam Phú An              |4           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|Làng Đại Học ABC            |2           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|Centana City Điền Phúc Thành|2           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|Eco Green City              |2           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|Q7 Saigon Riverside         |2           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|N01-T4 Ngoại Giao Đoàn      |1           |
|{2021-06-02 00:00:00, 2021-06-04 00:00:00}|EHome 4                     |1           |
|{2021-06-02 00:00:00, 2021-06-04

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----------------------------+------------+
|window                                    |Tên dự án                    |Số lượng tin|
+------------------------------------------+-----------------------------+------------+
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Villa Park                   |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Khu dân cư Phú Xuân          |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|GoldSeason                   |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Rosita Garden Khang Điền     |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Khu đô thị Mega City 2       |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Khu đô thị Đông Bình Dương   |2           |
|{2021-06-02 12:00:00, 2021-06-04 12:00:00}|Oriental Plaza 685 Âu Cơ     |2           |
|{2021-06-02 12:00:00, 



-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----------------------------------+------------+
|window                                    |Tên dự án                          |Số lượng tin|
+------------------------------------------+-----------------------------------+------------+
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Vinhomes Central Park              |4           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Khu đô thị mới Đông Tăng Long      |3           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Mega City                          |3           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Imperia Garden                     |3           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Vinhomes Golden River Ba Son       |3           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Paradise Riverside                 |3           |
|{2021-06-03 00:00:00, 2021-06-05 00:00:00}|Khu đô thị Me

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+----------------------------------+------------+
|window                                    |Tên dự án                         |Số lượng tin|
+------------------------------------------+----------------------------------+------------+
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|Lavila Kiến Á - Nhà Bè            |9           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|TTTM và Phố chợ Đô Nghĩa          |5           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|Sim City                          |3           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|Sarimi Sala                       |3           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|Nine South Estates                |3           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|Khu đô thị Thanh Hà Mường Thanh   |2           |
|{2021-06-03 12:00:00, 2021-06-05 12:00:00}|M-One Nam Sài Gòn     

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----------------------------------+------------+
|window                                    |Tên dự án                          |Số lượng tin|
+------------------------------------------+-----------------------------------+------------+
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Riviera Point                      |6           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Aqua City                          |2           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Khu đô thị mới Hạ Đình             |2           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Hoàng Anh Thanh Bình               |2           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Mega City                          |2           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|Golden Center City 3               |2           |
|{2021-06-04 00:00:00, 2021-06-06 00:00:00}|KDC Phú Xuân 

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+---------------------------------+------------+
|window                                    |Tên dự án                        |Số lượng tin|
+------------------------------------------+---------------------------------+------------+
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|Vinhomes Riverside               |4           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|CityLand Park Hills              |4           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|Sài Gòn Eco Lake                 |3           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|Sunrise City View                |3           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|Embassy Garden                   |2           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|The Golden Star                  |2           |
|{2021-06-04 12:00:00, 2021-06-06 12:00:00}|Khu đô thị Mega City 2         

                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----------------------------------+------------+
|window                                    |Tên dự án                          |Số lượng tin|
+------------------------------------------+-----------------------------------+------------+
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Vinhomes The Harmony               |9           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Căn hộ RichStar                   |5           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Khu dân cư Phú Xuân                |4           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Khu đô thị Vạn Phúc City           |3           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Sim City                           |3           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Khu đô thị Mỹ Đình Sông Đà - Sudico|2           |
|{2021-06-05 00:00:00, 2021-06-07 00:00:00}|Khu đô thị mớ

                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+----------------------------+------------+
|window                                    |Tên dự án                   |Số lượng tin|
+------------------------------------------+----------------------------+------------+
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|D-Vela                      |4           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Khu đô thị Nam An Khánh     |3           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Làng Đại Học ABC            |3           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Khu đô thị Him Lam Kênh Tẻ  |3           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Flora Anh Đào               |2           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Royal City                  |2           |
|{2021-06-05 12:00:00, 2021-06-07 12:00:00}|Quy Nhơn Melody             |2           |
|{2021-06-05 12:00:00, 2021-06-07

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-------------------------------+------------+
|window                                    |Tên dự án                      |Số lượng tin|
+------------------------------------------+-------------------------------+------------+
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|Midtown Phú Mỹ Hưng            |7           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|The Peak                       |4           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|Centana Thủ Thiêm              |3           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|The Art                        |3           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|Him Lam Riverside              |3           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|Indochina Plaza Hanoi          |3           |
|{2021-06-06 00:00:00, 2021-06-08 00:00:00}|The View Riviera Point         |3           |
|{2

In [25]:
### Write Streaming analyze to console
activityQuery = (
    df.writeStream.queryName("dest_counts")
    .trigger(processingTime='30 seconds')
    .option("checkpointLocation", "hdfs://namenode:9000/bigdataproject/streaming_data/checkpoint/")
    .format("memory")
    .outputMode("complete") #complete or append
    .start()
)

21/12/31 16:12:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [35]:
spark.sql("select * from dest_counts WHERE `Số lượng tin`>10 ORDER BY `Số lượng tin` DESC").show(truncate=False)

+------------------------------------------+-----------------------------+------------+
|window                                    |Tên dự án                    |Số lượng tin|
+------------------------------------------+-----------------------------+------------+
|{2021-05-31 12:00:00, 2021-06-02 12:00:00}|Him Lam Phú An               |19          |
|{2021-06-01 00:00:00, 2021-06-03 00:00:00}|Him Lam Phú An               |19          |
|{2021-05-31 00:00:00, 2021-06-02 00:00:00}|Him Lam Phú An               |15          |
|{2021-05-31 00:00:00, 2021-06-02 00:00:00}|KDC Phú Nhuận - Phước Long B |14          |
|{2021-05-31 12:00:00, 2021-06-02 12:00:00}|KDC Phú Nhuận - Phước Long B |14          |
|{2021-06-01 00:00:00, 2021-06-03 00:00:00}|KDC Phú Nhuận - Phước Long B |14          |
|{2021-05-31 00:00:00, 2021-06-02 00:00:00}|Khu đô thị mới Đông Tăng Long|13          |
|{2021-06-01 00:00:00, 2021-06-03 00:00:00}|Khu đô thị mới Đông Tăng Long|13          |
|{2021-05-31 12:00:00, 2021-06-0

In [3]:
input_schema = spark.read.csv("hdfs://namenode:9000/bigdataproject/streaming_data/group=0/part-00000-b9bcd014-0f25-4e56-91d7-9c90d2a65d27.c000.csv",\
                        header = True, inferSchema=True,multiLine=True).schema

In [5]:
spark.streams.active[0].isActive

True

In [26]:
activityQuery.status

{'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [12]:
activityQuery.stop()

In [15]:
new_df = spark.read.csv(
    path="hdfs://namenode:9000/bigdataproject/streaming_output/part-00000-46b5bc2c-c17c-43fd-8ada-dd38b46f6333-c000.csv",
    header=True,
    inferSchema=True,
    multiLine=True)

In [16]:
new_df.show()

+-----+---+---------+------------+
|start|end|Tên dự án|Số lượng tin|
+-----+---+---------+------------+
+-----+---+---------+------------+

