# 前置作業

In [1]:
import pyspark
from pyspark.sql import SparkSession
from operator import add
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.appName("csvTest").getOrCreate()

In [2]:
global Path
if sc.master[0:5] == "local":
    Path = "file:/home/jovyan/work/csvData/"
else:
    Path = "hdfs:/user/zeppelin/csvData/"

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType
import time
import math
import datetime
from pyspark.sql.functions import monotonically_increasing_id 

# 建立product DataFrame

In [4]:
productSchema = StructType([
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("name", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("sale", IntegerType(), True),
    StructField("score", FloatType(), True),
    StructField("url", StringType(), True),
    StructField("imgurl", StringType(), True),
    StructField("update_time", DateType(), True)])

productDf = spark.read.csv(Path+"3c_product.csv",header=False,schema=productSchema)

In [5]:
productDf.select("product_id", "category", "name", "price", "sale", "score", "update_time").show(5)

+----------+--------+--------------------+-----+----+-----+-----------+
|product_id|category|                name|price|sale|score|update_time|
+----------+--------+--------------------+-----+----+-----+-----------+
| 100000238| Apple空機|迪士尼手機殼愛麗絲維尼史迪奇電鍍保...|  299|  -1| -1.0| 2017-12-23|
| 100000238| Apple空機|迪士尼手機殼愛麗絲維尼史迪奇電鍍保...|  299|  -1| -1.0| 2017-12-24|
| 100000238| Apple空機|迪士尼手機殼愛麗絲維尼史迪奇電鍍保...|  299|  -1| -1.0| 2017-12-25|
| 100000238| Apple空機|迪士尼手機殼愛麗絲維尼史迪奇電鍍保...|  299|  -1| -1.0| 2017-12-26|
| 100000411| Apple空機|迪士尼手機殼愛麗絲維尼史迪奇電鍍保...|  299|  -1|  5.0| 2017-12-23|
+----------+--------+--------------------+-----+----+-----+-----------+
only showing top 5 rows



In [6]:
productNameDf = productDf.select("product_id", "name").distinct()
productNameDf.show(5)

+----------+--------------------+
|product_id|                name|
+----------+--------------------+
| 100050400|JayBirdX2BLUEBUDS...|
| 100068041|樂視1手機殼letv樂1手機套樂視...|
| 100125088|Samsung三星Note5手機殼...|
| 100507876|Disney原廠全新MicroUS...|
| 100715572|美芙MeFOTOMK10藍牙遙控自...|
+----------+--------------------+
only showing top 5 rows



In [7]:
productDf.rdd.count()

53515

In [8]:
productNameDf.rdd.distinct().count()

45783

In [9]:
categoryDf = productDf.select("category")

In [10]:
categoryDf.groupBy("category").count().orderBy("count", ascending=0).show(10)

+-----------+-----+
|   category|count|
+-----------+-----+
| Android保護殼| 7106|
| Android保護貼| 6937|
|     手機周邊配件| 5398|
|  iPhone保護殼| 5084|
|       耳機喇叭| 4168|
| iPhone充電傳輸| 3479|
|       行動電源| 3391|
|  Android空機| 3345|
|  iPhone保護貼| 3022|
|Android充電傳輸| 2966|
+-----------+-----+
only showing top 10 rows



In [11]:
testRDD = productDf.filter('category = "Android保護殼"').select("name").distinct()
testRDD.show(5)

+--------------------+
|                name|
+--------------------+
|小米5小米6小米5sPlus小米A...|
|公司貨BagrunAsusZenF...|
|手機殼送掛繩動漫你的名字三星S5S...|
|MIBO超級薄TPU軟殻SONYZ...|
|全館299免運現貨三星S6edge...|
+--------------------+
only showing top 5 rows



In [12]:
testRDD.count()

6287

In [13]:
categoryRDD = productDf.select("category").rdd.map(lambda x: x[0]).distinct()
categoryRDD.take(5) 

['Apple空機', 'Android保護殼', '手機周邊配件', '電話、儲值卡', 'iPhone保護貼']

# 建立category RDD

In [14]:
categoryRDD.collect()

['Apple空機',
 'Android保護殼',
 '手機周邊配件',
 '電話、儲值卡',
 'iPhone保護貼',
 '平板電腦',
 '手拿包',
 '行動電源',
 'Android空機',
 '耳機喇叭',
 'Android充電傳輸',
 'Android保護貼',
 '其他',
 'iPhone保護殼',
 'iPhone充電傳輸',
 '其他廠牌手機',
 '泳衣、比基尼',
 '居家裝飾']

# 建立product RDD 供後續使用

In [15]:
newProductRDD = productDf.select("category", "name", "price", "sale", "product_id").rdd.map(lambda x: [x[0], x[1], x[2], x[3], x[4]])
newProductRDD.take(5)

[['Apple空機',
  '迪士尼手機殼愛麗絲維尼史迪奇電鍍保護殼正版米奇小美人魚Iphone7iphone7Plus',
  299,
  -1,
  '100000238'],
 ['Apple空機',
  '迪士尼手機殼愛麗絲維尼史迪奇電鍍保護殼正版米奇小美人魚Iphone7iphone7Plus',
  299,
  -1,
  '100000238'],
 ['Apple空機',
  '迪士尼手機殼愛麗絲維尼史迪奇電鍍保護殼正版米奇小美人魚Iphone7iphone7Plus',
  299,
  -1,
  '100000238'],
 ['Apple空機',
  '迪士尼手機殼愛麗絲維尼史迪奇電鍍保護殼正版米奇小美人魚Iphone7iphone7Plus',
  299,
  -1,
  '100000238'],
 ['Apple空機',
  '迪士尼手機殼愛麗絲維尼史迪奇電鍍保護殼正版米奇小美人魚Iphone7iphone7Plus',
  299,
  -1,
  '100000411']]

# 計算category 的總銷售額

In [16]:
def countCategorySalePrice():
    
    BaseRDD = newProductRDD.filter(lambda x: x[3]<3000)
    CategoryRDD = newProductRDD.map(lambda x: (x[1], x[0])).distinct()
    PriceRDD = BaseRDD.map(lambda x:(x[1], x[2])).reduceByKey(max)
    SaleRDD = BaseRDD.map(lambda x:(x[1], x[3])).filter(lambda x: x[1] > 0).reduceByKey(max)
    JoinRDD = SaleRDD.join(PriceRDD).map(lambda x: (x[0], x[1][0] * x[1][1])).reduceByKey(add)
    resRDD = JoinRDD.join(CategoryRDD).map(lambda x: (x[1][1], x[1][0])).reduceByKey(add)
    
    return resRDD


In [17]:
#newProductRDD.filter(lambda x: x[0] == category).filter(lambda x: x[3]<200)
t0 = time.time()
timeString = time.strftime("%Y年%m月%d日")
resRDD = countCategorySalePrice()
#resRDD.saveAsTextFile("outputData/"+timeString+"_output/")
    #for k,v in resRDD.take(3):
    #    print(k, v)

tt = time.time()-t0
print("use time: {} seconds".format(round(tt,3)))

use time: 0.183 seconds


# category的總銷售量排行榜

In [18]:
resRDD.take(3)

[('Apple空機', 613544), ('電話、儲值卡', 1562418), ('iPhone保護殼', 1312996)]

In [19]:
saleChartDf = spark.createDataFrame(resRDD, ["category", "salesValue"])
saleChartDf.show(3)

+---------+----------+
| category|salesValue|
+---------+----------+
|  Apple空機|    613544|
|   電話、儲值卡|   1562418|
|iPhone保護殼|   1312996|
+---------+----------+
only showing top 3 rows



In [20]:
saleChartDf = saleChartDf.orderBy(saleChartDf.salesValue.desc())
saleChartDf.show(3)

+----------+----------+
|  category|salesValue|
+----------+----------+
|      行動電源|   2043538|
|      耳機喇叭|   1949447|
|iPhone充電傳輸|   1753300|
+----------+----------+
only showing top 3 rows



In [21]:
saleChartDf.rdd.collect()

[Row(category='行動電源', salesValue=2043538),
 Row(category='耳機喇叭', salesValue=1949447),
 Row(category='iPhone充電傳輸', salesValue=1753300),
 Row(category='電話、儲值卡', salesValue=1562418),
 Row(category='iPhone保護殼', salesValue=1312996),
 Row(category='iPhone保護貼', salesValue=1243834),
 Row(category='手機周邊配件', salesValue=1232840),
 Row(category='Android空機', salesValue=1188542),
 Row(category='其他', salesValue=1051228),
 Row(category='Android保護貼', salesValue=644555),
 Row(category='Apple空機', salesValue=613544),
 Row(category='Android保護殼', salesValue=548670),
 Row(category='Android充電傳輸', salesValue=493610),
 Row(category='平板電腦', salesValue=446374),
 Row(category='其他廠牌手機', salesValue=101866)]

### 轉pandas to csv

In [22]:
import pandas as pd
saleChartDf_pddf = saleChartDf.toPandas()
saleChartDf_pddf.index+=1
saleChartDf_pddf.index.name="rank"
saleChartDf_pddf

Unnamed: 0_level_0,category,salesValue
rank,Unnamed: 1_level_1,Unnamed: 2_level_1
1,行動電源,2043538
2,耳機喇叭,1949447
3,iPhone充電傳輸,1753300
4,電話、儲值卡,1562418
5,iPhone保護殼,1312996
6,iPhone保護貼,1243834
7,手機周邊配件,1232840
8,Android空機,1188542
9,其他,1051228
10,Android保護貼,644555


In [23]:
saleChartDf_pddf.to_csv('saleChart.csv', encoding='utf-8')

In [24]:
!cat saleChart.csv

rank,category,salesValue
1,行動電源,2043538
2,耳機喇叭,1949447
3,iPhone充電傳輸,1753300
4,電話、儲值卡,1562418
5,iPhone保護殼,1312996
6,iPhone保護貼,1243834
7,手機周邊配件,1232840
8,Android空機,1188542
9,其他,1051228
10,Android保護貼,644555
11,Apple空機,613544
12,Android保護殼,548670
13,Android充電傳輸,493610
14,平板電腦,446374
15,其他廠牌手機,101866


# 各個category的商品銷售額排行榜

# 計算單一category的各項商品銷售額

In [25]:
def countSalePrice(category):
    baseRDD = newProductRDD.filter(lambda x: x[0] == category)
    productIdRDD = baseRDD.map(lambda x: (x[1],x[4])).distinct()
    priceRDD = baseRDD.map(lambda x:(x[1], x[2])).reduceByKey(max)
    saleRDD = baseRDD.map(lambda x:(x[1], x[3])).filter(lambda x: x[1] > 0).reduceByKey(max)
    joinRDD = saleRDD.join(priceRDD).join(productIdRDD)
    #joinRDD = saleRDD.join(priceRDD)
    res = joinRDD.map(lambda x: (category, x[1][1], x[0], x[1][0][0], x[1][0][1], x[1][0][0] * x[1][0][1])).takeOrdered(10, key=lambda x: -1*int(x[5]))
    #res = joinRDD.map(lambda x: (category, x[0], x[1][0], x[1][1], x[1][0] * x[1][1])).takeOrdered(10, key=lambda x: -1*int(x[4]))
    
    return res
    

In [26]:
t0 = time.time()

for category in categoryRDD.collect():
    CSP_RDD = countSalePrice(category)
    try: 
        CSP_DF = spark.createDataFrame(CSP_RDD, ["category", "product_id", "productName", "price", "sales", "salesValue"])
        #CSP_DF = spark.createDataFrame(CSP_RDD, ["category", "productName", "price", "sales", "salesValue"])
        #CSP_DF = CSP_DF.orderBy(CSP_DF.salesValue.desc())
        CSP_PDDF = CSP_DF.toPandas()
        CSP_PDDF.index+=1
        CSP_PDDF.index.name="rank"
        CSP_PDDF.to_csv("ProductSaleChart_Output/"+category+'SaleChart.csv', encoding='utf-8')
    except ValueError:
        continue
    
    
tt = time.time()-t0
print("use time: {} seconds".format(round(tt,3)))

use time: 33.34 seconds


In [27]:
!cat ProductSaleChart_Output/Android保護殼SaleChart.csv

rank,category,product_id,productName,price,sales,salesValue
1,Android保護殼,187108809,有影摔給你看OPPO冰晶盾4角增壓防摔R9R9SF1SA39保護殼送贈品E13,203,99,20097
2,Android保護殼,385708657,送鋼化膜USAMSOPPOR9sR9sPlusR11R11S矽膠防摔手機殼,99,199,19701
3,Android保護殼,26556512,實拍測試摔給你看冰晶盾ASUSZenfone4SelfieZE552KL手機殼D34U1,172,99,17028
4,Android保護殼,262969112,來圖就能客製化訂製客製手機殼蘋果三星OPPOHTC華碩SONYLGOPPO小米富可視美圖,99,169,16731
5,Android保護殼,387523295,送鋼化膜JOYROOMOPPOR9R9PR9sR9sPR11R11S磨砂手機殼保護殼,75,199,14925
6,Android保護殼,379551049,男士黑色手機殼HTCEYEE8E9E9M7M8M9M9M10K10530U11A37F1,71,160,11360
7,Android保護殼,215981653,LLM來圖客制化空壓殼軟殼oppoA77a35a39F1R9SPLUSR7r7plusR11手機殼,34,300,10200
8,Android保護殼,187208682,有影摔給你看ASUS冰晶盾4角防摔ZF35552送贈品ZE552KLZE520KL手機殼E13,79,99,7821
9,Android保護殼,385789645,卡娜赫拉點點各式手機殼OPPOR11sA77A57A39F1sR7R7SR9R9R11,38,179,6802
10,Android保護殼,385745444,卡娜赫拉P助支持各式手機殼iphone三星HTCASUSsonyLGOPPO小米華為Nokia,33,182,6006


# 計算分類銷售頻率

In [29]:
itemSchema = StructType([
    StructField("item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("stock", IntegerType(), True),
    StructField("update_time", DateType(), True)])

itemDf = spark.read.csv(Path+"3c_item.csv",header=False,schema=itemSchema)

itemDf.show(5)

+-------+----------+----+-----+------+-----------+
|item_id|product_id|name|price| stock|update_time|
+-------+----------+----+-----+------+-----------+
|      0| 100003669|  黑色|  450|944799| 2017-12-24|
|      0| 100003669|  黑色|  450|944799| 2017-12-25|
|      0| 100003669|  黑色|  450|944799| 2017-12-26|
|      0| 100003669|  黑色|  450|944799| 2017-12-27|
|      0| 100003669|  黑色|  450|944799| 2017-12-28|
+-------+----------+----+-----+------+-----------+
only showing top 5 rows



<bound method DataFrame.join of DataFrame[item_id: int, product_id: string, name: string, price: int, stock: int, update_time: date]>

In [30]:
itemDf.count()

429202

In [31]:
productIdDf = productDf.select("product_id", "category").distinct()
productIdDf.show(3)

+----------+---------+
|product_id| category|
+----------+---------+
| 100229839|     行動電源|
| 100394186|     耳機喇叭|
| 100625798|iPhone保護殼|
+----------+---------+
only showing top 3 rows



In [32]:
itemJoinDf = itemDf.join(productIdDf, \
           itemDf.product_id == productIdDf.product_id, "inner").drop(productIdDf.product_id)
itemJoinDf.show(3)

+-------+----------+----+-----+------+-----------+--------+
|item_id|product_id|name|price| stock|update_time|category|
+-------+----------+----+-----+------+-----------+--------+
|      0| 100003669|  黑色|  450|944799| 2017-12-24|    行動電源|
|      0| 100003669|  黑色|  450|944799| 2017-12-25|    行動電源|
|      0| 100003669|  黑色|  450|944799| 2017-12-26|    行動電源|
+-------+----------+----+-----+------+-----------+--------+
only showing top 3 rows



In [33]:
ItemRDD = itemJoinDf.select("product_id", "name", "price", "stock", "update_time", "category").filter("name is not NULL").filter("price > 0").filter("stock > 0") \
.rdd.map(lambda x: (x[0]+" "+x[1], (x[3], x[4], x[5])))
ItemRDD.take(3)

[('100003669 黑色', (944799, datetime.date(2017, 12, 24), '行動電源')),
 ('100003669 黑色', (944799, datetime.date(2017, 12, 25), '行動電源')),
 ('100003669 黑色', (944799, datetime.date(2017, 12, 26), '行動電源'))]

In [34]:
ItemRDD.count()

291498

In [35]:
ItemRDD = ItemRDD.groupByKey().map(lambda x: (x[0], list(x[1])))
ItemRDD.take(3)

[('100003669 黑色',
  [(944799, datetime.date(2017, 12, 24), '行動電源'),
   (944799, datetime.date(2017, 12, 25), '行動電源'),
   (944799, datetime.date(2017, 12, 26), '行動電源'),
   (944799, datetime.date(2017, 12, 27), '行動電源'),
   (944799, datetime.date(2017, 12, 28), '行動電源'),
   (944799, datetime.date(2017, 12, 29), '行動電源'),
   (944799, datetime.date(2017, 12, 30), '行動電源'),
   (944799, datetime.date(2017, 12, 31), '行動電源'),
   (944799, datetime.date(2018, 1, 4), '行動電源'),
   (944799, datetime.date(2018, 1, 5), '行動電源')]),
 ('100006241 黑',
  [(1, datetime.date(2017, 12, 23), '耳機喇叭'),
   (1, datetime.date(2017, 12, 24), '耳機喇叭'),
   (1, datetime.date(2017, 12, 25), '耳機喇叭'),
   (1, datetime.date(2017, 12, 26), '耳機喇叭'),
   (1, datetime.date(2017, 12, 27), '耳機喇叭'),
   (1, datetime.date(2017, 12, 28), '耳機喇叭'),
   (1, datetime.date(2017, 12, 29), '耳機喇叭'),
   (1, datetime.date(2017, 12, 30), '耳機喇叭'),
   (1, datetime.date(2017, 12, 31), '耳機喇叭'),
   (1, datetime.date(2018, 1, 4), '耳機喇叭'),
   (1, datetime.dat

In [36]:
def countSaleDate(x):
    datalen = len(x[1])
    startSale = x[1][0][0]
    startDate = x[1][0][1]
    endSale = x[1][datalen-1][0]
    endDate = x[1][datalen-1][1]
    
    dateDelta = (endDate - startDate).days
    #saleDelta = startSale - endSale
    saleDelta = 0
    for loc in range(0, datalen-1):
        if x[1][loc][0] > x[1][loc+1][0]:
            saleDelta += (x[1][loc][0] - x[1][loc+1][0])
    
    
    if datalen == 1:
        res = 0
    else:
        try:
            res = saleDelta / dateDelta
        except ZeroDivisionError:
            return (x[0], saleDelta, dateDelta, x[1][0][2], "ZeroDivisionError", datalen)
    
    
    #return (x[0], datalen)
    return (x[0], saleDelta, dateDelta, x[1][0][2], res, datalen)

In [37]:
ItemDisRDD = ItemRDD.map(lambda x: countSaleDate(x))
ItemDisRDD.count()

120673

In [38]:
ItemDisRDD.filter(lambda x: x[4]=="ZeroDivisionError").take(3)

[('106823900 i6i6s', 0, 0, 'iPhone保護殼', 'ZeroDivisionError', 2),
 ('106825241 i6i6s', 0, 0, 'iPhone保護殼', 'ZeroDivisionError', 2),
 ('109640614 1雪地迷彩藍iPhone7', 0, 0, 'iPhone保護殼', 'ZeroDivisionError', 2)]

In [39]:
ItemDisRDD.filter(lambda x: x[4]!= "ZeroDivisionError").filter(lambda x: x[4]>0.0).count()

1908

In [40]:
ItemDisRDD.filter(lambda x: x[4]!= "ZeroDivisionError").filter(lambda x: x[4]>0.0).take(5)

[('100047069 時代鎏金', 1, 11, 'Android保護殼', 0.09090909090909091, 9),
 ('100053964 黑色', 7, 11, 'Android保護殼', 0.6363636363636364, 9),
 ('100082525 霧面i647吋內縮滿版白', 1, 5, 'iPhone保護貼', 0.2, 6),
 ('100127428 M7', 2, 7, 'Android保護貼', 0.2857142857142857, 8),
 ('100236654 iPhoneX前貼黑框滿版', 1, 6, 'iPhone保護貼', 0.16666666666666666, 7)]

In [41]:
frequencyKeyRDD = ItemDisRDD.map(lambda x: (x[3], (x[1], x[2])))
frequencyKeyRDD.take(3)

[('行動電源', (0, 12)), ('耳機喇叭', (0, 13)), ('手機周邊配件', (0, 12))]

In [42]:
frequencyKeyRDD.count()

120673

In [43]:
frequencyKeyRDD.filter(lambda x: x[1][0]>20).take(3)

[('iPhone充電傳輸', (23, 8)), ('iPhone充電傳輸', (22, 4)), ('Android保護殼', (998, 10))]

In [44]:
def countSaleAndMaxDay(data1, data2):
    
    maxday = max(data1[1], data2[1])
    
    sales = data1[0]+data2[0]
    
    return (sales,maxday)

In [45]:
def countFrequency(data):
    sale = data[1][0]
    days = data[1][1]
    
    if days == 0:
        return (data[0], 0)
    else:
        return (data[0], data[1][0]/data[1][1]) 

In [46]:
frequencyKeyRDD.reduceByKey(lambda x,y: countSaleAndMaxDay(x, y)).collect()

[('手機周邊配件', (627, 12)),
 ('Android保護殼', (1471, 13)),
 ('iPhone保護貼', (1400189, 13)),
 ('平板電腦', (31, 12)),
 ('Apple空機', (11, 12)),
 ('電話、儲值卡', (63, 13)),
 ('手拿包', (0, 2)),
 ('行動電源', (326, 13)),
 ('耳機喇叭', (566, 13)),
 ('Android保護貼', (1176, 12)),
 ('iPhone保護殼', (78078, 13)),
 ('iPhone充電傳輸', (2302, 13)),
 ('其他廠牌手機', (2, 11)),
 ('其他', (659, 11)),
 ('Android空機', (21, 12)),
 ('Android充電傳輸', (513, 12)),
 ('泳衣、比基尼', (0, 8)),
 ('居家裝飾', (0, 0))]

In [47]:
saleFrequencyResRDD = frequencyKeyRDD.reduceByKey(lambda x,y: countSaleAndMaxDay(x, y))\
    .map(lambda x: countFrequency(x))

In [48]:
saleFrequencyResRDD.count()

18

In [49]:
saleFrequencyResRDD.collect()

[('手機周邊配件', 52.25),
 ('Android保護殼', 113.15384615384616),
 ('iPhone保護貼', 107706.84615384616),
 ('平板電腦', 2.5833333333333335),
 ('Apple空機', 0.9166666666666666),
 ('電話、儲值卡', 4.846153846153846),
 ('手拿包', 0.0),
 ('行動電源', 25.076923076923077),
 ('耳機喇叭', 43.53846153846154),
 ('Android保護貼', 98.0),
 ('iPhone保護殼', 6006.0),
 ('iPhone充電傳輸', 177.07692307692307),
 ('其他廠牌手機', 0.18181818181818182),
 ('其他', 59.90909090909091),
 ('Android空機', 1.75),
 ('Android充電傳輸', 42.75),
 ('泳衣、比基尼', 0.0),
 ('居家裝飾', 0)]

In [50]:
def changeDay(data):
    frequency = data[1]
    
    if frequency == 0:
        days = 0
        res = 0
    elif 1/frequency < 1:
        days = 1
        res = int(frequency)
    else:
        days = math.ceil(1/frequency)
        res = 1
    
    return (data[0], days, res)

In [51]:
saleDayRDD = saleFrequencyResRDD.map(lambda x: changeDay(x))
saleDayRDD.take(3)

[('手機周邊配件', 1, 52), ('Android保護殼', 1, 113), ('iPhone保護貼', 1, 107706)]

In [52]:
saleDayRDD.collect()

[('手機周邊配件', 1, 52),
 ('Android保護殼', 1, 113),
 ('iPhone保護貼', 1, 107706),
 ('平板電腦', 1, 2),
 ('Apple空機', 2, 1),
 ('電話、儲值卡', 1, 4),
 ('手拿包', 0, 0),
 ('行動電源', 1, 25),
 ('耳機喇叭', 1, 43),
 ('Android保護貼', 1, 98),
 ('iPhone保護殼', 1, 6006),
 ('iPhone充電傳輸', 1, 177),
 ('其他廠牌手機', 6, 1),
 ('其他', 1, 59),
 ('Android空機', 1, 1),
 ('Android充電傳輸', 1, 42),
 ('泳衣、比基尼', 0, 0),
 ('居家裝飾', 0, 0)]

In [53]:
saleDayDf = spark.createDataFrame(saleDayRDD, ["category", "days", "sales"])
saleDayDf.count()

18

In [54]:
saleDayDf_pddf = saleDayDf.toPandas()
saleDayDf_pddf.index+=1
saleDayDf_pddf.index.name="id"
saleDayDf_pddf

Unnamed: 0_level_0,category,days,sales
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,手機周邊配件,1,52
2,Android保護殼,1,113
3,iPhone保護貼,1,107706
4,平板電腦,1,2
5,Apple空機,2,1
6,電話、儲值卡,1,4
7,手拿包,0,0
8,行動電源,1,25
9,耳機喇叭,1,43
10,Android保護貼,1,98


In [56]:
saleDayDf_pddf.to_csv('saleDay.csv', encoding='utf-8')