# Chapter 4. Streaming Data: Publication and Ingest

* 싸이그래머 / CloudΨ - DS on GCP [1]
* 김무성

# 차례 
* 준비단계    
* Designing the Event Feed
* Time Correction
* Apache Beam/Cloud Dataflow
    - Parsing Airports Data
    - Adding Time Zone Information
    - Converting Times to UTC
    - Correcting Dates
    - Creating Events
    - Running the Pipeline in the Cloud
* Publishing an Event Stream to Cloud Pub/Sub
    - Get Records to Publish
    - Paging Through Records
    - Building a Batch of Events
    - Publishing a Batch of Events
* Real-Time Stream Processing
    - Streaming in Java Dataflow
        - Windowing a pipeline
        - Streaming aggregation
        - Co-join by key
    - Executing the Stream Processing
    - Analyzing Streaming Data in BigQuery
    - Real-Time Dashboard
* Summary

----------------------------

#### 참고 
* Getting started with Google Cloud Training Material - 2018 - https://www.slideshare.net/jkbaseer/getting-started-with-google-cloud-training-material-2018



---------------------

# 준비단계

Google Cloud Shell로 이동 
* https://console.cloud.google.com/



--------------------

# 실습 1. Batch processing in DataFlow

### Batch processing in DataFlow

1. Setup:
   ```shell
   ./install_packages.sh
   ```
   <br><br>
   
2. Parsing airports data:
	```shell
	cd 04_streaming/simulate
	./install_packages.sh
	./df01.py
	head extracted_airports-00000*
	rm extracted_airports-*
	```
    <br><br>
    
3. Adding timezone information:
	```shell
	./df02.py
	head airports_with_tz-00000*
	rm airports_with_tz-*
	```
    <br><br>
    
4. Converting times to UTC:
	```shell
	./df03.py
	head -3 all_flights-00000*
	```
    <br><br>
    
5. Correcting dates:
	```shell
	./df04.py
	head -3 all_flights-00000*
	rm all_flights-*
	```
    <br><br>
    
6. Create events:
	```shell
	./df05.py
	head -3 all_events-00000*
	rm all_events-*
	```  

-----------------------------------

# Designing the Event Feed

<img src="figures/cap01.png" width=600 />

# Time Correction

# Apache Beam/Cloud Dataflow
* Parsing Airports Data
* Adding Time Zone Information
* Converting Times to UTC
* Correcting Dates
* Creating Events
* Running the Pipeline in the Cloud

<img src="figures/cap02.png" width=600 />

## Parsing Airports Data

## Adding Time Zone Information

## Converting Times to UTC

## Correcting Dates

## Creating Events

---------------

# 실습 2. Pipeline on GCP

1. Go to the GCP web console, API & Services section and enable the Dataflow API.<br><br>

2. In CloudShell, type:
	```shell
	bq mk flights
	gsutil cp airports.csv.gz gs://<BUCKET-NAME>/flights/airports/airports.csv.gz
	./df06.py -p $DEVSHELL_PROJECT_ID -b <BUCKETNAME> 
	```
    <br>
3. Go to the GCP web console and wait for the Dataflow ch04timecorr job to finish. It might take several<br><br>

4. Then, navigate to the BigQuery console and type in:
	```shell
			SELECT
			  ORIGIN,
			  DEP_TIME,
			  DEP_DELAY,
			  DEST,
			  ARR_TIME,
			  ARR_DELAY,
			  NOTIFY_TIME
			FROM
			  flights.simevents
			WHERE
			  (DEP_DELAY > 15 and ORIGIN = 'SEA') or
			  (ARR_DELAY > 15 and DEST = 'SEA')
			ORDER BY NOTIFY_TIME ASC
			LIMIT
			  10
	```

------------------

## Running the Pipeline in the Cloud

<img src="figures/cap03.png" width=600 />

<img src="figures/cap04.png" width=600 />

# Publishing an Event Stream to Cloud Pub/Sub
* Get Records to Publish
* Paging Through Records
* Building a Batch of Events
* Publishing a Batch of Events

<img src="figures/cap05.png" width=600 />

<img src="figures/cap06.png" width=600 />

## Get Records to Publish

## Paging Through Records

## Building a Batch of Events

## Publishing a Batch of Events

-----------------------

# 실습 3. Stream processing

1. In CloudShell, follow the OAuth2 workflow so that the python script can run code on your behalf:
	```shell
	gcloud auth application-default login
	```
    <br>
2. Run
	```shell
	python ./simulate.py --startTime '2015-05-01 00:00:00 UTC' --endTime '2015-05-04 00:00:00 UTC' --speedFactor=30 --project $DEVSHELL_PROJECT_ID
    ```
    <br>
3. In another CloudShell tab, run:
	```shell
	cd 04_streaming/process
	./run_on_cloud.sh <BUCKET-NAME>
	```
    <br>
4. Go to the GCP web console in the Dataflow section and monitor the job.<br><br>

5. Once you see events being written into BigQuery, you can query them from the BigQuery console:
			```shell
			#standardsql
			SELECT
			  *
			FROM
			  `flights.streaming_delays`
			WHERE
			  airport = 'DEN'
			ORDER BY
			  timestamp DESC
			```
            <br>

6. In BigQuery, run this query and save this as a view:
	```shell
		#standardSQL
		SELECT
		  airport,
		  last[safe_OFFSET(0)].*,
		  CONCAT(CAST(last[safe_OFFSET(0)].latitude AS STRING), ",", CAST(last[safe_OFFSET(0)].longitude AS STRING)) AS location
		FROM (
		  SELECT
		    airport,
		    ARRAY_AGG(STRUCT(arr_delay,
		        dep_delay,
		        timestamp,
		        latitude,
		        longitude,
		        num_flights)
		    ORDER BY
		      timestamp DESC
		    LIMIT
		      1) last
		  FROM
		    `flights.streaming_delays`
		  GROUP BY
		    airport )
	```   
    <br>
7. Follow the steps in the chapter to connect to Data Studio and create a GeoMap.<br><br>

8. Stop the simulation program in CloudShell.<br><br>

9. From the GCP web console, stop the Dataflow streaming pipeline.<br><br>


-------------------------

# Real-Time Stream Processing
* Streaming in Java Dataflow
    - Windowing a pipeline
    - Streaming aggregation
    - Co-join by key
* Executing the Stream Processing
* Analyzing Streaming Data in BigQuery
* Real-Time Dashboard

<img src="figures/cap07.png" width=600 />

## Streaming in Java Dataflow
* Windowing a pipeline
* Streaming aggregation
* Co-join by key

### Windowing a pipeline

### Streaming aggregation

### Co-join by key

## Executing the Stream Processing

<img src="figures/cap08.png" width=1000 />

## Analyzing Streaming Data in BigQuery

<img src="figures/cap09.png" width=600 />

<img src="figures/cap10.png" width=600 />

## Real-Time Dashboard

<img src="figures/cap11.png" width=600 />

<img src="figures/cap12.png" width=600 />

# Summary

# 참고자료
* [1] Data Science on the Google Cloud Platform: Implementing End-to-End Real-Time Data Pipelines: From Ingest to Machine Learning - https://www.amazon.com/Data-Science-Google-Cloud-Platform/dp/1491974567
* [2] Book github - https://github.com/GoogleCloudPlatform/data-science-on-gcp