# Use Azure Open AI  GPT-4 Turbo with Vision to Analyze Images and identify and classify critical maintenance issues and Predictive maintenace

### import the packages

In [None]:

#import the packages
# Install the packages if missing
import os
import requests
import base64
from azure.storage.blob.aio import BlobClient
from azure.storage.blob import ContainerClient

### env variables

In [None]:
#environment variables , for demo using keys , read from key vault or use other authentication methods
AZURE_STORAGE_CONNECTION_STRING='<storage acocunt connection string>'
AZURE_STORAGE_CONTAINER_NAME='<container name where images are stored>'
#OPEN AI KEY
GPT4V_KEY = "<Azure Open AI API Key>"


#Set Headers
headers = {
            "Content-Type": "application/json",
            "api-key": GPT4V_KEY,
         }
AZURE_OPENAI_ENDPOINT="<Azure Open AI API endpoint>"
AZURE_OPENAI_VISIONGPT_DEPLOYMENT_NAME="<Azure Open AI GPT-4 Turbo vision deployment name>"
AZURE_OPENAI_API_VERSION="2024-02-15-preview"

In [None]:
#variables
files_to_index=[]
data_list=[]
GPT4V_ENDPOINT =AZURE_OPENAI_ENDPOINT+"openai/deployments/"+AZURE_OPENAI_VISIONGPT_DEPLOYMENT_NAME+"/chat/completions?api-version="+AZURE_OPENAI_API_VERSION

### retreive all images from the container to process using ContainerClient

In [None]:
#retreive all images from the container to process using ContainerClient
container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONNECTION_STRING, container_name=AZURE_STORAGE_CONTAINER_NAME)
blob_list = container.list_blobs()
for blob in blob_list:
    #print(blob)
    files_to_index.append({"file_name": blob.name}) 

### build payload for the GPTV model

In [None]:
#build payload for the GPTV model
#when Jsom mode is available use the Json mode for responses
#Please be aware that the prompt instructions shown are not intended to be a correct and complete representation of the prompts that should be used with your applications in production. They are provided for informational purposes only and may not be suitable for all use cases. It is important to carefully consider your specific requirements and design appropriate prompts that meet your users' needs and expectations.
def build_payload(base64_string):
    payload=''
    payload = {
    "messages": [
        {
        "role": "system",
        "content": [
            {
            "type": "text",
            "text": "You are an AI assistant for the Maintenance Organization , you will help  by identifying the, need for Maintenace in the pictures , answer in a json with two columns Maintenaced needed Yes or No and then in new column details let the details of the image add a priority for fixing the damage and what category is the mainatenance is needed and confidence %. Only provide a valid json and donot add any text\n \noutput json example follwo the same format\n \n{  \n  \"Maintenance needed\": \"Yes\",  \n    \"Priority\": \"High\",  \n    \"Category\": \"Plumbing\",  \n    \"Specifics\": \"Water heater leaking, rust and corrosion visible, potential failure of tank integrity\"  \n \n}  \n\nIf there are No Maintenance needed use below sample output \n\n{\n\"Maintenance needed\": \"No\",\n \"Priority\": \"N/A\",  \n    \"Category\": \"Plumbing\",  \n\"Specifics\": \"All equipment appears to be in good condition with no visible signs of damage or wear\"\n}. THE OUTPUT WILL BE  ONLY JSON  AND DO NOT  QUALIFY THE JSON OUTPUT WITH ANY PREFIX OR SUFFIX "
            }
        ]
        },
        {
        "role": "user",
        "content": [
            {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{base64_string}"
            }
            },
            {
            "type": "text",
            "text": "explain"
            }
        ]
        }
    ],
    "temperature": 0.7,
    "top_p": 0.95,
    "max_tokens": 800
    }
    return payload

### function to execute the GPTV model

In [None]:
#function to execute the GPTV model
def call_AzureOpenAI_Vision_RAG_API(payload):
    try:
        response = requests.post(GPT4V_ENDPOINT, headers=headers, json=payload)
        response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
    except requests.RequestException as e:
        raise SystemExit(f"Failed to make the request. Error: {e}")

    # Handle the response as needed (e.g., print or process)
    #print(response.json()['choices'][0]['message']['content'])
    return response.json()['choices'][0]['message']['content']



### check for valid json from GPTV output

In [None]:
#debug check for valid json from GPTV output
import json

def is_valid_json(json_string):
    try:
        json.loads(json_string)
        print("The JSON is valid.")
        return True
        
    except json.JSONDecodeError:
        print(f"Invalid JSON: {json_string}")
        return False
        


### for each image AI identifies the maintenance need , convert the json response and add to list

In [None]:
#for each image AI identifies the maintenance need , convert the json response to dict and add to list
import asyncio
import base64
from azure.storage.blob import BlobClient
import io
import json
data_list = []
for item in files_to_index:
    blob_client = BlobClient.from_connection_string(conn_str=AZURE_STORAGE_CONNECTION_STRING, 
                                                    container_name=AZURE_STORAGE_CONTAINER_NAME, blob_name=item["file_name"])
    #get stream 
    stream = io.BytesIO()
    num_bytes = blob_client.download_blob().readinto(stream)
    body= build_payload(base64.b64encode(stream.getvalue()).decode('utf-8'))
    response = call_AzureOpenAI_Vision_RAG_API(body)
    #Check AI generated Json for validity 
    if(is_valid_json(response)):
        #print(json.loads(response))
        
        #Convert the JSON string to a Python dictionary
        data_dict = json.loads(response)

        #Create a list and append the dictionary to it
        data_list.append({"file_name": item["file_name"],"data_dict":data_dict}) 
        #data_list.append(data_dict)
            

### write the AI output with other metadata to delta tables for analytics

In [None]:
#convert to spark dataframe
from pyspark.sql.functions import lit, current_date , col

# Convert the list to a DataFrame
df = spark.createDataFrame(data_list)

dfinter = df.withColumn("current_date", lit(current_date()))

df_final = dfinter.select(
    col("data_dict.Specifics").alias("Specifics"),
    col("data_dict.Category").alias("Category"),
    col("data_dict.Maintenance needed").alias("Maintenance_needed"),
    col("data_dict.Priority").alias("Priority"),col("file_name"),col("current_date")
)


In [None]:
#configure vorder for Performance
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

df_final.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/AI_Maintenance")
df_final.filter((col("Priority") == "Critical") | (col("Priority") == "High")).write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/AI_Alerts")