# Use spark

In [1]:
import os, re, time
import glob, json

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from IPython.display import display, clear_output
import pandas as pd

spark = SparkSession.builder \
    .appName("TRANSFORM_DATA")\
    .master("local[*]")\
    .config("spark.local.dir", "/home/khoaint/logsSpark") \
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .getOrCreate()

Setting default log level to "

In [2]:
st = time.time()

json_files = glob.glob('./**/*.json', recursive = True)
# len(json_files)

In [3]:
df = spark.read.json(json_files)
# df.show()

In [4]:
df = df.withColumn('product', F.explode((df.data.sections)[0].data.item)).select('product')
# df.count()

In [5]:
# df.printSchema()

In [6]:
def getUrl(name, shopid, itemid):
    name = re.sub('[\[\]\/\\\%#\?\" \|]', '-', name)
    name = re.sub('-+', '-', name)
    name = 'https://shopee.vn/'+name + f"-i.{shopid}.{itemid}"
    return name
    
udf_url = F.udf(getUrl, StringType())

In [7]:
# # product_name: product['name']
# # product_url: product['name']-i.product['shopid'].product['itemid']
# # product_rating: product['item_rating']['rating_star']
# # product_price: product['price']
# # product_revenue: product['price']*product['historical_sold']

df = df.withColumn('product_name', df.product['name'])
df = df.withColumn('shopid', df.product['shopid'])
df = df.withColumn('itemid', df.product['itemid'])
df = df.withColumn('product_rating', df.product['item_rating']['rating_star'])
df = df.withColumn('product_price', df.product['price'])
df = df.withColumn('product_hs', df.product['historical_sold'])
df = df.withColumn('product_url', udf_url(F.col('product_name'), F.col('shopid'), F.col('itemid')))
df = df.withColumn('product_revenue', F.col('product_price')*F.col('product_hs'))

In [8]:
df = df.select('product_name', 'product_rating', 'product_price', 'product_url', 'product_revenue')

In [9]:
# df.count()

In [10]:
# df.select('product_name', 'product_url').show(50, False)

In [11]:
en = time.time()

In [12]:
print(en-st)

12.03064227104187


In [13]:
df.repartition(1).write.csv("shopee")

In [14]:
data = df.toPandas()
data = data.applymap(lambda x: x.encode('unicode_escape').
                 decode('utf-8') if isinstance(x, str) else x)
data.to_excel("shopee.xlsx")  

In [15]:
# df.filter(F.col('product_name').contains('Hàng Cao Cấp-Nón Bảo Hiểm 1/2'))\
# .select('product_name', 'product_url').show(200, False)

+--------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_name                                                                                                  |product_url                                                                                                                                           |
+--------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|Mũ Bảo Hiểm Nửa Đầu 1/2 Nhiều Tem Kèm Kính Phi Công Siêu HOT - Hàng Cao Cấp-Nón Bảo Hiểm 1/2 Có Kính Tuỳ Chọn|https://shopee.vn/Mũ-Bảo-Hiểm-Nửa-Đầu-1-2-Nhiều-Tem-Kèm-Kính-Phi-Công-Siêu-HOT-Hàng-Cao-Cấp-Nón

In [16]:
df.filter(F.col('product_revenue').isNull()).count()

0

In [17]:
df.count()

148960