In [81]:
import logging
from google.oauth2 import service_account
from googleapiclient.discovery import build

# Configure logging
logging.basicConfig(filename='app.log', filemode='w', level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')

# Path to your Service Account key file
key_file_path = 'directory-test-387900-a9df781feda6.json'

# Load the Service Account credentials
creds = service_account.Credentials.from_service_account_file(
    key_file_path,
    scopes=['https://www.googleapis.com/auth/admin.directory.user', 'https://www.googleapis.com/auth/contacts'])

# Create a service object for the admin user
admin_creds = creds.with_subject('jason.c.lin@gws.jasonengr.com')  # Replace with your Google Workspace admin user
service_admin = build('admin', 'directory_v1', credentials=admin_creds)

# Get all users
results = service_admin.users().list(customer='C03objz98').execute()
users = results.get('users', [])


In [76]:
users

[{'kind': 'admin#directory#user',
  'id': '107310617708040826552',
  'etag': '"82YgmLpRVC5iA-bU28dUqy2uCKS7KMlDQDk3vW-TM2Y/H77ILrCMOaHZfr-V6py3-Z52ud0"',
  'primaryEmail': 'jason.c.lin@gws.jasonengr.com',
  'name': {'givenName': '家任', 'familyName': '林', 'fullName': '林家任'},
  'isAdmin': True,
  'isDelegatedAdmin': False,
  'lastLoginTime': '2023-05-26T05:06:59.000Z',
  'creationTime': '2023-05-25T08:59:54.000Z',
  'agreedToTerms': True,
  'suspended': False,
  'archived': False,
  'changePasswordAtNextLogin': False,
  'ipWhitelisted': False,
  'emails': [{'address': 'jason.c.lin@gws.jasonengr.com', 'primary': True},
   {'address': 'jason.c.lin@gws.jasonengr.com.test-google-a.com'}],
  'languages': [{'languageCode': 'en', 'preference': 'preferred'}],
  'nonEditableAliases': ['jason.c.lin@gws.jasonengr.com.test-google-a.com'],
  'customerId': 'C03objz98',
  'orgUnitPath': '/',
  'isMailboxSetup': True,
  'isEnrolledIn2Sv': False,
  'isEnforcedIn2Sv': False,
  'includeInGlobalAddressList':

In [80]:
# For each user
for user in users:
    # Delegating authority to the service account to impersonate the current user
    user_creds = creds.with_subject(user['primaryEmail'])
    service = build('people', 'v1', credentials=user_creds)

    # Get the user's contact list
    contact_list = service.people().connections().list(resourceName='people/me', personFields='names,emailAddresses,occupations,organizations,phoneNumbers').execute().get('connections', [])

    # For each other user
    for contact in users:
        if contact['primaryEmail'] != user['primaryEmail']:
            # Prepare contact info
            contact_info = {
                'names': [{'givenName': contact['name']['fullName']}],
                'emailAddresses': [{'value': contact['primaryEmail']}],
            }
            if 'organizations' in contact and contact['organizations']:
                if 'title' in contact['organizations'][0]:
                    contact_info['organizations'] = [{'title': contact['organizations'][0]['title']}]
                if 'orgUnitPath' in contact:
                    if contact_info.get('organizations'):
                        contact_info['organizations'][0]['name'] = contact['orgUnitPath']
                    else:
                        contact_info['organizations'] = [{'name': contact['orgUnitPath']}]
            if 'phones' in contact and contact['phones']:
                contact_info['phoneNumbers'] = [{'value': contact['phones'][0]['value']}]

            # Check if the contact already exists in the user's contact list
            if not any(c for c in contact_list if c.get('emailAddresses', [{}])[0].get('value') == contact['primaryEmail']):
                # Add to user's contact list
                service.people().createContact(body=contact_info).execute()
                # Log successful contact creation
                logging.info(f"Added contact: {contact['primaryEmail']} to {user['primaryEmail']}")
                
    # For each contact email
    for contact_email in contact_emails:
        # If the contact email does not exist in the Directory
        if not any(u for u in users if u['primaryEmail'] == contact_email):
            # Find the contact in the contact list
            contact = next((c for c in contact_list if c.get('emailAddresses', [{}])[0].get('value') == contact_email), None)
            if contact:
                # Delete the contact
                service.people().deleteContact(resourceName=contact['resourceName']).execute()
                # Log successful contact deletion
                logging.info(f"Deleted contact: {contact_email} from {user['primaryEmail']}")


In [77]:
contact_emails

['jason.c.lin@gws.jasonengr.com',
 'test01@gws.jasonengr.com',
 'test02@gws.jasonengr.com',
 'test03@gws.jasonengr.com',
 'test04@gws.jasonengr.com',
 'test05@gws.jasonengr.com',
 'test06@gws.jasonengr.com',
 'test07@gws.jasonengr.com',
 'test08@gws.jasonengr.com',
 'test09@gws.jasonengr.com',
 'test10@gws.jasonengr.com',
 'test11@gws.jasonengr.com',
 'test12@gws.jasonengr.com',
 'test13@gws.jasonengr.com',
 'test14@gws.jasonengr.com',
 'test15@gws.jasonengr.com',
 'test16@gws.jasonengr.com',
 'test17@gws.jasonengr.com',
 'test18@gws.jasonengr.com',
 'test19@gws.jasonengr.com',
 'test20@gws.jasonengr.com',
 'test21@gws.jasonengr.com']

In [52]:
contact_list

[{'resourceName': 'people/c5994386937824914048',
  'etag': '%EgsBAgkLDA0uNz0+PxoEAQIFByIMdlVDdmd2VDAwRmc9',
  'names': [{'metadata': {'primary': True,
     'source': {'type': 'CONTACT', 'id': '533057290b4baa80'}},
    'displayName': '林家任',
    'givenName': '林家任',
    'displayNameLastFirst': '林家任',
    'unstructuredName': '林家任'}],
  'emailAddresses': [{'metadata': {'primary': True,
     'source': {'type': 'CONTACT', 'id': '533057290b4baa80'}},
    'value': 'jason.c.lin@gws.jasonengr.com'}]},
 {'resourceName': 'people/c1149295291795243003',
  'etag': '%EgsBAgkLDA0uNz0+PxoEAQIFByIMNlhUbFdKUU52Mzg9',
  'names': [{'metadata': {'primary': True,
     'source': {'type': 'CONTACT', 'id': 'ff31dfa897bfffb'}},
    'displayName': 'Peter Lin',
    'givenName': 'Peter Lin',
    'displayNameLastFirst': 'Peter Lin',
    'unstructuredName': 'Peter Lin'}],
  'emailAddresses': [{'metadata': {'primary': True,
     'source': {'type': 'CONTACT', 'id': 'ff31dfa897bfffb'}},
    'value': 'peter@gws.jasonengr.c

In [82]:
# For each user
for user in users:
    # Delegating authority to the service account to impersonate the current user
    user_creds = creds.with_subject(user['primaryEmail'])
    service = build('people', 'v1', credentials=user_creds)

    # Get the user's contact list
    connections = service.people().connections().list(resourceName='people/me', personFields='names,emailAddresses,occupations,organizations,phoneNumbers').execute()
    contact_list = connections.get('connections', [])

    # Get the email addresses of all contacts
    contact_emails = [contact.get('emailAddresses', [{}])[0].get('value') for contact in contact_list]

    # For each other user
    for contact in users:
        if contact['primaryEmail'] != user['primaryEmail']:
            # Prepare contact info
            contact_info = {
                'names': [{'givenName': contact['name']['fullName']}],
                'emailAddresses': [{'value': contact['primaryEmail']}],
            }
            if 'organizations' in contact and contact['organizations']:
                if 'title' in contact['organizations'][0]:
                    contact_info['organizations'] = [{'title': contact['organizations'][0]['title']}]
                if 'orgUnitPath' in contact:
                    if contact_info.get('organizations'):
                        contact_info['organizations'][0]['name'] = contact['orgUnitPath']
                    else:
                        contact_info['organizations'] = [{'name': contact['orgUnitPath']}]
            if 'phones' in contact and contact['phones']:
                contact_info['phoneNumbers'] = [{'value': contact['phones'][0]['value']}]

            # Check if the contact already exists in the user's contact list
            if not any(c for c in contact_list if c.get('emailAddresses', [{}])[0].get('value') == contact['primaryEmail']):
                # Add to user's contact list
                service.people().createContact(body=contact_info).execute()
                # Log successful contact creation
                logging.info(f"Added contact: {contact['primaryEmail']} to {user['primaryEmail']}")

    # For each contact email
    for contact_email in contact_emails:
        # If the contact email does not exist in the Directory
        if not any(u for u in users if u['primaryEmail'] == contact_email):
            # Find the contact in the contact list
            contact = next((c for c in contact_list if c.get('emailAddresses', [{}])[0].get('value') == contact_email), None)
            if contact:
                # Delete the contact
                service.people().deleteContact(resourceName=contact['resourceName']).execute()
                # Log successful contact deletion
                logging.info(f"Deleted contact: {contact_email} from {user['primaryEmail']}")
