### Install Modules


Install all dependencies inside requirements.txt and other dependencies


In [None]:
!py -m pip install -r requirements.txt

# Install the Azure Identity library
!py -m pip install azure-identity

# Install Dotenv for environment variable management
!py -m pip install python-dotenv

# Install nest_asyncio to allow nested event loops in Jupyter
!py -m pip install nest_asyncio

# Install pytz for timezone handling
!py -m pip install pytz


### Authenticate and Initialize Graph


In [None]:
# Get the keys from .env file
import os
from dotenv import load_dotenv
load_dotenv()

# Get the client ID and secret from environment variables
tenant_id = os.getenv("AZURE_TENANT_ID")
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")
current_user_id = os.getenv("USER_ID")

# Display the values (redacted for security)
print(f"Client ID: {client_id[:5]}... (redacted)" if client_id else "Client ID: Not found")
print(f"Client Secret: {'*' * 8}" if client_secret else "Client Secret: Not found")
print(f"Tenant ID: {tenant_id[:5]}... (redacted)" if tenant_id else "Tenant ID: Not found")

### Initialize Graph Client

In [None]:
# Configure Graph client for app-only authentication 
from azure.identity import ClientSecretCredential
from msgraph import GraphServiceClient

# Verify that environment variables are set before proceeding
if not all([client_id, client_secret, tenant_id]):
    raise ValueError("One or more required environment variables are not set. Please check your .env file.")

# Print the type of the env variables to ensure they are strings
print(f"Tenant ID: {tenant_id} (Type: {type(tenant_id)})")
print(f"Client ID: {client_id} (Type: {type(client_id)})")
print(f"Client Secret: {client_secret} (Type: {type(client_secret)})")

credential = ClientSecretCredential(
    tenant_id=tenant_id,
    client_id=client_id,
    client_secret=client_secret
)

# Create a Graph client using the credential object
graph_client = GraphServiceClient(credential)
print("Graph client initialized successfully")

### Get Token


In [None]:
# Function to get an access token using ClientSecretCredential
def get_token():
    try:
        # Use the default scope for Microsoft Graph
        graph_scope = "https://graph.microsoft.com/.default"
        
        # Get the token synchronously
        token = credential.get_token(graph_scope)
        return token.token
    except Exception as e:
        print(f"Error obtaining token: {e}")
        raise

### Display Token


In [None]:
# Display the access token
def display_access_token():
    try:
        # Get the access token
        token = get_token()
        
        # Display a part of the access token for security
        if token:
            token_length = len(token)
            print(f"Access Token: {token[:25]}...{token[-5:]}")
            print(f"Token length: {token_length} characters")
            print("\nWARNING: This token contains sensitive credentials.")
            print("Do not share or commit this token to version control.")
        else:
            print("Failed to retrieve token.")
            
        return token
    except Exception as e:
        print(f"Error retrieving token: {e}")
        return None

In [None]:
# Get and display the token
# token = display_access_token()

In [None]:
# Display the raw token (for debugging purposes)
def display_raw_token():
    try:
        token = get_token()
        print(token)
        print(f"\nRaw token displayed. Length: {len(token)} characters")
        return token
    except Exception as e:
        print(f"Error displaying raw token: {e}")
        return None

In [None]:
# Uncomment the line below to see the full raw token (use with caution)
# raw_token = display_raw_token()

### List Users


Get users v1

In [None]:
import asyncio

async def get_users_async():
    try:
        # Get the list of users with the async approach
        users_response = await graph_client.users.get()
        
        # Print the user details
        print(f"Retrieved {len(users_response.value)} users")
        for user in users_response.value:
            # Use proper property access for user objects
            display_name = user.display_name if hasattr(user, 'display_name') else 'No display name'
            email = user.mail if hasattr(user, 'mail') and user.mail else user.user_principal_name
            print(f"User ID: {user.id}, Name: {display_name}, Email: {email}")
        
        return users_response.value
    except Exception as e:
        print(f"Error retrieving users: {e}")
        import traceback
        traceback.print_exc()
        return None

Get users v2

In [None]:
from msgraph.generated.users.users_request_builder import UsersRequestBuilder

async def get_users_async_v2():
    try:
        # Set the top limit for the number of users to retrieve
        topLimit = 50

        # Build query parameters for Users Request Builder
        query_params = UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
            select = ['displayName', 'id', 'mail'],
            top = topLimit,
            orderby = ['displayName desc']
        )

        # create a request_config object
        request_config = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
            query_parameters=query_params
        )

        # Get the list of users with the async approach
        users_response = await graph_client.users.get(request_configuration=request_config)
        
        # Print the user details (limited to 10 for display purposes)
        count = 0
        for user in users_response.value:
            print(f"User ID: {user.id}, Display Name: {user.display_name}, Email: {user.mail or user.user_principal_name}")
            count += 1
            if count >= topLimit:
                break
        
        return users_response.value
    except Exception as e:
        print(f"Error retrieving users: {e}")
        return None

In [None]:
# Import required libraries
import asyncio
import nest_asyncio

# Apply nest_asyncio to allow running asyncio in Jupyter notebook
nest_asyncio.apply()

# Display Users using async execution
try:
    # Use asyncio directly now that we've patched it with nest_asyncio
    loop = asyncio.get_event_loop()

    users = loop.run_until_complete(get_users_async())
    # users = loop.run_until_complete(get_users_async_v2())

    print(f"\nTotal users retrieved: {len(users) if users else 0}")
except Exception as e:
    print(f"Error executing async function: {e}")
    
    import traceback
    traceback.print_exc()

Tabulate Data

In [None]:
from tabulate import tabulate

def tabulate_calendar_events(events):
    table_data = []
    for i, event in enumerate(events, 1):
        subject = event.subject or "(No subject)"
        organizer = event.organizer.email_address.name if event.organizer and hasattr(event.organizer, 'email_address') else "Unknown"

        start_time_dt_pst = convert_to_pst(event.start.date_time)
        start_time = start_time_dt_pst.strftime("%Y-%m-%d %H:%M:%S %Z")

        end_time_dt_pst = convert_to_pst(event.end.date_time)
        end_time = end_time_dt_pst.strftime("%Y-%m-%d %H:%M:%S %Z")

        location = event.location.display_name if event.location else "No location"
        is_all_day = "Yes" if event.is_all_day else "No"
        importance = event.importance.capitalize() if event.importance and event.importance != 'normal' else "Normal"

        attendees = ", ".join(
            f"{a.email_address.name} ({a.email_address.address})"
            for a in event.attendees
        ) if hasattr(event, 'attendees') and event.attendees else "None"

        table_data.append([
            i, subject, organizer, start_time, end_time,
            location, is_all_day, importance, attendees
        ])

    headers = ["#", "Subject", "Organizer", "Start Time", "End Time", "Location", "All Day", "Importance", "Attendees"]
    print(tabulate(table_data, headers=headers, tablefmt="grid"))


### Get Calendar View

Retrieve calendar events within a specific time range


In [None]:
import datetime
import pytz
from msgraph.generated.users.item.calendar.calendar_view.calendar_view_request_builder import CalendarViewRequestBuilder

def convert_to_pst(date_time_str):
    utc_timezone = pytz.timezone("UTC")
    dt = datetime.datetime.fromisoformat(date_time_str)
    dt_utc = utc_timezone.localize(dt)
    pst_timezone = pytz.timezone('America/Los_Angeles')
    dt_pst = dt_utc.astimezone(pst_timezone)
    return dt_pst

async def get_calendar_view(user_id=None):
    try:
        # Create datetime objects for the date range
        now = datetime.datetime.now(pytz.timezone("America/Los_Angeles"))
        start_date = now
        end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
        
        # Create query parameters for the calendar view request
        query_parameters = CalendarViewRequestBuilder.CalendarViewRequestBuilderGetQueryParameters(
            start_date_time=start_date,
            end_date_time=end_date,
            top=10,
            select=['subject', 'organizer', 'start', 'end', 'isAllDay', 'location', 'bodyPreview', 'importance', 'attendees'],
            orderby=['start/dateTime asc']
        )

        # Check if we're getting calendar for a specific user or the current user
        print(f"Fetching calendar events for user {user_id} from {start_date} to {end_date}")
        
        # For specific user
        # Step 1: Create request parameters
        request_config = CalendarViewRequestBuilder.CalendarViewRequestBuilderGetRequestConfiguration(
            query_parameters=query_parameters
        )

        # Step 3: Make the call
        calendar_view = await graph_client.users.by_user_id(user_id).calendar.calendar_view.get(request_configuration=request_config)
                
        # Process and display the events
        if hasattr(calendar_view, 'value') and calendar_view.value:
            print(f"Retrieved {len(calendar_view.value)} calendar events.")
            tabulate_calendar_events(calendar_view.value)
        else:
            print("No events found in the specified time range.")
        return calendar_view.value if hasattr(calendar_view, 'value') else []
    except Exception as e:
        print(f"Error retrieving calendar view: {e}")
        import traceback
        traceback.print_exc()
        return None

Display Calendar Events

In [None]:
# Import required libraries
import asyncio
import nest_asyncio

# Apply nest_asyncio to allow running asyncio in Jupyter notebook
nest_asyncio.apply()

try:
    # Use asyncio directly now that we've patched it with nest_asyncio
    loop = asyncio.get_event_loop()

    # Using the nest_asyncio approach we established earlier
    user_calendar_events = loop.run_until_complete(get_calendar_view(user_id=current_user_id))
    
    # Summary of results
    if user_calendar_events:
        print(f"\nSummary: Retrieved {len(user_calendar_events)} calendar events for user {current_user_id}.")
    else:
        print(f"No calendar events were retrieved for user {current_user_id} or an error occurred.")
except Exception as e:
    print(f"Error executing calendar view function for specific user: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Create a function to take the current date and time and create a start and end date where the start date is now and the end date is today at 12 midinght
def get_today_start_end():
    # Get the current date and time
    now = datetime.datetime.now(pytz.timezone("America/Los_Angeles"))
    
    # Create start and end dates
    start_date = now
    end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
    
    return start_date, end_date

# Test the function
start_date, end_date = get_today_start_end()
print(f"Start date: {start_date}")
print(f"Dst: {start_date.dst()}")  
print(f"UtcOffset: {start_date.utcoffset()}")  
print(f"End date: {end_date}")
print(f"Dst: {end_date.dst()}")  
print(f"UtcOffset: {end_date.utcoffset()}")  

Create Event Function

In [None]:
from datetime import datetime
import pytz
from msgraph.generated.models.event import Event
from msgraph.generated.models.item_body import ItemBody
from msgraph.generated.models.body_type import BodyType
from msgraph.generated.models.date_time_time_zone import DateTimeTimeZone
from msgraph.generated.models.attendee import Attendee
from msgraph.generated.models.email_address import EmailAddress
from msgraph.generated.models.location import Location
from msgraph.generated.models.patterned_recurrence import PatternedRecurrence
from msgraph.generated.models.recurrence_pattern import RecurrencePattern
from msgraph.generated.models.recurrence_range import RecurrenceRange
from msgraph.generated.models.recurrence_pattern_type import RecurrencePatternType
from msgraph.generated.models.importance import Importance
from msgraph.generated.models.response_status import ResponseStatus

async def create_calendar_event(
    user_id: str,
    subject: str,
    content: str,
    start_datetime: datetime,
    end_datetime: datetime,
    timezone: str = 'Asia/Manila',
    is_online_meeting: bool = False,
    attendees: list = None,
    location: str = None,
    is_all_day: bool = False,
    importance: Importance = Importance.Normal,
    recurrence: dict = None
):
    """
    Create a new calendar event for the specified user (application-level).
    """
    try:
        if not start_datetime.tzinfo:
            start_datetime = pytz.timezone(timezone).localize(start_datetime)
        if not end_datetime.tzinfo:
            end_datetime = pytz.timezone(timezone).localize(end_datetime)

        event = Event()
        event.subject = subject

        event.body = ItemBody()
        event.body.content_type = BodyType.Html
        event.body.content = content

        event.start = DateTimeTimeZone()
        event.start.date_time = start_datetime.isoformat()
        event.start.time_zone = "Pacific Standard Time"

        event.end = DateTimeTimeZone()
        event.end.date_time = end_datetime.isoformat()
        event.end.time_zone = "Pacific Standard Time"

        event.is_all_day = is_all_day
        event.importance = importance

        if location:
            event.location = Location()
            event.location.display_name = location

        if attendees:
            event.attendees = []
            for email in attendees:
                attendee = Attendee()
                attendee.email_address = EmailAddress(address=email)
                attendee.status = ResponseStatus(response="none")
                event.attendees.append(attendee)

        event.is_online_meeting = is_online_meeting
        if is_online_meeting:
            event.online_meeting_provider = "teamsForBusiness"

        if recurrence:
            pattern = RecurrencePattern()
            pattern.type_ = RecurrencePatternType(recurrence.get('type', 'daily'))
            pattern.interval = recurrence.get('interval', 1)
            if pattern.type_ == RecurrencePatternType.Weekly:
                pattern.days_of_week = recurrence.get('days_of_week', [])

            range_ = RecurrenceRange()
            range_.type_ = recurrence.get('range_type', 'noEnd')
            range_.start_date = start_datetime.date()

            if range_.type_ == 'endDate':
                range_.end_date = recurrence.get('end_date')
            elif range_.type_ == 'numbered':
                range_.number_of_occurrences = recurrence.get('occurrences', 5)

            event.recurrence = PatternedRecurrence(pattern=pattern, range=range_)

        created_event = await graph_client.users.by_user_id(user_id).events.post(event)
        print(f"Event created successfully: {created_event.id}")
        print(f"Web link: {created_event.web_link}")

        return created_event

    except Exception as e:
        print(f"Error creating event: {e}")
        import traceback
        traceback.print_exc()
        return None


Create the actual event

In [None]:
# Example usage
import asyncio
import nest_asyncio
from datetime import datetime, timedelta

# Apply nest_asyncio to enable asyncio in Jupyter
nest_asyncio.apply()

# Create a test event
tz = pytz.timezone("Asia/Manila")
now = datetime.now(tz)
start_time = tz.localize(datetime(now.year, now.month, now.day, 16, 0)) #4pm
end_time = start_time + timedelta(hours=1)

#This is a sample payload
event_data = {
    "subject": "Test create event function",
    "content": "<p>This is a test meeting for creating event</p>",
    "start_datetime": start_time,
    "end_datetime": end_time,
    "is_online_meeting": True,
    "attendees": ["sgeraldino@jairosoft.com"],
    "location": "Jairosoft",
    "importance": "Normal",
    "recurrence": None,
    "user_id": current_user_id  # replace with actual user ID
}

event = await create_calendar_event(**event_data)