Cluster :
- 4 Node 
1. Cluster Standard: i3.xlarge 30.5GB 4cores (Min ,Max workers: 2,5)  , working and driver has same configuration.
2. Databricks runtime versions : 10.3 (includes Apache Spark 3.2.1, Scala 2.12)

In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.shell import spark

In [0]:
import pyspark
import os
import numpy as np
from io import StringIO
from PIL import Image,ImageOps
import pandas as pd
from pyspark.sql.functions import base64, col
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

Add configuration for accessing S3

In [0]:
AWS_ACCESS_KEY_ID = 
AWS_SECRET_ACCESS_KEY = 

app_name="Group3"

In [0]:

#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.3.1" pyspark-shell'
conf = pyspark.SparkConf()
conf = conf.set('spark.driver.memory ', '10g')
conf = conf.set('spark.executor.memory ', '10g')
conf = conf.setAppName(app_name)

sc1 = SparkSession.builder.appName('day1')\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.0")\
    .config("spark.mongodb.input.uri", "mongodb+srv://root:root@cluster0.eq07a.mongodb.net/Group3.Images")\
    .config("spark.mongodb.output.uri", "mongodb+srv://root:root@cluster0.eq07a.mongodb.net/Group3.Images")\
    .config("spark.network.timeout", "7200s").getOrCreate()

sc1._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') 
sc1._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc1._jsc.hadoopConfiguration().set('fs.s3a.access.key',
                                  AWS_ACCESS_KEY_ID)
sc1._jsc.hadoopConfiguration().set('fs.s3a.secret.key',
                                  AWS_SECRET_ACCESS_KEY)


Data Pre-processing:

%md

PreProcessing Algorithm:

Steps:
- Images

1. Reading colored images (32K, 1.92 GB,(downloaded from kaggle) ) and scraped images (from ebay website API (~5000 images, 500MB)) from S3 as image dataframe using spark.
2. Extracting image, its height, width, and channel(RGB: 3).
3. Reshaping to image (after conversion to uint8 from binary) to original shape of height,width,channel.
4. Creating grayscale images by taking average across channels (reshaping iamges from 2D to 2D array).
5. Resizing large (1000X 1000) images to (300,300) and reshaping as 1D array then staring for storing on mongodb. 

- Text

1. Load text Csvs from S3 (kaggle + scraped from ebay API). merge all spark dataframes.
2. Filter relevant columns.
3. TFIDF : Tokenized texts, hashed and fit IDF model on texts 
4. Stored on MongoDb in tetx collection


Aggregates on MongoDB on two shards: 
- Images Collection
- Text Collection

- Execution Time: <u> <b> 10 mins  </b> </u> (loading from S3 to writing to mongoDB)

In [0]:
common_path="s3://group3shopee/trainimages/"

In [0]:
#code to fetch images from S3
img_df = spark.read.format("image").load('s3://group3shopee/trainimages/*.jpg')

In [0]:
## model to convert image to embedding

import torch
import torchvision
# change model name as needed

model = torch.hub.load('pytorch/vision:v0.10.0', 'VGG-11', pretrained=True)

In [0]:
class MyResNet18(Resnet):

  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

  def forward(self, xb):
    x = self.maxpool(self.relu(self.bn1(self.conv1(xb))))
    x = self.layer1(x)
    x2 = x = self.layer2(x)
    x3 = x = self.layer3(x)
    x4 = x = self.layer4(x)
    return [x2, x3, x4]

In [0]:
model.eval()

In [0]:
def mod_proc(img_array):
  
    file_name = img_array[0][0]
    x=img_array[0][1]
    y=img_array[0][2]
    z=img_array[0][3]
    
    img = np.frombuffer(img_array[0][5],dtype=np.uint8).reshape(x,y,z)

    img = Image.fromarray(img)
    # change shapes as per model
    
    img = torch.tensor(np.array(img.resize(size=(224, 224))),dtype=torch.float32)
    img = model(img.reshape(-1,3,224,224))
    return file_name,str(img.tolist()[0])

In [0]:
%%time
proc_rdd = img_df.rdd.map(lambda x:mod_proc(x))

In [0]:
proc_df = proc_rdd.map(lambda x : [x[0].split('/')[-1],x[1]]).toDF()

In [0]:
%%time
schema = StructType([StructField(str(i), StringType(), True) for i in range(2)])

In [0]:
%%time
proc_df = sc1.createDataFrame(proc_rdd, schema)

In [0]:
%%time
proc_df.show(5)

Text Processing (Image Title and Description) to get TFIDF

In [0]:
# Read scraped data part 1
input_train_path='s3://group3shopee/products.csv'
prod_df=spark.read.options(header='True').csv(input_train_path)

In [0]:
# Read scraped data part2
input_train_path='s3://group3shopee/products1.csv'
prod1_df=spark.read.options(header='True').csv(input_train_path)

In [0]:
#Merge both scraped data
merged_scaped_df = prod_df.unionByName(prod1_df)

In [0]:
# Filter advertisement data from actual data
merged_scaped_df=merged_scaped_df.filter(~col("description").contains("Shop on eBay"))

In [0]:
# Rename scraped data columns to match kaggle dataset
merged_scaped_df=merged_scaped_df.withColumnRenamed('title','image').withColumnRenamed('description','title')

In [0]:
# Read kaggle dataset
input_train_path='s3://group3shopee/train.csv'
full_df=spark.read.options(header='True').csv(input_train_path)

In [0]:
labels= np.array(full_df.groupby('label_group').agg(collect_list('image'), count('image').alias('count')).filter('count>5')\
                 .orderBy('count', ascending=False).select('label_group').limit(10).collect()).reshape(-1)

In [0]:
emp_RDD = spark.sparkContext.emptyRDD()
 
# Create empty schema
columns = StructType([])
 
# Create an empty RDD with empty schema
test_df = spark.createDataFrame(data = emp_RDD,
                             schema = full_df.schema)
for label in labels:
    test_df = test_df.union(full_df.filter(full_df.label_group==label).limit(5))

In [0]:
test_posting_ids = list(np.array(test_df.select('posting_id').collect()).reshape(-1))

In [0]:
#df.filter(df("language").isin(listValues:_*))
df_train = full_df.filter(~full_df.posting_id.isin(test_posting_ids))

In [0]:
# Select the relevant data description from kaggle dataset
df=df.select('image','title')

In [0]:
# Add image extension to images name in scraped dataset
merged_scaped_df=merged_scaped_df.select(concat(col('image'),lit('.png')).alias('image'),'title')


In [0]:
#Merge kaggle and scraped data
merged_scaped_df = merged_scaped_df.unionByName(train_df)

In [0]:
#Tokenize the decription of each product
tokenizer = Tokenizer(inputCol="title", outputCol="words")
wordsData = tokenizer.transform(merged_scaped_df)
# Generate the tfidf feature vectors for each tokens from the description
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [0]:
rescaledData=rescaledData.drop('rawFeatures','words','title')

Connect to MongoDB
- Store aggregates in the database and re-read

In [0]:
# change collection for different models

database = 'Group3'
collection = 'image_2'
user_name = 'root'
password = 'root'
address = 'cluster0.eq07a.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
%%time
proc_df.show(5)

In [0]:
%%time
proc_df.write.format("mongo").option("uri",connection_string).mode("append").save()

In [0]:
%%time
scrapped_df = spark.read.format("image").load('s3://group3shopee/images/*.png')

In [0]:
%%time
proc_scrap_rdd = scrapped_df.rdd.map(lambda x:reshape(x))

In [0]:
%%time
proc_scrap_df = sc1.createDataFrame(proc_scrap_rdd, schema)

In [0]:
%%time
proc_scrap_df.show(5)

In [0]:
%%time
proc_scrap_df.write.format("mongo").option("uri",connection_string).mode("append").save()

In [0]:
# Mongo db configs
database = 'Group3'
collection = 'text'
user_name = 'root'
password = 'root'
address = 'cluster0.eq07a.mongodb.net'
connection_string_text = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"


In [0]:
conv=udf(lambda x:str(x))
rescaledData=rescaledData.withColumn('features',conv('features'))

In [0]:
%%time
rescaledData.write.format("mongo").option("uri",connection_string_text).mode("overwrite").save()

Final Output from DataBricks in MongoDB

In [0]:
%%time
df = spark.read.format("mongo").option("uri",connection_string).load()
df.show(5)

In [0]:
%%time
df = spark.read.format("mongo").option("uri",connection_string_text).load()
df.show(5)

In [0]:
sc1.stop()