<a href="https://colab.research.google.com/github/nauman-akram/Data_Analytics/blob/master/Working%20with%20pySpark/fashion_retail_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*connection guide taken from ref:* [github link](https://github.com/gahogg/YouTube-I-mostly-use-colab-now-/blob/master/PySpark_DataFrame_SQL_Basics.ipynb)


In [None]:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
spark = SparkSession.builder \
    .appName('pyspark-shell') \
    .master('local[*]') \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12-3.0.2") \
    .config("spark.mongodb.input.uri", 'mongodb+srv://your-collection-string') \
    .config("spark.mongodb.output.uri", 'mongodb+srv://your-collection-string') \
    .getOrCreate()



Connecting mongoDB through python

In [None]:
import pymongo

myclient = pymongo.MongoClient("mongodb+srv://your-collection-string/test")
mydb = myclient["bigdata"]
mycol = mydb["brand_details"]

In [None]:
import pandas as pd
import pprint
from bson.son import SON

In [None]:
brand_details = pd.DataFrame(list(mycol.find()))

Analysis with mongoDB queries 


In [None]:
#show some of data
i=0
for x in mycol.find():
    i += 1
    if i<=3:
        pprint.pprint( x)
    else:
        break

{'_id': ObjectId('63b4195c4ad68288f7b87e56'),
 'brand_id': 32,
 'brand_name': 'Adiva'}
{'_id': ObjectId('63b4195c4ad68288f7b87e3a'),
 'brand_id': 4,
 'brand_name': '250 Designs'}
{'_id': ObjectId('63b4195c4ad68288f7b87e7c'),
 'brand_id': 70,
 'brand_name': 'Angel & Rocket'}


there is brand_id, and brand_name associated with the it


In [None]:
brand_details.head(3)

Unnamed: 0,_id,brand_id,brand_name
0,63b4195c4ad68288f7b87e56,32,Adiva
1,63b4195c4ad68288f7b87e3a,4,250 Designs
2,63b4195c4ad68288f7b87e7c,70,Angel & Rocket


In [None]:
print('total no of brands in dataset:', mycol.estimated_document_count())

total no of brands in dataset: 1020


In [None]:
fashion_col = mydb["fashion"]

Analysis with mongoDB queries 


In [None]:
#show some of data from fashion collection
i=0
for x in fashion_col.find():
    i += 1
    if i<=3:
        pprint.pprint( x)
    else:
        break

{'_id': ObjectId('63b1961df26165df1760b2f9'),
 'avg_rating': 4.548826646,
 'brand': 'Dupatta Bazaar',
 'colour': 'White',
 'description': 'White embroidered&nbsp;dupattaChiffon<br>Hand-wash '
                'coldLength: 2.5 metres Width: 1 metre',
 'name': 'Dupatta Bazaar White Embroidered Chiffon Dupatta',
 'p_attributes': "{'Occasion': 'Daily', 'Pattern': 'Embroidered', 'Print or "
                 "Pattern Type': 'Floral'}",
 'p_id': 1518329,
 'price': 899,
 'ratingCount': 1321}
{'_id': ObjectId('63b1961df26165df1760b30e'),
 'avg_rating': 4.652173913,
 'brand': 'WEAVERS VILLA',
 'colour': 'Pink',
 'description': 'Pink and mustard yellow floral embroidered shawl, has solid '
                'border with tasselsWool and acrylic blend<br>Dry CleanLength: '
                '2 m<br>Width: 75 cm',
 'name': 'WEAVERS VILLA Women Pink & Mustard Yellow Floral Embroidered Shawl',
 'p_attributes': "{'Border': 'Solid', 'Fabric': 'Wool', 'Fabric 2': 'Acrylic', "
                 "'Fabric Purity'

In [None]:
#total
print('total no of documents(rows) for fashion dataset:', fashion_col.estimated_document_count())

total no of documents(rows) for fashion dataset: 14329


In [None]:
#size in terms of memory in KBs and MBs

print('Size in KBs:', mydb.command("collstats","fashion")["storageSize"]/1024.0 )
print('Size in MBs:', (mydb.command("collstats","fashion")["storageSize"]/(1024.0*1024.0)))


Size in KBs: 5788.0
Size in MBs: 5.65234375


In [None]:
#Unique 

print('total unique brands in dataset:', len(fashion_col.distinct('brand')))

total unique brands in dataset: 1020


Which are same as we got from brand_details total collection


In [None]:
#top five brands with their no. of occurance

query = fashion_col.aggregate([
    { "$group":{"_id": "$brand", "count":{"$sum":1}}  },
    {  "$sort": {"count":-1}   },
    {   "$limit" : 5} 
    ])

for i in query:
    pprint.pprint(i)

{'_id': 'Roadster', 'count': 346}
{'_id': 'Tokyo Talkies', 'count': 287}
{'_id': 'MANGO', 'count': 264}
{'_id': 'SASSAFRAS', 'count': 246}
{'_id': 'Clora Creation', 'count': 236}


In [None]:
#Least five brands with their no. of occurance

query = fashion_col.aggregate([
    { "$group":{"_id": "$brand", "count":{"$sum":1}}  },
    {  "$sort": {"count":1}   },
    {   "$limit" : 5} 
    ])

for i in query:
    pprint.pprint(i)

{'_id': 'Lovista', 'count': 1}
{'_id': 'Mine4Nine', 'count': 1}
{'_id': 'RENTIYO', 'count': 1}
{'_id': 'Nauti Nati', 'count': 1}
{'_id': 'NAVIYATA', 'count': 1}


In [None]:
#all the brands with 1 occurance in data set

query2 = fashion_col.aggregate([
    { "$group":{"_id": "$brand", "count":{"$sum":1}}  },
    {  "$sort": {"count":1}   },
    ])

single_occ_brands = [dict["_id"] for dict in query2 if dict["count"] == 1]
print ("total brands appearing one time", len(single_occ_brands))
pprint.pprint( single_occ_brands)



total brands appearing one time 211
['Mine4Nine',
 'NAVIYATA',
 'Lovista',
 'Nauti Nati',
 'RENTIYO',
 'One of a Kind',
 'BURGER BAE',
 'CUSHYBEE',
 'Pistaa',
 'Cub McPaws',
 'SuperBottoms',
 'shiloh',
 'Indi INSIDE',
 'ZIKARAA',
 'SAAKAA',
 'Swasti',
 'Dexter by Kook N Keech',
 'NOT YET by us',
 'PrettyPlus by Desinoor.com',
 'Nivah Fashion',
 'Kanvin',
 'tantkatha',
 'JUNEBERRY',
 'IUGA',
 'Reebok Classic',
 'am ma',
 'taffykids',
 'Fashion FRICKS',
 'Virah Fashion',
 'London Rag',
 'Juniper Plus',
 'Chipbeys',
 'Dollar Missy',
 'MISSY',
 'Knitstudio',
 'Havida Sarees',
 'One Friday',
 'Tuna London',
 'WILORI',
 'HSR',
 'FabFairy',
 'Flenzy',
 'SAPTRANGI',
 'An Episode',
 'gvs shoppe',
 'JBN Creation',
 'Angel & Rocket',
 'FRENCH FLEXIOUS',
 'VERO MODA CURVE',
 'Fabcartz',
 'KBZ',
 'Ashtag',
 'KANNAHI',
 'BonOrganik',
 'ANAISA',
 'Remanika',
 'Frempy',
 'One Femme',
 'Juniors by Lifestyle',
 'WEAVETECH IMPEX',
 'ADBUCKS',
 'Shree',
 'Crozo By Cantabil',
 "Cloth's Villa",
 'Plutus',
 

In [None]:
query_agg = fashion_col.aggregate([{
  "$group": {
    "_id": "$brand",
    "count": {
      "$sum": 1
    }
  }
},{
  "$match": {
    "$expr": {
      "$eq": [
        "$count",
        1
      ]
    }
  }
},{
  "$project": {
    "brand": "$_id",
    "count": 1,
    "_id": 0
  }
}])

In [None]:
count=0
for i in query_agg:
    count += 1 
    pprint.pprint(i)

print("total brands appearing one time", count)

{'brand': 'SAAKAA', 'count': 1}
{'brand': 'ZIKARAA', 'count': 1}
{'brand': 'Swasti', 'count': 1}
{'brand': 'SuperBottoms', 'count': 1}
{'brand': 'Pistaa', 'count': 1}
{'brand': 'shiloh', 'count': 1}
{'brand': 'Cub McPaws', 'count': 1}
{'brand': 'Indi INSIDE', 'count': 1}
{'brand': 'PrettyPlus by Desinoor.com', 'count': 1}
{'brand': 'Nivah Fashion', 'count': 1}
{'brand': 'Dexter by Kook N Keech', 'count': 1}
{'brand': 'NOT YET by us', 'count': 1}
{'brand': 'Nauti Nati', 'count': 1}
{'brand': 'Mine4Nine', 'count': 1}
{'brand': 'NAVIYATA', 'count': 1}
{'brand': 'BURGER BAE', 'count': 1}
{'brand': 'Lovista', 'count': 1}
{'brand': 'CUSHYBEE', 'count': 1}
{'brand': 'One of a Kind', 'count': 1}
{'brand': 'RENTIYO', 'count': 1}
{'brand': 'Reebok Classic', 'count': 1}
{'brand': 'Fashion FRICKS', 'count': 1}
{'brand': 'am ma', 'count': 1}
{'brand': 'taffykids', 'count': 1}
{'brand': 'tantkatha', 'count': 1}
{'brand': 'Kanvin', 'count': 1}
{'brand': 'JUNEBERRY', 'count': 1}
{'brand': 'IUGA', 'cou

In [None]:
print("total brands appearing one time", count)


total brands appearing one time 211


In [None]:
#top 5 highest rating brands
query = fashion_col.aggregatequery = fashion_col.aggregate([
    { "$group":{"_id": "$brand", "maxRating": {"$max":"$avg_rating"}}},
    {  "$sort": {"maxRating":-1}   },
    {   "$limit" : 5} 
    ])

for i in query:
    pprint.pprint(i)


{'_id': 'Shae by SASSAFRAS', 'maxRating': 5}
{'_id': 'Silk Land', 'maxRating': 5}
{'_id': 'People', 'maxRating': 5}
{'_id': 'GRACIT', 'maxRating': 5}
{'_id': 'Miaz Lifestyle', 'maxRating': 5}


In [None]:
fashion_df = pd.DataFrame(list(fashion_col.find()))

In [None]:
fashion_df.head(2)

Unnamed: 0,_id,p_id,name,price,colour,brand,ratingCount,avg_rating,description,p_attributes
0,63b1961df26165df1760b2f9,1518329.0,Dupatta Bazaar White Embroidered Chiffon Dupatta,899.0,White,Dupatta Bazaar,1321.0,4.548827,White embroidered&nbsp;dupattaChiffon<br>Hand-...,"{'Occasion': 'Daily', 'Pattern': 'Embroidered'..."
1,63b1961df26165df1760b30e,12159424.0,WEAVERS VILLA Women Pink & Mustard Yellow Flor...,1399.0,Pink,WEAVERS VILLA,23.0,4.652174,Pink and mustard yellow floral embroidered sha...,"{'Border': 'Solid', 'Fabric': 'Wool', 'Fabric ..."


In [None]:
fashion_df.shape

(14329, 10)

In [None]:
brand_details.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1020 entries, 0 to 1019
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   _id         1020 non-null   object
 1   brand_id    1020 non-null   int64 
 2   brand_name  1020 non-null   object
dtypes: int64(1), object(2)
memory usage: 24.0+ KB


In [None]:
fashion_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14329 entries, 0 to 14328
Data columns (total 10 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   _id           14329 non-null  object 
 1   p_id          14311 non-null  float64
 2   name          14310 non-null  object 
 3   price         14310 non-null  float64
 4   colour        14310 non-null  object 
 5   brand         14305 non-null  object 
 6   ratingCount   6581 non-null   float64
 7   avg_rating    6581 non-null   float64
 8   description   14310 non-null  object 
 9   p_attributes  14310 non-null  object 
dtypes: float64(4), object(6)
memory usage: 1.1+ MB


#merge both dataframes

In [None]:
#Using inner merge to get common data in join on the basis of comminality of brand name present in dataset 

In [None]:
dataset = fashion_df.merge(brand_details, how='inner', left_on='brand', right_on='brand_name')

In [None]:
dataset.shape

(8098, 13)

In [None]:
dataset.head()

Unnamed: 0,_id_x,p_id,name,price,colour,brand,ratingCount,avg_rating,description,p_attributes,_id_y,brand_id,brand_name
0,63b1961df26165df1760b2f9,1518329.0,Dupatta Bazaar White Embroidered Chiffon Dupatta,899.0,White,Dupatta Bazaar,1321.0,4.548827,White embroidered&nbsp;dupattaChiffon<br>Hand-...,"{'Occasion': 'Daily', 'Pattern': 'Embroidered'...",63b4195c4ad68288f7b87f28,242,Dupatta Bazaar
1,63b1961df26165df1760b38b,14964708.0,Dupatta Bazaar Orange & Green Dyed Art Silk Ba...,899.0,Orange,Dupatta Bazaar,30.0,4.366667,Orange and green bandhani dyed dupatta has ban...,"{'Border': 'Woven Design', 'Fabric': 'Art Silk...",63b4195c4ad68288f7b87f28,242,Dupatta Bazaar
2,63b1961df26165df1760b3ca,13552234.0,Dupatta Bazaar Black Solid Dupatta,599.0,Black,Dupatta Bazaar,232.0,4.547414,Black solid Dupatta and has a solid borderMate...,"{'Border': 'Solid', 'Fabric': 'Poly Chiffon', ...",63b4195c4ad68288f7b87f28,242,Dupatta Bazaar
3,63b1961df26165df1760b33f,10711448.0,Dupatta Bazaar Women White Solid Dupatta,599.0,White,Dupatta Bazaar,1531.0,4.536251,White solid dupatta and has a taping borderPol...,"{'Border': 'Taping', 'Fabric': 'Poly Chiffon',...",63b4195c4ad68288f7b87f28,242,Dupatta Bazaar
4,63b1961df26165df1760b461,12866574.0,Dupatta Bazaar Maroon Solid Dupatta,699.0,Maroon,Dupatta Bazaar,118.0,4.245763,Maroon solid Dupatta and has a fringed borderM...,"{'Border': 'Fringed', 'Fabric': 'Poly Chiffon'...",63b4195c4ad68288f7b87f28,242,Dupatta Bazaar


just a quick check on brand 'SASSAFRAS' because it doesn't show up in the brand detail but its present in our fashion dataset




In [None]:
brand_details.brand_name.isin(['SASSAFRAS']).any()

False

In [None]:
# sparkDF=spark.createDataFrame(dataset) 
# sparkDF.printSchema()
# sparkDF.show()

dropping mongodb ids (bson objects) because these are insignificant in any analysis and spark doesnot support its data type

In [None]:
new_df = dataset.drop(axis=1, columns= ["_id_x", '_id_y'])

In [None]:
new_df[['name', 'colour', 'brand', 'description', 'p_attributes', 'brand_name']] = new_df[['name', 'colour', 'brand', 'description', 'p_attributes', 'brand_name']].astype(str)

In [None]:
new_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 8098 entries, 0 to 8097
Data columns (total 11 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   p_id          8098 non-null   float64
 1   name          8098 non-null   object 
 2   price         8098 non-null   float64
 3   colour        8098 non-null   object 
 4   brand         8098 non-null   object 
 5   ratingCount   3959 non-null   float64
 6   avg_rating    3959 non-null   float64
 7   description   8098 non-null   object 
 8   p_attributes  8098 non-null   object 
 9   brand_id      8098 non-null   int64  
 10  brand_name    8098 non-null   object 
dtypes: float64(4), int64(1), object(6)
memory usage: 759.2+ KB


In [None]:
new_df.head(2)

Unnamed: 0,p_id,name,price,colour,brand,ratingCount,avg_rating,description,p_attributes,brand_id,brand_name
0,1518329.0,Dupatta Bazaar White Embroidered Chiffon Dupatta,899.0,White,Dupatta Bazaar,1321.0,4.548827,White embroidered&nbsp;dupattaChiffon<br>Hand-...,"{'Occasion': 'Daily', 'Pattern': 'Embroidered'...",242,Dupatta Bazaar
1,14964708.0,Dupatta Bazaar Orange & Green Dyed Art Silk Ba...,899.0,Orange,Dupatta Bazaar,30.0,4.366667,Orange and green bandhani dyed dupatta has ban...,"{'Border': 'Woven Design', 'Fabric': 'Art Silk...",242,Dupatta Bazaar


In [None]:
sparkDF=spark.createDataFrame(new_df) 
sparkDF.printSchema()


root
 |-- p_id: double (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- colour: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- ratingCount: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- description: string (nullable = true)
 |-- p_attributes: string (nullable = true)
 |-- brand_id: long (nullable = true)
 |-- brand_name: string (nullable = true)



In [None]:
sparkDF.show(5)

+-----------+--------------------+-----+------+--------------+-----------+-----------+--------------------+--------------------+--------+--------------+
|       p_id|                name|price|colour|         brand|ratingCount| avg_rating|         description|        p_attributes|brand_id|    brand_name|
+-----------+--------------------+-----+------+--------------+-----------+-----------+--------------------+--------------------+--------+--------------+
|  1518329.0|Dupatta Bazaar Wh...|899.0| White|Dupatta Bazaar|     1321.0|4.548826646|White embroidered...|{'Occasion': 'Dai...|     242|Dupatta Bazaar|
|1.4964708E7|Dupatta Bazaar Or...|899.0|Orange|Dupatta Bazaar|       30.0|4.366666667|Orange and green ...|{'Border': 'Woven...|     242|Dupatta Bazaar|
|1.3552234E7|Dupatta Bazaar Bl...|599.0| Black|Dupatta Bazaar|      232.0|4.547413793|Black solid Dupat...|{'Border': 'Solid...|     242|Dupatta Bazaar|
|1.0711448E7|Dupatta Bazaar Wo...|599.0| White|Dupatta Bazaar|     1531.0|4.536250

In [None]:
sparkDF = sparkDF.drop("p_id", 'description', 'p_attributes')

In [None]:
sparkDF.show(2)

+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
|                name|price|colour|         brand|ratingCount| avg_rating|brand_id|    brand_name|
+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
|Dupatta Bazaar Wh...|899.0| White|Dupatta Bazaar|     1321.0|4.548826646|     242|Dupatta Bazaar|
|Dupatta Bazaar Or...|899.0|Orange|Dupatta Bazaar|       30.0|4.366666667|     242|Dupatta Bazaar|
+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
only showing top 2 rows



In [None]:
sparkDF.dtypes

[('name', 'string'),
 ('price', 'double'),
 ('colour', 'string'),
 ('brand', 'string'),
 ('ratingCount', 'double'),
 ('avg_rating', 'double'),
 ('brand_id', 'bigint'),
 ('brand_name', 'string')]

In [None]:
from pyspark.sql.functions import isnan, when, count, col

In [None]:
sparkDF.createOrReplaceTempView("dataset")

In [None]:
from pyspark.sql.functions import countDistinct

sparkDF.select(countDistinct("brand"))

count(DISTINCT brand)
500


In [None]:
sparkDF.select(countDistinct("brand_name"))

count(DISTINCT brand_name)
500


In [None]:
sparkDF.distinct().count()

8005

In [None]:
from pyspark.sql.functions import isnan, when, count, col

Data Cleaning finding null values and dealing with them

In [None]:
sparkDF.filter( isnan(sparkDF.price)).show()

+----+-----+------+-----+-----------+----------+--------+----------+
|name|price|colour|brand|ratingCount|avg_rating|brand_id|brand_name|
+----+-----+------+-----+-----------+----------+--------+----------+
+----+-----+------+-----+-----------+----------+--------+----------+



this shows no null values in price column, which will be used as label/target variable

In [None]:
sparkDF.filter( isnan(sparkDF.ratingCount)).count()

4139

In [None]:
sparkDF.filter( isnan(sparkDF.ratingCount)).show(5)

+--------------------+------+------+--------------+-----------+----------+--------+--------------+
|                name| price|colour|         brand|ratingCount|avg_rating|brand_id|    brand_name|
+--------------------+------+------+--------------+-----------+----------+--------+--------------+
|Dupatta Bazaar Wo...| 599.0|Orange|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Gr...| 999.0| Green|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Wo...|1399.0|  Pink|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Ye...|6999.0|Yellow|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Ma...| 599.0|Maroon|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
+--------------------+------+------+--------------+-----------+----------+--------+--------------+
only showing top 5 rows



In [None]:
sparkDF.filter(isnan(sparkDF.avg_rating)).count()

4139

In [None]:
sparkDF.filter(isnan(sparkDF.avg_rating)).show(5)

+--------------------+------+------+--------------+-----------+----------+--------+--------------+
|                name| price|colour|         brand|ratingCount|avg_rating|brand_id|    brand_name|
+--------------------+------+------+--------------+-----------+----------+--------+--------------+
|Dupatta Bazaar Wo...| 599.0|Orange|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Gr...| 999.0| Green|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Wo...|1399.0|  Pink|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Ye...|6999.0|Yellow|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
|Dupatta Bazaar Ma...| 599.0|Maroon|Dupatta Bazaar|        NaN|       NaN|     242|Dupatta Bazaar|
+--------------------+------+------+--------------+-----------+----------+--------+--------------+
only showing top 5 rows



In [None]:
sparkDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sparkDF.columns]).show()

+----+-----+------+-----+-----------+----------+--------+----------+
|name|price|colour|brand|ratingCount|avg_rating|brand_id|brand_name|
+----+-----+------+-----+-----------+----------+--------+----------+
|   0|    0|     0|    0|       4139|      4139|       0|         0|
+----+-----+------+-----+-----------+----------+--------+----------+



In [None]:
non_naDF = sparkDF.na.drop('any')

In [None]:
non_naDF.show(2)

+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
|                name|price|colour|         brand|ratingCount| avg_rating|brand_id|    brand_name|
+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
|Dupatta Bazaar Wh...|899.0| White|Dupatta Bazaar|     1321.0|4.548826646|     242|Dupatta Bazaar|
|Dupatta Bazaar Or...|899.0|Orange|Dupatta Bazaar|       30.0|4.366666667|     242|Dupatta Bazaar|
+--------------------+-----+------+--------------+-----------+-----------+--------+--------------+
only showing top 2 rows



In [None]:
non_naDF.select(countDistinct("brand"))

count(DISTINCT brand)
324


In [None]:
to_fill = non_naDF.groupBy("brand").avg('ratingCount').withColumnRenamed("avg(ratingCount)", "rating")
to_fill.show()

+-----------------+------------------+
|            brand|            rating|
+-----------------+------------------+
|            Darzi|            40.125|
|         Swtantra|               2.5|
|           Stylum| 420.2857142857143|
|        Club York|               3.0|
|          Kalista|             224.0|
|            Tiara|              15.0|
|           Globus|21.555555555555557|
|          Prakhya|2166.6666666666665|
|         Woodland|               4.0|
|          Code 61|               2.0|
|            Levis|             29.56|
|       Tikhi Imli|242.91666666666666|
|        Paralians| 8.857142857142858|
|      Lovely Lady|              11.5|
|           Aarika| 64.57142857142857|
|   Free Authority|               8.0|
|          Varanga| 599.7377049180328|
|Allen Solly Woman| 36.53846153846154|
|     Fort Collins| 7.833333333333333|
|          Vishudh|337.37704918032784|
+-----------------+------------------+
only showing top 20 rows



In [None]:
sparkDF = sparkDF.na.fill(0)

After filling na values with 0, 0 is replaced with mean value of group "brand" for both ratingCount and avg_rating in the dataset

In [None]:
from pyspark.sql import Window as W
from pyspark.sql import functions as F
from pyspark.sql.functions import when,avg

sparkDF = sparkDF.withColumn('ratingCount', F
        .when(F.col('ratingCount')==0, F.mean('ratingCount').over(W.partitionBy('brand')))
        .otherwise(F.col('ratingCount'))
    )

sparkDF = sparkDF.withColumn('avg_rating', F
        .when(F.col('avg_rating')==0, F.mean('avg_rating').over(W.partitionBy('brand')))
        .otherwise(F.col('avg_rating'))
    )

In [None]:
sparkDF

name,price,colour,brand,ratingCount,avg_rating,brand_id,brand_name
Darzi Women White...,1499.0,White,Darzi,7.0,3.428571429,207,Darzi
Darzi Women White...,1199.0,White,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Women Multi...,1499.0,Multi,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Black Bodyc...,1499.0,Black,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Women Pink ...,1499.0,Pink,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Women Black...,1199.0,Black,Darzi,126.0,3.865079365,207,Darzi
Darzi Women Black...,1199.0,Black,Darzi,11.0,3.818181818,207,Darzi
Darzi Women Yello...,1999.0,Yellow,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Women Maroo...,1499.0,Maroon,Darzi,13.375,1.3223973524583332,207,Darzi
Darzi Women Yello...,1499.0,Yellow,Darzi,13.375,1.3223973524583332,207,Darzi


In [None]:

sparkDF.groupBy("brand").max('ratingCount').show()


+-----------------+----------------+
|            brand|max(ratingCount)|
+-----------------+----------------+
|            Darzi|           126.0|
|         Swtantra|             3.0|
|          Taneira|             0.0|
|        Club York|             3.0|
|          Kalista|           396.0|
|           Stylum|          2075.0|
|Teakwood Leathers|             0.0|
|    Apraa & Parma|             0.0|
|           Globus|            63.0|
|           Sasimo|             0.0|
|            Tiara|            15.0|
|          Code 61|             2.0|
|          Disrupt|             0.0|
|          Prakhya|          5199.0|
|         Woodland|             5.0|
|   Dress My Angel|             0.0|
|            Levis|           211.0|
|      Lovely Lady|            12.0|
|        Paralians|            16.0|
|       Tikhi Imli|           781.0|
+-----------------+----------------+
only showing top 20 rows



Data Analysis with Spark Sql

In [None]:
sparkDF.registerTempTable("dataset")

In [None]:
#Most popular brands
spark.sql('''select brand,count(brand) as popularity  from dataset
              group by brand
              order by count(brand) desc
            ''').show(10)

+-------------------+----------+
|              brand|popularity|
+-------------------+----------+
|           Roadster|       346|
|      Tokyo Talkies|       287|
|     Clora Creation|       236|
|             Mitera|       204|
|                H&M|       202|
|              Anouk|       200|
|     Dupatta Bazaar|       171|
|            Sangria|       151|
|            Vishudh|       138|
|Readiprint Fashions|       124|
+-------------------+----------+
only showing top 10 rows



In [None]:
#popular colors
spark.sql('''select colour,count(colour) as count  from dataset
              group by colour
              order by count(colour) desc
            ''').show(10)

+---------+-----+
|   colour|count|
+---------+-----+
|    Black| 1057|
|     Blue| 1013|
|     Pink|  677|
|    Green|  594|
|Navy Blue|  494|
|    White|  443|
|      Red|  414|
|     Grey|  351|
|   Maroon|  311|
|    Beige|  304|
+---------+-----+
only showing top 10 rows



In [None]:
#most popular brand and its color
spark.sql('''select brand,colour,count(colour) as count  from dataset
              group by brand, colour
              order by count(colour) desc
            ''').show(10)

+--------------+------+-----+
|         brand|colour|count|
+--------------+------+-----+
|      Roadster|  Blue|   94|
| Tokyo Talkies|  Blue|   54|
|           H&M| Black|   46|
| Tokyo Talkies| Black|   45|
| Tokyo Talkies| Green|   44|
|      Roadster| Black|   37|
|     High Star|  Blue|   29|
|           H&M| Beige|   29|
|         Levis|  Blue|   28|
|Clora Creation| Black|   27|
+--------------+------+-----+
only showing top 10 rows



In [None]:
#total unique brands
spark.sql('''select count(Distinct(brand)) as total_brands  from dataset
            ''').show()

+------------+
|total_brands|
+------------+
|         500|
+------------+



In [None]:
#highest number of rating for a brand
spark.sql('''select brand, CAST(sum(ratingCount) as INT) as total_no_ratings  from dataset
              group by brand
              order by sum(ratingCount) desc
            ''').show(10)


+-------------+----------------+
|        brand|total_no_ratings|
+-------------+----------------+
|     Roadster|          121169|
|Tokyo Talkies|           70216|
|        Anouk|           50932|
|      Vishudh|           45932|
|      Varanga|           45617|
|        Libas|           40390|
|    Anubhutee|           37695|
|        Kotty|           32330|
|       Mitera|           31698|
|       Athena|           29757|
+-------------+----------------+
only showing top 10 rows



In [None]:
#highest rated brands based on avg rating score

spark.sql('''select brand, ROUND(AVG(avg_rating), 2) as total_rating  from dataset
              group by brand
              order by AVG(avg_rating) desc
            ''').show(10)


+--------------------+------------+
|               brand|total_rating|
+--------------------+------------+
|              Reebok|        4.83|
|   Allen Solly Tribe|        4.77|
|      Angel & Rocket|        4.73|
|        Juniper Plus|        4.65|
|Kook N Keech Loon...|        4.63|
|          Swishchick|        4.63|
|              Faserz|        4.58|
|                 513|        4.53|
|                Zeyo|         4.5|
|              Amante|        4.46|
+--------------------+------------+
only showing top 10 rows



In [None]:
#highest rated clothing item for each brand

spark.sql('''Select brand, name,
                        avg_rating
                  From(
                      SELECT Row_Number() over(partition by brand order by avg_rating desc)as 
                    row_number, *
                  FROM dataset
                  ) 
                  Where row_number = 1
                  order by avg_rating DESC
            ''').show(50, False)


+-------------------+-----------------------------------------------------------------------------------------------------+-----------+
|brand              |name                                                                                                 |avg_rating |
+-------------------+-----------------------------------------------------------------------------------------------------+-----------+
|Monte Carlo        |Monte Carlo Women Beige Solid Sweatshirt                                                             |5.0        |
|Xpose              |Xpose Women Navy Blue Washed Crop Denim Jacket                                                       |5.0        |
|Chhabra 555        |Chhabra 555 Women Blue & Golden Zari Woven Design Semi-Stitched Banarasi Lehenga Choli               |5.0        |
|Anouk              |Anouk Women Black & White Bandhani Printed Halter Neck Kurta                                         |5.0        |
|High Star          |High Star Women Blue Solid 

In [None]:
#total no of brands with 1 occurance in data set

spark.sql('''Select count(*) as no_of_brands FROM (Select brand, count(brand) FROM dataset
              GROUP BY brand
              HAVING count(brand) = 1) 
            ''').show(10, False)


+------------+
|no_of_brands|
+------------+
|102         |
+------------+



In [None]:
#all the brands with 1 occurance in data set

spark.sql('''Select brand, count(brand) FROM dataset
              GROUP BY brand
              HAVING count(brand) = 1
            ''').show(10, False)


+-----------------+------------+
|brand            |count(brand)|
+-----------------+------------+
|Taneira          |1           |
|Teakwood Leathers|1           |
|Tiara            |1           |
|Sasimo           |1           |
|Aditi Wasan      |1           |
|Pantaloons Junior|1           |
|Baby Moo         |1           |
|Mystere Paris    |1           |
|Kryptic          |1           |
|Panash Trends    |1           |
+-----------------+------------+
only showing top 10 rows



**Preparing dataset for machine learning algorithms**

Feature Selection based on Domain Expert Knowledge

In [None]:
sparkDF.show(2)

+--------------------+------+------+-----+-----------+------------------+--------+----------+
|                name| price|colour|brand|ratingCount|        avg_rating|brand_id|brand_name|
+--------------------+------+------+-----+-----------+------------------+--------+----------+
|Darzi Women White...|1499.0| White|Darzi|        7.0|       3.428571429|     207|     Darzi|
|Darzi Women White...|1199.0| White|Darzi|     13.375|1.3223973524583332|     207|     Darzi|
+--------------------+------+------+-----+-----------+------------------+--------+----------+
only showing top 2 rows



In [None]:
save = sparkDF.toPandas()
save.to_csv('dataset.csv', index=None)

As label (price) which is used for ML model to predict, first and foremost name is removed as it does not contribute to model building its name of the item in string format, string columns brand, brand_name are dropped as well to represent the brand in dataset, brand id is used.

In [None]:
data_DF = sparkDF.drop('name', 'brand', 'brand_name')

In [None]:
data_DF.show(2)

+------+------+-----------+------------------+--------+
| price|colour|ratingCount|        avg_rating|brand_id|
+------+------+-----------+------------------+--------+
|1499.0| White|        7.0|       3.428571429|     207|
|1199.0| White|     13.375|1.3223973524583332|     207|
+------+------+-----------+------------------+--------+
only showing top 2 rows



In [None]:
data_DF.select(countDistinct("colour"))

count(DISTINCT colour)
49


As far as color is considered according to analysis, certain color are more likely to be sold like the colour black, so to use its importance colours are converted into numeric formart through StringIndexer function

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="colour", outputCol="colour_no")
indexed = indexer.fit(data_DF).transform(data_DF)
indexed.show()

+------+--------+-----------+------------------+--------+---------+
| price|  colour|ratingCount|        avg_rating|brand_id|colour_no|
+------+--------+-----------+------------------+--------+---------+
|1499.0|   White|        7.0|       3.428571429|     207|      5.0|
|1199.0|   White|     13.375|1.3223973524583332|     207|      5.0|
|1499.0|   Multi|     13.375|1.3223973524583332|     207|     21.0|
|1499.0|   Black|     13.375|1.3223973524583332|     207|      0.0|
|1499.0|    Pink|     13.375|1.3223973524583332|     207|      2.0|
|1199.0|   Black|      126.0|       3.865079365|     207|      0.0|
|1199.0|   Black|       11.0|       3.818181818|     207|      0.0|
|1999.0|  Yellow|     13.375|1.3223973524583332|     207|     10.0|
|1499.0|  Maroon|     13.375|1.3223973524583332|     207|      8.0|
|1499.0|  Yellow|     13.375|1.3223973524583332|     207|     10.0|
|1199.0|   Black|       94.0|        4.14893617|     207|      0.0|
|1999.0|  Maroon|     13.375|1.3223973524583332|

now colour coloumn can be droped 

In [None]:
dataDF = indexed.drop('colour')

In [None]:
dataDF.show(2)

+------+-----------+------------------+--------+---------+
| price|ratingCount|        avg_rating|brand_id|colour_no|
+------+-----------+------------------+--------+---------+
|1499.0|        7.0|       3.428571429|     207|      5.0|
|1199.0|     13.375|1.3223973524583332|     207|      5.0|
+------+-----------+------------------+--------+---------+
only showing top 2 rows



As dataset now has all the coloumns in numeric format, it can be randomly splitted into train and test set and Machine Learning models can be applied

In [None]:
# here 75% training, and 25% testing data is taken

In [None]:
# Spark MLlib has its own format of dataset to take, that is achieved by putting all the columns other than target under feature label

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=[col for col in dataDF.columns if col!="price"],
    outputCol='features')

new_df = assembler.transform(dataDF)
new_df = new_df.select(['features', 'price'])
new_df.show(3)

+--------------------+------+
|            features| price|
+--------------------+------+
|[7.0,3.428571429,...|1499.0|
|[13.375,1.3223973...|1199.0|
|[13.375,1.3223973...|1499.0|
+--------------------+------+
only showing top 3 rows



In [None]:
(trainingData, testData) = new_df.randomSplit([0.75, 0.25])

In [None]:
# -> Note the code is adopted from official documentation of Spark MLlib link: https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='price', maxIter=30, regParam=0.2, elasticNetParam=0.3)

# Fit the model
lrModel = lr.fit(trainingData)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-0.05797045114121363,-324.87508436591344,0.5881111489973367,21.44811207383677]
Intercept: 3280.9181940929884
numIterations: 7
objectiveHistory: [0.5, 0.4889817465320556, 0.47534218643246046, 0.4707374283582021, 0.470736414606651, 0.47073639303139675, 0.4707363930171113, 0.4707363930169327]
+-------------------+
|          residuals|
+-------------------+
|-447.21071675088933|
| -896.6345368956972|
| -962.5873867257369|
|-1473.2678957062617|
| -994.7160077800986|
|-1516.1641198539355|
|-1516.1641198539355|
|-1559.0603440016089|
|-1880.5084560754458|
|-1101.9565681492822|
| -1644.852792296956|
| -1144.852792296956|
| -1144.852792296956|
|-1687.7490164446294|
|-1187.7490164446294|
| -1923.678249256834|
|-2030.9188096260177|
|   -731.73710403224|
| -497.4695706905295|
| -940.6849403063179|
+-------------------+
only showing top 20 rows

RMSE: 2368.545228
r2: 0.058549


In [None]:
# Make predictions.
predictions = lrModel.transform(testData)

In [None]:

# Select example rows to display.
predictions.select("prediction", "price", "features").show(5)


+------------------+------+--------------------+
|        prediction| price|            features|
+------------------+------+--------------------+
| 3143.852792296956|1199.0|[13.375,1.3223973...|
|3529.9188096260177|1499.0|[13.375,1.3223973...|
| 2029.918933231011|1199.0|[18.0,4.222222222...|
| 2049.321991270623|1199.0|[94.0,4.14893617,...|
| 3652.829573387105|1999.0|[0.26315789473684...|
+------------------+------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 2363.79


In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=[col for col in dataDF.columns if col!="price"],
    outputCol='features')

new_df = assembler.transform(dataDF)

new_df = new_df.withColumnRenamed("price","label")

new_df = new_df.select(['features', 'label'])
new_df.show(3)

+--------------------+------+
|            features| label|
+--------------------+------+
|[7.0,3.428571429,...|1499.0|
|[13.375,1.3223973...|1199.0|
|[13.375,1.3223973...|1499.0|
+--------------------+------+
only showing top 3 rows



In [None]:
#Note code adopted from official spark MLlib documentation

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit



train, test = new_df.randomSplit([0.75, 0.25], seed=45)

lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)

features,label,prediction
"[2.0,4.0,207.0,16.0]",1999.0,2333.384806216632
"[13.375,1.3223973...",1499.0,3002.579831420076
"[13.375,1.3223973...",1199.0,3042.073814546377
"[94.0,4.14893617,...",1199.0,2069.0021228191745
"[126.0,3.86507936...",1199.0,2157.3973620111697
[0.26315789473684...,3299.0,3716.054987761866
"[2.0,5.0,865.0,0.0]",2499.0,2215.561729814542
"[0.6,0.9333333334...",3299.0,3087.4737265284616
"[0.6,0.9333333334...",2899.0,3140.132370696864
"[52.0,4.346153846...",1310.0,2184.223802498497


In [None]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")


In [None]:
evrmse = evaluator.evaluate(model.transform(test))

In [None]:
evrmse

2510.695313167522

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler


featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(new_df)

train, test = new_df.randomSplit([0.75, 0.25], seed=45)


# Train a Decision Tree model.
dt_reg = DecisionTreeRegressor(featuresCol="scaledFeatures")

pipeline = Pipeline(stages=[featureScaler, dt_reg])

# Train model.  This also runs the indexer.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

dtModel = model.stages[1]
print(dtModel)

+-----------------+------+--------------------+
|       prediction| label|            features|
+-----------------+------+--------------------+
|1545.347619047619|1999.0|[2.0,4.0,207.0,16.0]|
|1545.347619047619|1499.0|[13.375,1.3223973...|
|1545.347619047619|1199.0|[13.375,1.3223973...|
|1545.347619047619|1199.0|[94.0,4.14893617,...|
|1545.347619047619|1199.0|[126.0,3.86507936...|
+-----------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2305.3
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_22826a755cbf, depth=5, numNodes=63, numFeatures=4


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler


featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(new_df)

train, test = new_df.randomSplit([0.75, 0.25], seed=45)


# Train a Random Forest model.
rf_reg = RandomForestRegressor(featuresCol="scaledFeatures")

pipeline = Pipeline(stages=[featureScaler, rf_reg])

# Train model.
model = pipeline.fit(train)
predictions = model.transform(test)
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)

+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
| 2459.501850531552|1999.0|[2.0,4.0,207.0,16.0]|
|2079.6394932126204|1499.0|[13.375,1.3223973...|
|  2037.44709960966|1199.0|[13.375,1.3223973...|
|1778.7401521642114|1199.0|[94.0,4.14893617,...|
|1778.7401521642114|1199.0|[126.0,3.86507936...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2284.65
RandomForestRegressionModel: uid=RandomForestRegressor_dfb205815b44, numTrees=20, numFeatures=4


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler


featureScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures").fit(new_df)

train, test = new_df.randomSplit([0.75, 0.25], seed=34)

# Train a FM model.
fm_reg = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)
pipeline = Pipeline(stages=[featureScaler, fm_reg])
# Train model.
model = pipeline.fit(train)
# Make predictions.
predictions = model.transform(test)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print(fmModel)

+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|0.7274839329392907|1199.0|[18.0,4.222222222...|
|0.9734769847088646|2999.0|[0.26315789473684...|
|1.0473519219045138|2999.0|[0.26315789473684...|
|1.0473519219045138|3299.0|[0.26315789473684...|
| 3.928474472534834|2999.0|[0.26315789473684...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 3726.85
FMRegressionModel: uid=FMRegressor_b75cbdf53139, numFeatures=4, factorSize=8, fitLinear=true, fitIntercept=true


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler


featureScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures").fit(new_df)

train, test = new_df.randomSplit([0.75, 0.25], seed=45)

# Train a GBT model.
gbt = GBTRegressor(featuresCol="scaledFeatures", maxIter=20)

pipeline = Pipeline(stages=[featureScaler, gbt])
# Train model.
model = pipeline.fit(train)
# Make predictions.
predictions = model.transform(test)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)

+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|2307.1264301278675|1999.0|[2.0,4.0,207.0,16.0]|
|1689.3521242283853|1499.0|[13.375,1.3223973...|
|1541.9659602751299|1199.0|[13.375,1.3223973...|
| 1475.708210341232|1199.0|[94.0,4.14893617,...|
|1586.5195642363935|1199.0|[126.0,3.86507936...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2215.25
GBTRegressionModel: uid=GBTRegressor_406c8120b676, numTrees=20, numFeatures=4


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer, PCA
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler

train, test = new_df.randomSplit([0.75, 0.25], seed=45)
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
# Train a GBT model.
gbt = GBTRegressor(featuresCol="pcaFeatures", maxIter=20)
pipeline = Pipeline(stages=[pca, gbt])
# Train model.
model = pipeline.fit(train)
# Make predictions.
predictions = model.transform(test)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=[col for col in dataDF.columns if col!="avg_rating"],
    outputCol='features')

test_df = assembler.transform(dataDF)

test_df = test_df.select(['features', 'avg_rating'])
test_df.show(3, False)

In [None]:
from pyspark.ml.feature import Bucketizer

splits = [0,1,2,3,4, 5]


bucketizer = Bucketizer(splits=splits, inputCol="avg_rating", outputCol="label")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(test_df)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))


In [None]:
bucketedData.show(20)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(bucketedData)
labelIndexer

In [None]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(bucketedData)
featureIndexer

In [None]:
(trainingData, testData) = bucketedData.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [None]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

In [None]:
#Use Python to perform this classification task

bucketedData.show(2)

In [None]:
pandasDF = dataDF.toPandas()
pandasDF.head(2)

In [None]:
pandasDF['label'] = pd.cut(x=pandasDF['avg_rating'], bins=[-1, 1, 2, 3, 4, 5],
                     labels=[1, 2, 3, 4,5])

In [None]:
pandasDF.label.value_counts()

In [None]:
pandasDF.isna().sum()

In [None]:
pandasDF.to_csv('dataset.csv', index=None)

In [None]:
data_DF = pandasDF.drop('avg_rating', axis=1)

In [None]:
data_DF.head()

In [None]:
data_DF.iloc[:,-1]

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.decomposition import PCA


features_train, features_test, labels_train, labels_test = train_test_split(data_DF.iloc[:,:-1],\
                        data_DF.iloc[:,-1], stratify = data_DF.iloc[:,-1] , test_size=0.2, random_state=94)

print(len(features_train) , len(features_test))

sss = StratifiedShuffleSplit(n_splits=3, test_size=0.20, random_state=34)
scaler = StandardScaler()
clf_lr = LogisticRegression()
pca = PCA()
pipeline = Pipeline(steps = [ ("log_reg",clf_lr)])
param_grid = {"log_reg__C":[ 0.1, 1, 10], "log_reg__max_iter":[500,1000,5000]}


gridlr = GridSearchCV(pipeline, param_grid, verbose = 0, cv = sss, scoring = 'f1_weighted', n_jobs=4, return_train_score=True)

gridlr.fit(features_train, labels_train)

predictions = gridlr.predict(features_test)

print ("F1 with grid search: {:.2f}".format((gridlr.score(features_test, labels_test)*100)))

print(classification_report(labels_test, predictions))
print( confusion_matrix(labels_test, predictions))

In [None]:
from sklearn.ensemble import ExtraTreesClassifier

In [None]:
ensemble = ExtraTreesClassifier(n_estimators=100, random_state=12)
ensemble.fit(features_train, labels_train)

predictions = ensemble.predict(features_test)


In [None]:
ensemble.score(features_test, labels_test)

In [None]:

print(classification_report(labels_test, predictions))

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0,  max_depth=5, random_state=34)
clf.fit(features_train, labels_train)

predictions = clf.predict(features_test)


In [None]:
clf.score(features_test, labels_test)

In [None]:

print(classification_report(labels_test, predictions))

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

nn = MLPClassifier(random_state=1, learning_rate='adaptive',max_iter=300, verbose=True ).fit(features_train, labels_train)
predictions = nn.predict(features_test)



In [None]:

nn.score(features_test, labels_test)

In [None]:
predictions = nn.predict(features_test)

In [None]:
predictions

In [None]:

print(classification_report(labels_test, predictions))