In [11]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
!pip install office365-REST-Python-Client

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 13, Finished, Available, Finished)



In [None]:
from office365.sharepoint.client_context import ClientContext
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.folders.folder import Folder
from office365.sharepoint.files.file import File
from office365.sharepoint.sharing.role import Role
from office365.sharepoint.sharing.external_site_option import ExternalSharingSiteOption

from office365.sharepoint.sharing.internal import sharing_restrictions
import io, pandas as pd, tempfile


class SharePoint_Connection:
    def __init__(self, client_id: str, client_secret: str, team: str) -> None:
        """
        Constructor to initialize SharePoint_Connection object with client_id, client_secret, and team values.

        Parameters:
            - client_id (str): The client ID used for SharePoint authentication.
            - client_secret (str): The client secret used for SharePoint authentication.
            - team (str): The name of the SharePoint team.

        Returns:
            - None
        """
        self.client_id = client_id
        self.client_secret = client_secret
        self.team = team

    def establish_sharepoint_context(self):
        """
        Establishes a SharePoint context using the provided client_id, client_secret, and team.

        Parameters:
            - self: An instance of the class that contains the method.

        Returns:
            - ctx (ClientContext): A SharePoint client context established using the provided authentication credentials
            (client_id, client_secret) and team information. If successful, the function returns the ClientContext object.
            If an error occurs during the establishment of the SharePoint context, an exception is caught and an error message
            is printed, and the function returns None.
        """

        try:
            # SharePoint site URL based on the company's domain name and team
            site_url = f"https://.sharepoint.com/sites/{self.team}"
            # Authentication context using client_id and client_secret
            context_auth = AuthenticationContext(site_url)
            # Acquire token for the application
            if context_auth.acquire_token_for_app(
                client_id=self.client_id, client_secret=self.client_secret
            ):
                # Create SharePoint client context
                ctx = ClientContext(site_url, context_auth)
                return ctx
        except Exception as e:
            # Print error message if an exception occurs during SharePoint context establishment
            print(f"Error: {type(e).__name__} {e}")
            return None
        
    def folder_details(self, folder_in_sharepoint: str, list_file_input: list):
        ctx = self.establish_sharepoint_context()  
        folder = ctx.web.get_folder_by_server_relative_url(f"Shared Documents/{folder_in_sharepoint}")  
        list_file_end = [] 
        list_file_sharepoint = []
        sub_folders = folder.files   
        ctx.load(sub_folders)  
        ctx.execute_query() 
        for s_folder in sub_folders:    
            list_file_sharepoint.append(s_folder.properties["Name"])

        if len(list_file_input) == 0:
            list_file_end = list_file_sharepoint
        else:
            list_file_end = list(set(list_file_sharepoint).intersection(set(list_file_input)))
        return list_file_end
    
    def create_sharepoint_directory(self, directory_name: str) -> str | None:
        """
        Creates a directory in SharePoint under the 'Shared Documents/General/' path.

        Parameters:
            - directory_name (str): The name of the directory to be created.

        Returns:
            - str: The relative URL of the created directory if successful. If an error occurs during the creation process,
            an error message is printed, and the function returns an empty string.
        """

        if directory_name:
            # Establish SharePoint context
            ctx = self.establish_sharepoint_context()

            # Attempt to create the directory
            try:
                result = ctx.web.folders.add(
                    f"Shared Documents/{directory_name}"
                ).execute_query()
                # If successful, return the relative URL of the created directory
                if result:

                    relative_url = f"Shared Documents/{directory_name}"
                    print(
                        f"{directory_name} directory has been created at '{relative_url}'"
                    )
                    return relative_url
                else:
                    print("Failed to create a folder/directory!")
                    return ""
            except Exception as e:
                # Print error message if an exception occurs during directory creation
                print(f"Error: {type(e).__name__} {e}")
                return ""
        else:
            print("Directory name cannot be empty!")
            return ""
    
    def read_sharepoint_file_as_df(self, file_path: str,sheet_name: str, dtype=None) -> pd.DataFrame:
        """
        Reads a file from SharePoint and returns its content as a Pandas DataFrame.

        Parameters:
            - file_path (str): The path of the file in SharePoint, relative to the 'Shared Documents' directory.
            - dtype (dict or None): Data type specification for columns in the DataFrame (optional).

        Returns:
            - pd.DataFrame: A Pandas DataFrame containing the content of the specified file. If the 'dtype' parameter is provided,
            it is used to specify data types for DataFrame columns during reading.
        """
        # Establish SharePoint context
        ctx = self.establish_sharepoint_context()
        web = ctx.web
        ctx.load(web)
        ctx.execute_query()
        # Download file content
        out = io.BytesIO()
        f = (
            ctx.web.get_file_by_server_relative_url(f"/Shared Documents/{file_path}")
            .download(out)
            .execute_query()
        )
        # Read file content into Pandas DataFrame
        if dtype is not None:
            # If data types are specified, use them during DataFrame creation
            # df = pd.read_csv(out, dtype=dtype)
            df = pd.read_excel(out, dtype=dtype, sheet_name=sheet_name, engine='openpyxl')
        else:
            # Otherwise, read the file without specifying data types
            # df = pd.read_csv(out)
            df = pd.read_excel(out,sheet_name=sheet_name, engine='openpyxl')
        # Close the BytesIO stream
        out.close()
        return df
    
    def read_sharepoint_csv_as_df(self, file_path: str, dtype=None) -> pd.DataFrame:
        """
        Reads a file from SharePoint and returns its content as a Pandas DataFrame.

        Parameters:
            - file_path (str): The path of the file in SharePoint, relative to the 'Shared Documents' directory.
            - dtype (dict or None): Data type specification for columns in the DataFrame (optional).

        Returns:
            - pd.DataFrame: A Pandas DataFrame containing the content of the specified file. If the 'dtype' parameter is provided,
            it is used to specify data types for DataFrame columns during reading.
        """
        # Establish SharePoint context
        ctx = self.establish_sharepoint_context()
        web = ctx.web
        ctx.load(web)
        ctx.execute_query()
        # Download file content
        out = io.BytesIO()
        f = (
            ctx.web.get_file_by_server_relative_url(f"/Shared Documents/{file_path}")
            .download(out)
            .execute_query()
        )
        # Read file content into Pandas DataFrame
        if dtype is not None:
            # If data types are specified, use them during DataFrame creation
            df = pd.read_csv(out, dtype=dtype)
            # df = pd.read_excel(out, dtype=dtype
        else:
            # Otherwise, read the file without specifying data types
            df = pd.read_csv(out)
            # df = pd.read_excel(out,sheet_name=sheet_name)
        # Close the BytesIO stream
        out.close()
        return df

    def write_bytefile_to_sharepoint(
        self, file_path: str, file_name: str, file_bytes: bytes
    ) -> None:
        """
        Writes a byte file to SharePoint in the specified folder with the given file name.

        Parameters:
            - file_path (str): The path of the folder in SharePoint where the file should be written, relative to the 'Shared Documents' directory.
            - file_name (str): The name to be given to the file in SharePoint.
            - file_bytes (bytes): The content of the file as a bytes object.

        Returns:
            - None
        """
        # Establish SharePoint context
        ctx = self.establish_sharepoint_context()
        # Get the SharePoint folder by server-relative URL
        folder: Folder = ctx.web.get_folder_by_server_relative_url(
            f"Shared Documents/{file_path}"
        )
        # Chunk size for uploading
        chunk_size: int = 500000

        # Check if the file already exists
        file: File = folder.files.get_by_url(file_name)

        if file.exists:

            # If the file exists, delete it
            file.delete_object().execute_query()

        # Create a temporary file and write the bytes to it
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_file.write(file_bytes)

        # Use the temporary file for uploading
        with open(temp_file.name, "rb") as file_to_upload:
            folder.files.create_upload_session(
                file=file_to_upload, chunk_size=chunk_size, file_name=file_name
            ).execute_query()
        # Print success message
        print(f"{file_name} has been uploaded successfully!")

    
    def grant_users(
        self, folder_in_sharepoint: str, email: str, role: str
    ) -> None:
        '''
            Grants permission to a user for a specific folder in SharePoint.

            This method establishes a SharePoint context and grants the specified 
            permission (e.g., 'View' or 'Edit') to a user for a target folder located 
            under 'Shared Documents'.

    
            - folder_in_sharepoint (str): The relative path to the folder within 
                'Shared Documents' where access should be granted.
            - email (str): The email address of the user to be granted access.
            - role (str): The level of access to grant. Acceptable values include 
                'View' or 'Edit'.
        '''
        ctx = self.establish_sharepoint_context()
        folder_url = f"Shared Documents/{folder_in_sharepoint}"
        folder = ctx.web.get_folder_by_server_relative_url(folder_url)
        folder_item = folder.list_item_all_fields
        ctx.load(folder_item)
        ctx.execute_query()

        # 4. Gọi chia sẻ
        try:
            result = folder_item.share(
            
                user_principal_name=email,
                share_option = ExternalSharingSiteOption.View if role == "View" else ExternalSharingSiteOption.Edit,
                send_email=True,
                email_subject=f"Grant Access to Folder: {folder_in_sharepoint}",
                email_body=f"You have been granted '{role}' access to the folder '{folder_in_sharepoint}'. Click the link to access it."
            )

            ctx.execute_query()
            print(f"✅ Đã chia sẻ '{role}' tới: {email}")


      
        except Exception as e:
            print(f"❌ Lỗi khi chia sẻ tới {email}: {str(e)}")


    def share_link(
        self, folder_in_sharepoint: str
    ) -> None:
        '''
            role: View, Edit
        '''
        ctx = self.establish_sharepoint_context()
        folder_url = f"Shared Documents/{folder_in_sharepoint}"
        folder = ctx.web.get_folder_by_server_relative_url(folder_url)


        a = folder.get_sharing_information()

        print(a)

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 14, Finished, Available, Finished)

In [13]:
import getpass
import pyspark
importing_user = getpass.getuser()

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 15, Finished, Available, Finished)

In [None]:
import requests
def send_workflow(text):
    workflow_url = 'https://prod-26.southeastasia.logic.azure.com:443/workflows/'
    payload = {
        "type": "message",
        "text": text
    }
    response = requests.post(workflow_url, json=payload)
    if response.status_code == 200:
        print("Webhook sent successfully!")
    elif response.status_code == 202:
        print("Webhook accepted for processing!")
    else:
        print(f"Failed to send webhook. Status code: {response.status_code}")
        
        print("Response content:", response.content)

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 16, Finished, Available, Finished)

In [None]:
CLIENT_ID = 'd0635947-8f06-4353'
CLIENT_SECRET = 'RXFOOFF+RnM5Sml4RHcyZW1Cg=='
TEAM = 'DagsterAndFabric'

connection = SharePoint_Connection(CLIENT_ID, CLIENT_SECRET, TEAM)

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 17, Finished, Available, Finished)

In [16]:
folder_shapoint_path = 'Tutram'
list_file = []
connection.folder_details(folder_shapoint_path, list_file)

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 18, Finished, Available, Finished)

['fact.xlsx', 'dim_date.xlsx', 'dim_store.xlsx', 'dim_customer.xlsx']

In [17]:
# --- Dictionary cho các bảng dimension ---
dim_file_dictionary = {
    'Date': "Tutram/dim_date.xlsx",
    'Store': "Tutram/dim_store.xlsx",
    'Customer': "Tutram/dim_customer.xlsx"
}

# --- Dictionary cho bảng fact ---
fact_file_dictionary = {
    'Sales': "Tutram/fact.xlsx"
}


StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 19, Finished, Available, Finished)

In [18]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 20, Finished, Available, Finished)

In [19]:
i = 1
for table_name, path_file in fact_file_dictionary.items():
    print(f"{i}. Processing {table_name}: {path_file}")
    df = connection.read_sharepoint_file_as_df(path_file, 'Sheet1')
    # Thêm metadata
    df['FilePath'] = path_file
    df['User'] = importing_user
    # Nếu có cột 'transaction_time' thì chuẩn hoá (phòng trường hợp có)
    if 'transaction_time' in df.columns:
        df["transaction_time"] = (
            pd.to_timedelta(df["transaction_time"].astype(str).str.strip())
            .dt.total_seconds()
        )
    # Tạo Spark DataFrame
    spark_df = spark.createDataFrame(df)
    # Ghi dữ liệu vào Delta Table
    spark_df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(f'FACT_{table_name.upper()}')
    print(f"✅ Transform {table_name} with {spark_df.count()} rows successfully\n")
    i += 1


StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 21, Finished, Available, Finished)

1. Processing Sales: Tutram/fact.xlsx
✅ Transform Sales with 1048575 rows successfully



In [20]:
i = 1
for table_name, path_file in dim_file_dictionary.items():
    print(f"{i}. Processing DIM_{table_name}: {path_file}")
    # Đọc dữ liệu từ SharePoint (sheet 'data' hoặc 'Sheet1' tùy file)
    df = connection.read_sharepoint_file_as_df(path_file, 'Sheet1')
    # Thêm metadata
    df['FilePath'] = path_file
    df['User'] = importing_user
    # Tạo Spark DataFrame từ Pandas DataFrame
    spark_df = spark.createDataFrame(df)
    # Ghi dữ liệu vào Delta Table trong Lakehouse
    spark_df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(f'DIM_{table_name.upper()}')
    print(f"✅ Transform DIM_{table_name} with {spark_df.count()} rows successfully\n")
    i += 1

StatementMeta(, 09402fd4-0f51-4ebe-a20c-6046f9272414, 22, Finished, Available, Finished)

1. Processing DIM_Date: Tutram/dim_date.xlsx
✅ Transform DIM_Date with 466 rows successfully

2. Processing DIM_Store: Tutram/dim_store.xlsx
✅ Transform DIM_Store with 135 rows successfully

3. Processing DIM_Customer: Tutram/dim_customer.xlsx
✅ Transform DIM_Customer with 583642 rows successfully

