In [1]:
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.appName("calculateSaleRate").getOrCreate()

In [6]:
global Path
if sc.master[0:5] == "local":
    Path = "file:/home/jovyan/work/csvData/CSV/"
else:
    Path = "hdfs:/user/zeppelin/csvData/CSV/"

In [154]:
from operator import add
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType
import time
import math
import datetime
from pyspark.sql.functions import monotonically_increasing_id, array_contains
import re

In [100]:
productSchema = StructType([
    StructField("item_id", IntegerType(), True),
    StructField("shop_id", IntegerType(), True),
    StructField("shop_name", StringType(), True),
    StructField("category_name", StringType(), True),
    StructField("name", StringType(), True),
    StructField("hashtag", StringType(), True),
    StructField("trade_in", FloatType(), True),
    StructField("price", IntegerType(), True),
    StructField("sale", IntegerType(), True),
    StructField("score_avg", FloatType(), True),
    StructField("score_num", IntegerType(), True),
    StructField("stock", IntegerType(), True),
    StructField("need_day", IntegerType(), True),
    StructField("url", StringType(), True),
    StructField("content", StringType(), True),
    StructField("update_time", DateType(), True)])

#productDf = spark.read.csv(Path+"2018-04-01 shopee_product.csv",header=False,schema=productSchema,quote='')

In [110]:
def makeRDD(line):
    line = line.split(",")
    
    if line[0] != "item_id":
        for i in range(len(line)):
            if line[i]=="":
                line[i]=0
                
        return [int(line[0]), int(line[1]), line[2], line[3], line[4], line[5], float(line[6]), int(line[7]), int(line[8]), float(line[9]), int(line[10]), int(line[11]), int(line[12]), line[13], line[14]]
    
    else:
        return line

In [147]:
productRDD = sc.parallelize([])

In [148]:
for i in range(1,12):
    if i <10:
        date = "2018-04-0"+str(i)
        filename = Path + date +" shopee_product.csv"
    elif 10 <= i and i < 31:
        date = "2018-04-"+str(i)
        filename = Path + date +" shopee_product.csv"
    date = datetime.datetime.strptime(date, '%Y-%m-%d')
    readRDD = sc.textFile(filename).map(makeRDD).map(lambda line: tuple(line)+(date,))
    productRDD = productRDD.union(readRDD)

In [149]:
productRDD = productRDD.filter(lambda line: line[0]!="item_id")
productRDD.take(3)

[(290925436,
  2022130,
  'yiting1211',
  'iPhone保護殼',
  '可超取iphone6__iphone6S_47吋_迪士尼_米奇放大鏡_軟殼_手機殼',
  0,
  9.0,
  100,
  0,
  0.0,
  0,
  0,
  0,
  'https://shopee.tw/%E3%80%90%E5%8F%AF%E8%B6%85%E5%8F%96%E3%80%91iphone6-iphone6S-(4.7%E5%90%8B)-%E8%BF%AA%E5%A3%AB%E5%B0%BC-%E7%B1%B3%E5%A5%87%E6%94%BE%E5%A4%A7%E9%8F%A1-%E8%BB%9F%E6%AE%BC-%E6%89%8B%E6%A9%9F%E6%AE%BC-i.2022130.290925436',
  '可超取iphone6__iphone6S_47吋_迪士尼_米奇放大鏡_軟殼_手機殼售價150元',
  datetime.datetime(2018, 4, 1, 0, 0)),
 (1008696376,
  59323996,
  'kaixina',
  'Android保護殼',
  '潮流女神欧美爆款日韩名媛潮英伦学院风摇滚庞克仿旧百搭厚底坡跟涼鞋夏2017韓版新款羅馬鞋超高跟防水臺鏤空甜美露趾女鞋',
  '#女鞋#歐美#羅馬風格#簡約#小白鞋#英倫#韓版#休閒鞋#粗跟#綁帶#高跟鞋#復古#運動鞋#洞洞鞋#平底鞋#帆布鞋#拖鞋#涼鞋',
  4.3,
  800,
  0,
  0.0,
  0,
  120,
  -1,
  'https://shopee.tw/%E6%BD%AE%E6%B5%81%E5%A5%B3%E7%A5%9E%E6%AC%A7%E7%BE%8E%E7%88%86%E6%AC%BE%E6%97%A5%E9%9F%A9%E5%90%8D%E5%AA%9B%E6%BD%AE%E8%8B%B1%E4%BC%A6%E5%AD%A6%E9%99%A2%E9%A3%8E%E6%91%87%E6%BB%9A%E5%BA%9E%E5%85%8B%E4%BB%BF%E6%97%A7%E7%99%BE%E6%90%AD%E5%8E%9A%E5%BA%95%E5%9D%A1%E8%

In [150]:
productDF = spark.createDataFrame(productRDD, productSchema)
productDF.show(3)

+----------+--------+----------+-------------+--------------------+--------------------+--------+-----+----+---------+---------+-----+--------+--------------------+--------------------+-----------+
|   item_id| shop_id| shop_name|category_name|                name|             hashtag|trade_in|price|sale|score_avg|score_num|stock|need_day|                 url|             content|update_time|
+----------+--------+----------+-------------+--------------------+--------------------+--------+-----+----+---------+---------+-----+--------+--------------------+--------------------+-----------+
| 290925436| 2022130|yiting1211|    iPhone保護殼|可超取iphone6__iphon...|                   0|     9.0|  100|   0|      0.0|        0|    0|       0|https://shopee.tw...|可超取iphone6__iphon...| 2018-04-01|
|1008696376|59323996|   kaixina|   Android保護殼|潮流女神欧美爆款日韩名媛潮英伦学院...|#女鞋#歐美#羅馬風格#簡約#小白...|     4.3|  800|   0|      0.0|        0|  120|      -1|https://shopee.tw...|                   _| 2018-04-01|
| 24493544

In [229]:
productDF.filter("stock = 0").rdd.take(5)

[Row(item_id=290925436, shop_id=2022130, shop_name='yiting1211', category_name='iPhone保護殼', name='可超取iphone6__iphone6S_47吋_迪士尼_米奇放大鏡_軟殼_手機殼', hashtag='0', trade_in=9.0, price=100, sale=0, score_avg=0.0, score_num=0, stock=0, need_day=0, url='https://shopee.tw/%E3%80%90%E5%8F%AF%E8%B6%85%E5%8F%96%E3%80%91iphone6-iphone6S-(4.7%E5%90%8B)-%E8%BF%AA%E5%A3%AB%E5%B0%BC-%E7%B1%B3%E5%A5%87%E6%94%BE%E5%A4%A7%E9%8F%A1-%E8%BB%9F%E6%AE%BC-%E6%89%8B%E6%A9%9F%E6%AE%BC-i.2022130.290925436', content='可超取iphone6__iphone6S_47吋_迪士尼_米奇放大鏡_軟殼_手機殼售價150元', update_time=datetime.date(2018, 4, 1)),
 Row(item_id=185992271, shop_id=15709005, shop_name='lovephonefashion', category_name='Apple空機', name='愛瘋時尚iphone5_16g_銀色', hashtag='#iphone#iphone5', trade_in=10.0, price=4000, sale=0, score_avg=0.0, score_num=0, stock=0, need_day=-1, url='https://shopee.tw/%E3%80%90%E6%84%9B%E7%98%8B%E6%99%82%E5%B0%9A%E3%80%91iphone5-16g-%E9%8A%80%E8%89%B2-i.15709005.185992271', content='_', update_time=datetime.date(2018, 4, 1)),
 

In [151]:
productDF.count()

169992

In [152]:
dataRDD = productDF.select("item_id", "category_name", "name", "price", "sale", "score_avg", "score_num", "stock", "update_time")

In [153]:
dataRDD.take(5)

[Row(item_id=290925436, category_name='iPhone保護殼', name='可超取iphone6__iphone6S_47吋_迪士尼_米奇放大鏡_軟殼_手機殼', price=100, sale=0, score_avg=0.0, score_num=0, stock=0, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=1008696376, category_name='Android保護殼', name='潮流女神欧美爆款日韩名媛潮英伦学院风摇滚庞克仿旧百搭厚底坡跟涼鞋夏2017韓版新款羅馬鞋超高跟防水臺鏤空甜美露趾女鞋', price=800, sale=0, score_avg=0.0, score_num=0, stock=120, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=244935440, category_name='iPhone充電傳輸', name='微型商店Jetart_Lightning_USB_強化傳輸線15m_IPHONE_充電線_CAA220', price=499, sale=0, score_avg=0.0, score_num=0, stock=99, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=78503770, category_name='平板電腦', name='愛買舖全新_平板_SuperPad_A1967_97吋_3G通話上網_8核架構_16G_老人機__通話平板', price=3990, sale=0, score_avg=0.0, score_num=0, stock=5, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=627940626, category_name='Android保護殼', name='全民3C_日本東映授權正版_航海王_ZenFone_4_ZE554KL_彩繪磁力皮套封鎖羅_手機皮套_保護套', price=399, sale=0, score_avg=0.0, score_num=0, stock=20, 

# 查看資料筆數分佈情況

In [170]:
testCountRDD = productDF.select("item_id").rdd.map(lambda x: (x,1)).reduceByKey(add)
testCountRDD.count()

70408

In [177]:
testCountRDD = testCountRDD.map(lambda x:(x[1], 1)).reduceByKey(add)
testCountRDD.count()

9

In [178]:
testCountRDD.collect()

[(1, 25327),
 (2, 18553),
 (3, 11523),
 (4, 3751),
 (5, 10199),
 (6, 547),
 (7, 370),
 (8, 123),
 (9, 15)]

In [179]:
productDF.select("item_id").distinct().count()

70408

# 建立測試用資料

### 依據item_id 將資料轉換形式

In [188]:
testDataRDD = dataRDD.rdd.map(lambda x: (x[0], list(x[1:]))).groupByKey().map(lambda x: (x[0], list(x[1])))
testDataRDD.take(3)

[(469012152,
  [['Android空機',
    'JH_手機快修_HTC_820_16G_全功能正常_保固3個月',
    2500,
    0,
    5.0,
    1,
    0,
    datetime.date(2018, 4, 1)]]),
 (545299512,
  [['手機周邊配件',
    '雅客方舟Nike_證件帶_手機掛環_現貨',
    229,
    0,
    0.0,
    0,
    150,
    datetime.date(2018, 4, 1)]]),
 (862832544, [['其他', '手機掛繩_白色', 15, 0, 0.0, 0, 1, datetime.date(2018, 4, 1)]])]

### 取得資料筆數>5的資料群

In [192]:
count5RDD = testDataRDD.filter(lambda x: len(x[1])>=5)
count5RDD.count()

11254

### 依據評價數計算銷售頻率

In [290]:
def countSaleRateByScore(line):
    # (id, list)
    score_count = line[1][-1][5] - line[1][0][5]
    date = (line[1][-1][-1] - line[1][0][-1]).days
    
    saleRate = score_count / date * 1.0
    
    return (line[0], score_count, date, saleRate)

In [291]:
count5SaleRateRDD = count5RDD.map(countSaleRateByScore)
count5SaleRateRDD.count()

11254

In [292]:
count5SaleRateRDD.take(3)

[(201666072, 0, 9, 0.0), (1001361696, 0, 10, 0.0), (44400816, 0, 9, 0.0)]

In [293]:
count5SaleRateRDD.filter(lambda x: x[-1]>0).count()

214

### 依據庫存計算銷售頻率

In [325]:
def countSaleRateByStock(line):
    #(id, list)
    baseStock = line[1][0][6]
    stockCount = 0
    date = 0
    stockDelta = 0
    category = line[1][0][0]
    name = line[1][0][1]
    saleDelta = 0
    
    for i in range(len(line[1])-1):
        saleDelta = abs(line[1][i+1][3] - line[1][i][3]) #月銷量變化
        if line[1][i][6] >= line[1][i+1][6]:
            stockDelta = (line[1][i][6] - line[1][i+1][6])
            baseStock = line[1][i+1][6]
            
            if stockDelta >= saleDelta: #庫存變化 > 月銷量變化 => 下調庫存量
                stockDelta=0
            
            stockCount += stockDelta
            date += (line[1][i+1][-1] - line[1][i][-1]).days
        
        elif line[1][i][6] < line[1][i+1][6]:
            baseStock = line[1][i+1][6]
            filterStock = baseStock
            
        elif baseStock == 0 :
            continue
        
    
    #date = (line[1][-1][-1] - line[1][0][-1]).days
    if date > 0 :
        saleRate = stockDelta / date * 1.0
    else:
        saleRate = 0.0
    
    return (line[0], category, name, stockCount, date, saleRate)

In [284]:
count5StockSaleRateRDD = count5RDD.map(countSaleRateByStock)
count5StockSaleRateRDD.count()

11254

In [285]:
count5StockSaleRateRDD.take(3)

[(201666072, 0, 9, 0.0), (1001361696, 0, 10, 0.0), (44400816, 0, 9, 0.0)]

In [286]:
count5StockSaleRateRDD.filter(lambda x: x[-1]>0).take(10)

[(8459184, 1, 4, 0.25),
 (8536896, 25, 4, 1.5),
 (896398800, 1, 4, 0.25),
 (251176728, 1, 3, 0.3333333333333333),
 (60200712, 1, 4, 0.25),
 (14111545, 8, 7, 1.0),
 (19331497, 5, 4, 0.5),
 (114088945, 4, 5, 0.2),
 (42291049, 1, 4, 0.25),
 (465216985, 2, 3, 0.3333333333333333)]

In [287]:
count5StockSaleRateRDD.takeOrdered(10, key=lambda x: -x[-1])

[(4959332, 93, 4, 7.0),
 (525232698, 26, 4, 2.75),
 (68359654, 20, 4, 2.75),
 (191969759, 29, 4, 2.75),
 (457334233, 15, 4, 2.5),
 (5595311, 41, 4, 2.5),
 (192279461, 12, 4, 2.25),
 (342120467, 19, 8, 2.125),
 (19648171, 20, 4, 1.75),
 (8536896, 25, 4, 1.5)]

In [288]:
productDF.select("item_id", "sale", "score_num", "stock", "update_time").filter('item_id = 4959332').rdd.collect()

[Row(item_id=4959332, sale=856, score_num=4017, stock=4378, update_time=datetime.date(2018, 4, 6)),
 Row(item_id=4959332, sale=827, score_num=4032, stock=4359, update_time=datetime.date(2018, 4, 7)),
 Row(item_id=4959332, sale=802, score_num=4051, stock=4341, update_time=datetime.date(2018, 4, 8)),
 Row(item_id=4959332, sale=781, score_num=4059, stock=4313, update_time=datetime.date(2018, 4, 9)),
 Row(item_id=4959332, sale=782, score_num=4081, stock=4285, update_time=datetime.date(2018, 4, 10))]

In [254]:
productDF.select("item_id", "sale", "score_num", "stock", "update_time").filter('sale >= 10').rdd.collect()

[Row(item_id=864699726, sale=17, score_num=14, stock=10, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=8388865, sale=25, score_num=70, stock=5948, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=557538607, sale=29, score_num=167, stock=1290, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=65025241, sale=20, score_num=75, stock=9908, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=180192578, sale=20, score_num=205, stock=1577, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=8012139, sale=12, score_num=35, stock=44, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=505033631, sale=16, score_num=37, stock=1317, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=380970962, sale=42, score_num=90, stock=824, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=276866438, sale=134, score_num=58, stock=3547, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=177921273, sale=48, score_num=170, stock=183, update_time=datetime.date(2018, 4, 1)),
 Row(item_id=8207

### 實際應用

In [326]:
stockSaleRateRDD = testDataRDD.map(countSaleRateByStock)
stockSaleRateRDD.count()

70408

In [327]:
stockSaleRateRDD.takeOrdered(10, key=lambda x: -x[-1])

[(945906386,
  '手機周邊配件',
  '新款四代吃雞神器_附發票最新款卡扣式_快捷射擊按鍵神器_手機搖桿_荒野行動_絕地求生_射擊按鍵AB920',
  16,
  1,
  16.0),
 (85844137,
  '手機周邊配件',
  '現貨新款升級_線控_藍芽_美顏_補光燈_A6_自拍神器自拍棒_自拍桿_補光_手機_自拍_CYKE_ADYSS_藍牙',
  11,
  1,
  11.0),
 (175192480,
  '手機周邊配件',
  '現貨_多款_卡通造型_掛繩_可拆式_手機掛繩_迪士尼_小新_史迪奇_無臉男_宮崎駿_米妮_山茶花',
  5,
  1,
  5.0),
 (832737770, '手機周邊配件', '現貨預購_KAKAO_FRIENDS_新款可伸縮氣囊多功能手機支架_通用款_', 3, 1, 3.0),
 (888637403,
  '耳機喇叭',
  'MToy總代理台灣公司貨_防掉防汗水_IPX67運動耳機_無線耳機_重低音_磁吸_藍牙耳機_藍芽耳機',
  11,
  2,
  3.0),
 (524474627,
  'iPhone保護殼',
  'iPhone轉聲_空壓殼_四角防摔_前置轉聲__iPhone7_iPhone8_Plus_i7_i8_iX',
  6,
  2,
  3.0),
 (474042478,
  'Android保護殼',
  'BlACK英文_磨砂手機殼S8S8plusJ7A7J5_2016版A5红米NOTE4红米NOTE3小米6',
  3,
  1,
  3.0),
 (457334233, 'iPhone充電傳輸', '大量現貨蘋果iphone線套保護套_創意線套_數據線保護器_i線套', 10, 4, 2.5),
 (956128551,
  '耳機喇叭',
  '現貨供貨QCY_T1_PRO_觸碰藍芽耳機_磁吸藍牙耳機_運動藍芽耳機_音樂藍牙耳機_藍芽耳機_Q29_2代',
  5,
  2,
  2.5),
 (192078474,
  'Android保護貼',
  '滿版全膠_HTC_U_UltraU11U11Eyes全版鋼化玻璃保護貼防碎手機螢幕防爆玻璃貼',
  5,
  2,
  2.5)]

# 做成pandas Dataframe

In [333]:
stockSaleRateDF = spark.createDataFrame(stockSaleRateRDD.takeOrdered(stockSaleRateRDD.count(), key=lambda x: -x[-1]), ["item_id", "category_name", "name", "庫存變化", "日期差", "銷售頻率"])
stockSaleRateDF.count()

70408

In [337]:
stockSaleRatePDDF = stockSaleRateDF.toPandas()
stockSaleRatePDDF.index.name="id"
today = datetime.date.today()

stockSaleRatePDDF.to_csv("stockSaleRate_Output/"+str(today)+"_SaleRateChart.csv", encoding='utf-8')

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 58840)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pyspark/serializers.py", line 577, in read_int
    raise EOFError
EOFError
----------------------------------------
