# Transform data by using Spark

This notebook transforms sales order data; converting it from CSV to Parquet format and splitting customer name into two separate fields.

## Set variables

In [None]:
import zipfile
import pandas as pd
import datetime
import os

## Load source data

Let's start by loading some historical sales order data into a dataframe.

In [None]:
from azure.storage.blob import BlobServiceClient, BlobClient
import zipfile
# Create a connection to your Azure Blob Storage account
blob_service_client = BlobServiceClient.from_connection_string('<your-connection-string>')

# Specify the name of your container and the path to the zip file
container_name = 'files'
zip_file_path = 'enrollmentdb'

import datetime
# Generate a unique folder name using a timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
folder_name = f"converteddata_{timestamp}"

# Specify the path where to extract the contents of the zip file
extract_path = f"{folder_name}/"

In [None]:
# Download the zip file to a local temporary file
local_zip_path = 'temp.zip'
blob_client = blob_service_client.get_blob_client(container=container_name, blob=zip_file_path)
with open(local_zip_path, 'wb') as file:
    file.write(blob_client.download_blob().readall())

## Transform the data structure

The source data includes a **CustomerName** field, that contains the customer's first and last name. Modify the dataframe to separate this field into separate **FirstName** and **LastName** fields.

In [None]:
import os

# Iterate over the extracted files
for file_name in zip_ref.namelist():
    # Skip directories
    if not file_name.endswith('/'):
        # Check if the file has the extension '.mdb' or '.accdb'
        if file_name.endswith('.mdb') or file_name.endswith('.accdb'):
            # Read the Microsoft Access database file into a pandas DataFrame
            access_file_path = f'{extract_path}/{file_name}'
            df = pd.read_csv(access_file_path, delimiter=';')

            # Specify the path to save the CSV file
            csv_file_path = f'{extract_path}/{file_name}.csv'

            # Save the DataFrame as a CSV file
            df.to_csv(csv_file_path, index=False)

## Save the transformed data

Now save the transformed dataframe in Parquet format in a folder specified in a variable (Overwriting the data if it already exists).

In [None]:
# Upload the converted CSV file to Azure Blob Storage
        blob_name = f'{folder_name}/{file_name}.csv'
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        with open(csv_file_path, "rb") as data:
            blob_client.upload_blob(data)

        # Clean up the local CSV file
        os.remove(csv_file_path)

In [None]:
# Clean up the extracted folder
shutil.rmtree(extract_path)