# BigFrames Streaming
@johanesalxd

## Setup

In [1]:
import bigframes

# make sure bigframes version >= 1.12.0
bigframes.__version__

'1.12.0'

In [2]:
import bigframes.pandas as bpd
import bigframes.streaming as bst

bigframes.options._bigquery_options.project = (
    "johanesa-playground-326616"  # Change to your own project ID
)
job_id_prefix = "test_streaming_"

In [6]:
# Copy a table from the public dataset for streaming jobs. Any changes to the table can be reflected in the streaming destination.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
df.to_gbq("birds.penguins_streaming", if_exists="replace")

  exec(code_obj, self.user_global_ns, self.user_ns)


'birds.penguins_streaming'

### Caveat

In [31]:
df["json"] = df.apply(lambda x: x.to_json(), axis=1)
df



ValueError: For axis=1 a remote function must be used.

## Create, select, filter and preview
Create the StreamingDataFrame from a BigQuery table, select certain columns, filter rows and preview the output

In [7]:
sdf = bst.read_gbq_table("birds.penguins_streaming")



In [8]:
sdf = sdf[["species", "island", "body_mass_g"]]
sdf = sdf[sdf["body_mass_g"] < 4000]
print(type(sdf))
sdf



<class 'bigframes.streaming.dataframe.StreamingDataFrame'>




Unnamed: 0,species,island,body_mass_g
0,Adelie Penguin (Pygoscelis adeliae),Torgersen,3875.0
1,Adelie Penguin (Pygoscelis adeliae),Torgersen,2900.0
2,Adelie Penguin (Pygoscelis adeliae),Biscoe,3725.0
3,Adelie Penguin (Pygoscelis adeliae),Dream,2975.0
4,Adelie Penguin (Pygoscelis adeliae),Torgersen,3050.0
5,Chinstrap penguin (Pygoscelis antarctica),Dream,2700.0
6,Adelie Penguin (Pygoscelis adeliae),Dream,3900.0
7,Adelie Penguin (Pygoscelis adeliae),Biscoe,3825.0
8,Chinstrap penguin (Pygoscelis antarctica),Dream,3775.0
9,Adelie Penguin (Pygoscelis adeliae),Dream,3350.0


## Pub/Sub
Create Pub/Sub streaming job

In [9]:
# Pub/Sub requires a single column
sdf = sdf[["island"]]



In [13]:
job = sdf.to_pubsub(
    topic="penguins",  # Change to your own Pub/Sub topic ID
    service_account_email="bq-cq-sa@johanesa-playground-326616.iam.gserviceaccount.com",  # Change to your own service account
    job_id=None,
    job_id_prefix=job_id_prefix,
)

In [14]:
print(job.running())
print(job.error_result)

True
None


In [15]:
job.cancel()

True