In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=7395545e8371ad8e2072b58428d626a1d36fdf91b1293dd904363afe10dfcbe8
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
spark

In [4]:
spark.stop()

In [5]:
from pyspark import SparkConf, SparkContext

In [6]:
conf = SparkConf().setMaster("local").setAppName("RetailData")
sc = SparkContext(conf = conf)

In [7]:
def parseLine(line):
    fields = line.split(';')

    InvoiceNo = str(fields[0])
    Description = str(fields[1])
    Amount = float(fields[2])
    CustomerID = str(fields[3])
    
    return (CustomerID, Amount)


In [8]:
lines = sc.textFile("retail-data.csv")

In [9]:
rdd = lines.map(parseLine)
rdd.collect()

[('17850.0', 2.55),
 ('17850.0', 3.39),
 ('17850.0', 2.75),
 ('17850.0', 3.39),
 ('17850.0', 3.39),
 ('17850.0', 7.65),
 ('17850.0', 4.25),
 ('17850.0', 1.85),
 ('17850.0', 1.85),
 ('13047.0', 1.69),
 ('13047.0', 2.1),
 ('13047.0', 2.1),
 ('13047.0', 3.75),
 ('13047.0', 1.65),
 ('13047.0', 4.25),
 ('13047.0', 4.95),
 ('13047.0', 9.95),
 ('13047.0', 5.95),
 ('13047.0', 5.95),
 ('13047.0', 7.95),
 ('13047.0', 7.95),
 ('13047.0', 4.25),
 ('13047.0', 4.95),
 ('13047.0', 4.95),
 ('13047.0', 4.95),
 ('13047.0', 5.95),
 ('12583.0', 3.75),
 ('12583.0', 3.75),
 ('12583.0', 3.75),
 ('12583.0', 0.85),
 ('12583.0', 0.65),
 ('12583.0', 0.85),
 ('12583.0', 1.25),
 ('12583.0', 2.95),
 ('12583.0', 2.95),
 ('12583.0', 1.95),
 ('12583.0', 1.95),
 ('12583.0', 1.95),
 ('12583.0', 0.85),
 ('12583.0', 1.65),
 ('12583.0', 2.95),
 ('12583.0', 3.75),
 ('12583.0', 0.42),
 ('12583.0', 0.42),
 ('12583.0', 0.65),
 ('12583.0', 18.0),
 ('13748.0', 2.55),
 ('17850.0', 1.85),
 ('17850.0', 1.85),
 ('17850.0', 2.55),
 (

In [10]:
AmountsByCstID = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
AmountsByCstID.collect()

[('17850.0', (334.75999999999993, 84)),
 ('13047.0', (83.29000000000002, 17)),
 ('12583.0', (55.29, 20)),
 ('13748.0', (2.55, 1)),
 ('15100.0', (10.95, 1)),
 ('15291.0', (6.0, 2)),
 ('14688.0', (33.35, 19)),
 ('17809.0', (1.45, 1)),
 ('15311.0', (92.85000000000005, 36)),
 ('14527.0', (27.5, 1)),
 ('16098.0', (71.65, 12)),
 ('18074.0', (62.150000000000006, 13)),
 ('17420.0', (38.99999999999999, 7)),
 ('16029.0', (21.51, 8)),
 ('16250.0', (47.27, 14)),
 ('12431.0', (73.9, 14)),
 ('17511.0', (58.870000000000005, 24)),
 ('17548.0', (9.270000000000001, 7)),
 ('13705.0', (183.98999999999998, 10)),
 ('13747.0', (9.95, 1)),
 ('13408.0', (24.07, 11)),
 ('13767.0', (35.99, 14)),
 ('17924.0', (9.3, 2)),
 ('13448.0', (65.24000000000001, 18)),
 ('15862.0', (207.03999999999996, 64)),
 ('15513.0', (9.35, 3)),
 ('12791.0', (16.85, 2)),
 ('16218.0', (32.75000000000001, 28)),
 ('14045.0', (2.55, 1)),
 ('14307.0', (115.35000000000004, 48)),
 ('17908.0', (155.0099999999999, 58)),
 ('17920.0', (225.8099999

In [11]:
rdd.mapValues(lambda q: (q, 1)).collect()

[('17850.0', (2.55, 1)),
 ('17850.0', (3.39, 1)),
 ('17850.0', (2.75, 1)),
 ('17850.0', (3.39, 1)),
 ('17850.0', (3.39, 1)),
 ('17850.0', (7.65, 1)),
 ('17850.0', (4.25, 1)),
 ('17850.0', (1.85, 1)),
 ('17850.0', (1.85, 1)),
 ('13047.0', (1.69, 1)),
 ('13047.0', (2.1, 1)),
 ('13047.0', (2.1, 1)),
 ('13047.0', (3.75, 1)),
 ('13047.0', (1.65, 1)),
 ('13047.0', (4.25, 1)),
 ('13047.0', (4.95, 1)),
 ('13047.0', (9.95, 1)),
 ('13047.0', (5.95, 1)),
 ('13047.0', (5.95, 1)),
 ('13047.0', (7.95, 1)),
 ('13047.0', (7.95, 1)),
 ('13047.0', (4.25, 1)),
 ('13047.0', (4.95, 1)),
 ('13047.0', (4.95, 1)),
 ('13047.0', (4.95, 1)),
 ('13047.0', (5.95, 1)),
 ('12583.0', (3.75, 1)),
 ('12583.0', (3.75, 1)),
 ('12583.0', (3.75, 1)),
 ('12583.0', (0.85, 1)),
 ('12583.0', (0.65, 1)),
 ('12583.0', (0.85, 1)),
 ('12583.0', (1.25, 1)),
 ('12583.0', (2.95, 1)),
 ('12583.0', (2.95, 1)),
 ('12583.0', (1.95, 1)),
 ('12583.0', (1.95, 1)),
 ('12583.0', (1.95, 1)),
 ('12583.0', (0.85, 1)),
 ('12583.0', (1.65, 1)),
 (

In [12]:
AmountsByCstID.collect()

[('17850.0', (334.75999999999993, 84)),
 ('13047.0', (83.29000000000002, 17)),
 ('12583.0', (55.29, 20)),
 ('13748.0', (2.55, 1)),
 ('15100.0', (10.95, 1)),
 ('15291.0', (6.0, 2)),
 ('14688.0', (33.35, 19)),
 ('17809.0', (1.45, 1)),
 ('15311.0', (92.85000000000005, 36)),
 ('14527.0', (27.5, 1)),
 ('16098.0', (71.65, 12)),
 ('18074.0', (62.150000000000006, 13)),
 ('17420.0', (38.99999999999999, 7)),
 ('16029.0', (21.51, 8)),
 ('16250.0', (47.27, 14)),
 ('12431.0', (73.9, 14)),
 ('17511.0', (58.870000000000005, 24)),
 ('17548.0', (9.270000000000001, 7)),
 ('13705.0', (183.98999999999998, 10)),
 ('13747.0', (9.95, 1)),
 ('13408.0', (24.07, 11)),
 ('13767.0', (35.99, 14)),
 ('17924.0', (9.3, 2)),
 ('13448.0', (65.24000000000001, 18)),
 ('15862.0', (207.03999999999996, 64)),
 ('15513.0', (9.35, 3)),
 ('12791.0', (16.85, 2)),
 ('16218.0', (32.75000000000001, 28)),
 ('14045.0', (2.55, 1)),
 ('14307.0', (115.35000000000004, 48)),
 ('17908.0', (155.0099999999999, 58)),
 ('17920.0', (225.8099999

In [13]:
averagesByCstID = AmountsByCstID.mapValues(lambda x: x[0] / x[1])

In [14]:
averagesByCstID

PythonRDD[9] at RDD at PythonRDD.scala:53

In [15]:
results = averagesByCstID.collect()

In [16]:
for result in results:
    print(result)

('17850.0', 3.9852380952380946)
('13047.0', 4.899411764705883)
('12583.0', 2.7645)
('13748.0', 2.55)
('15100.0', 10.95)
('15291.0', 3.0)
('14688.0', 1.7552631578947369)
('17809.0', 1.45)
('15311.0', 2.579166666666668)
('14527.0', 27.5)
('16098.0', 5.970833333333334)
('18074.0', 4.7807692307692315)
('17420.0', 5.57142857142857)
('16029.0', 2.68875)
('16250.0', 3.376428571428572)
('12431.0', 5.278571428571429)
('17511.0', 2.452916666666667)
('17548.0', 1.3242857142857145)
('13705.0', 18.398999999999997)
('13747.0', 9.95)
('13408.0', 2.188181818181818)
('13767.0', 2.5707142857142857)
('17924.0', 4.65)
('13448.0', 3.624444444444445)
('15862.0', 3.2349999999999994)
('15513.0', 3.1166666666666667)
('12791.0', 8.425)
('16218.0', 1.1696428571428574)
('14045.0', 2.55)
('14307.0', 2.4031250000000006)
('17908.0', 2.67258620689655)
('17920.0', 2.7877777777777766)
('', 5.845640350877205)
('12838.0', 1.9733898305084747)
('13255.0', 4.55)
('16583.0', 2.5642857142857145)
('18085.0', 3.8444444444444446

In [18]:
!spark-submit amount-by-cstid.py > retailbyamnt.txt

22/04/05 12:46:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/04/05 12:46:26 INFO SparkContext: Running Spark version 3.0.0
22/04/05 12:46:26 INFO ResourceUtils: Resources for spark.driver:

22/04/05 12:46:26 INFO SparkContext: Submitted application: RetailData
22/04/05 12:46:27 INFO SecurityManager: Changing view acls to: root
22/04/05 12:46:27 INFO SecurityManager: Changing modify acls to: root
22/04/05 12:46:27 INFO SecurityManager: Changing view acls groups to: 
22/04/05 12:46:27 INFO SecurityManager: Changing modify acls groups to: 
22/04/05 12:46:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/04/05 12:46:27 INFO Utils: Suc