In [None]:
# install findspark using pip
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
#import Library
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import pandas as pd

In [None]:
spark = SparkSession.builder.appName("MinAmounts").getOrCreate()

In [None]:
schema = StructType([
                     StructField("InvoiceNo", StringType(), True),
                     StructField("StockCode", StringType(), True),                     
                     StructField("Description", StringType(), True),
                     StructField("Quantity", IntegerType(), True),
                     StructField("InvoiceData", StringType(), True),
                     StructField("Amount", FloatType(), True),
                     StructField("CustomerID", StringType(), True),
                     StructField("Country", StringType(), True)])


In [None]:
# read csv using pandas 
csv_file = pd.read_csv('retail-data-full.csv', sep = ';')
df = pd.DataFrame(data=csv_file)
df.to_csv('result_data.csv', index = False)

In [None]:
# // Read the retail-data-full.csv as dataframe
df = spark.read.schema(schema).csv('result_data.csv')
df.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceData: string (nullable = true)
 |-- Amount: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
df.show()

+---------+---------+--------------------+--------+-------------------+------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceData|Amount|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|  2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|  2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|  7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|2010

In [None]:
df

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceData: string, Amount: float, CustomerID: string, Country: string]

In [None]:
minAmt = df.filter(df.CustomerID != 'null')

In [None]:
minAmt.show()

+---------+---------+--------------------+--------+-------------------+------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceData|Amount|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|  2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|  2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|  3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|  7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|2010

In [None]:
# Select only CustomerID and Amount
stationAmt = minAmt.select("CustomerID", "Amount")


In [None]:
stationAmt.show()

+----------+------+
|CustomerID|Amount|
+----------+------+
|   17850.0|  2.55|
|   17850.0|  3.39|
|   17850.0|  2.75|
|   17850.0|  3.39|
|   17850.0|  3.39|
|   17850.0|  7.65|
|   17850.0|  4.25|
|   17850.0|  1.85|
|   17850.0|  1.85|
|   13047.0|  1.69|
|   13047.0|   2.1|
|   13047.0|   2.1|
|   13047.0|  3.75|
|   13047.0|  1.65|
|   13047.0|  4.25|
|   13047.0|  4.95|
|   13047.0|  9.95|
|   13047.0|  5.95|
|   13047.0|  5.95|
|   13047.0|  7.95|
+----------+------+
only showing top 20 rows



In [None]:
# Aggregate to find minimum CustomerID for every Amount 
minAmtByCustomer = stationAmt.groupBy("CustomerID").min("Amount")
minAmtByCustomer.show()

+----------+-----------+
|CustomerID|min(Amount)|
+----------+-----------+
|   18085.0|        2.1|
|   17905.0|       1.25|
|   17377.0|       0.29|
|   17850.0|       1.06|
|   17181.0|       0.55|
|   16835.0|        2.1|
|   13093.0|       0.85|
|   13694.0|       0.24|
|   15525.0|       0.29|
|   14307.0|       0.21|
|   17460.0|       9.95|
|   12868.0|       0.65|
|   13047.0|       1.65|
|   16583.0|       1.25|
|   14237.0|       0.85|
|   15485.0|       0.42|
|   14696.0|       0.42|
|   12921.0|       1.25|
|   12947.0|       0.42|
|   17873.0|       0.85|
+----------+-----------+
only showing top 20 rows



In [None]:
minAmtByCustomerS = minAmtByCustomer.withColumn("Amount",
                                                  func.round(func.col("min(Amount)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
                                                  .select("CustomerID", "Amount").sort("Amount")

In [None]:
minAmtByCustomerS.show()

+----------+------+
|CustomerID|Amount|
+----------+------+
|   17841.0| 32.02|
|   17511.0| 32.02|
|   17908.0| 32.02|
|   18011.0| 32.03|
|   14307.0| 32.04|
|   14142.0| 32.04|
|   13694.0| 32.04|
|   17968.0| 32.05|
|   17377.0| 32.05|
|   14606.0| 32.05|
|   14729.0| 32.05|
|   13448.0| 32.05|
|   15525.0| 32.05|
|   17548.0| 32.05|
|   12433.0| 32.05|
|   12838.0| 32.05|
|   16274.0| 32.05|
|   14688.0| 32.07|
|   12583.0| 32.08|
|   15983.0| 32.08|
+----------+------+
only showing top 20 rows



In [None]:
results = minAmtByCustomerS.collect()

In [None]:
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))            

17908.0	32.02F
17511.0	32.02F
17841.0	32.02F
18011.0	32.03F
13694.0	32.04F
14307.0	32.04F
14142.0	32.04F
17377.0	32.05F
15525.0	32.05F
17548.0	32.05F
17968.0	32.05F
14606.0	32.05F
16274.0	32.05F
12433.0	32.05F
14729.0	32.05F
12838.0	32.05F
13448.0	32.05F
14688.0	32.07F
15485.0	32.08F
14696.0	32.08F
12947.0	32.08F
13408.0	32.08F
17897.0	32.08F
14078.0	32.08F
12583.0	32.08F
15983.0	32.08F
16048.0	32.08F
17346.0	32.08F
17760.0	32.08F
13065.0	32.08F
15012.0	32.08F
16250.0	32.08F
14594.0	32.08F
12662.0	32.08F
17181.0	32.10F
13767.0	32.10F
15862.0	32.10F
16218.0	32.10F
14849.0	32.10F
12868.0	32.12F
18074.0	32.12F
17643.0	32.12F
17920.0	32.12F
17025.0	32.12F
13705.0	32.12F
14911.0	32.12F
16210.0	32.13F
13093.0	32.15F
14237.0	32.15F
17873.0	32.15F
16098.0	32.15F
12472.0	32.15F
16539.0	32.15F
16552.0	32.15F
15311.0	32.15F
12431.0	32.15F
17069.0	32.15F
16955.0	32.15F
15605.0	32.15F
15235.0	32.17F
17850.0	32.19F
13777.0	32.19F
17905.0	32.23F
16583.0	32.23F
12921.0	32.23F
13468.0	32.23F
17690.0	32

In [None]:
spark.stop()