In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Start a spark session

In [2]:
spark = (SparkSession.builder.master("local[*]")  # you can set how much thread by this: local[4] using 4 thread.
        ).getOrCreate()

## Read Data
Spark could directly read and write from Amazon s3.

In [3]:
json1_path = "s3a://shopback-prod-tw-orca-data-backup/json1/spstore/20210305/spstore.json"
json2_path = "s3a://shopback-prod-tw-orca-data-backup/json2/spstore/20210305/orca_product_spstore.json"
j1_df = spark.read.json(json1_path)
j2_df = spark.read.json(json2_path)

In [4]:
# check data
j1_df.take(1)[0].asDict()

{'cc': 'tw',
 'crawled_ts': 1614909600,
 'image_url': 'http://cf.shopee.tw/file/06e0f00454e8fc9246d26638caeca9a7',
 'merchant': 'spstore',
 'merchant_display_name': '蝦皮商城',
 'merchant_id': 130,
 'orca_category_lv1_id': 10,
 'orca_category_lv2_id': 67,
 'orca_category_lv3_id': 542,
 'price': 190.0,
 'product_id': '8904073469',
 'product_rating': None,
 'product_title': '緩解疼痛 激痛點的肌肉訓練[二手書_良好]5994',
 'raw_brand': '楓葉社文化事業有限公司',
 'seller': 'TAAZE讀冊生活網路書店',
 'seller_rating': 4.97,
 'url': 'https://shopee.tw/universal-link/product/159834670/8904073469'}

In [5]:
# check data
j2_df.take(1)[0].asDict()

{'accessoryScore': 0.0,
 'bookScore': 0.0,
 'brand': None,
 'brandId': None,
 'categoriesAnnotation': None,
 'categoriesRank1': None,
 'categoriesRank1Score': None,
 'categoryFinalConfidenceScore': None,
 'categoryLv1Id': None,
 'categoryLv2Id': None,
 'categoryLv3Id': None,
 'crawledTs': 1614909600,
 'currency': 'TWD',
 'displayGroupTitle': 'easyCover Lens Oak for Tamron 150-600mm f/5-6.3 Di VC USD G2',
 'groupId': 'spstore:3942642592',
 'groupTitle': 'easyCover Lens Oak for Tamron 150-600mm f/5-6.3 Di VC USD G2',
 'imageUrl': 'http://cf.shopee.tw/file/1b8e23c393a9addeda36e58d0946645c',
 'imgproxyHashOriginalSize': '/4DvFwL8_PwNTs1y3hLmsWwPDbRQOnbSrXE5BMxAXkyo/aHR0cDovL2NmLnNo/b3BlZS50dy9maWxl/LzFiOGUyM2MzOTNh/OWFkZGVkYTM2ZTU4/ZDA5NDY2NDVj.jpg',
 'imgproxyHashS180': '/FWV_eYrBg9LLj_7dUKSEw-tPYxCYaUSWJTlc-7ee9V0/fill/180/180/no/1/aHR0cDovL2NmLnNo/b3BlZS50dy9maWxl/LzFiOGUyM2MzOTNh/OWFkZGVkYTM2ZTU4/ZDA5NDY2NDVj.jpg',
 'imgproxyHashS292': '/G7__qgzgKV3Sp1Io6qoReWZmnLNTwQ9EB1uCLbtKEvM/fill

## Write to S3

In [6]:
# Write json format
# If path already exists add _1 or _2 ...
j1_df.write.json("s3a://staging-sg-shopback-orca-ml/blues/spark-demo/test_json1/spstore.json")

In [7]:
# Write parquet format, a compact format.
# If path already exists add _1 or _2 ...
j1_df.write.parquet("s3a://staging-sg-shopback-orca-ml/blues/spark-demo/test_json1/spstore.parquet")
j2_df.write.parquet("s3a://staging-sg-shopback-orca-ml/blues/spark-demo/test_json2/spstore.parquet")

## Demo some usage
I want to know is there a category id in json1 and end up differently in json2.

Use `groupId` in json2 to find original data in json1 with `product_id` column.

In [8]:
# Read data from we just write.
j1_df = spark.read.parquet("s3a://staging-sg-shopback-orca-ml/blues/spark-demo/test_json1/spstore.parquet")
j2_df = spark.read.parquet("s3a://staging-sg-shopback-orca-ml/blues/spark-demo/test_json2/spstore.parquet")

In [9]:
# Filter orca_category_lv3_id is not None.
j1_select = (j1_df
             .filter(j1_df["orca_category_lv3_id"].isNotNull())
             .select("product_id", "orca_category_lv3_id", "product_title")
            )
# Filter categoryLv3Id is not None.
# split product_id from groupId.
j2_select = (j2_df
             .filter(j2_df["categoryLv3Id"].isNotNull())
             .filter(j2_df["isCategoryFromDataFeed"] == True)
             .select(F.split("groupId", ":").getItem(1).alias("_product_id"), "categoryLv3Id", "title")
            )

In [10]:
# Check
print("json1: ", j1_select.take(1))
print("json2: ", j2_select.take(1))

json1:  [Row(product_id='4647005361', orca_category_lv3_id=542, product_title='2020掌握考點【人事行政】初等考試‧地方五等課文版全套[9折]11100905908')]
json2:  [Row(_product_id='3111750407', categoryLv3Id=88, title='Apple Watch 1 2 3 4 5代3D曲面電鍍保護貼 38 40 42 44玻璃 保護貼 滿版 手錶玻璃貼9H')]


In [11]:
# Inner join in product_id column.

join_df = j1_select.join(j2_select, j1_select.product_id == j2_select._product_id, "left")
join_df.take(1)[0].asDict()

{'product_id': '7132713160',
 'orca_category_lv3_id': 542,
 'product_title': '從早到晚與從今以後 Dear 誠品eslite',
 '_product_id': '7132713160',
 'categoryLv3Id': 542,
 'title': '從早到晚與從今以後 Dear 誠品eslite'}

In [13]:
# filter for orca_category_lv3_id != categoryLv3Id
filt_df = (join_df
           .filter(join_df.categoryLv3Id != join_df.orca_category_lv3_id)
          )

filt_df.count()

0

There is no category change from json1 to json2 in `20210305/spstore.json` if count equal to 0.