## Downloading the required dependencies

In [None]:
!python -m pip install git+https://github.com/huggingface/transformers
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117

## Importing Model From Hugging Face

In [None]:
import openpyxl
import cv2
import os
import torch
from PIL import Image
import requests
from io import BytesIO
import json
from transformers import AutoProcessor, AutoModelForCausalLM, Qwen2VLForConditionalGeneration
from base64 import b64decode
from IPython.display import display, Javascript
from google.colab.output import eval_js
from datetime import datetime, timedelta
import re


# Load the Qwen2 model and processor
model = Qwen2VLForConditionalGeneration.from_pretrained(
    "Qwen/Qwen2-VL-2B-Instruct",
    torch_dtype="auto",
    device_map="auto",
)

processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.20k [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/56.4k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/3.99G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/429M [00:00<?, ?B/s]

`Qwen2VLRotaryEmbedding` can now be fully parameterized by passing the model config through the `config` argument. All other arguments will be removed in v4.46


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/272 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/347 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

chat_template.json:   0%|          | 0.00/1.05k [00:00<?, ?B/s]

## Initialise Excel File and Necessary Functions to Prcoess Data

In [None]:
# Define the Excel file path
file_path = "product_analysis.xlsx"

# Initialize workbook and sheets
if os.path.exists(file_path):
    workbook = openpyxl.load_workbook(file_path)
    if "Packaged Products" in workbook.sheetnames:
        packaged_sheet = workbook["Packaged Products"]
    else:
        packaged_sheet = workbook.create_sheet("Packaged Products")
    if "Fresh Produce" in workbook.sheetnames:
        produce_sheet = workbook["Fresh Produce"]
    else:
        produce_sheet = workbook.create_sheet("Fresh Produce")
else:
    workbook = openpyxl.Workbook()
    packaged_sheet = workbook.active
    packaged_sheet.title = "Packaged Products"
    produce_sheet = workbook.create_sheet("Fresh Produce")

    # Add headers for Packaged Products
    packaged_headers = ["Sl No", "Timestamp", "Brand", "Count", "Expiry Date", "Expired", "Expected Life Span (Days)"]
    packaged_sheet.append(packaged_headers)

    # Add headers for Fresh Produce
    produce_headers = ["Sl No", "Timestamp", "Produce Name", "Freshness (1-10)", "Expected Life Span (Days)"]
    produce_sheet.append(produce_headers)

# Initialize row counters
packaged_sl_no = len(packaged_sheet['A']) if len(packaged_sheet['A']) > 1 else 1
produce_sl_no = len(produce_sheet['A']) if len(produce_sheet['A']) > 1 else 1


def normalize_date(date_str):
    """
    Normalize various date formats to 'YYYY-MM-DD'.
    """
    patterns = [
        "%Y-%m-%d", "%d-%m-%Y", "%m/%d/%Y", "%d/%m/%Y",
        "%Y/%m/%d", "%m-%Y", "%m/%d", "%b %Y", "%b-%Y"
    ]
    for pattern in patterns:
        try:
            normalized_date = datetime.strptime(date_str, pattern).strftime("%Y-%m-%d")
            return normalized_date
        except ValueError:
            continue
    return "Invalid Date"

def calculate_expiration_from_manufacturing(mfg_date, best_before_months):
    """
    Calculate the expiry date based on manufacturing date and 'Best Before' duration.
    """
    try:
        mfg_date = datetime.strptime(mfg_date, "%Y-%m-%d")
        expiry_date = mfg_date + timedelta(days=best_before_months * 30)  # Approximate months as 30 days
        return expiry_date.strftime("%Y-%m-%d")
    except ValueError:
        return "Invalid Manufacturing Date"


def calculate_expiration(expiry_date, mfg_date=None, best_before=None):
    """
    Calculate expiration status and expected life span, handling various date scenarios.
    """
    today = datetime.today()
    normalized_expiry_date = normalize_date(expiry_date)

    # Attempt to calculate expiry date from manufacturing details if direct expiry date is invalid
    if normalized_expiry_date == "Invalid Date" and mfg_date and best_before:
        normalized_mfg_date = normalize_date(mfg_date)
        if normalized_mfg_date != "Invalid Date":
            normalized_expiry_date = calculate_expiration_from_manufacturing(
                normalized_mfg_date, int(best_before)
            )

    # If expiry date is still invalid, return default invalid values
    if normalized_expiry_date == "Invalid Date":
        return False, 0, "Invalid Date"

    # Calculate expired and expected life span for valid expiry date
    expiry_date_obj = datetime.strptime(normalized_expiry_date, "%Y-%m-%d")
    expired = expiry_date_obj < today
    expected_life_span = (expiry_date_obj - today).days if not expired else 0
    return expired, expected_life_span, normalized_expiry_date




# Function to append product details to the correct sheet
def append_to_excel(product_details):
    global packaged_sl_no, produce_sl_no
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    if product_details.get("type") == "Packed Product":
        packaged_sl_no += 1
        expiry_date = product_details.get("Expiry Date", "Unknown")
        mfg_date = product_details.get("Manufacturing Date", None)
        best_before = product_details.get("Best Before Months", None)

        expired, expected_life_span, normalized_expiry_date = calculate_expiration(
            expiry_date, mfg_date, best_before
        )

        packaged_sheet.append([
            packaged_sl_no,
            timestamp,
            product_details.get("Brand", "Unknown"),
            product_details.get("Product Count", 0),
            normalized_expiry_date,
            expired,
            expected_life_span
        ])
    elif product_details.get("type") == "Fresh Product":
        produce_sl_no += 1
        produce_sheet.append([
            produce_sl_no,
            timestamp,
            product_details.get("Produce Name", "Unknown"),
            product_details.get("Freshness", "Unknown"),
            product_details.get("Expected Life Span (Days)", "Unknown")
        ])
    else:
        print("Error: Invalid product type")

    # Save the workbook
    workbook.save(file_path)


# Function to process an image
def process_image(image):
    # Resize the image for processing
    image = image.resize((256, 256))

    # Prepare the text prompt
    messages = [
      {
          "role": "user",
          "content": [
              {
                  "type": "image",
                  "image_url": "Captured from webcam"
              },
              {
                  "type": "text",
                  "text": """Analyze this image and return a Python dictionary in JSON format.
                  If it contains packaged products, include:
                  {
                      "type": "Packed Product",
                      "Brand": "string",
                      "Product Count": int,
                      "Expiry Date": "YYYY-MM-DD",  # Handle variations like MM/YY, DD-MM-YYYY, etc. Normalize to 'YYYY-MM-DD'. If only 'Best Before months' is mentioned, calculate expiry date by adding it to the manufacturing date.
                      "Manufacturing Date": "YYYY-MM-DD",  # If present, provide the normalized manufacturing date.
                      "Best Before Months": int  # If 'Best Before months' is mentioned, extract the duration.
                  }

                  If it contains fresh produce, include:
                  {
                      "type": "Fresh Product",
                      "Produce Name": "string",
                      "Freshness": int,  # Based on visual clues (e.g., color, ripeness, bruising, etc.). 1-10 scale.
                      "Expected Life Span (Days)": int  # Estimate based on visual signs of freshness and decay.
                  }
                  """
              }
          ]
      }
    ]


    # Prepare the text prompt for processing
    text_prompt = processor.apply_chat_template(messages, add_generation_prompt=True)

    # Process the image and prompt for model input
    inputs = processor(
        text=[text_prompt],
        images=[image],
        padding=True,
        return_tensors="pt"
    )

    # Move inputs to GPU if available
    inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu")

    # Generate output from the model
    output_ids = model.generate(**inputs, max_new_tokens=1024)

    # Decode the generated output
    generated_ids = [
        output_ids[len(input_ids):]
        for input_ids, output_ids in zip(inputs.input_ids, output_ids)
    ]
    output_text = processor.batch_decode(
        generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
    )[0]

    print("Model Output:", output_text)

    # Parse the output as a dictionary and append to Excel
    try:
        product_details = eval(output_text)  # Assuming the model returns a Python dictionary as a string
        append_to_excel(product_details)
        print("Product details saved successfully.")
    except Exception as e:
        print(f"Error processing output: {e}")

def take_photo(filename='photo.jpg', quality=0.8):
    js = Javascript('''
      async function takePhoto(quality) {
        const div = document.createElement('div');
        const capture = document.createElement('button');
        capture.textContent = 'Capture';
        div.appendChild(capture);

        const video = document.createElement('video');
        video.style.display = 'block';
        const stream = await navigator.mediaDevices.getUserMedia({video: true});

        document.body.appendChild(div);
        div.appendChild(video);
        video.srcObject = stream;
        await video.play();

        // Resize the output to fit the video element.
        google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

        // Wait for Capture to be clicked.
        await new Promise((resolve) => capture.onclick = resolve);

        const canvas = document.createElement('canvas');
        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;
        canvas.getContext('2d').drawImage(video, 0, 0);
        stream.getVideoTracks()[0].stop();
        div.remove();
        return canvas.toDataURL('image/jpeg', quality);
      }
      ''')
    display(js)
    data = eval_js('takePhoto({})'.format(quality))
    binary = b64decode(data.split(',')[1])
    with open(filename, 'wb') as f:
      f.write(binary)
    return filename

## => Final Function to Run Each Time

In [None]:
from IPython.display import Image as IPImage
try:
  filename = take_photo()
  print('Saved to {}'.format(filename))

  # Show the image which was just taken.
  display(IPImage(filename))
  # Open the image file
  image = Image.open(filename)

  # Process the image
  process_image(image)
except Exception as err:
  # Errors will be thrown if the user does not have a webcam or if they do not
  # grant the page permission to access it.
  print(str(err))