# GCP Professional Data Engineer
### Serverless Data Analysis with Google BigQuery and Cloud Dataflow
#### Modules:
- Serverless Data Analysis with BigQuery
- Autoscaling Data Processing Pipelines with Dataflow

#### Learning Objectives:

- Build up a complex BigQuery using clauses, inner selects, built-in functions and joins
- Load and export data to/from BigQuery
- Identify need for nested, repeated fields and user-defined functions

## Module 1: Serverless Data Analysis with Big Query
#### Topics:
- Queries
- Functions
- Load & export data
- Nested, repeated fields
- Window functions
- User defined functions

### BigQuery Overview
#### BigQuery Benefits:
- Interactive analysis of petabyte scale databases
- Familiar SQL 2011 query language
- Nested and repeat fields, user defined functions in Javascript
- Data storage is inexpensive

#### BigQuery Sample Architecture
##### Project (billing, top-level container)
- Limit access to datasets and jobs
- Manage billing

##### Dataset (organization, access control)
- Access Control Lists for Reader/Writer/Owner
- Applied to all tables/views in dataset

##### Table (data w/ schema)
- Columnar storage
- Views are in virtual tables defined by SQL query
- Tables can be external (Cloud Storage, etc.)
- Each column is storage in a separated, encrypted file

##### Jobs (query, import, export, copy) 
- Repeated or long running action
- Can be cancelled

### Lab: Building a BigQuery Query
#### Objectives:
- Create and run a query
- Modify the query to add clauses, subqueries, built-in functions and joins.

#### Task 1: Create and Run a Query
- In the Console, on the Products & services menu () click BigQuery. Click on the Compose Query button on top left, and then click on Show Options, and ensure you are using Standard SQL. You are using Standard SQL if the Use Legacy SQL checkbox is unchecked.
- Click Hide Options.
- In the New Query window, type (or copy-and-paste) the following query:

In [None]:
# sql
SELECT
  airline,
  date,
  departure_delay
FROM
  `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0
  AND departure_airport = 'LGA'
LIMIT
  100

#### Task 2: Aggregate and Boolean Fxns
- In the New Query window, type the following query(s):

In [None]:
# total number of flights departed from LGA
SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

# total number of late flights from LGA
SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0 AND
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

# total number of flights AND total delayed flights
SELECT
  f.airline,
  COUNT(f.departure_delay) AS total_flights,
  SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed
FROM
   `bigquery-samples.airline_ontime_data.flights` AS f
WHERE
  f.departure_airport = 'LGA' AND f.date = '2008-05-13'
GROUP BY
  f.airline

#### Task 3: String Operations, Joins, & Subqueries

In [None]:
SELECT
  CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
  `bigquery-samples.weather_geo.gsod`
WHERE
  station_number = 725030
  AND total_precipitation > 0

# join weather data and flight information
SELECT
  f.airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline

# fraction of flights delayed per airline
SELECT
  airline,
  num_delayed,
  total_flights,
  num_delayed / total_flights AS frac_delayed
FROM (
SELECT
  f.airline AS airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
  )
ORDER BY
  frac_delayed ASC

### End Lab
### Lab: Loading & Exporting Data
#### Objectives:
- Load a CSV file into a BigQuery table using the web UI
- Load a JSON file into a BigQuery table using the CLI
- Export a table using the web UI

#### Task 1: Upload the data using the web UI

- Return to the browser tab containing Console.
- In the Console, on the Products & services menu () click BigQuery.
- In the left column, beneath the text box, find your project name. To the right of the project name, click the blue arrow. - Choose Create new dataset.
- In the ‘Create Dataset' dialog, for Dataset ID, type cpb101_flight_data and click OK.
- Download the following file to your local machine. This file contains the data that will populate the first table.

[Download airports.csv](https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/airports.csv)

- Create a new table in the cpb101_flight_data dataset to store the data from the CSV file. Mouse over the line with the name of the dataset. This will reveal the create table icon, which looks like a plus sign. Click the create table icon (the plus sign) to the right of the cpb101_flight_data dataset.
- On the Create Table page, in the Source Data section:
    - For Location, leave File upload selected.
    - To the right of File upload, click Choose file, then browse to and select airports.csv.
    - Verify File format is set to CSV.
In the Destination Table section:
    - For Table name, leave cpb101_flight_data selected.
    - For Destination table name, type AIRPORTS.
    - For Table type, Native table should be selected and unchangeable.
- In the Schema section:
- Add fields one at a time. The airports.csv has the following fields: IATA, AIRPORT, CITY, STATE, COUNTRY which are of type STRING and LATITUDE, LONGITUDE which are of type FLOAT. Make all these fields REQUIRED.
- In the Options section:
    - For Field delimiter, verify Comma is selected.
    - Since airports.csv contains a single header row, for Header rows to skip, type 1.
    - Accept the remaining default values and click Create Table. BigQuery creates a load job to create the table and upload data into the table (this may take a few seconds). You can track job progress by clicking Job History
- Once the load job is complete, click cpb101_flight_data > AIRPORTS.
- On the Table Details page, click Details to view the table properties and then click Preview to view the table data.

#### Task 2. Upload the data using the CLI
- Return to the browser tab containing Cloud Shell.
- In Cloud Shell, enter the following command to download the schema file for the table to your working directory. (The file is schema_flight_performance.json)
``` 
curl https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/schema_flight_performance.json -o schema_flight_performance.json
```

- Next, you will create a table in the dataset using the schema file you downloaded to Cloud Shell and data from JSON files that are in Cloud Storage. The JSON files have URIs like the following:
```
gs://cloud-training/CPB200/BQ/lab4/domestic_2014_flights_*.json
```

##### Note that your Project ID is stored as a variable in Cloud Shell ($DEVSHELL_PROJECT_ID) so there's no need for you to remember it. If you require it, you can view your Project ID in the command line to the right of your username (after the @ symbol).

- In Cloud Shell, create a table named flights_2014 in the cpb101_flight_data dataset with this command:

``` 
bq load --source_format=NEWLINE_DELIMITED_JSON $DEVSHELL_PROJECT_ID:cpb101_flight_data.flights_2014 gs://cloud-training/CPB200/BQ/lab4/domestic_2014_flights_*.json ./schema_flight_performance.json 

```
- There are multiple JSON files in the Cloud Storage bucket. They are named according to the convention: domestic_2014_flights_*.json. The wildcard (*) character in the command is used to include all of the .json files in the bucket.
- Once the table is created, type the following command to verify table flights_2014 exists in dataset cpb101_flight_data:

```
bq ls $DEVSHELL_PROJECT_ID:cpb101_flight_data
```

#### Task 3: Export table
- In the Console, on the Products & services menu () click Home
- Select and copy the Project ID. For simplicity you will use the Qwiklabs Project ID, which is already globally unique, as the bucket name.
- In the Console, on the Products & services menu () click Storage > Browser.
- Click Create Bucket.
- Specify the following, and leave the remaining settings as their defaults:
- Click Create.
- Record the name of your bucket. You will need it in subsequent tasks.
- In Cloud Shell enter the following to create an environment variable named "BUCKET" and verify that it exists with the echo command.

```
BUCKET= |your unique bucket name (Project ID)|

echo $BUCKET
```

- You can use BUCKET in Cloud Shell commands. And if you need to enter the bucket name <your-bucket> in a text field in Console, you can quickly retrieve the name with "echo $BUCKET".
- Return to the BigQuery web UI. If it is not already open, open Console. On the Products & services menu () click BigQuery.
- Select the AIRPORTS table that you created recently, and using the "down" button to its right, select the option for Export Table.
- In the dialog, specify gs://<YOUR-BUCKET>/bq/airports.csv and click OK.
- Use the CLI to export the table:

```
bq extract cpb101_flight_data.AIRPORTS gs://$BUCKET/bq/airports2.csv
```

- In the Console, on the Products & services menu () click Storage > Browser. Browse to your bucket and ensure that both .csv files have been created.

### End Lab

### Lab: Advanced SQL Queries
#### Objectives: 
- Use Nested fields, Regular expressions, With statement, and Group and Having
- Extract programming information about code commits

#### Task 1: Get information about code commits
- In the Console, on the Products & services menu () click BigQuery.
- Compose a new query, making sure that the "Legacy SQL" option is not checked (you are using Standard SQL).
```
SELECT
  author.email,
  diff.new_path AS path,
  author.date
FROM
  `bigquery-public-data.github_repos.commits`,
  UNNEST(difference) diff
WHERE
  EXTRACT(YEAR
  FROM
    author.date)=2016
LIMIT 10
```
- Play a little with the query above to understand what it is doing. For example, instead of author.email, try just author. What type of field is author?
- Change diff.new_path to difference.new_path. Why does it not work? Replace difference.new_path by difference[OFFSET(0)].new_path. Does this work? Why? What is the UNNEST doing?

#### Task 2: Extract programming language
- Run the following query. Remember to uncheck Legacy SQL.

```
SELECT
  author.email,
  LOWER(REGEXP_EXTRACT(diff.new_path, r'\.([^\./\(~_ \- #]*)$')) lang,
  diff.new_path AS path,
  author.date
FROM
  `bigquery-public-data.github_repos.commits`,
  UNNEST(difference) diff
WHERE
  EXTRACT(YEAR
  FROM
    author.date)=2016
LIMIT
  10
```
- Modify the query above to only use lang if the language consists purely of letters and has a length that is fewer than 8 characters.
- Modify the query above to group by language and list in descending order of the number of commits.
- Here is one solution:
```
WITH
  commits AS (
  SELECT
    author.email,
    LOWER(REGEXP_EXTRACT(diff.new_path, r'\.([^\./\(~_ \- #]*)$')) lang,
    diff.new_path AS path,
    author.date
  FROM
    `bigquery-public-data.github_repos.commits`,
    UNNEST(difference) diff
  WHERE
    EXTRACT(YEAR
    FROM
      author.date)=2016 )
SELECT
  lang,
  COUNT(path) AS numcommits
FROM
  commits
WHERE
  LENGTH(lang)<8
  AND lang IS NOT NULL
  AND REGEXP_CONTAINS(lang, '[a-zA-Z]')
GROUP BY
  lang
HAVING
  numcommits > 100
ORDER BY
  numcommits DESC
```

#### Task 3: Weekend or weekday?
- Modify the previous query to extract the day of the week from author.date. Days 2 to 6 are weekdays.
```
WITH
  commits AS (
  SELECT
    author.email,
    EXTRACT(DAYOFWEEK
    FROM
      author.date) BETWEEN 2
    AND 6 is_weekday,
    LOWER(REGEXP_EXTRACT(diff.new_path, r'\.([^\./\(~_ \- #]*)$')) lang,
    diff.new_path AS path,
    author.date
  FROM
    `bigquery-public-data.github_repos.commits`,
    UNNEST(difference) diff
  WHERE
    EXTRACT(YEAR
    FROM
      author.date)=2016)
SELECT
  lang,
  is_weekday,
  COUNT(path) AS numcommits
FROM
  commits
WHERE
  lang IS NOT NULL
GROUP BY
  lang,
  is_weekday
HAVING
  numcommits > 100
ORDER BY
  numcommits DESC
```
#### Note: Ignoring file extensions that do not correspond to programming languages, it appears that the most popular weekend programming languages are JavaScript, PHP and C (*.h is the C header file), Java, and Python.

### End Lab

### Module 1 Review

1.) I want to query a table, then query within the results of that query. Which of these is the BEST way to do this?
- Use a subquery of the form SELECT ... FROM(SELECT) ...

2.) Which of the following statements are true?
- Dataflow transforms both batch and streaming pipelines
- Dataflow executes Apache Beam pipelines

Incorrect: 
- Side-inputs in Dataflow are a way to export data from one pipeline to share with another pipeline
- Map operations in MapReduce and be performed by Combine transforms in Dataflow

## Module 2: Autoscaling Data Processing Pipelines with Dataflow
#### Topics:
- Pipeline concepts
- MapReduce
- Side inputs
- Streaming

### Lab: A Simple Dataflow Pipeline (Python)

#### Objectives:
-Setup a Python Dataflow project using Apache Beam
-Write a simple pipeline in Python
-Execute the query on the local machine
-Execute the query on the cloud

#### Task 1: Preparation
- Verify that the repository exists, and if not, clone it. Return to the browser tab containing the Cloud Shell code editor. Click on File > Refresh in the left navigator panel. You should see the training-data-analyst directory.
- If the directory does not exist clone the repository from the Cloud Shell command line:
```
cd ~
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
```
- In the Console, on the Products & services menu () click Home
-Select and copy the Project ID. For simplicity you will use the Qwiklabs Project ID, which is already globally unique, as the bucket name.
- In the Console, on the Products & services menu () click Storage > Browser.
- Click Create Bucket.
- Record the name of your bucket. You will need it in subsequent tasks.
- In Cloud Shell enter the following to create an environment variable named "BUCKET" and verify that it exists with the echo command.
```
BUCKET="<your unique bucket name (Project ID)>"
echo (dollarsign)BUCKET
```

- Return to the browser tab for Console. In the top search bar, enter Google Dataflow API. This will take you to the page, Products & Services > APIs & Services > Dashboard > Google Dataflow API. It will either show a status information or it will give you the option to Enable the API.
- If necessary, Enable the API.

#### Task 2: Open Dataflow Project
- The goal of this lab is to become familiar with the structure of a Dataflow project and learn how to execute a Dataflow pipeline. You will need to update some files to install Apache Beam. Apache Beam is an open source platform for executing data processing workflows.
- Return to the browser tab containing Cloud Shell. In Cloud Shell navigate to the directory for this lab:
```
cd ~/training-data-analyst/courses/data_analysis/lab2/python
# install dependencies
sudo ./install_packages.sh
# check version
pip -V
```
#### Task 3: Pipeline Filtering
In the Cloud Shell code editor navigate to the directory /training-data-analyst/courses/data_analysis/lab2/python and view the file grep.py Do not make any changes to the code.
```
cd ~/training-data-analyst/courses/data_analysis/lab2/python
nano grep.py
```

[This file can be found here.](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/data_analysis/lab2/python/grep.py)

- Can you answer these questions about the file grep.py?

    - What files are being read?
    - What is the search term?
    - Where does the output go?
    - There are three transforms in the pipeline:

    - What does the transform do?
    - What does the second transform do?
    - Where does its input come from?
    - What does it do with this input?
    - What does it write to its output?
    - Where does the output go to?
    - What does the third transform do?
    
#### Task 4: Execute the Pipeline Locally

- In the Cloud Shell command line, locally execute grep.py
```
cd ~/training-data-analyst/courses/data_analysis/lab2/python
python grep.py
```
- The output file will be output.txt. If the output is large enough, it will be sharded into separate parts with names like: output-00000-of-00001. If necessary, you can locate the correct file by examining the file's time.
```
# examine file's time
ls -al /tmp
# examine output file
cat /tmp/output-*
```

#### Task 5: Execute the pipeline on the cloud

- Copy some Java files to the cloud
```
gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://(dollarsign)BUCKET/javahelp
```
- Edit the Dataflow pipeline in grepc.py. In the Cloud Shell code editor navigate to the directory /training-data-analyst/courses/data_analysis/lab2/python in and edit the file grepc.py
- Replace PROJECT and BUCKET with your Project ID and Bucket name. Here are easy ways to retrieve the values:

[This pipeline can be found here: grepc.py](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/data_analysis/lab2/python/grepc.py)

echo (dollarsign)DEVSHELL_PROJECT_ID
echo (dollarsign)BUCKET

- Submit dataflow job to the cloud
```
python grepc.py
```

- Return to the browser tab for Console. On the Products & services menu () click Dataflow and click on your job to monitor progress.
- Wait for the job status to turn to Succeeded. At this point, your Cloud Shell will display a command-line prompt.
- Examine the output in the Cloud Storage bucket. On the Products & services menu () click Storage > Browser and click on your bucket. Click the javahelp directory. This job will generate the file output.txt. If the file is large enough it will be sharded into multiple parts with names like: output-0000x-of-000y. You can identify the most recent file by name or by the Last modified field. Click on the file to view it.
- Alternatively, you could download the file in Cloud Shell and view it:

```
gsutil cp gs://$BUCKET/javahelp/output.txt .
cat output.txt
```

### Appendix

In [1]:
#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import re
import sys

def my_grep(line, term):
   if re.match( r'^' + re.escape(term), line):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

ModuleNotFoundError: No module named 'apache_beam'

In [None]:
#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import re

def my_grep(line, term):
   if re.match( r'^' + re.escape(term), line):
      yield line

PROJECT='cloud-training-demos'
BUCKET='cloud-training-demos'

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--job_name=examplejob2',
      '--save_main_session',
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)
   input = 'gs://{0}/javahelp/*.java'.format(BUCKET)
   output_prefix = 'gs://{0}/javahelp/output'.format(BUCKET)
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run()

if __name__ == '__main__':
   run()