## Google Cloud Storage using Python and Pandas

#### Using gsutil inside the notebook using !

In [5]:
!gsutil list gs://gcs-data-lake-1

gs://gcs-data-lake-1/data/


In [6]:
!gsutil ls -R gs://gcs-data-lake-1

gs://gcs-data-lake-1/data/:

gs://gcs-data-lake-1/data/retail_db/:
gs://gcs-data-lake-1/data/retail_db/create_db_tables_pg.sql
gs://gcs-data-lake-1/data/retail_db/load_db_tables_pg.sql
gs://gcs-data-lake-1/data/retail_db/schemas.json

gs://gcs-data-lake-1/data/retail_db/categories/:
gs://gcs-data-lake-1/data/retail_db/categories/part-00000

gs://gcs-data-lake-1/data/retail_db/customers/:
gs://gcs-data-lake-1/data/retail_db/customers/part-00000

gs://gcs-data-lake-1/data/retail_db/departments/:
gs://gcs-data-lake-1/data/retail_db/departments/part-00000

gs://gcs-data-lake-1/data/retail_db/order_items/:
gs://gcs-data-lake-1/data/retail_db/order_items/part-00000

gs://gcs-data-lake-1/data/retail_db/orders/:
gs://gcs-data-lake-1/data/retail_db/orders/part-00000

gs://gcs-data-lake-1/data/retail_db/products/:
gs://gcs-data-lake-1/data/retail_db/products/part-00000


### Using Python to interact with Google Storage

In [None]:
# By using the below command, it sets the application defaults for the GCS Python library to connect to google cloud

!gcloud auth application-default login

#### Create a client to Google Storage

In [30]:
from google.cloud import storage
import json
import pandas as pd


print("")

gsclient = storage.Client()
gsclient




<google.cloud.storage.client.Client at 0x7f2373bdfbb0>

#### List all the buckets in the account

In [36]:
print(f"Buckets in the account are {list(gsclient.list_buckets())}")

bucket0 = list(gsclient.list_buckets())[0]
bucket0.name

Buckets in the account are [<Bucket: gcs-data-lake-1>]


'gcs-data-lake-1'

#### List all blobs in current bucket and show contents of one of the blobs

In [33]:
for i in bucket0.list_blobs():
    print(i)

<Blob: gcs-data-lake-1, data/retail_db/categories/part-00000, 1676193065875141>
<Blob: gcs-data-lake-1, data/retail_db/create_db_tables_pg.sql, 1676193065836272>
<Blob: gcs-data-lake-1, data/retail_db/customers/part-00000, 1676193065868069>
<Blob: gcs-data-lake-1, data/retail_db/departments/part-00000, 1676193065828750>
<Blob: gcs-data-lake-1, data/retail_db/load_db_tables_pg.sql, 1676193065865869>
<Blob: gcs-data-lake-1, data/retail_db/order_items/part-00000, 1676193065874727>
<Blob: gcs-data-lake-1, data/retail_db/orders/part-00000, 1676193065904849>
<Blob: gcs-data-lake-1, data/retail_db/products/part-00000, 1676193065917054>
<Blob: gcs-data-lake-1, data/retail_db/schemas.json, 1676193065864417>


In [34]:
# Read the last object in memory, which is "schemas.json"

json.loads(i.download_as_string())

{'departments': [{'column_name': 'department_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'department_name',
   'data_type': 'string',
   'column_position': 2}],
 'categories': [{'column_name': 'category_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'category_department_id',
   'data_type': 'integer',
   'column_position': 2},
  {'column_name': 'category_name',
   'data_type': 'string',
   'column_position': 3}],
 'orders': [{'column_name': 'order_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
  {'column_name': 'order_customer_id',
   'data_type': 'timestamp',
   'column_position': 3},
  {'column_name': 'order_status',
   'data_type': 'string',
   'column_position': 4}],
 'products': [{'column_name': 'product_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'product_cateogry_id',
   'data_type': 'integer',
   'c

#### Selecting a specific bucket, and working with it

In [65]:
[i.name for i in gsclient.get_bucket(bucket_or_name="gcs-data-lake-1").list_blobs()]

['data/retail_db/categories/part-00000',
 'data/retail_db/create_db_tables_pg.sql',
 'data/retail_db/customers/part-00000',
 'data/retail_db/departments/part-00000',
 'data/retail_db/load_db_tables_pg.sql',
 'data/retail_db/order_items/part-00000',
 'data/retail_db/orders/part-00000',
 'data/retail_db/products/part-00000',
 'data/retail_db/schemas.json']

##### Creating new blobs of data

In [53]:
# Creating a new blob, and deleting
gsclient.get_bucket(bucket_or_name="gcs-data-lake-1").blob("new_blob").upload_from_string("This is a new blob")
gsclient.get_bucket(bucket_or_name="gcs-data-lake-1").blob("new_blob").exists()
gsclient.get_bucket(bucket_or_name="gcs-data-lake-1").blob("new_blob").delete()

##### Selecting as blob, downloading data and displaying with Pandas Dataframe

In [95]:
blob_name =  "data/retail_db/orders/part-00000"
blob = gsclient.get_bucket("gcs-data-lake-1").get_blob(blob_name)
blob

<Blob: gcs-data-lake-1, data/retail_db/orders/part-00000, 1676193065904849>

In [114]:
from io import BytesIO
io = BytesIO()

io.write(blob.download_as_string())
io.seek(0)
pd.read_csv(io, header=None).head(3)

Unnamed: 0,0,1,2,3
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE


##### Downloading with URI

In [116]:
io = BytesIO()
gsclient.download_blob_to_file("gs://gcs-data-lake-1/data/retail_db/orders/part-00000", io)
io.seek(0)
pd.read_csv(io, header=None).head(3)

Unnamed: 0,0,1,2,3
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE


### Project to upload multiple files to GCS using Python modules

As part of this section, we will be using the modules like google-cloud-storage, OS and glob to enable us to transfer files from local to the object storage with help of Python API
 
Design requirements - 
1. Get list of file names from local
2. Build a blob for each file to upload
3. use upload method to upload the related data
4. use metadata for help with uploading the right info
5. rename blobs with help of file names as reference


In [117]:
!gsutil mb -b on -c standard --autoclass --placement us-east1,us-east4 gs://test-data-lake-1

Creating gs://test-data-lake-1/...


In [118]:
list(gsclient.list_buckets())

[<Bucket: gcs-data-lake-1>, <Bucket: test-data-lake-1>]

At this point, now we have two buckets, with the one we recently created

In [164]:
## Function to get list of all the files that we require

from glob import glob
from os import path

def get_file_list(src_dir):
    items = glob(f"{src_dir}/**", recursive=True)
    for i in filter(lambda item: path.isfile(item), items): yield i

create_blob_name = lambda x: x[x.index("/", x.index("/")+1)+1:]
[create_blob_name(i) for i in get_file_list("../data/")][:2]

['cards/smalldecks/deckofcards.txt', 'cards/smalldecks/deckofcards.tar.gz']

In [166]:
### Let's upload these files to bucket we created

bucket1 = gsclient.get_bucket(bucket_or_name="test-data-lake-1")
for file_to_upload in get_file_list("../data/"):
    # Get the file object
    blob_name=create_blob_name(file_to_upload)
    blob = bucket1.blob(blob_name=blob_name)
    with open(file_to_upload, "rb") as f:
        print(f"Upload of file in progress - {blob_name}")
        blob.upload_from_file(f)

Upload of file in progress - cards/smalldecks/deckofcards.txt
Upload of file in progress - cards/smalldecks/deckofcards.tar.gz
Upload of file in progress - cards/deckofcards.txt
Upload of file in progress - cards/zippeddecks/zippeddeck.txt.gz
Upload of file in progress - cards/zippeddecks/zippeddeck.tar
Upload of file in progress - cards/largedeck.txt.gz
Upload of file in progress - electionresults/ls2014.tsv
Upload of file in progress - nyse/companylist_noheader.csv
Upload of file in progress - nyse/nyse_data.tar.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2004.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2013.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2011.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2009.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_1998.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2003.txt.gz
Upload of file in progress - nyse_all/nyse_data/NYSE_2014.txt.gz
Upload of file in prog

In [167]:
len(list(bucket1.list_blobs()))

82

In [171]:
# Another way of listing the files of interest

list(gsclient.list_blobs(bucket_or_name="test-data-lake-1", prefix="retail_db/"))

[<Blob: test-data-lake-1, retail_db/categories/part-00000, 1676239099401336>,
 <Blob: test-data-lake-1, retail_db/create_db_tables_pg.sql, 1676239099920951>,
 <Blob: test-data-lake-1, retail_db/customers/part-00000, 1676239101051780>,
 <Blob: test-data-lake-1, retail_db/departments/part-00000, 1676239101592531>,
 <Blob: test-data-lake-1, retail_db/load_db_tables_pg.sql, 1676239096202400>,
 <Blob: test-data-lake-1, retail_db/order_items/part-00000, 1676239098325463>,
 <Blob: test-data-lake-1, retail_db/orders/part-00000, 1676239097434223>,
 <Blob: test-data-lake-1, retail_db/products/part-00000, 1676239100453811>,
 <Blob: test-data-lake-1, retail_db/schemas.json, 1676239098876624>]

### Overview of using pandas to process the files in GCS

In order to achieve this, we will be using a python library called `gcsfs`

In [178]:
import pandas as pd

# Because gcsfs is installed, we can directly specify the location with GCS to access the table
df_order_details = pd.read_csv("gs://test-data-lake-1/retail_db/orders/part-00000", names=["order_id", "order_date", "order_customer_id", "order_status"])
df_order_details.head(3)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE


#### Writing parquet files to GCS (compressed and partitioned) and then reading them

In [179]:
# Write the data to GFS
df_order_details.to_parquet("gs://test-data-lake-1/retail_db_parquet/orders/part-00000.snappy.parquet", compression="snappy", partition_cols=["order_status"])

In [183]:
list(gsclient.list_blobs("test-data-lake-1", prefix="retail_db_parquet"))

[<Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=CANCELED/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240825523613>,
 <Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=CLOSED/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240825522819>,
 <Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=COMPLETE/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240825606102>,
 <Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=ON_HOLD/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240826036880>,
 <Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=PAYMENT_REVIEW/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240825481703>,
 <Blob: test-data-lake-1, retail_db_parquet/orders/part-00000.snappy.parquet/order_status=PENDING/aff7d2fc3e744710812a01af8fc1942c-0.parquet, 1676240825557637>,
 <Blob: test-data-lake-1, 

In [188]:
# Print the shape of the data file written to GCS
pd.read_parquet("gs://test-data-lake-1/retail_db_parquet/orders/part-00000.snappy.parquet")

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,50,2013-07-25 00:00:00.0,5225,CANCELED
1,112,2013-07-26 00:00:00.0,5375,CANCELED
2,527,2013-07-28 00:00:00.0,5426,CANCELED
3,552,2013-07-28 00:00:00.0,1445,CANCELED
4,564,2013-07-28 00:00:00.0,2216,CANCELED
...,...,...,...,...
68878,68606,2014-06-28 00:00:00.0,2253,SUSPECTED_FRAUD
68879,68686,2014-07-23 00:00:00.0,2591,SUSPECTED_FRAUD
68880,68718,2013-09-14 00:00:00.0,6710,SUSPECTED_FRAUD
68881,68782,2014-01-10 00:00:00.0,8509,SUSPECTED_FRAUD


### Project to upload multiple files to GCS using Python modules -USING PANDAS

As part of this section, we will be using the modules like google-cloud-storage, OS and glob to enable us to transfer files from local to the object storage with help of Python API
 
Design requirements - 
1. Get list of file names from local
2. Build a blob for each file to upload
3. use upload method to upload the related data
4. use metadata for help with uploading the right info
5. rename blobs with help of file names as reference


In [244]:
# First delete all blobs, specially the parquet

# delete
bucket1.delete_blobs(list(bucket1.list_blobs()))
# validate
list(bucket1.list_blobs())

[]

Now the files written in previous step have been deleted. The next step is to write a generic function to read the files, convert and transfer to the GCS

In [245]:
for i in bucket1.list_blobs():
    print(i)

In [246]:
def get_file_list(src_dir):
    items = glob(f"{src_dir}/**", recursive=True)
    for i in filter(lambda item: path.isfile(item) and item.endswith("part-00000"), items) : yield i

create_blob_name = lambda x: x[x.index("/", x.index("/")+1)+1:]
[create_blob_name(i) for i in get_file_list("../data/retail_db")]

['retail_db/orders/part-00000',
 'retail_db/order_items/part-00000',
 'retail_db/categories/part-00000',
 'retail_db/products/part-00000',
 'retail_db/customers/part-00000',
 'retail_db/departments/part-00000']

In [251]:
# Read the schema
with open("../data/retail_db/schemas.json") as f:
    schemax = json.load(f)

for file_to_upload in get_file_list("../data/retail_db/"):
    blob_name = create_blob_name(file_to_upload)
    blob_name = f'{blob_name.split("/")[0]}_parquet/' + "/".join(blob_name.split("/")[1:])
    
    table_name = blob_name.split("/")[1]
    schema = pd.DataFrame(schemax[table_name]).sort_values("column_position")
    col_names = schema.column_name
    
    df = pd.read_csv(file_to_upload, names=col_names)
    df.to_parquet(f"gs://test-data-lake-1/{blob_name}.snappy.parquet", compression="snappy")
    print(f"Uploaded file {blob_name}.snappy.parquet")

Uploaded file retail_db_parquet/orders/part-00000.snappy.parquet
Uploaded file retail_db_parquet/order_items/part-00000.snappy.parquet
Uploaded file retail_db_parquet/categories/part-00000.snappy.parquet
Uploaded file retail_db_parquet/products/part-00000.snappy.parquet
Uploaded file retail_db_parquet/customers/part-00000.snappy.parquet
Uploaded file retail_db_parquet/departments/part-00000.snappy.parquet


In [255]:
# Read a file to see if column names are picked up
pd.read_parquet("gs://test-data-lake-1/retail_db_parquet/products/part-00000.snappy.parquet").head()

Unnamed: 0,product_id,product_cateogry_id,product_name,product_description,product_price,product_image
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+M...
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+...


## END OF SECTION