In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)


In [2]:
import pandas as pd

data_df = pd.read_csv('data/customer-orders.csv')
df = spark.createDataFrame(data_df)
df

DataFrame[CustomerID: bigint, ItemID: bigint, Amount: double]

In [3]:
data_df

Unnamed: 0,CustomerID,ItemID,Amount
0,44,8602,37.19
1,35,5368,65.89
2,2,3391,40.64
3,47,6694,14.98
4,29,680,13.08
...,...,...,...
9995,61,229,86.69
9996,50,4331,92.79
9997,2,9155,56.05
9998,23,2477,14.89


In [6]:
df.show()

+----------+------+------+
|CustomerID|ItemID|Amount|
+----------+------+------+
|        44|  8602| 37.19|
|        35|  5368| 65.89|
|         2|  3391| 40.64|
|        47|  6694| 14.98|
|        29|   680| 13.08|
|        91|  8900| 24.59|
|        70|  3959| 68.68|
|        85|  1733| 28.53|
|        53|  9900| 83.55|
|        14|  1505|  4.32|
|        51|  3378|  19.8|
|        42|  6926| 57.77|
|         2|  4424| 55.77|
|        79|  9291| 33.17|
|        50|  3901| 23.57|
|        20|  6633|  6.49|
|        15|  6148| 65.53|
|        44|  8331| 99.19|
|         5|  3505| 64.18|
|        48|  5539| 32.42|
+----------+------+------+
only showing top 20 rows



In [7]:
df

CustomerID,ItemID,Amount
44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08
91,8900,24.59
70,3959,68.68
85,1733,28.53
53,9900,83.55
14,1505,4.32


In [12]:
# Write PySpark script to map each line to key/value pairs of customer ID and the amount.
customer_id_amount = df.rdd.map(lambda x: (x[0], x[2]))
df2 = customer_id_amount.toDF(["CustomerId","Amount"])


In [13]:
df2.show()

+----------+------+
|CustomerId|Amount|
+----------+------+
|        44| 37.19|
|        35| 65.89|
|         2| 40.64|
|        47| 14.98|
|        29| 13.08|
|        91| 24.59|
|        70| 68.68|
|        85| 28.53|
|        53| 83.55|
|        14|  4.32|
|        51|  19.8|
|        42| 57.77|
|         2| 55.77|
|        79| 33.17|
|        50| 23.57|
|        20|  6.49|
|        15| 65.53|
|        44| 99.19|
|         5| 64.18|
|        48| 32.42|
+----------+------+
only showing top 20 rows



In [15]:
# PySpark script to reduceByKey to add up amount spent by customer ID.
reduced = customer_id_amount.reduceByKey(lambda x, y: x + y)
for elment in reduced.collect():
    print(elment)

[Stage 9:>                                                          (0 + 8) / 8]

(48, 4384.33)
(0, 5524.950000000001)
(88, 4830.55)
(40, 5186.429999999999)
(64, 5288.6900000000005)
(72, 5337.44)
(8, 5517.24)
(80, 4727.860000000001)
(16, 4979.06)
(56, 4701.0199999999995)
(24, 5259.919999999999)
(96, 3924.2300000000005)
(32, 5496.05)
(57, 4628.4)
(25, 5057.610000000001)
(97, 5977.1900000000005)
(1, 4958.599999999999)
(73, 6206.2)
(49, 4394.6)
(65, 5140.349999999999)
(9, 5322.650000000001)
(41, 5637.620000000001)
(17, 5032.679999999999)
(33, 5254.66)
(81, 5112.71)
(89, 4851.4800000000005)
(2, 5994.589999999999)
(42, 5696.84)
(50, 4517.27)
(98, 4297.26)
(26, 5250.4)
(82, 4812.489999999999)
(34, 5330.8)
(18, 4921.2699999999995)
(74, 4647.130000000001)
(90, 5290.410000000001)
(10, 4819.7)
(58, 5437.73)
(66, 4681.92)
(35, 5155.42)
(91, 4642.26)
(51, 4975.22)
(27, 4915.89)
(83, 4635.8)
(75, 4178.500000000001)
(99, 4172.29)
(19, 5059.43)
(59, 5642.889999999999)
(11, 5152.290000000001)
(43, 5368.83)
(3, 4659.63)
(67, 4505.79)
(44, 4756.889999999999)
(20, 4836.860000000001)
(

                                                                                

In [17]:
# PySpark script to finally collect the result and display them on the spark shell.
results = reduced.collect()
results

[(48, 4384.33),
 (0, 5524.950000000001),
 (88, 4830.55),
 (40, 5186.429999999999),
 (64, 5288.6900000000005),
 (72, 5337.44),
 (8, 5517.24),
 (80, 4727.860000000001),
 (16, 4979.06),
 (56, 4701.0199999999995),
 (24, 5259.919999999999),
 (96, 3924.2300000000005),
 (32, 5496.05),
 (57, 4628.4),
 (25, 5057.610000000001),
 (97, 5977.1900000000005),
 (1, 4958.599999999999),
 (73, 6206.2),
 (49, 4394.6),
 (65, 5140.349999999999),
 (9, 5322.650000000001),
 (41, 5637.620000000001),
 (17, 5032.679999999999),
 (33, 5254.66),
 (81, 5112.71),
 (89, 4851.4800000000005),
 (2, 5994.589999999999),
 (42, 5696.84),
 (50, 4517.27),
 (98, 4297.26),
 (26, 5250.4),
 (82, 4812.489999999999),
 (34, 5330.8),
 (18, 4921.2699999999995),
 (74, 4647.130000000001),
 (90, 5290.410000000001),
 (10, 4819.7),
 (58, 5437.73),
 (66, 4681.92),
 (35, 5155.42),
 (91, 4642.26),
 (51, 4975.22),
 (27, 4915.89),
 (83, 4635.8),
 (75, 4178.500000000001),
 (99, 4172.29),
 (19, 5059.43),
 (59, 5642.889999999999),
 (11, 5152.2900000