[012 Serverless Data Processing with Dataflow - Writing an ETL Pipeline using Apache Beam and Dataflow (Python)](https://www.cloudskillsboost.google/focuses/64780?catalog_rank=%7B%22rank%22%3A3%2C%22num_filters%22%3A0%2C%22has_search%22%3Atrue%7D&parent=catalog&search_id=41740668)

#Setup

##SA

In [None]:
{project-number}-compute@developer.gserviceaccount.comjest # editor

##Konfiguracja środowiska programistycznego opartego na notebooku Jupyter

In [None]:
Vertex AI > Workbench

##Repo notebook Jupyter

In [None]:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/

#Część 1 laboratorium. Pisanie potoku ETL od podstaw

In [None]:
cd 1_Basic_ETL/lab
export BASE_DIR=$(pwd)

##Skonfiguruj środowisko wirtualne i zależności

In [None]:
# Pobiera najnowsze informacje o dostępnych pakietach.
# python3-venv umożliwia tworzenie wirtualnych środowisk Pythona (venv).
# Opcja -y automatycznie akceptuje instalację.

sudo apt-get update && sudo apt-get install -y python3-venv

##venv

In [None]:
python3 -m venv df-env
source df-env/bin/activate

##Instalacja pakietów

In [None]:
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]

##API Dataflow

In [None]:
gcloud services enable dataflow.googleapis.com

#Zadanie 1. Generowanie danych syntetycznych

In [None]:
cd $BASE_DIR/../..

source create_batch_sinks.sh

bash generate_batch_events.sh

head events.json

In [None]:
{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

In [1]:
import json

# Dane wejściowe w formacie JSON
data = {
    "user_id": "-6434255326544341291",
    "ip": "192.175.49.116",
    "timestamp": "2019-06-19T16:06:45.118306Z",
    "http_request": "\"GET eucharya.html HTTP/1.0\"",
    "lat": 37.751,
    "lng": -97.822,
    "http_response": 200,
    "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)",
    "num_bytes": 182
}

# Drukowanie sformatowanych danych JSON
print(json.dumps(data, indent=4))

{
    "user_id": "-6434255326544341291",
    "ip": "192.175.49.116",
    "timestamp": "2019-06-19T16:06:45.118306Z",
    "http_request": "\"GET eucharya.html HTTP/1.0\"",
    "lat": 37.751,
    "lng": -97.822,
    "http_response": 200,
    "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)",
    "num_bytes": 182
}


#Zadanie 2. Odczytaj dane ze źródła

In [None]:
import argparse
import time
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners import DataflowRunner, DirectRunner

# ### main

def run():
    # Command line arguments
    parser = argparse.ArgumentParser(description='Load from Json into BigQuery')
    parser.add_argument('--project',required=True, help='Specify Google Cloud project')
    parser.add_argument('--region', required=True, help='Specify Google Cloud region')
    parser.add_argument('--stagingLocation', required=True, help='Specify Cloud Storage bucket for staging')
    parser.add_argument('--tempLocation', required=True, help='Specify Cloud Storage bucket for temp')
    parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')

    opts = parser.parse_args()

    # Setting up the Beam pipeline options
    options = PipelineOptions()
    options.view_as(GoogleCloudOptions).project = opts.project
    options.view_as(GoogleCloudOptions).region = opts.region
    options.view_as(GoogleCloudOptions).staging_location = opts.stagingLocation
    options.view_as(GoogleCloudOptions).temp_location = opts.tempLocation
    options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-',time.time_ns())
    options.view_as(StandardOptions).runner = opts.runner

    # Static input and output
    input = 'gs://{0}/events.json'.format(opts.project)
    output = '{0}:logs.logs'.format(opts.project)

    # Table schema for BigQuery
    table_schema = {
        "fields": [
            {
                "name": "ip",
                "type": "STRING"
            },
            {
                "name": "user_id",
                "type": "STRING"
            },
            {
                "name": "lat",
                "type": "FLOAT"
            },
            {
                "name": "lng",
                "type": "FLOAT"
            },
            {
                "name": "timestamp",
                "type": "STRING"
            },
            {
                "name": "http_request",
                "type": "STRING"
            },
            {
                "name": "http_response",
                "type": "INTEGER"
            },
            {
                "name": "num_bytes",
                "type": "INTEGER"
            },
            {
                "name": "user_agent",
                "type": "STRING"
            }
        ]
    }

    # Create the pipeline
    p = beam.Pipeline(options=options)

    '''

    Steps:
    1) Read something
    2) Transform something
    3) Write something

    '''

    (p
        | 'ReadFromGCS' >> beam.io.ReadFromText(input)
        | 'ParseJson' >> beam.Map(lambda line: json.loads(line))
        | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            output,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )
    )

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")

    p.run()

if __name__ == '__main__':
  run()

In [None]:
import argparse  # Moduł do parsowania argumentów wiersza poleceń
import time  # Moduł do operacji na czasie (np. generowanie unikalnej nazwy zadania)
import logging  # Moduł do logowania informacji
import json  # Moduł do parsowania danych JSON
import apache_beam as beam  # Import głównej biblioteki Apache Beam
from apache_beam.options.pipeline_options import GoogleCloudOptions  # Opcje dla GCP
from apache_beam.options.pipeline_options import PipelineOptions  # Opcje dla potoku
from apache_beam.options.pipeline_options import StandardOptions  # Standardowe opcje dla Apache Beam
from apache_beam.runners import DataflowRunner, DirectRunner  # Możliwe wykonawce Apache Beam

# Definicja głównej funkcji uruchamiającej potok

def run():
    # Parsowanie argumentów wiersza poleceń
    parser = argparse.ArgumentParser(description='Load from JSON into BigQuery')
    parser.add_argument('--project', required=True, help='Specify Google Cloud project')  # Identyfikator projektu GCP
    parser.add_argument('--region', required=True, help='Specify Google Cloud region')  # Region, w którym działa Dataflow
    parser.add_argument('--stagingLocation', required=True, help='Specify Cloud Storage bucket for staging')  # Miejsce przechowywania plików pośrednich
    parser.add_argument('--tempLocation', required=True, help='Specify Cloud Storage bucket for temp')  # Lokalizacja dla plików tymczasowych
    parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')  # Wybór wykonawcy np. Dataflow lub DirectRunner

    opts = parser.parse_args()  # Pobranie wartości argumentów

    # Ustawienie opcji potoku Apache Beam
    options = PipelineOptions()
    options.view_as(GoogleCloudOptions).project = opts.project  # Przypisanie projektu GCP
    options.view_as(GoogleCloudOptions).region = opts.region  # Ustawienie regionu GCP
    options.view_as(GoogleCloudOptions).staging_location = opts.stagingLocation  # Miejsce przechowywania plików pośrednich
    options.view_as(GoogleCloudOptions).temp_location = opts.tempLocation  # Miejsce przechowywania plików tymczasowych
    options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-', time.time_ns())  # Unikalna nazwa zadania
    options.view_as(StandardOptions).runner = opts.runner  # Ustawienie wykonawcy (np. Dataflow)

    # Ścieżka do wejściowego pliku JSON w Google Cloud Storage
    input = 'gs://{0}/events.json'.format(opts.project)
    # Nazwa tabeli docelowej w BigQuery
    output = '{0}:logs.logs'.format(opts.project)

    # Definicja schematu tabeli BigQuery
    table_schema = {
        "fields": [
            {"name": "ip", "type": "STRING"},
            {"name": "user_id", "type": "STRING"},
            {"name": "lat", "type": "FLOAT"},
            {"name": "lng", "type": "FLOAT"},
            {"name": "timestamp", "type": "STRING"},
            {"name": "http_request", "type": "STRING"},
            {"name": "http_response", "type": "INTEGER"},
            {"name": "num_bytes", "type": "INTEGER"},
            {"name": "user_agent", "type": "STRING"}
        ]
    }

    # Tworzenie potoku Apache Beam
    p = beam.Pipeline(options=options)

    # Definicja kroków potoku
    (
        p
        | 'ReadFromGCS' >> beam.io.ReadFromText(input)  # Odczyt danych JSON z pliku w Google Cloud Storage
        | 'ParseJson' >> beam.Map(lambda line: json.loads(line))  # Konwersja każdej linii JSON na obiekt Python (słownik)
        | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            output,  # Nazwa tabeli docelowej
            schema=table_schema,  # Schemat tabeli BigQuery
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  # Tworzenie tabeli, jeśli nie istnieje
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE  # Nadpisanie istniejących danych
        )
    )

    # Ustawienie poziomu logowania i uruchomienie potoku
    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")
    p.run()

# Uruchomienie potoku, jeśli skrypt jest wykonywany bezpośrednio
if __name__ == '__main__':
    run()


#Zadanie 3. Uruchom swój potok, aby sprawdzić, czy działa

In [None]:
# Przechodzi do katalogu bazowego, który powinien być wcześniej ustawiony (export BASE_DIR=/ścieżka/do/projektu).
cd $BASE_DIR

# Pobiera identyfikator bieżącego projektu GCP i zapisuje go w zmiennej PROJECT_ID.
export PROJECT_ID=$(gcloud config get-value project)

python3 my_pipeline.py \
  --project=${PROJECT_ID} \ # określa projekt GCP.
  --region=Region \ # wymaga uzupełnienia poprawnego regionu, np. us-central1.
  --stagingLocation=gs://$PROJECT_ID/staging/ \ # miejsce w GCS na pliki stagingowe (np. zależności).
  --tempLocation=gs://$PROJECT_ID/temp/ \ # miejsce w GCS na pliki tymczasowe.
  --runner=DirectRunner # wykonuje pipeline lokalnie, bez wysyłania do Dataflow.

#Zadanie 4. Dodaj transformację

In [None]:
[Output_PCollection] = ([Input_PCollection] | [First Transform]
                                            | [Second Transform]
                                            | [Third Transform])

In [None]:
p | beam.Map(lambda x : something(x))

In [None]:
def something(x):
  y = # Do something!
  return y

p | beam.Map(something)

In [None]:
class MyDoFn(beam.DoFn):
  def process(self, element):
    output = #Do Something!
    yield output

p | beam.ParDo(MyDoFn())

#Zadanie 5. Napisz do ujścia

In [None]:
# Examine dataset
bq ls

# No tables yet
bq ls logs

In [None]:
# piotr_mackowka@cloudshell:~ (avon-prod-analytics)$ bq ls analytics
#               tableId                Type    Labels     Time Partitioning      Clustered Fields
#  ---------------------------------- ------- -------- ------------------------ ------------------
#   002_purchase                       TABLE            DAY (field: true_date)
#   002_select_item                    TABLE            DAY (field: true_date)
#   002_union_all                      TABLE            DAY (field: true_date)
#   002_view_cart                      TABLE            DAY (field: true_date)
#   002_view_item                      TABLE            DAY (field: true_date)
#   002_view_item_list                 TABLE            DAY (field: true_date)
#   003_basic_kpi                      TABLE            DAY (field: true_date)
#   003_basic_kpi_event                TABLE            DAY (field: true_date)
#   003_basic_kpi_transactions         TABLE            DAY (field: true_date)
#   003_event_counter                  TABLE            DAY (field: true_date)
#   004_generate_lead                  TABLE            DAY (field: true_date)
#   004_generate_lead_lag              TABLE
#   004_generate_lead_sessions         TABLE            DAY (field: true_date)
#   005_benchmark                      TABLE            DAY (field: true_date)
#   005_click_avon_nagradza            TABLE            DAY (field: true_date)
#   005_hamburger_menu_avon_nagradza   TABLE            DAY (field: true_date)
#   006_product_performance_campaign   TABLE            DAY (field: true_date)

In [None]:
table_schema = 'name:STRING,id:INTEGER,balance:FLOAT'

In [None]:
table_schema = {
        "fields": [
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "id",
                "type": "INTEGER",
                "mode": "REQUIRED"
            },
            {
                "name": "balance",
                "type": "FLOAT",
                "mode": "REQUIRED"
            }
        ]
    }


In [None]:
p | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            'project:dataset.table',
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )

In [None]:
p | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            'project:dataset.table',  # 🔹 Pełna nazwa tabeli BigQuery w formacie `projekt:dataset.tabela`
            schema=table_schema,  # 🔹 Definicja schematu tabeli (lista pól i ich typów)
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  # 🔹 Tworzy tabelę, jeśli nie istnieje

'''Uwaga:WRITE_TRUNCATE usunie i utworzy ponownie Twoją tabelę za każdym razem.
Jest to pomocne na wczesnym etapie iteracji potoku, szczególnie gdy iterujesz swój schemat,
ale może łatwo spowodować niezamierzone problemy w produkcji. WRITE_APPEND lub WRITE_EMPTY są bezpieczniejsze.'''
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE  # 🔹 Nadpisuje tabelę przy każdym uruchomieniu potoku
)

#Zadanie 6. Uruchom swój potok

In [None]:
# Set up environment variables
cd $BASE_DIR
export PROJECT_ID=$(gcloud config get-value project)

# Run the pipelines
python3 my_pipeline.py \
  --project=${PROJECT_ID} \
  --region=Region \
  --stagingLocation=gs://$PROJECT_ID/staging/ \
  --tempLocation=gs://$PROJECT_ID/temp/ \
  --runner=DataflowRunner

#Część 2 laboratorium. Parametryzacja podstawowego ETL

#Zadanie 1. Utwórz plik schematu JSON

In [None]:
cd $BASE_DIR/../..

'''	•	Wyświetla schemat tabeli logs.logs w BigQuery w formacie JSON, co ułatwia czytanie i analizę struktury danych.
	•	--schema → Pobiera tylko schemat tabeli, bez zawartości.
	•	--format=prettyjson → Formatuje wyjście jako czytelny JSON.'''
bq show --schema --format=prettyjson logs.logs

In [None]:
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json

cat schema.json

export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp schema.json gs://${PROJECT_ID}/

#Zadanie 2. Napisz funkcję zdefiniowaną przez użytkownika w JavaScript

In [None]:
function transform(line) {
  return line;
}

#Zadanie 3. Uruchom szablon przepływu danych


#Zadanie 4. Przejrzyj kod szablonu przepływu danych