**Script Libraries & Modules**
- Imports libraries for API requests, XML parsing, and date handling.
- Includes Spark libraries for data processing, schema definition, and transformations.

In [None]:
import requests 
import xml.etree.ElementTree as ET
from pytz import timezone 
from datetime import datetime, timedelta
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_sub
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType, TimestampType, StructType, StructField, StringType, DateType, FloatType
from xml.etree.ElementTree import Element, SubElement, tostring
from pyspark.sql.functions import col, to_timestamp, from_utc_timestamp

## Contacts
**Automates the process of retrieving, processing, and saving contact data for various vineyard brands
1. **API Requests**: Sends requests to the source API for contact data.
2. **Data Processing**:
   - Processes contact data in 2-year date chunks.
   - Handles multiple pages per chunk.
3. **Brand-Specific Data**:
   - Adds a `BrandName` column to differentiate brands.
   - Filters out rows with missing `FirstName`, `LastName`, `Email`, or `LastOrderDate`.
4. **Final Data Storage**:
   - Combines data from all brands into a single dataset.
   - Saves the final dataset as a Delta table for analysis.

In [None]:
# Initialize Spark session - setting up the environment for processing data
spark = SparkSession.builder.appName("ContactsData").getOrCreate()

# Defining the schema
# Structuring of the incoming data, specifying column names and their data types
schema = StructType([
    StructField("ContactID", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("LastOrderDate", StringType(), True),
    StructField("LifetimeValue", StringType(), True)
])

# SOAP request template 
# This function creates the XML request required to interact with the external service
def xml_template(page_num, date_from, date_to, website_id, username, password):
    return f"""
    <soapenv:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v300="http://V300">
        <soapenv:Body>
            <SearchContacts>
                <Request>
                    <Security>
                        <Username>{username}</Username>
                        <Password>{password}</Password>
                    </Security>
                    <WebsiteIDs>{website_id}</WebsiteIDs>
                    <DateModifiedFrom>{date_from}</DateModifiedFrom><DateModifiedTo>{date_to}</DateModifiedTo>
                    <SortBy>LastName</SortBy>
                    <Page>{page_num}</Page>
                </Request>
            </SearchContacts>
        </soapenv:Body>
    </soapenv:Envelope>
    """

# Retrieving data from the API for a specific page and sending a SOAP request with the required parameters and parses the response
def fetch_data(page_num, date_from, date_to, website_id, username, password):
    url = "https://api.example.com/ContactService?wsdl"  # Example API endpoint
    headers = {         # Required headers for the SOAP request
        "Content-Type": "text/xml",
        "charset": "utf-8",
        "SOAPAction": "SearchContacts"
    }
    # Sending the SOAP request
    response = requests.post(url, data=xml_template(page_num, date_from, date_to, website_id, username, password), headers=headers)
    if response.status_code == 200:             # Check if the response is successful
        root = ET.fromstring(response.text)     # Parse the XML response
        contacts = root.findall('.//Contacts/Contacts')     # Locating contacts data in the website
        # Extracting the contacts information into a dictionary
        return [{elem.tag.split('}')[-1]: elem.text for elem in contact} for contact in contacts]
    return []

# Retrieving all data for the specific date range and loops through pages until all data is collected
def fetch_all_data(date_from, date_to, website_id, username, password):
    all_data = []
    page = 1
    while True:
        data = fetch_data(page, date_from, date_to, website_id, username, password) # Get data for the current page
        if not data:
            break
        all_data.extend(data)     # Append retrieved data to the list
        page += 1    # Move to the next page
    return all_data

# Retrieving all data in chunks of 2 years so it's easier for processing
def fetch_data_until_empty(website_id, username, password, chunk_size_days=365 * 2):
    all_data = []
    end_date = datetime.now()   # Current date and time
    while True:
        start_date = end_date - timedelta(days=chunk_size_days)
        date_from = start_date.strftime("%Y-%m-%dT%H:%M:%S")
        date_to = end_date.strftime("%Y-%m-%dT%H:%M:%S")
        
        print(f"  Fetching data from {date_from} to {date_to}...")
        chunk_data = fetch_all_data(date_from, date_to, website_id, username, password) # Get data for the chunk
        if not chunk_data:
            print(f"  No more data found from {date_from} to {date_to}.")
            break
        
        all_data.extend(chunk_data)
        end_date = start_date
    
    return all_data

# Make KeyVault call to get secrets
Vineyards_Password = "Example_PASSWORD" 

# Brands info
brands_info = {
    "BrandA": {"website_id": "Example_WEBSITE_ID", "username": "Example_USERNAME"},
    "BrandB": {"website_id": "Example_WEBSITE_ID", "username": "Example_USERNAME"},
    "BrandC": {"website_id": "Example_WEBSITE_ID", "username": "Example_USERNAME"},
}

# Loop through each brand and process data
dataframes = []
for brand_name, info in brands_info.items():
    website_id = info["website_id"]
    username = info["username"]
    password = Vineyards_Password
    
    print(f"Fetching data for {brand_name}...")
    all_data = fetch_data_until_empty(website_id, username, password)   # Retrieve data for the brand
    
    if all_data:
        df = spark.createDataFrame(all_data, schema=schema)     # Create a Spark DataFrame from the retrieved data
        
        df_with_brand = df.withColumn("BrandName", F.lit(brand_name))   # Add a BrandName column to the table
        
        # Filter out rows with missing values in key columns
        df_filtered = df_with_brand.filter(
            F.col("FirstName").isNotNull() &
            F.col("LastName").isNotNull() &
            F.col("Email").isNotNull() &
            F.col("LastOrderDate").isNotNull()
        )
        dataframes.append(df_filtered)  # Add the filtered DataFrame to the list

# Combine data from all brands and save as a Delta table
if dataframes:
    combined_df = dataframes[0]
    for df in dataframes[1:]:
        combined_df = combined_df.unionByName(df)
    
    # Reorder columns with BrandName first
    columns = ["BrandName"] + [col for col in combined_df.columns if col != "BrandName"]
    combined_df = combined_df.select(columns)
    
    #display(combined_df)
    
    # Save the DataFrame as a Delta table
    combined_df.write.mode("overwrite").saveAsTable("example_catalog.example_schema.ContactsData")
else:
    print("No data was retrieved for any of the brands.") # Print message if no data was retrieved

## Orders
**Automates the process of retrieving, processing, and saving `orders` data for various vineyard brands
1. **API Requests**: Sends requests to the vineyard API for orders data.
2. **Data Processing**:
   - Processes contact data in 2-year date chunks.
   - Handles multiple pages per chunk.
3. **Brand-Specific Data**:
   - Adds a `BrandName` column to differentiate brands.
   - Filters out rows with missing `OrderID`, `OrderNumber`, and `OrderType`.
4. **Final Data Storage**:
   - Combines data from all brands into a single dataset.
   - Saves the final dataset as a Delta table for analysis.

In [None]:
# Initialize Spark session - setting up the environment for processing data
spark = SparkSession.builder.appName("OrdersData").getOrCreate()

# Defining the schema
# Structuring of the incoming data, specifying column names and their data types 
schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderNumber", IntegerType(), True),
    StructField("DateCompleted", StringType(), True),
    StructField("OrderType", StringType(), True),
    StructField("OrderStatus", StringType(), True),
    StructField("PaymentStatus", StringType(), True),
    StructField("OrderTotal", StringType(), True),
    StructField("ContactID", StringType(), True),
])

# SOAP request template 
# This function creates the XML request required to interact with the external service
def orders_xml_template(website_id, username, password, date_from, date_to, max_rows, page):
    return f"""
    <soapenv:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v300="http://V300">
        <soapenv:Body>
            <SearchOrders>
                <Request>
                    <Security>
                        <Username>{username}</Username>
                        <Password>{password}</Password>
                    </Security>
                    <WebsiteIDs>{website_id}</WebsiteIDs>
                    <DateCompletedFrom>{date_from}</DateCompletedFrom>
                    <DateCompletedTo>{date_to}</DateCompletedTo>
                    <MaxRows>{max_rows}</MaxRows>
                    <Page>{page}</Page>
                </Request>
            </SearchOrders>
        </soapenv:Body>
    </soapenv:Envelope>
    """

# Parse individual order details from XML response
def parse_order(order_elem):
    return {
        "OrderID": order_elem.find(".//OrderID").text if order_elem.find(".//OrderID") is not None else None,
        "OrderNumber": order_elem.find(".//OrderNumber").text if order_elem.find(".//OrderNumber") is not None else None,
        "DateCompleted": order_elem.find(".//CompletedDate").text if order_elem.find(".//CompletedDate") is not None else None,
        "OrderType": order_elem.find(".//Type").text if order_elem.find(".//Type") is not None else None,
        "OrderStatus": order_elem.find(".//OrderStatus").text if order_elem.find(".//OrderStatus") is not None else None,
        "PaymentStatus": order_elem.find(".//PaymentStatus").text if order_elem.find(".//PaymentStatus") is not None else None,
        "OrderTotal": order_elem.find(".//OrderTotal").text if order_elem.find(".//OrderTotal") is not None else None,
        "ContactID": order_elem.find(".//ContactID").text if order_elem.find(".//ContactID") is not None else None,
    }

# Retrieving data from the API for a specific page and sending a SOAP request with the required parameters and parses the response
def fetch_orders(website_id, username, password, date_from, date_to, max_rows=100):
    page = 1
    all_orders = []
    url = "https://api.example.com/OrderService?wsdl"    # Example API endpoint
    headers = {     # Required headers for the SOAP request
        "Content-Type": "text/xml",
        "charset": "utf-8",
        "SOAPAction": "SearchOrders"
    }
    # Sending the SOAP request
    while True:
        xml_request = orders_xml_template(website_id, username, password, date_from, date_to, max_rows, page)
        response = requests.post(url, data=xml_request, headers=headers)
        if response.status_code == 200:             # Check if the response is successful
            root = ET.fromstring(response.text)     # Parse the XML response
            orders = root.findall('.//Orders/Orders')   # Locating orders data in the website
            if not orders:  # Stop if no orders are found
                break
            all_orders.extend([parse_order(order) for order in orders])
            page += 1
        else:
            print(f"Error: {response.status_code}, Page: {page}")
            break
    
    return all_orders

# Retrieving all data in chunks of 2 years and loops through pages until all data is collected
def fetch_all_orders(website_id, username, password, chunk_size_days=720):
    all_orders = []
    end_date = datetime.now()
    
    while True:
        start_date = end_date - timedelta(days=chunk_size_days)
        date_from = start_date.strftime("%Y-%m-%dT%H:%M:%S")
        date_to = end_date.strftime("%Y-%m-%dT%H:%M:%S")
        print(f"Fetching orders from {date_from} to {date_to}...")
        chunk_orders = fetch_orders(website_id, username, password, date_from, date_to) # Get data for the current page
        if not chunk_orders:
            print(f"No more orders found between {date_from} and {date_to}.")
            break
        all_orders.extend(chunk_orders) # Append retrieved data to the list
        end_date = start_date
    
    return all_orders

# Make KeyVault call to get secrets
Vineyards_Password = "example_password"  # Example password

# Brand info
brands_info = {
    "BrandA": {"website_id": "example_website_ID_1", "username": "example_username_1"},
    "BrandB": {"website_id": "example_website_ID_2", "username": "example_username_2"},
    "BrandC": {"website_id": "example_website_ID_3", "username": "example_username_3"},
}

# Loop through each brand and process data
dataframes = []
for brand_name, info in brands_info.items():
    website_id = info["website_id"]
    username = info["username"]
    password = Vineyards_Password
    print(f"Fetching orders for {brand_name}...")
    all_orders = fetch_all_orders(website_id, username, password)    # Retrieve data for the brand
    if all_orders:
        df = spark.createDataFrame(all_orders, schema=schema)       # Create a Spark DataFrame from the retrieved data
        df_with_brand = df.withColumn("BrandName", F.lit(brand_name)) \
                  .withColumn("DateCompleted", F.regexp_replace(F.col("DateCompleted"), "\\.000Z", ""))   # Add a BrandName column to the table
        # Filter out rows with missing values in key columns
        df_filtered = df_with_brand.filter(
            F.col("OrderID").isNotNull() &
            F.col("OrderNumber").isNotNull() &
            F.col("OrderType").isNotNull()
        )
        dataframes.append(df_filtered)  # Add the filtered DataFrame to the list

# Combine data from all brands and save as a Delta table
if dataframes:
    combined_df = dataframes[0]
    for df in dataframes[1:]:
        combined_df = combined_df.unionByName(df)
     # Reorder columns with BrandName first
    columns = ["BrandName"] + [col for col in combined_df.columns if col != "BrandName"]
    combined_df = combined_df.select(columns)
    combined_df.write.mode("overwrite").saveAsTable("example_catalog.example_schema.OrdersData")    # Save the DataFrame as a Delta table
    
    #display(combined_df)
    
else:
    print("No orders were fetched for any of the brands.")  # Print message if no data was retrieved