In [None]:
pip install datasets

In [None]:
pip install pyspark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import datasets
from datasets import load_dataset, load_from_disk
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, IntegerType, MapType, LongType, BooleanType, FloatType
from pyspark.sql.functions import col, explode, expr, collect_list, when
from pyspark.sql.functions import pandas_udf, PandasUDFType, col
import requests
from PIL import Image
import numpy as np
import cv2
from io import BytesIO
import json

In [None]:
spark = SparkSession.builder.appName("ResourceOptimizedSession").master("local[*]").config("spark.executor.memory", "100g").config("spark.driver.memory", "50g").config("spark.executor.cores", "8").config("spark.sql.shuffle.partitions", "200").getOrCreate()

In [None]:
# Define schema for the result columns
schema = StructType([
    StructField("resolution", FloatType(), True),
    StructField("sharpness", FloatType(), True),
    StructField("brightness", FloatType(), True),
    StructField("contrast", FloatType(), True)
])

# Define UDF to process each row of the DataFrame
@pandas_udf(schema, PandasUDFType.SCALAR)
def analyze_images(images_col):
    resolutions = []
    sharpness_values = []
    brightness_values = []
    contrast_values = []

    for images in images_col:
        try:
            # Load images from URLs
            img_url = images[0]  # First image URL

            # Download the image
            response = requests.get(img_url)
            img = Image.open(BytesIO(response.content))

            # Compute resolution
            width, height = img.size
            resolution = width * height

            # Convert to grayscale for analysis
            image_cv = np.array(img.convert('L'))

            # Sharpness (Laplacian Variance)
            sharpness = cv2.Laplacian(image_cv, cv2.CV_64F).var()

            # Brightness (mean pixel value)
            brightness = np.mean(image_cv)

            # Contrast (max - min pixel value)
            contrast = image_cv.max() - image_cv.min()

            # Append results to lists
            resolutions.append(resolution)
            sharpness_values.append(sharpness)
            brightness_values.append(brightness)
            contrast_values.append(contrast)

        except:
            # Handle errors
            resolutions.append(None)
            sharpness_values.append(None)
            brightness_values.append(None)
            contrast_values.append(None)

    # Return as pandas Series
    return pd.DataFrame({
        "resolution": resolutions,
        "sharpness": sharpness_values,
        "brightness": brightness_values,
        "contrast": contrast_values
    })

In [None]:
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import FloatType, IntegerType, StringType

# Define weights for each metric
WEIGHTS = {
    'resolution': 0.4,  # 40% weight
    'sharpness': 0.3,   # 30% weight
    'brightness': 0.2, # 15% weight
    'contrast': 0.1    # 15% weight
}

# Granular classification functions for resolution, sharpness, brightness, and contrast
def classify_by_resolution(resolution):
      if resolution < 1_000_000:
          return 2
      elif 1_000_000 <= resolution <= 2_000_000:
          return 6
      else:
          return 10

def classify_by_sharpness(sharpness):
    if sharpness is None:
        return 0
    if sharpness < 500:
        return 2
    elif 500 <= sharpness <= 1000:
        return 6
    else:
        return 10

def classify_by_brightness(brightness):
    if brightness is None:
        return 0
    if brightness < 50 or brightness > 200:
        return 2
    elif 50 <= brightness <= 100 or 150 <= brightness <= 200:
        return 6
    else:
        return 10

def classify_by_contrast(contrast):
    if contrast is None:
        return 0
    if contrast < 50:
        return 2
    elif 50 <= contrast <= 150:
        return 6
    else:
        return 10

In [None]:
def image_analysis(year):

  datasets.logging.set_verbosity_error()
  dataset = load_from_disk("/content/drive/MyDrive/MIT805/Merged/Books_{}".format(year))
  df = dataset.to_pandas()
  df['categories'] = df['categories'].apply(lambda x: ', '.join(x) if len(x) > 0 else '').astype(str)

  # Define the schema, specifying the type for the `images` field and other fields.
  schema_review = StructType([
      StructField("rating", FloatType(), True),
      StructField("title", StringType(), True),
      StructField("text", StringType(), True),
      StructField("images", ArrayType(MapType(StringType(), StringType())), True),  # Array of dictionaries
      StructField("asin", StringType(), True),
      StructField("parent_asin", StringType(), True),
      StructField("user_id", StringType(), True),
      StructField("timestamp", LongType(), True),
      StructField("helpful_vote", IntegerType(), True),
      StructField("verified_purchase", BooleanType(), True),
      StructField("book_title", StringType(), True),
      StructField("price", StringType(), True),
      StructField("store", StringType(), True),
      StructField("categories", StringType(), True)
  ])

  # Convert Pandas DataFrames to PySpark DataFrames
  pyspark_df_reviews = spark.createDataFrame(df, schema = schema_review)

  # Filter reviews that had no images
  pyspark_df_reviews = pyspark_df_reviews.withColumn('number_images', f.size('images')).withColumn("id", f.monotonically_increasing_id())

  # Count how mnay images reviews had
  pyspark_df_reviews = pyspark_df_reviews.filter("number_images != 0").filter("rating > 0 and rating is not null")

  # Extract the 'large_image_url' from the 'images' column
  pyspark_df_reviews = pyspark_df_reviews.withColumn('large_image_url', expr("images[0]['large_image_url']"))

  # Explode the 'images' array to get each dictionary as a separate row
  exploded_df = pyspark_df_reviews.select(explode(col("images")).alias("image"))

  # Extract 'large_image_url' from each exploded dictionary
  extracted_df = exploded_df.select(col("image")["large_image_url"].alias("large_image_url"))

  # Collect all 'large_image_url' values back into a list per original row
  pyspark_df_reviews = pyspark_df_reviews.withColumn("all_large_image_urls", expr("transform(images, x -> x['large_image_url'])"))

  # Convert 'timestamp' to correct formatting
  pyspark_df_reviews = pyspark_df_reviews.withColumn("timestamp", f.date_format(f.from_unixtime(col("timestamp") / 1000), "yyyy-MM-dd"))

  # Apply the UDF on the Spark DataFrame
  pyspark_df_reviews = pyspark_df_reviews.withColumn("analysis", analyze_images(col("all_large_image_urls")))

  # Replace None values with 0 (or a default value) in relevant columns
  pyspark_df_reviews = pyspark_df_reviews.withColumn("analysis.resolution", when(col("analysis.resolution").isNull(), 0).otherwise(col("analysis.resolution")))\
                                          .withColumn("analysis.sharpness", when(col("analysis.sharpness").isNull(), 0).otherwise(col("analysis.sharpness")))\
                                          .withColumn("analysis.brightness", when(col("analysis.brightness").isNull(), 0).otherwise(col("analysis.brightness")))\
                                          .withColumn("analysis.contrast", when(col("analysis.contrast").isNull(), 0).otherwise(col("analysis.contrast")))\
                                          .na.drop()


  # Register the classification functions as UDFs
  classify_resolution_udf = udf(classify_by_resolution, IntegerType())
  classify_sharpness_udf = udf(classify_by_sharpness, IntegerType())
  classify_brightness_udf = udf(classify_by_brightness, IntegerType())
  classify_contrast_udf = udf(classify_by_contrast, IntegerType())

  # Add classification columns to the DataFrame
  df_with_classification = pyspark_df_reviews \
      .withColumn("resolution_score", classify_resolution_udf(col("analysis.resolution")) * WEIGHTS['resolution']) \
      .withColumn("sharpness_score", classify_sharpness_udf(col("analysis.sharpness")) * WEIGHTS['sharpness']) \
      .withColumn("brightness_score", classify_brightness_udf(col("analysis.brightness")) * WEIGHTS['brightness']) \
      .withColumn("contrast_score", classify_contrast_udf(col("analysis.contrast")) * WEIGHTS['contrast'])

  # Calculate the total weighted score for each row
  df_with_scores = df_with_classification.withColumn(
      "image_quality",
      f.round((col("resolution_score") + col("sharpness_score") + col("brightness_score") + col("contrast_score")) / (sum(WEIGHTS.values()) * 10), 2)
  )

  # Select the relevant columns and show the result
  df_final = df_with_scores.select(
    "title", "rating", "helpful_vote", 'asin', 'parent_asin', 'timestamp', 'verified_purchase', 'book_title', 'price', 'store', 'categories', 'number_images', "analysis.resolution", "analysis.sharpness",
    "analysis.brightness", "analysis.contrast", "image_quality")

  df_final.write.csv("/content/drive/MyDrive/MIT805/Results/Image_Analysis/{}/".format(year), header=True, mode="overwrite")
  print("Year {} is done!".format(year))

In [None]:
for year in range(2019, 2024):
  image_analysis(year)