In [1]:
# Set the environment to use standalone spark
import os
import sys

def set_home():
    os.environ['SPARK_HOME']="C:\spark\spark"
    os.environ['JAVA_HOME']="C:/Program Files/Java/jre1.8.0_152"
    sys.path.append("c:\spark\spark\python")
    os.environ['HADOOP_HOME']="c:\spark\hadoop"
    sys.path.append("C:\spark\mongo-hadoop\spark\src\main\python")
def set_work():
    os.environ['SPARK_HOME']="D:\spark\spark"
    os.environ['JAVA_HOME']="C:/Program Files/Java/jre1.8.0_151"
    sys.path.append("D:\spark\spark\python")
    os.environ['HADOOP_HOME']="D:\spark\hadoop"
    sys.path.append("D:\mongo-hadoop\spark\src\main\python")
    sys.path.append("D:\pyspark-cassandra\python")

In [2]:
set_work()

In [3]:
# This will help to print all the commands output of a single cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark import SQLContext
from pyspark.sql import Row
import pandas as pd
import pymongo_spark
pymongo_spark.activate()

from pyspark.sql.functions import rand
from pyspark.sql.functions import monotonically_increasing_id
from skimage.data import imread
import io
import numpy as np
from pyspark.sql.types import *

In [4]:
# Now we have the train_df, val_df, class_df. We are good to go to start building a keras model using functional API's
from keras.models import Sequential
from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten
from keras.utils import to_categorical, np_utils
from keras.optimizers import Adam, SGD
from keras.activations import relu, sigmoid
from keras.preprocessing.image import load_img, img_to_array
from keras.regularizers import l2
from keras.constraints import maxnorm

Using CNTK backend


In [5]:
from pyspark import SparkContext
try:
    sc = SparkContext()
except:
    print("alredy done")
sqlcontext = SQLContext(sc)

In [6]:
file = "C:/Users/kokumars/OneDrive - Microsoft/Kaggle/France-ECommerce/category_names.csv"

customSchema = StructType([ \
    StructField("category_id", IntegerType(), True), \
    StructField("category_level1", StringType(), True), \
    StructField("category_level2", StringType(), True), \
    StructField("category_level3", StringType(), True)])

df = sqlcontext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(file, schema = customSchema)

In [7]:
# map the categories to numeric categories, so we can do one hot encoding.
class_df = df.select(['category_id']).sort('category_id').withColumn("id", monotonically_increasing_id())
num_classes = class_df.count()

In [8]:
# map classes to idx and idx to classes for lookups
classes = class_df.toPandas()
cat2idx = {}
idx2cat = {}

for ir in classes.itertuples():
    category_id = ir[2]
    category_idx = ir[1]
    cat2idx[category_id] = category_idx
    idx2cat[category_idx] = category_id

In [9]:
# BSONFileRDD is coming from the pymongo_spark. It provides API to parse bson files to RDD
file = 'C:/Users/kokumars/OneDrive - Microsoft/Kaggle/France-ECommerce/train_example.bson'
#file = 'C:/Users/kokumars/Desktop/Kaggle/train.bson'
#file = "D:/Kaggle/France-ECommerce/train.bson"
%time bsonFileRDD = sc.BSONFileRDD(file)
# Lets count how many products are present
# bsonFileRDD.count()

Wall time: 157 ms


In [10]:
# flatten the product list over the images of a product.
flatMapRDD = bsonFileRDD.map(lambda x: ((x['_id'], x['category_id']), x['imgs'])).flatMapValues(lambda x : x)
#fd = flatMapRDD.map(lambda x: Row(id=x[0][0], cid=x[0][1], \
#                                  img = (img_to_array(load_img(io.BytesIO(x[1]['picture'])))).astype('float32')/255.0))

fd = flatMapRDD.map(lambda x: Row(id=x[0][0], cid=int(cat2idx[x[0][1]]), img=x[1]['picture']))

In [11]:
fd = fd.filter(lambda x: 0 < x['id'] < 12000)
fd.persist()
count = fd.count()

PythonRDD[24] at RDD at PythonRDD.scala:48

#This is an example to understand how the flattening is done for this problem.
data = [{'id':0, 'cid':10879, 'img':[{'pic':'picture'}, {'pic': 'picture1'}]},
       {'id':1, 'cid':10872, 'img':[{'pic':'picture'}, {'pic': 'picture1'}]}]
rdd = sc.parallelize(data)

d = rdd.map(lambda x: ((x['cid'], x['id']), x['img'])).flatMapValues(lambda x : x)
#fd = d.map()
d.collect()
fd = d.map(lambda x: Row(id=x[0][1], cid=x[0][0], img=x[1]['pic']))
fd.collect()

In [12]:
%time train_df, val_df = fd.toDF().randomSplit([0.7, 0.3], seed=99)
train_df = train_df.withColumn("id", monotonically_increasing_id())
val_df = val_df.withColumn("id", monotonically_increasing_id())
#train_rdd, val_rdd = fd.randomSplit([0.7, 0.3], seed=99)

Wall time: 1.57 s


In [2]:
width, height, depth =  img_to_array(load_img(io.BytesIO(train_df.take(1)[0]['img']))).shape
width, height, depth

In [None]:
# Create the model
model = Sequential()
model.add(Conv2D(32, (3, 3), input_shape=(height, width, depth), activation='relu', padding='same'))
model.add(Dropout(0.2))
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(Dropout(0.2))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(Dropout(0.2))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dropout(0.2))
model.add(Dense(1024, activation='relu', kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(512, activation='relu', kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(num_classes, activation='softmax'))

In [None]:
# Compile the model
epochs = 10
lrate = 0.01
decay = lrate/epochs
sgd = SGD(lr=lrate, momentum=0.9, decay=decay, nesterov=False)
model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy'])
#print(model.summary())

In [None]:
#te = val_df.filter((0 <= val_df.id) & (val_df.id < 100)).select(['cid', 'img']).toPandas()
#dt = te['img']
#t = val_df.repartition(10, "id", "cid")
#t.show()
#t = bsonFileRDD.repartition(100)
#t.persist()
#t.count()
#fd.take(1)

In [None]:
from keras.utils import Sequence
import math
class classSeq(Sequence):
    def __init__(self, sqlc, df, batch_size, count): 
        self.sql_context = sqlc        
        self.count = count
        self.batch_size = batch_size        
        self.iter = 0
        self.len = self.__len__()
        self.df = df
        self.start_idx = 0
        self.end_idx = batch_size
        
    def __len__(self):
        return math.ceil(self.count / self.batch_size)

    def on_epoch_end(self):
        self.iter = 0
        self.start_idx = 0
        self.end_idx = self.batch_size
        
    def __getitem__(self, idx):
        if self.iter < self.len:
            start_idx = self.start_idx
            end_idx = self.end_idx            
            #param = sc.broadcast(self.start_idx, self.end_idx, self.df)
            #print(self.count, self.iter, self.len, param.value, self.start_idx, self.end_idx)
            #fdf = self.df.filter((param.value[0] <= param.value[2].id) & (param.value[2].id < param.value[1])).select(['cid', 'img']).toPandas()
            fdf = self.df.filter((start_idx <= self.df.id) & (self.df.id < end_idx)).select(['cid', 'img']).toPandas()
            self.start_idx = self.start_idx + self.batch_size
            self.end_idx = self.end_idx + self.batch_size
            self.iter = self.iter + 1
            #fdf = fdf.map(lambda x: [cat2idx[x['cid']], x['img']]).toDF().toPandas()
            y_train = to_categorical(fdf['cid'], num_classes=num_classes)
            x_train = np.array([img_to_array(load_img(io.BytesIO(x))).astype('float32')/255.0 for x in fdf['img']])
            #x_train = dataPandas['_2'].apply(lambda x: np.array(img_to_array(load_img(io.BytesIO(x))).astype('float32')/255.0).astype('float32'))
            #x_train = np.array(frdd.map(lambda x: ((img_to_array(load_img(io.BytesIO(x['img'])))).astype('float32')/255.0).astype('float32')).collect())
            #y_train = to_categorical(np.array(frdd.map(lambda x: cat2idx[x['cid']]).collect()), num_classes=num_classes)
            return (x_train, y_train)        

In [None]:
# 7069896, 100000
#train_count = train_df.count()
#val_count = val_df.count()
#train_count = 7069896
#val_count = 3069896
train_count = count * 0.7
val_count = count * 0.3
train_seq = classSeq(sqlcontext, train_df, 1000, train_count)
val_seq = classSeq(sqlcontext, val_df, 700, val_count)

In [None]:
# Fit the model
%time scores = model.fit_generator(train_seq, steps_per_epoch = len(train_seq), max_queue_size=100, workers = 10, use_multiprocessing=False,epochs=2)
#%time model.fit(q['_1'], q['_2'], epochs=epochs, batch_size=64)


In [None]:
loss = model.evaluate_generator(val_seq, steps =len(val_seq), max_queue_size=100, workers=2, use_multiprocessing=False)

In [None]:
model.metrics_names

In [None]:
# Final evaluation of the model
#%time scores = model.evaluate_generator(val_generator(), steps=2)
len(seq)

In [None]:
#model.get_weights()
loss[1]