<a href="https://colab.research.google.com/github/smitha-google/DataPipelineFromAzure/blob/main/DataPipelineFromAzure.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Set Variables
**bold text**
```
# This is formatted as code
```



In [10]:
# Set the Project id
project_id = 'smithaargolisinternal'

#Connect to MimeoDataset GCS bucket
bucket_name="mimeodata"

#PubSub Topic Name
topic_name="mimeotopic" 

#PubSub Subscription Name
subscription_name="mimeosubscription"

#BigQuery Dataset Id
dataset_id="mimeo_dataset"

#BigQuery Table Id
table_id="yellowtable"

#GCS Bucket Name to store Functions Data
functions_bucket="smithaargolisinternal-functions"

#Region
region="us-central1"

In [6]:
# Authenticate to Access GCS
from google.colab import auth
auth.authenticate_user()

In [7]:
# gsutil to use the project we specified by using gcloud
!gcloud config set project {project_id}

Updated property [core/project].


In [20]:
#List the files in the GCS Bucket
!gsutil ls gs://{bucket_name}/

gs://mimeodata/bquxjob_5fb8f53c_1807602e66b.json
gs://mimeodata/mimeo_data.csv
gs://mimeodata/ADLSData/


In [17]:
# Create the PubSubTopic 

!gcloud pubsub topics create {topic_name}

Created topic [projects/smithaargolisinternal/topics/mimeotopic].


In [25]:
#Configure Notifications on the bucket
!gsutil notification create -t {topic_name} -f json gs://{bucket_name}/

Created notification config projects/_/buckets/mimeodata/notificationConfigs/4


In [29]:
# Create PubSub Subscriptions to write to BQ direct
!gcloud pubsub subscriptions create {subscription_name} \
  --topic={topic_name} \
  --bigquery-table={project_id}:{dataset_id}.{table_id}

Created subscription [projects/smithaargolisinternal/subscriptions/mimeosubscription].


In [11]:
!gsutil mb -c regional -l {region} gs://{functions_bucket}

Creating gs://smithaargolisinternal-functions/...


In [13]:
!gcloud functions deploy streaming --region={region} \
    --source=streaming --runtime=python37 \
    --stage-bucket={functions_bucket} \
    --trigger-bucket={bucket_name}

[1;31mERROR:[0m (gcloud.functions.deploy) argument `--source`: Provided directory does not exist


In [31]:
!pip install functions-framework

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting functions-framework
  Downloading functions_framework-3.1.0-py3-none-any.whl (30 kB)
Collecting gunicorn<21.0,>=19.2.0
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
[K     |████████████████████████████████| 79 kB 3.7 MB/s 
Collecting cloudevents<2.0.0,>=1.2.0
  Downloading cloudevents-1.5.0-py3-none-any.whl (34 kB)
Collecting watchdog>=1.0.0
  Downloading watchdog-2.1.9-py3-none-manylinux2014_x86_64.whl (78 kB)
[K     |████████████████████████████████| 78 kB 6.6 MB/s 
Collecting deprecation<3.0,>=2.0
  Downloading deprecation-2.1.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: deprecation, watchdog, gunicorn, cloudevents, functions-framework
Successfully installed cloudevents-1.5.0 deprecation-2.1.0 functions-framework-3.1.0 gunicorn-20.1.0 watchdog-2.1.9


In [30]:
!gsutil notification list gs://{bucket_name}

projects/_/buckets/mimeodata/notificationConfigs/2
	Cloud Pub/Sub topic: projects/smithaargolisinternal/topics/my-lab-topic

projects/_/buckets/mimeodata/notificationConfigs/3
	Cloud Pub/Sub topic: projects/smithaargolisinternal/topics/mimeotopic

projects/_/buckets/mimeodata/notificationConfigs/4
	Cloud Pub/Sub topic: projects/smithaargolisinternal/topics/mimeotopic



# Google Cloud Storage (GCS)

In order to use Colaboratory with GCS, you'll need to create a [Google Cloud project](https://cloud.google.com/storage/docs/projects) or use a pre-existing one.

Specify your project ID below:

In [1]:
project_id = 'smithaargolisinternal'

Files in GCS are contained in [buckets](https://cloud.google.com/storage/docs/key-terms#buckets).

Buckets must have a globally-unique name, so we generate one here.

In [2]:
import uuid
bucket_name = 'colab-sample-bucket-' + str(uuid.uuid1())

In order to access GCS, we must authenticate.

In [3]:
from google.colab import auth
auth.authenticate_user()

GCS can be accessed via the `gsutil` command-line utility or via the native Python API.

## `gsutil`

First, we configure `gsutil` to use the project we specified above by using `gcloud`.

In [4]:
!gcloud config set project {project_id}

Updated property [core/project].


Create a local file to upload.

In [5]:
with open('/tmp/to_upload.txt', 'w') as f:
  f.write('my sample file')

print('/tmp/to_upload.txt contains:')
!cat /tmp/to_upload.txt

/tmp/to_upload.txt contains:
my sample file

Make a bucket to which we'll upload the file ([documentation](https://cloud.google.com/storage/docs/gsutil/commands/mb)).

In [6]:
!gsutil mb gs://{bucket_name}

Creating gs://colab-sample-bucket-bd6640ec-1721-11ed-ac14-0242ac1c0002/...


Copy the file to our new bucket ([documentation](https://cloud.google.com/storage/docs/gsutil/commands/cp)).

In [7]:
!gsutil cp /tmp/to_upload.txt gs://{bucket_name}/

Copying file:///tmp/to_upload.txt [Content-Type=text/plain]...
/ [1 files][   14.0 B/   14.0 B]                                                
Operation completed over 1 objects/14.0 B.                                       


Dump the contents of our newly copied file to make sure everything worked ([documentation](https://cloud.google.com/storage/docs/gsutil/commands/cat)).


In [8]:
!gsutil cat gs://{bucket_name}/to_upload.txt

my sample file

In [9]:
#@markdown Once the upload has finished, the data will appear in the Cloud Console storage browser for your project:
print('https://console.cloud.google.com/storage/browser?project=' + project_id)

https://console.cloud.google.com/storage/browser?project=smithaargolisinternal


Finally, we'll download the file we just uploaded in the example above. It's as simple as reversing the order in the `gsutil cp` command.

In [10]:
!gsutil cp gs://{bucket_name}/to_upload.txt /tmp/gsutil_download.txt
  
# Print the result to make sure the transfer worked.
!cat /tmp/gsutil_download.txt

Copying gs://colab-sample-bucket-bd6640ec-1721-11ed-ac14-0242ac1c0002/to_upload.txt...
/ [0 files][    0.0 B/   14.0 B]                                                / [1 files][   14.0 B/   14.0 B]                                                
Operation completed over 1 objects/14.0 B.                                       
my sample file

## Python API

These snippets based on [a larger example](https://github.com/GoogleCloudPlatform/storage-file-transfer-json-python/blob/master/chunked_transfer.py) that shows additional uses of the API.

 First, we create the service client.

In [None]:
from googleapiclient.discovery import build
gcs_service = build('storage', 'v1')

Create a local file to upload.

In [None]:
with open('/tmp/to_upload.txt', 'w') as f:
  f.write('my sample file')

print('/tmp/to_upload.txt contains:')
!cat /tmp/to_upload.txt

/tmp/to_upload.txt contains:
my sample file

Create a bucket in the project specified above.

In [None]:
# Use a different globally-unique bucket name from the gsutil example above.
import uuid
bucket_name = 'colab-sample-bucket-' + str(uuid.uuid1())

body = {
  'name': bucket_name,
  # For a full list of locations, see:
  # https://cloud.google.com/storage/docs/bucket-locations
  'location': 'us',
}
gcs_service.buckets().insert(project=project_id, body=body).execute()
print('Done')

Done


Upload the file to our newly created bucket.

In [None]:
from googleapiclient.http import MediaFileUpload

media = MediaFileUpload('/tmp/to_upload.txt', 
                        mimetype='text/plain',
                        resumable=True)

request = gcs_service.objects().insert(bucket=bucket_name, 
                                       name='to_upload.txt',
                                       media_body=media)

response = None
while response is None:
  # _ is a placeholder for a progress object that we ignore.
  # (Our file is small, so we skip reporting progress.)
  _, response = request.next_chunk()

print('Upload complete')

Upload complete


In [None]:
#@markdown Once the upload has finished, the data will appear in the Cloud Console storage browser for your project:
print('https://console.cloud.google.com/storage/browser?project=' + project_id)

https://console.cloud.google.com/storage/browser?project=Your_project_ID_here


Download the file we just uploaded.

In [None]:
from apiclient.http import MediaIoBaseDownload

with open('/tmp/downloaded_from_gcs.txt', 'wb') as f:
  request = gcs_service.objects().get_media(bucket=bucket_name,
                                            object='to_upload.txt')
  media = MediaIoBaseDownload(f, request)

  done = False
  while not done:
    # _ is a placeholder for a progress object that we ignore.
    # (Our file is small, so we skip reporting progress.)
    _, done = media.next_chunk()

print('Download complete')

Download complete


Inspect the downloaded file.


In [None]:
!cat /tmp/downloaded_from_gcs.txt

my sample file