In [41]:
# list blobs based on path prefix
# source: 

from google.cloud import storage

def list_blobs_with_prefix(bucket_name, prefix, delimiter=None):
    """Lists all the blobs in the bucket that begin with the prefix.
    """

    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)

    # Note: The call returns a response only when the iterator is consumed.
    print("Blobs:")
    for blob in blobs:
        print(blob.name)

    if delimiter:
        print("Prefixes:")
        for prefix in blobs.prefixes:
            print(prefix)

In [54]:
# access zip file from blob storage

import io

storage_client = storage.Client()

bucket = storage_client.get_bucket('data-seeds')
blob = bucket.blob('spotify/streaming_history/20240904T205339.zip')


zipbytes = io.BytesIO(blob.download_as_string())

In [56]:
# read zip content and upload back to gcs

from zipfile import ZipFile
from zipfile import is_zipfile


if is_zipfile(zipbytes):
    with ZipFile(zipbytes, 'r') as myzip:
        for contentfilename in myzip.namelist():
            contentfile = myzip.read(contentfilename)
            blob = bucket.blob(zipfilename_with_path + "/" + contentfilename)
            blob.upload_from_string(contentfile)

True

In [None]:
# convert from json to jsonl



In [60]:
list_blobs_with_prefix('data-seeds', 'google/',delimiter='T')

Blobs:
Prefixes:
google/takeout/20240920T


In [1]:
import traceback

In [9]:
# put it all together

# Get data seed unzip it and then upload it into another bucket

from google.cloud import storage
from zipfile import ZipFile, is_zipfile

import io
import os
import json
import re

# source: https://cloud.google.com/storage/docs/listing-objects with modifications
def list_blobs_with_prefix(bucket_name, prefix):
    """Lists all the blobs in the bucket that begin with the prefix.
    """

    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)

    blob_names = [blob.name for blob in blobs]
    return blob_names

storage_client = storage.Client()
data_seed_bucket = storage_client.get_bucket('data-seeds')

# params
prefix_filter = "spotify/account_data/" # options: ["google/", "spotify/"]  can be parameterized futher by vendor and data type (takeout, account_data, streaming_history)
landing_bucket_name = "compendium-test"

landing_bucket = storage_client.get_bucket(landing_bucket_name)

blob_paths = list_blobs_with_prefix('data-seeds', prefix_filter)

for blob_path in blob_paths:
    blob = data_seed_bucket.blob(blob_path)
    zipbytes = io.BytesIO(blob.download_as_string())
    
    if is_zipfile(zipbytes):
        with ZipFile(zipbytes, 'r') as myzip:
            for contentfilename in myzip.namelist():
                contentfile = myzip.read(contentfilename)
                blob = landing_bucket.blob(blob_path.removesuffix('.zip') + "/" + contentfilename)
                blob.upload_from_string(contentfile)
                

landing_blob_paths = list_blobs_with_prefix(landing_bucket_name, prefix_filter)
json_blobs = [p for p in landing_blob_paths if re.search(r'\.json$', p) is not None]

for json_blob in json_blobs:
    blob = landing_bucket.blob(json_blob)
    blob_content = blob.download_as_string().decode('utf-8', 'replace')
    data = json.loads(blob_content)
    jsonl_blob = landing_bucket.blob(json_blob+"l")
    if isinstance(data, dict):
        jsonl_blob.upload_from_string(json.dumps(data))
    elif isinstance(data, list):
        content = ""
        for datum in data:
            content = content + json.dumps(datum) + "\n"
        jsonl_blob.upload_from_string(content)

In [10]:
json_blobs

['spotify/account_data/20240904T205339/Spotify Account Data/FamilyPlan.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Follow.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Identifiers.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Identity.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Inferences.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Marquee.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Payments.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Playlist1.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/PlaylistInABottle.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/SearchQueries.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/StreamingHistory_music_0.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/StreamingHistory_music_1.json',
 'spotify/account_data/20240904T20533

In [79]:
list_blobs_with_prefix('compendium-test')

['spotify/account_data/20240904T205339.zip']

In [80]:
list_blobs_with_prefix(landing_bucket_name, blob_paths[0].removesuffix('.zip'))

['spotify/account_data/20240904T205339/Spotify Account Data/',
 'spotify/account_data/20240904T205339/Spotify Account Data/FamilyPlan.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Follow.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Identifiers.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Identity.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Inferences.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Marquee.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Payments.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Playlist1.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/PlaylistInABottle.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/Read_Me_First.pdf',
 'spotify/account_data/20240904T205339/Spotify Account Data/SearchQueries.json',
 'spotify/account_data/20240904T205339/Spotify Account Data/StreamingHistory_m

In [19]:
# parse the json and upload it to the DB with dlt

import dlt
from dlt.sources.filesystem import filesystem, read_jsonl

files = filesystem(bucket_url=f"gs://{landing_bucket_name}/{blob_paths[0].removesuffix('.zip')}/Spotify Account Data/", file_glob="*.jsonl")
# filtered_files = [f for f in files if f['file_name'] in ['Marquee.jsonl', "Playlist1.jsonl", "SearchQueries.jsonl", "Userdata.jsonl", "YourLibrary.jsonl"]]
reader = (files | read_jsonl()).with_name("spotify_account_data")
pipeline = dlt.pipeline(pipeline_name="spotify_account_data_pipeline", dataset_name="spotify_account_data", destination="bigquery")

info = pipeline.run(reader)
print(info)


Pipeline spotify_account_data_pipeline load step completed in 28.76 seconds
1 load package(s) were loaded to destination bigquery and into dataset spotify_account_data
The bigquery destination used compendium-sa@cellular-virtue-434518-i4.iam.gserviceaccount.com@cellular-virtue-434518-i4 location to store data
Load package 1726995831.98243 is LOADED and contains no failed jobs


In [16]:
for f in files:
    print(f)

{'file_name': 'FamilyPlan.jsonl', 'relative_path': 'FamilyPlan.jsonl', 'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/FamilyPlan.jsonl', 'mime_type': 'application/jsonl', 'encoding': None, 'modification_date': DateTime(2024, 9, 22, 8, 55, 55, 357000, tzinfo=Timezone('UTC')), 'size_in_bytes': 92}
{'file_name': 'Follow.jsonl', 'relative_path': 'Follow.jsonl', 'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/Follow.jsonl', 'mime_type': 'application/jsonl', 'encoding': None, 'modification_date': DateTime(2024, 9, 22, 8, 55, 55, 580000, tzinfo=Timezone('UTC')), 'size_in_bytes': 75}
{'file_name': 'Identifiers.jsonl', 'relative_path': 'Identifiers.jsonl', 'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/Identifiers.jsonl', 'mime_type': 'application/jsonl', 'encoding': None, 'modification_date': DateTime(2024, 9, 22, 8, 55, 55, 810000, tzinfo=Timezone('UTC')), 'siz

In [14]:
f"gs://{landing_bucket_name}/{blob_paths[0].removesuffix('.zip')}/Spotify Account Data/"

'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/'

In [16]:
f"{landing_bucket_name}/{blob_paths[0].removesuffix('.zip')}"

'compendium-test/spotify/account_data/20240904T205339'

In [23]:
[f for f in files if f['file_name'] in ['Marquee.json', "Playlist1.json", "SearchQueries.json", "Userdata.json", "YourLibrary.json"]]

[{'file_name': 'Marquee.json',
  'relative_path': 'Marquee.json',
  'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/Marquee.json',
  'mime_type': 'application/json',
  'encoding': None,
  'modification_date': DateTime(2024, 9, 22, 7, 15, 27, 574000, tzinfo=Timezone('UTC')),
  'size_in_bytes': 78899},
 {'file_name': 'Playlist1.json',
  'relative_path': 'Playlist1.json',
  'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/Playlist1.json',
  'mime_type': 'application/json',
  'encoding': None,
  'modification_date': DateTime(2024, 9, 22, 7, 15, 28, 454000, tzinfo=Timezone('UTC')),
  'size_in_bytes': 677743},
 {'file_name': 'SearchQueries.json',
  'relative_path': 'SearchQueries.json',
  'file_url': 'gs://compendium-test/spotify/account_data/20240904T205339/Spotify Account Data/SearchQueries.json',
  'mime_type': 'application/json',
  'encoding': None,
  'modification_date': DateTime(2024, 9, 22, 7, 15, 2