## Imports and initialization

In [1]:
import os
import json

from dataverk import Client
from dataverk_vault import api as vault_api
from google.cloud import storage
from google.oauth2 import service_account

### Dataverk

In [2]:
dv = Client()
vault_api.set_secrets_as_envs()

### Google cloud

In [3]:
creds = service_account.Credentials.from_service_account_info(json.loads(os.environ["GOOGLE_CREDS"]))

In [4]:
client = storage.Client(creds.project_id, credentials=creds)

In [5]:
bucket = client.get_bucket("styrk-bucket")

## Read from database

In [6]:
df = dv.read_kafka(["aapen-nada-topic"])

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=b27apvl00046.preprod.local:8443 <connecting> [IPv4 ('10.183.32.47', 8443)]>: connecting to b27apvl00046.preprod.local:8443 [('10.183.32.47', 8443) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=b27apvl00046.preprod.local:8443 <handshake> [IPv4 ('10.183.32.47', 8443)]>: Loading SSL CA from /etc/pki/tls/certs/ca-bundle.crt
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=b27apvl00046.preprod.local:8443 <authenticating> [IPv4 ('10.183.32.47', 8443)]>: Authenticated as srvDataverk via PLAIN
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=b27apvl00046.preprod.local:8443 <authenticating> [IPv4 ('10.183.32.47', 8443)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 1.0.0
INFO:kafka.conn:Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating su

In [7]:
df.head()

Unnamed: 0,id,name,int,tall,objekt,ny,nykolonne,dato,nyerekolonne
0,cf3a9f04-2d0a-425b-9550-bf055508dc16,ErikV6,1,90.49,{'subval1': 'verdi'},30.7,50,2020-04-21,verdi
1,597e628e-b20b-47e8-91dc-9d719570d740,ErikV6,2,90.49,{'subval1': 'verdi'},30.7,50,2020-04-21,verdi
2,67e20c5a-dfdf-432d-b8c8-8ee8ba446f85,ErikV6,3,90.49,{'subval1': 'verdi'},30.7,50,2020-04-21,verdi
3,095f0c6d-321d-44d8-b0ad-710137965dcd,ErikV6,4,90.49,{'subval1': 'verdi'},30.7,50,2020-04-21,verdi
4,e7ee18ad-61de-441f-980e-f313af2e6f21,ErikV6,5,90.49,{'subval1': 'verdi'},30.7,50,2020-04-21,verdi


## Convert dataframe to pickle and upload to gcs

In [8]:
df.to_pickle("databasedump_kafka.pkl")

In [9]:
blob = bucket.blob("airflow/databasedump_kafka.pkl")

In [10]:
blob.upload_from_filename("databasedump_kafka.pkl")