In [None]:
import findspark
findspark.init()

from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml import Pipeline, Transformer
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import urllib
import re

In [None]:
mongo_ip   = "34.81.218.85"
mongo_port = 27017

username   = "admin"
password   = urllib.parse.quote("BigD@ta2o23", "UTF-8")

database   = "amazon"
collection = "reviews"

mongo_url  = f"mongodb://{username}:{password}@{mongo_ip}:{mongo_port}/{database}.{collection}?authMechanism=SCRAM-SHA-256&authSource=admin"

In [None]:
spark = SparkSession.builder \
                    .appName('Ruten Preprocessing') \
                    .config("spark.mongodb.read.connection.uri" , mongo_url) \
                    .config("spark.mongodb.write.connection.uri", mongo_url) \
                    .config("spark.dynamicAllocation.initialExecutors", "4") \
                    .config("spark.driver.memory", "1g") \
                    .enableHiveSupport() \
                    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
_goodsDF = spark.read \
      .option("multiline", "false") \
      .option("charset", "UTF-8") \
      .option("mode", "PERMISSIVE") \
      .json("gs://dev-april-storage-2023/ruten/goods/*") \
      .withColumn("filename", concat_ws('/', slice( split(input_file_name(), '/'), -2, 2 ) ) )

goodsDF = _goodsDF.select(
    col("category_id"),
    trim(col("category_name")).cast("string").alias("category_name"),
    col("item_id").cast("BigInt").alias("item_id"),
    trim(col("item_name")).cast("string").alias("item_name"),
    col("price").cast("Double").alias("price"),
    col("seller_id").cast("BigInt").alias("seller_id"),
    trim(col("seller_nickname")).cast("string").alias("seller_nickname"),
    "filename"
).repartition(4).cache()

goodsDF.show(5)

+--------------+-------------+--------------+---------------------------------+------+---------+---------------+------------------+
|   category_id|category_name|       item_id|                        item_name| price|seller_id|seller_nickname|          filename|
+--------------+-------------+--------------+---------------------------------+------+---------+---------------+------------------+
| 6001000030001|     國家考試|21801545177911|文瑄書坊 一般警察考試  國文(作...| 190.0|  5174080|      we_shine1|goods/part-r-00165|
|     500140001|       遊戲王|21209032953703|     萬隆達*DTC2-JP041 自然向日葵|  10.0|  5701684|     comic-king|goods/part-r-00189|
|22000600020002|     策略模擬|22030933077668|【任兩件免運】【中古】PS4 三國...| 750.0|  3903840|      winki0313|goods/part-r-00109|
| 5000100040008|         其他|21917186407725|        【可樂心】Iron Studios...|5280.0|  8017048|    smalltigher|goods/part-r-00111|
|23001400090009|       奶泡器|21511645598108|  【米拉羅咖啡】庫存出清 Welea...| 150.0|  1592920|  milano-coffee|goods/part-r-00152|
+---------

                                                                                

In [None]:
goodsDF.write.format("parquet").mode("overwrite").saveAsTable("ruten.goods")

spark.read.table('ruten.goods') \
    .write \
    .format("mongodb") \
    .option("database", "ruten") \
    .option("collection", "goods") \
    .mode("overwrite") \
    .save()

23/05/11 15:45:49 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                

In [None]:
print("Goods")
print(f"Raw    Count: { goodsDF.count() }")
print(f"Target Count: { spark.read.table('ruten.goods').count() }")
print(f'Mongo  Count: { spark.read.format("mongodb").option("database", "ruten").option("collection", "goods").load().count() } ')

Goods
Raw    Count: 5332256


                                                                                

Target Count: 5332256


Mongo  Count: 5332256 


                                                                                

In [None]:
_orderDF = spark.read \
      .option("multiline", "false") \
      .option("charset", "UTF-8") \
      .option("mode", "PERMISSIVE") \
      .json("gs://dev-april-storage-2023/ruten/order/*") \
      .withColumn("filename", concat_ws('/', slice( split(input_file_name(), '/'), -2, 2 ) ) )

orderDF = _orderDF.select(
    col("category_id"),
    trim(col("category_name")).alias("category_name"),
    col("item_id").cast("BigInt").alias("item_id"),
    trim(col("item_name")).alias("item_name"),
    col("order_id").cast("BigInt").alias("order_id"),
    col("order_no").cast("BigInt").alias("order_no"),
    col("order_qty").cast("Int").alias("order_qty"),
    col("price").cast("Double").alias("price"),
    "filename"
).repartition(4).cache()

orderDF.show(5)

+--------------+-----------------+--------------+-----------------------------------+--------------+--------------+---------+------+------------------+
|   category_id|    category_name|       item_id|                          item_name|      order_id|      order_no|order_qty| price|          filename|
+--------------+-----------------+--------------+-----------------------------------+--------------+--------------+---------+------+------------------+
|     800070001|       監控攝影機|21816914677361|          可取 8路 4K H.265 監視...|52144511386662|21110369270951|        1|4800.0|order/part-r-00176|
|19000400010014|   車身標誌、貼紙|21736065841272|         ~歐力車飾~本田HONDA 17-...|52135481586547|21090351804138|        1| 299.0|order/part-r-00176|
| 6002200080009|會聲會影/威力導演|21652960556850|《度度鳥》會聲會影X9創新影音剪輯...|52112394133004|21032498905577|       12| 332.0|order/part-r-00198|
|        250009|             其他|22014410042998|         【24H 出貨】[買10送1]KY...|52118415848805|21050412268015|        4|  41.0|order/part

                                                                                

In [None]:
orderDF.write.format("parquet").mode("overwrite").saveAsTable("ruten.orders")

spark.read.table('ruten.orders') \
    .write \
    .format("mongodb") \
    .option("database", "ruten") \
    .option("collection", "orders") \
    .mode("overwrite") \
    .save()

In [None]:
print("Order")
print(f"Raw    Count: { orderDF.count() }")
print(f"Target Count: { spark.read.table('ruten.orders').count() }")
print(f'Mongo  Count: { spark.read.format("mongodb").option("database", "ruten").option("collection", "orders").load().count() } ')

Order


Raw    Count: 4047398


Target Count: 4047398


Mongo  Count: 4047398 


In [None]:
_goods_vs_keywordsDF = spark.read \
      .option("multiline", "false") \
      .option("charset", "UTF-8") \
      .option("mode", "PERMISSIVE") \
      .json("gs://dev-april-storage-2023/ruten/goods_vs_keywords/*.json") \
      .withColumn("filename", concat_ws('/', slice( split(input_file_name(), '/'), -2, 2 ) ) )

goods_vs_keywordsDF = _goods_vs_keywordsDF.select(
    col("GNO").cast("BigInt").alias("GNO"),
    trim(col("word")).alias("word"),
    "filename"
).repartition(2).cache()

goods_vs_keywordsDF.show(5)

In [None]:
goods_vs_keywordsDF.write.format("parquet").mode("overwrite").saveAsTable("ruten.goods_vs_keywords")

spark.read.table('ruten.goods_vs_keywords') \
    .write \
    .format("mongodb") \
    .option("database", "ruten") \
    .option("collection", "goods_vs_keywords") \
    .mode("overwrite") \
    .save()

                                                                                

In [None]:
print("Goods Vs Keywords")
print(f"Raw    Count: { goods_vs_keywordsDF.count() }")
print(f"Target Count: { spark.read.table('ruten.goods_vs_keywords').count() }")
print(f'Mongo  Count: { spark.read.format("mongodb").option("database", "ruten").option("collection", "goods_vs_keywords").load().count() } ')

Goods Vs Keywords
Raw    Count: 424858
Target Count: 424858


Mongo  Count: 424858 


In [None]:
clusterDF = spark.read \
      .option("charset", "UTF-8") \
      .option("delimiter", "\t") \
      .csv("gs://dev-april-storage-2023/ruten/cluster/*") \
      .withColumn("filename", concat_ws('/', slice( split(input_file_name(), '/'), -2, 2 ) ) ).repartition(4).cache()

clusterDF.show(5)

+--------------+--------+--------------+-------------------------------------+--------------------+
|           _c0|     _c1|           _c2|                                  _c3|            filename|
+--------------+--------+--------------+-------------------------------------+--------------------+
|22151571806581|18458358|22151530075203|         現貨 FuRyu 北斗神拳 拉歐 ...|cluster/part-r-00003|
|22149409714935|15918852|22124119490173|   破洞牛仔褲 牛仔長褲 牛仔褲 男生...|cluster/part-r-00185|
|22142668387751|13719911|22102651491474| 安全鞋 冬季勞保鞋男高幫棉鞋加絨保...|cluster/part-r-00071|
|22141556623131|17110965|22141542378289|                 &#x070ed;&#x05356...|cluster/part-r-00071|
|22135821575821|18008307|22135821575821|家庭榨油機家用商用全自動大中小型冷...|cluster/part-r-00003|
+--------------+--------+--------------+-------------------------------------+--------------------+
only showing top 5 rows



                                                                                

In [None]:
clusterDF.write.format("parquet").mode("overwrite").saveAsTable("ruten.cluster")

spark.read.table('ruten.cluster') \
    .write \
    .format("mongodb") \
    .option("database", "ruten") \
    .option("collection", "cluster") \
    .mode("overwrite") \
    .save()

                                                                                

In [None]:
print("Cluster")
print(f"Raw    Count: { clusterDF.count() }")
print(f"Target Count: { spark.read.table('ruten.cluster').count() }")
print(f'Mongo Count : { spark.read.format("mongodb").option("database", "ruten").option("collection", "cluster").load().count() } ')

Cluster
Raw    Count: 1190688


Target Count: 1190688


Mongo Count : 1190688 
