In [93]:
from elasticsearch import Elasticsearch, helpers
import csv
from tqdm import tqdm
import pandas as pd
import gensim

In [2]:
from pyspark import SparkContext, SparkConf
SparkContext.setSystemProperty('spark.executor.memory', '3g')
conf = SparkConf().setAppName("Upload to Elasticsearch").setMaster("spark://25.15.27.228:7077")
sc = SparkContext.getOrCreate(conf=conf)

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Upload to Elasticsearch').getOrCreate()

In [97]:
class ProcssProduct:
    def __init__(self,sc,spark,type = 'Product'):
        self.sc = sc
        self.type = type
        self.spark = spark
        self.URI           = self.sc._gateway.jvm.java.net.URI
        self.Path          = self.sc._gateway.jvm.org.apache.hadoop.fs.Path
        self.FileSystem    = self.sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        self.Configuration = self.sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        self.colsProduct = ['historical_sold','raw_discount', 'attributes','shop_location','show_free_shipping', 'name','cmt_count', 'price', 'item_rating','show_shopee_verified_label','liked_count']
        self.es = Elasticsearch(host = "25.15.27.228", port = 9200,timeout=30,max_retries=10, retry_on_timeout=True)
        self.iter = 1
        self.source = pd.read_csv('../CrawlData/Data/categoryList.csv')
        self.source = self.source[['catid','display_name']]

        self.getList()
    def getList(self):
        fs = self.FileSystem.get(self.URI("hdfs://cris:9000"), self.Configuration())
        status = fs.listStatus(self.Path(f'/shopee/{self.type}'))
        pathProducts = []
        for fileStatus in status:
            pathProducts.append(str(fileStatus.getPath()))
        self.path = pathProducts[1:]

    def processProduct(self,data):
        data['price'] = data['price'].astype(float)/100000
        data['rating_star'] = data.item_rating.apply(lambda x: eval(x)['rating_star'])
        data.drop(columns=['attributes','item_rating'],inplace=True)
        return data
    def uploadProduct(self):
        for url in self.path:
            parDF2 = self.spark.read.parquet(url)

            if self.type == 'Product':
                tmp = parDF2[self.colsProduct].toPandas()
                data = self.processProduct(tmp)
                cols = ['historical_sold','raw_discount','cmt_count','price','liked_count','rating_star']
                data[cols] = data[cols].astype(float)
                try:
                    self.uploadHdfs(data)
                except Exception as e:
                    print(f'Error when upload to HDFS {e}')
            elif self.type == 'Voucher':
                data = self.processShiping(parDF2)
                data = [data]
                tmp = pd.DataFrame(data)
                dictRename = dict()
                for colName in list(tmp.columns):
                    lines = gensim.utils.simple_preprocess(colName)
                    lines = ' '.join(lines)
                    alisName = lines.split()
                    alisName = "_".join(alisName)
                    dictRename[colName] = alisName
                tmp = tmp.rename(columns= dictRename)
                try:
                    self.uploadHdfs(tmp)
                except Exception as e:
                    print(f'Error when upload to HDFS {e}')
            
            data = self.spark.createDataFrame(data)
            data = data.toJSON().collect()

            self.uploadES(data)

    def processShiping(self,data):
        data = data.toPandas()
        data['new'] = data.in_country.apply(lambda x: eval(x))
        data.topic = data.topic.astype(int)
        data = pd.merge(data,self.source,left_on='topic',right_on='catid')
        transport = dict()
        data = data.fillna(0)
        for x in range(data.shape[0]):
            tmp = data.iloc[x]['new']
            if tmp!=0:
                for item in tmp:
                    if item['name'] not in transport.keys():
                        transport[item['name']] = 1
                    else:
                        transport[item['name']] += 1
        transport['topic'] = data.iloc[0]['display_name']
        return transport

    def uploadHdfs(self,data):
        data = spark.createDataFrame(data.astype(str))
        data.coalesce(1).write.mode('append').parquet(f'hdfs://cris:9000/ProcessShopee/{self.type}/')
        print('Done upload HDFS')
            
    def uploadES(self,data):
        index = self.type.lower()
        for item in tqdm(range(len(data))) :
            try:
                self.es.index(index=index, doc_type='data', id=self.iter, body=data[item])
                self.iter += 1
            except:
                print('Error when upload to ES')
    
    

In [98]:
product = ProcssProduct(sc,spark,'Product')

In [99]:
product.uploadProduct()

In [100]:
# product = ProcssProduct(sc,spark,'Voucher')

In [102]:
# data = product.uploadProduct()

In [6]:
# cols = ['description']

In [8]:
# data = parDF2[cols].toPandas()