<a href="https://colab.research.google.com/github/svvsaga/datascience_workshop/blob/peder/workshop_sesjon1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Outline
Her tar vi utgangspunkt i GPS-data fra Peder sin kjøretur fra Trondheim til Ørsta

1. Laste opp fil til GCS via terminalen (Cloud Shell/gsutil)
  - Nevne at det også kan gjøres via GUI og client libs
2. Importere data direkte fra GCS til BigQuery (Cloud Shell/bq)
  - Nevne at det også kan gjøres via GUI og client libs
3. Utføre en enkel BigQuery-spørring for å se at vi har fått inn data
4. Legge til datamapping i en Dataflow-jobb, og importere og mappe data inn i BigQuery
  - Kan bruke samme data som i pkt. 2, men at vi i DF-jobben forbedrer datamodellen, f.eks. ved å slå sammen to kolonner lat, long til en geography-kolonne
  - Bør vise Dataflow-GUI mens jobben kjører
5. Sjekke i BigQuery at dataene har komt inn på riktig format
6. Vise at dei kan laste inn alle dataene frå GCS til BigQuery via GUI
7. Meir? Skal vi joine med geolokasjon i denne bolken eller seinare?
  - https://www.vegvesen.no/trafikkdata/api/?query=%7B%0A%20%20trafficRegistrationPoints%20%7B%0A%20%20%20%20id%0A%20%20%20%20name%0A%20%20%20%20location%20%7B%0A%20%20%20%20%20%20coordinates%20%7B%0A%20%20%20%20%20%20%20%20latLon%20%7B%0A%20%20%20%20%20%20%20%20%20%20lat%0A%20%20%20%20%20%20%20%20%20%20lon%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%7D%0A%20%20%7D%0A%7D%0A

## Andre forslag:
- Bruke python libs + dataframes + pandas til noe (sesjon 1 eller 2?)
- Vurdere å vise Cloud Function som henter data og dytter det inn i Dataflow eller BigQuery?
- Andre verktøy: AirByte, FME og DBT

## Nytt forslag:
- Ta utgangspunkt i trafikkdata (saga-trafikkdata-prod-pz8l_timetrafikkdata-ingest)
- Gjere cirka det samme som før i steg 1, 2, 3
- Då ser vi at dataene for det første er veldig nøsta og at vi ikkje veit "ingest time" som er ein del av filnamna.
- Steg 4 (dataflow) vil då kunne gi verdi pga:
  1. Kan ha med ingesttime inn i BigQuery
  2. Kan hente ut berre totalt trafikkvolum, sidan vi ikkje bryr oss om resten

TODO: Forklare at vi skal autentisere oss

---




### 1. Laste opp fil til GCS 
Det finnes flere måter å laster data inn til google cloud storage med ulike nivåer av kode-bruk:
- gjennom google cloud platform grafisk user interface (nettleser)
- gjennom google cloud shell (nettleser)
- gjennom lokal installasjon av google cloud SDK
- ved bruk av client blibliotheker (f.eks. python client)

Her vil vi introdusere muligheter ved bruk av cloud shell og lokal installasjon av cloud SDK. 

Kommandoene er i utgangspunkt de samme, men oppsett er litt forskjellig: 




#### Oppsett Cloud shell
- Logg deg inn på Google Cloud Platform UI gjennom nettleseren: https://console.cloud.google.com/ 
- Verifiser at prosjektet til workshop et valgt som arbeidsprosjekt
- Aktiver cloud shell ("Activate cloud shell" knapp i øverste høyre hjørnet i UI), vent til cloud shell er provisjonert


#### Oppsett Google cloud SDK
- Last ned og installer Google cloud SDK for ditt system (i forkant av workshoppen): https://cloud.google.com/sdk/docs/install 
- initialiser cloud shell (verifiser at prosjektet er satt til ditt arbeids/workshop prosjekt)



#### Last ned eksempelfiler fra GCS (gjennom UI)
- Last ned 2 filer a
 gs://saga-trafikkdata-prod-pz8l_timetrafikkdata-ingest 
saga-trafikkdata-prod-pz8l_timetrafikkdata-ingest


#### Lag en GCS bøtte
Lag en GCS bøtte der filene skal lastest opp. 
Dette kan gjøres enten gjennom grafisk user interface eller gjennom cloud shell/SDK

- I grafisk user interface naviger til Cloud Storage -> Browser og trykk på "Create bucket" 
- Velg et (globalt unik!) navn til bucket og konfigurer lagringsopsjoner
  - standard storage
  - single-region, europe-west3
  - uniform access

Det samme kan oppnås ved bruk av følgende cloud/SDK shell kommando:

```
gsutil mb -b -c standard -l EUROPE-WEST3 gs://bucket-name
```

In [None]:
gsutil mb -b -c standard -l EUROPE-WEST3 gs://bucket-name

SyntaxError: ignored

#### Laste opp filer til GCS bucket 
Laste opp lokale filer til GCS bucket ved bruk av cloud/SDK shell. 

In [None]:
gsutil cp path/to/file.csv gs://bucket-name

Det er mulig å laste opp enkelte filter eller en liste med filter samtidig

In [None]:
gsutil cp path/to/*.csv gs://bucket-name

Til slutt verifiser at filene har blitt lastet opp, enten ved grafisk user interace (storage browser) eller gjennom cloud shell.

In [None]:
gsutil ls gs://bucket-name

En oversikt over gsutil kommandos finnes her: https://cloud.google.com/storage/docs/gsutil eller ved bruk av `gsutil help` i cloud shell terminal.

#### Laste opp filer med klientbiblioteker.
- Filer kan også lastes opp ved bruk av client bibliotheker. En oversikt over støttede programmeringsspråk finnes her: https://cloud.google.com/storage/docs/reference/libraries 

- I cellen under finnes det et minimalt eksempel med python client bibliothek.

- For å kjøre python kode direkte fra denne notebook, må google cloud brukeren først autentiseres. 

- Vi bruker google.colab.auth python bibliothek til autentisering og google.cloud.storage til interaksjon med storage buckets

In [None]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated') 

Authenticated


In [None]:
from google.cloud import storage
from google.cloud import bigquery
%load_ext google.colab.data_table

proj = # TODO define workshop-project
bucket =  # TODO define gs://bucket-name
file_name = # TODO define lokal file name
blob_name = # TODO define GCS blob name for the file
client = storage.Client(project = proj) # initialize client and set the billing project. NB: vi trenger ikke å gjenta credentials her.
bucket_object = client.get_bucket(bucket) # define the target bucket
blob = bucket.blob(blob_name) # make a file blob
blob.upload_from_filename(file_name) # upload the content from lokal file to GCS

SyntaxError: ignored

NB: Det finnes klientbiblioteker til de fleste GCP verktøy/tjenester. F.eks i python: https://cloud.google.com/python/docs/reference 

### 2. Importere data direkte fra GCS til BigQuery
Etter dataene har blitt lastet opp til GCS skal de ofte importeres til BigQuery til videre analyse. BigQuery er et datavarehus som kan lagre store datasett hvor man kan bruke SQL spørringer for å hente ut data.

BigQuery støtter en del formater direkte: 
- Avro
- CSV
- JSON
- ORC
- Parquet

Dataene kan importeres fra GCS bøtter, lokale filer (max 10MB upload per fil), google drive, BigTable eller genereres on-the-fly (lage en tom tabell).


Her importerer vi dataene vi har lastet opp til GCS bøtte til BigQuery ved bruk av cloud shell kommando. BigQuery organiserer dataene i prosjekter, datasett og så tabeller. Før vi kan laste opp dataene i en tabell må vi derfor lage en ny datasett:

In [None]:
bq --location=EUROPE-WEST3 mk -d dataset_name

Når datasett har blitt opprettet kan vi importere dataene fra GCS inn til en BigQuery tabell under samme datasett.

In [None]:
bq load dataset_name.table_name gs://bucket-name/fil.csv

In [None]:
bq --location=europe-west3 load --autodetect --source_format=CSV DATASET.TABLE_ID gs://path-to-file

***
Do we want to load all trips into one table?
***

In [None]:
bq --location=europe-west3 load --autodetect --source_format=CSV DATASET.TABLE_ID gs://path-to-data-folder/*.csv

#### Verifser import med en SQL spørring

Før vi kan hente ut data fra BigQuery via SQL spørringer så må vi definere prosjekt, dataset og tabel IDene.

In [None]:
project_id = "<prosjekt navn fra GCP>"
dataset_id = "raw"
table_id = "<table id>"

In [None]:
project_id = "saga-workshop-dtest-9hsr"
dataset_id = "raw"
table_id = "martin-test"

Her benytter vi oss av BigQuery "inline magic" kommandoer for å kjøre en SQL spørring direkte fra notebooken:

In [None]:
%%bigquery --project $project_id trip_df
SELECT *
FROM `<dataset_id>.<table_id>`

Her benytter vi oss av BigQuery klient biblioteket for å kjøre den samme SQL spørringen:

In [None]:
client = bigquery.Client(project=project_id)

trip_df = client.query(
    """
    SELECT *
    FROM `%s.%s`
    """ % (dataset_id, table_id)
).to_dataframe()



In [None]:
trip_df.head()

Unnamed: 0,longitude,latitude,elevation,time,speed,sat
0,8.966591,63.201223,47.618,2020-10-23 09:02:49+00:00,18.79,18
1,9.665204,63.282293,224.813,2020-10-23 08:31:33+00:00,21.483,19
2,9.222497,63.220013,314.111,2020-10-23 08:50:45.999000+00:00,25.08,20
3,8.965443,63.200993,48.58,2020-10-23 09:02:52+00:00,19.721,20
4,9.259465,63.222015,330.433,2020-10-23 08:49:21.999000+00:00,22.726,21


# Importe data med Dataflow inn i BigQuery

TODO: Tekst her


In [7]:
pip install 'apache-beam[interactive]'



In [6]:
pip install 'apache-beam[gcp]'



In [1]:
!gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=qMtXwuGKGRA83SBHdPdrL3mIMvjiZc&prompt=consent&access_type=offline&code_challenge=NNv3Ad0Mke6_AdidyUaN0l_hVNVeiC0BJ12uqj-DPb0&code_challenge_method=S256

Enter verification code: 4/1AY0e-g7rvsNpJOZr8lZNtejR1GEM9pB7suiTkHIFwlq8nmBXfP1QSupgXBo

You are now logged in as [dtestbrukersen@vegvesen.no].
Your current project is [None].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID


In [None]:
!gcloud projects list

PROJECT_ID                  NAME                        PROJECT_NUMBER
saga-datacatalog-stm-jsey   Saga Data catalog STM       338205945493
saga-oppetid-stm-6cgp       saga-oppetid-stm-6cgp       963226934334
saga-testbrukersen-spike    saga-testbrukersen-spike    670475669172
saga-trafikkdata-prod-pz8l  saga-trafikkdata-prod-pz8l  57533714817
saga-trafikkdata-stm-39pt   saga-trafikkdata-stm-39pt   173749799776
saga-veglogg-stm-bic4       saga-veglogg-stm-bic4       586454033882
saga-vegvar-stm-jwos        saga-vegvar-stm-jwos        373913877824
saga-workshop-dtest-9hsr    saga-workshop-dtest-9hsr    172858400697


In [2]:
!gcloud config set project saga-workshop-dtest-9hsr

Updated property [core/project].


In [3]:
!gcloud auth application-default login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=28ukpDGsQEOAeajeotkp6VFCcbu9XA&prompt=consent&access_type=offline&code_challenge=uqn4VJ1nScLIaBFG2BB8PmcJMMuILj5LVmlgH84RQok&code_challenge_method=S256

Enter verification code: 4/1AY0e-g7v46ebfGNHvOrEQE2s_JXZK9eHSDGaBOqRerLwIDgWljrTtciThWo

Credentials saved to file: [/content/.config/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).

Quota project "saga-workshop-dtest-9hsr" was added to ADC which can be used by Google client libraries for billing and quota. Note that some se

In [9]:
import csv

# Test

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
import apache_beam.io.fileio
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions


In [18]:
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = False
options.view_as(GoogleCloudOptions).project = "saga-workshop-dtest-9hsr"

p = beam.Pipeline(InteractiveRunner(), options=options)

In [20]:
file_pattern = "gs://saga-trafikkdata-prod-pz8l_timetrafikkdata-ingest/2021-04*"

files = (p
  | "find files" >> beam.io.fileio.MatchFiles(file_pattern))

In [21]:
ib.show(files)



In [None]:
foboie = (file_lines
  | "transform" >> beam.Map(lambda line: csv.DictReader())

In [None]:
output = "Hello World"
!echo {output}

Hello World


### Insert data with code

In [None]:
project_id = "saga-workshop-dtest-9hsr"
dataset_id = "raw"
table_id = ".".join([project_id, dataset_id, "martin_test_table"])
table_id

'saga-workshop-dtest-9hsr.raw.martin_test_table'

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object
client = bigquery.Client(project=project_id)

client.create_table(table_id, exists_ok=True)

Table(TableReference(DatasetReference('saga-workshop-dtest-9hsr', 'raw'), 'martin_test_table'))

In [None]:
rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True,
)

with open(file_path, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result()  # Waits for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)