# Data Portability API 
## Quick Start Demo

https://developers.google.com/data-portability

## Setup

Set up the Data Portability API : https://developers.google.com/data-portability/user-guide/setup


Create Credentials file:

- Go to https://console.cloud.google.com/apis/credentials.
- Click on "Create Credentials" and select "OAuth client ID".
- Select "Desktop app" as the application type, give it a name. Click "Create".
- Click on the download JSON button and save the file as DP_client_secret.json in the root directory.

In [None]:
pip install google-api-python-client google-auth google-auth-httplib2 google-auth-oauthlib

In [2]:
from collections.abc import Sequence
import io
import os
import time
from typing import Generator
import urllib
import zipfile

from google.oauth2 import credentials
import google_auth_oauthlib.flow
from googleapiclient import discovery
from google_auth_oauthlib.flow import InstalledAppFlow


In [3]:
# The name of a file that contains the OAuth 2.0 information for this
# application, including the client_id and client_secret. For this script, this
# should be a desktop application OAuth client.
CLIENT_SECRETS_FILE = 'DP_client_secret.json'

In [25]:
# A list of Data Portability resources that we want to request.
RESOURCES = [
            # "maps.ev_profile",
            # "maps.commute_routes",
            # "maps.commute_settings",
            # "maps.photos_videos",
            # "maps.reviews",
            # "maps.starred_places",
            # "myactivity.maps",
            # "myactivity.shopping",
            # "myactivity.youtube",
            "myactivity.search",
            # "shopping.addresses",
            # "shopping.reviews",
            # "youtube.channel",
            # "youtube.comments",
            # "youtube.live_chat",
            # "youtube.music",
            # "youtube.private_playlists",
            # "youtube.private_videos",
            # "youtube.public_playlists",
            # "youtube.public_videos",
            # "youtube.shopping",
            # "youtube.subscriptions",
            # "youtube.unlisted_playlists",
            # "youtube.unlisted_videos"
            ]

In [26]:
DATAPORTABILITY_API_SERVICE_NAME = 'dataportability'
API_VERSION = 'v1beta'

In [27]:
# There is a one to one mapping between Data Portability resources and
# dataportability OAuth scopes. The scope code is the resource name plus a
# prefix.
SCOPE_PREFIX = 'https://www.googleapis.com/auth/dataportability.'

In [7]:
# A list of MIME types that are associated with ZIP files.
ZIP_MIME_TYPES = ['application/zip', 'application/x-zip', 'application/x-zip-compressed']

# Define directory for saving non ZIP files
NON_ZIP_FILE_DIRECTORY = 'files'
os.makedirs(NON_ZIP_FILE_DIRECTORY, exist_ok=True)

## get_credentials function
Gets OAuth 2.0 credentials using an installed app OAuth flow.

  This generates a link for the user to consent to some or all of the requested
  resources. In a production environment, the best practice is to save a refresh
  token in Cloud Storage because the access token can expire before the
  portability archive job completes.

  Args:
    resources: A list of dataportability resource IDs. These are OAuth scope
    codes from
      https://developers.devsite.corp.google.com/data-portability/reference/rest/v1beta/portabilityArchive/initiate#authorization-scopes
        without the 'https://www.googleapis.com/auth/dataportability.' prefix.

  Returns:
    A tuple of credentials containing an access token and a list of resources
    for which the user has granted consent.

In [8]:
def get_credentials(
    resources: Sequence[str],
) -> (credentials.Credentials, Sequence[str]):

  flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
      CLIENT_SECRETS_FILE,
      [SCOPE_PREFIX + r for r in resources],
  )
  try:
    return flow.run_local_server(port=8082), resources
  except Warning as warn:
    # We should gracefully handle the user only consenting to a subset of the
    # requested scopes.
    return credentials.Credentials(warn.token['access_token']), [
        scope.removeprefix(SCOPE_PREFIX) for scope in warn.new_scope
    ]

##  get_api_interface function
Gets an interface to the Data Portability API.

It takes a credentials.Credentials object as input and returns a discovery.

The discovery.Resource object can be used to make calls to the Data Portability API.

In [9]:
def get_api_interface(
    creds: credentials.Credentials,
) -> discovery.Resource:

    return discovery.build(
            serviceName=DATAPORTABILITY_API_SERVICE_NAME,
            version=API_VERSION,
            credentials=creds,
            static_discovery=False,
            cache_discovery=False,
    )

## The initiate_portability_archive function
It initiates a portability archive for the requested resources. It takes a dataportability.Resource object and a Sequence of strings as input and returns a string. The string is the archive job ID.

The function first creates a dataportability.portabilityArchive().initiate() object. The body of the initiate() object is a dictionary that contains a list of resources. The resources are the strings that were passed to the function.

The function then calls the execute() method on the initiate() object. The execute() method returns a dictionary that contains the archive job ID. The archive job ID is a unique identifier for the archive job.

The function then returns the archive job ID.

In [10]:
def initiate_portability_archive(
    dataPortability: discovery.Resource, resources: Sequence[str]
) -> str:
  initiate = dataPortability.portabilityArchive().initiate(
      body={'resources': resources}
  )
  print(f'👉 Initiating archive...')
  print(f"🔎 method: {initiate.method} \n🔎 body: {initiate.body}\n🔎 uri: {initiate.uri}")
  initiate_response = initiate.execute()
  print(f"🔎 Initiate response: {initiate_response}")
  return initiate_response['archiveJobId']

## The retry_portability_archive function 
The function used to retry a failed portability archive job. It takes a dataPortability object and a job_id as input parameters. 

The function first creates a retry object by calling the archiveJobs().retry method on the dataPortability object.

The retry object is then used to call the execute method, which returns a retry_response object. The retry_response object contains the archiveJobId of the newly created job.



In [11]:
def retry_portability_archive(
    dataPortability: discovery.Resource, job_id
) -> str:
  retry = dataPortability.archiveJobs().retry(name='archiveJobs/{}'.format(job_id))
  print(f"method: {retry.method} \nbody: {retry.body}\nuri: {retry.uri}")
  retry_response = retry.execute()
  return retry_response['archiveJobId']

## The poll_get_portability_archive_state function
### And exponential_backoff function
The exponential_backoff function is a generator function that yields None values at a gradually increasing rate.


The poll_get_portability_archive_state function will uses the exponential_backoff function to poll the dataportability API's getPortabilityArchiveState endpoint until the state of the archive job is no longer IN_PROGRESS. The function then returns the URLs of the archive files.



In [19]:
def exponential_backoff(
    delay: float, max_delay: float, multiplier: float
) -> Generator[None, None, None]:
    while True:
        time.sleep(delay)
        yield
        delay = min(delay * multiplier, max_delay)


def poll_get_portability_archive_state(
    dataPortability: discovery.Resource, job_id: str
) -> Sequence[str]:
    while True:
        get_state = dataPortability.archiveJobs().getPortabilityArchiveState(
            name='archiveJobs/{}/portabilityArchiveState'.format(job_id)
        )
        print(f'👉 Polling archive status while server indicates state is in progress...')
        print(f'🔎 method: {get_state.method}\n🔎 URI:{get_state.uri} ')
        print("⏱️ IN_PROGRESS: ", end="", flush=True)
        for _ in exponential_backoff(3, 3600, 1.5):
            try:
                state = get_state.execute()
            except Exception as e:
                print(f"❗️ Error: {e}")
                sys.exit()
            if state['state'] == 'IN_PROGRESS':
                print("⬜️", end="", flush=True)
            elif state['state'] == 'FAILED':
                print("❗️ Job failed. Retrying...")
                # Retry and get new job id
                job_id = retry_portability_archive(dataPortability, job_id)
                break  # Break out of the exponential backoff loop to retry with new job ID
            else:
                print(f"\n🔎 State: {state}")
                return state['urls']
        else:
            # If the loop exits normally, without breaking (job completed successfully or failed without retry)
            break

## The reset_authorization function
The function calls the dataportability's reset endpoint. This endpoint resets the authorization for the current user. This is useful if the user has revoked access to the Data Portability API or if the access token has expired.

The resetAuthorization method does the following:

- Revokes all user-granted OAuth scopes
- Allows your application to call InitiatePortabilityArchive for a resource group that you used previously
- Removes access to previous data archives

In [13]:
def reset_authorization(data_portability: discovery.Resource) -> None:
  print('👉 Resetting authorization...')
  reset = data_portability.authorization().reset()
  print(f"🔎 method: {reset.method} URI:{reset.uri}")
  try:
    initiate_response = reset.execute()
    print(f"🔎 Initiate response: {initiate_response}")
  except Exception as e:
    print(f"❗️ Error: {e}")

## The download_files function
The function downloads files from a list of URLs. It saves non-ZIP files and extracts ZIP files into the archive directory. It determines file names using the 'Content-Disposition' header from the response.



In [14]:
def download_files(urls) -> None:
    for url in urls:
        try:
            print(f'👉 Beginning download. {url}')
            url_file = urllib.request.urlopen(url)
            print('👍 Download complete!')
            content_type = url_file.headers.get('Content-Type')
            # Check if the Content-Type is one of the ZIP MIME types
            if content_type in ZIP_MIME_TYPES:
                # Extract the archive.
                print("👉 Extracting archive...")
                zf = zipfile.ZipFile(io.BytesIO(url_file.read()), 'r')
                # Save extracted files in the current directory.
                zf.extractall()
                for f in zf.filelist:
                    print(f"📥 File {f.filename} extracted successfully")
            else:
                content_disposition = url_file.headers.get('Content-Disposition')
                print(f"👇 File is not a ZIP file:\n{url_file.headers}")
                content_disposition = url_file.headers.get('Content-Disposition')
                # extract filename from content-disposition and save the file
                if content_disposition and 'filename=' in content_disposition:
                    filename = content_disposition.split('filename=')[-1].strip('\"')
                    filename = filename.encode('ISO-8859-1').decode('utf-8')
                    print(f"👉 Saving file: {filename}")
                    file_path = os.path.join(NON_ZIP_FILE_DIRECTORY, filename)
                    with open(file_path, 'wb') as f:
                        f.write(url_file.read())
                    print(f"📁 File {filename} saved successfully")
                else:
                    print("❗️ The file from the url is not a zip file and does not have a valid filename.")
        except zipfile.BadZipFile:
            print(f"❗️ The file from the url is not a valid ZIP file or is corrupted.")
        except Exception as e:
            print(e)

# Main code start here:
------------------------

In [None]:
creds, resources = get_credentials(RESOURCES)
print(f'🔎 Obtained OAuth credentials for resources: ', ', '.join(resources))

In [None]:
data_portability = get_api_interface(creds)
job_id = initiate_portability_archive(data_portability, resources)
print(f'🔎 Obtained OAuth credentials for resources: ', ', '.join(resources))
print(f'👍 Successfully initiated data archive job with ID {job_id}')

In [None]:
try:
    urls = poll_get_portability_archive_state(data_portability, job_id)
    print("👍 Data archive is ready.")
    download_files(urls)
except Exception as e:
    print(f"❗️ Error: {e}")

In [None]:
reset_authorization(data_portability)