In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=9f77bb05bf327687ea62aeacde3b318af7c75acc321550a39d80e43dfa8496b6
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [3]:
from pyspark import SparkConf, SparkContext

In [4]:
conf = SparkConf().setMaster("local").setAppName("RetailData")
sc = SparkContext(conf = conf)

In [53]:
def parseLine(line):
    fields = line.split(';')
    amount = str(fields[2])
    CustomerID = str(fields[3])
    return (amount, CustomerID)


In [54]:
lines = sc.textFile("retail-data.csv")

In [55]:
rdd = lines.map(parseLine)

In [56]:
rdd

PythonRDD[62] at RDD at PythonRDD.scala:53

In [57]:
totalsByAmount = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [58]:
rdd.mapValues(lambda x: (x, 1)).collect()

[('2.55', ('17850.0', 1)),
 ('3.39', ('17850.0', 1)),
 ('2.75', ('17850.0', 1)),
 ('3.39', ('17850.0', 1)),
 ('3.39', ('17850.0', 1)),
 ('7.65', ('17850.0', 1)),
 ('4.25', ('17850.0', 1)),
 ('1.85', ('17850.0', 1)),
 ('1.85', ('17850.0', 1)),
 ('1.69', ('13047.0', 1)),
 ('2.1', ('13047.0', 1)),
 ('2.1', ('13047.0', 1)),
 ('3.75', ('13047.0', 1)),
 ('1.65', ('13047.0', 1)),
 ('4.25', ('13047.0', 1)),
 ('4.95', ('13047.0', 1)),
 ('9.95', ('13047.0', 1)),
 ('5.95', ('13047.0', 1)),
 ('5.95', ('13047.0', 1)),
 ('7.95', ('13047.0', 1)),
 ('7.95', ('13047.0', 1)),
 ('4.25', ('13047.0', 1)),
 ('4.95', ('13047.0', 1)),
 ('4.95', ('13047.0', 1)),
 ('4.95', ('13047.0', 1)),
 ('5.95', ('13047.0', 1)),
 ('3.75', ('12583.0', 1)),
 ('3.75', ('12583.0', 1)),
 ('3.75', ('12583.0', 1)),
 ('0.85', ('12583.0', 1)),
 ('0.65', ('12583.0', 1)),
 ('0.85', ('12583.0', 1)),
 ('1.25', ('12583.0', 1)),
 ('2.95', ('12583.0', 1)),
 ('2.95', ('12583.0', 1)),
 ('1.95', ('12583.0', 1)),
 ('1.95', ('12583.0', 1)),
 ('

In [59]:
totalsByAmount.collect()

[('2.55',
  ('17850.013748.017850.017850.017850.017850.015291.014688.014688.015311.015311.018074.017511.017511.013408.017850.017850.013448.015862.015862.015862.015862.015862.015513.015513.014045.017850.017850.014307.014307.016583.016583.016583.013758.013758.015983.015983.015983.015983.014849.017968.017897.017897.017897.017897.014729.014729.014729.014729.014729.014729.014729.014729.014729.015012.012662.015525.015525.015525.015485.015485.012433.014594.014594.014911.017841.017841.016274.016274.016274.016274.016274.016539.013777.013777.013777.013777.013777.012947.014142.014606.013576.018011.0',
   85)),
 ('3.39',
  ('17850.017850.017850.017850.017850.017850.017850.017850.017850.017511.017850.017850.017850.017850.017850.017850.013694.013694.016210.016210.014496.013576.0',
   22)),
 ('2.75', ('17850.017850.017850.017850.017850.0', 5)),
 ('7.65', ('17850.017850.017850.017850.017850.015922.0', 6)),
 ('4.25',
  ('17850.013047.013047.017850.017850.015311.017420.016250.017511.017511.017850.015862

In [60]:
results = totalsByAmount.collect()

In [61]:
for result in results:
    print(result)

('2.55', ('17850.013748.017850.017850.017850.017850.015291.014688.014688.015311.015311.018074.017511.017511.013408.017850.017850.013448.015862.015862.015862.015862.015862.015513.015513.014045.017850.017850.014307.014307.016583.016583.016583.013758.013758.015983.015983.015983.015983.014849.017968.017897.017897.017897.017897.014729.014729.014729.014729.014729.014729.014729.014729.014729.015012.012662.015525.015525.015525.015485.015485.012433.014594.014594.014911.017841.017841.016274.016274.016274.016274.016274.016539.013777.013777.013777.013777.013777.012947.014142.014606.013576.018011.0', 85))
('3.39', ('17850.017850.017850.017850.017850.017850.017850.017850.017850.017511.017850.017850.017850.017850.017850.017850.013694.013694.016210.016210.014496.013576.0', 22))
('2.75', ('17850.017850.017850.017850.017850.0', 5))
('7.65', ('17850.017850.017850.017850.017850.015922.0', 6))
('4.25', ('17850.013047.013047.017850.017850.015311.017420.016250.017511.017511.017850.015862.015862.015513.017850