In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession 
from pyspark.conf import SparkConf
from pyspark import SparkContext

In [143]:
pyspark = SparkSession.builder \
.master('local[4]') \
.appName("Filtering_SalesData") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [144]:
sc = pyspark.sparkContext

In [145]:
retailRDD = sc.textFile('OnlineRetail.csv')

In [146]:
retailRDD

OnlineRetail.csv MapPartitionsRDD[137] at textFile at <unknown>:0

In [147]:
retailRDD.take(5)

['InvoiceNo;StockCode;Description;Quantity;InvoiceDate;UnitPrice;CustomerID;Country',
 '536365;85123A;WHITE HANGING HEART T-LIGHT HOLDER;6;1.12.2010 08:26;2,55;17850;United Kingdom',
 '536365;71053;WHITE METAL LANTERN;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84406B;CREAM CUPID HEARTS COAT HANGER;8;1.12.2010 08:26;2,75;17850;United Kingdom',
 '536365;84029G;KNITTED UNION FLAG HOT WATER BOTTLE;6;1.12.2010 08:26;3,39;17850;United Kingdom']

In [148]:
headers = sc.parallelize(retailRDD.take(1)) #retailRDD.first()
headers.collect()

['InvoiceNo;StockCode;Description;Quantity;InvoiceDate;UnitPrice;CustomerID;Country']

In [149]:
retailRDD = retailRDD.subtract(headers)

## Filter

In [150]:
# filtering : Quantity> 30
retailRDD.filter(lambda x : int(x.split(";")[3]) > 30).take(5)

['536378;85183B;CHARLIE & LOLA WASTEPAPER BIN FLORA;48;1.12.2010 09:37;1,25;14688;United Kingdom',
 '536381;22719;GUMBALL MONOCHROME COAT RACK;36;1.12.2010 09:41;1,06;15311;United Kingdom',
 '536384;22470;HEART OF WICKER LARGE;40;1.12.2010 09:53;2,55;18074;United Kingdom',
 '536386;85099C;JUMBO  BAG BAROQUE BLACK WHITE;100;1.12.2010 09:57;1,65;16029;United Kingdom',
 '536387;79321;CHILLI LIGHTS;192;1.12.2010 09:58;3,82;16029;United Kingdom']

In [151]:
# filtering: 11.months sales
retailRDD.filter(lambda x : int((x.split(";")[4]).split(".")[1])==11).take(5)

['573744;21704;BAG 250g SWIRLY MARBLES;12;1.11.2011 08:16;0,85;17733;United Kingdom',
 '573744;21791;VINTAGE HEADS AND TAILS CARD GAME;12;1.11.2011 08:16;1,25;17733;United Kingdom',
 '573744;22340;NOEL GARLAND PAINTED ZINC;24;1.11.2011 08:16;0,39;17733;United Kingdom',
 '573744;22577;WOODEN HEART CHRISTMAS SCANDINAVIAN;24;1.11.2011 08:16;0,29;17733;United Kingdom',
 '573744;23503;PLAYING CARDS KEEP CALM & CARRY ON;12;1.11.2011 08:16;1,25;17733;United Kingdom']

In [152]:
# filtering : include coffe description
retailRDD.filter(lambda x : "COFFEE" in (x.split(";")[2]).split(" ")).take(5)

['536750;37370;RETRO COFFEE MUGS ASSORTED;6;2.12.2010 14:04;1,06;17850;United Kingdom',
 '536787;37370;RETRO COFFEE MUGS ASSORTED;6;2.12.2010 15:24;1,06;17850;United Kingdom',
 '536804;37370;RETRO COFFEE MUGS ASSORTED;72;2.12.2010 16:34;1,06;14031;United Kingdom',
 '536805;37370;RETRO COFFEE MUGS ASSORTED;12;2.12.2010 16:38;1,25;14775;United Kingdom',
 '536865;37370;RETRO COFFEE MUGS ASSORTED;1;3.12.2010 11:28;16,13;000000;United Kingdom']

In [153]:
# filtering : 11.month sales which are before 9am
def month_hour_filter(x):
    first_filter = int((x.split(";")[4]).split(".")[1])==11
    second_filter = int((x.split(";")[4]).split(" ")[1].split(":")[0]) < 9
    
    return (first_filter & second_filter)

In [154]:
retailRDD.filter(lambda x: month_hour_filter(x)).take(5)

['573744;21704;BAG 250g SWIRLY MARBLES;12;1.11.2011 08:16;0,85;17733;United Kingdom',
 '573744;21791;VINTAGE HEADS AND TAILS CARD GAME;12;1.11.2011 08:16;1,25;17733;United Kingdom',
 '573744;22340;NOEL GARLAND PAINTED ZINC;24;1.11.2011 08:16;0,39;17733;United Kingdom',
 '573744;22577;WOODEN HEART CHRISTMAS SCANDINAVIAN;24;1.11.2011 08:16;0,29;17733;United Kingdom',
 '573744;23503;PLAYING CARDS KEEP CALM & CARRY ON;12;1.11.2011 08:16;1,25;17733;United Kingdom']

## Map

In [157]:
# filtering : returned products
#startswith("C")
returnedRetailRDD = retailRDD.filter(lambda x: "C" in (x.split(";")[0]))

In [158]:
# counts of returned products

In [160]:
returnedRetailRDD

PythonRDD[152] at collect at <ipython-input-159-ae8c7a838b73>:1

In [161]:
# total price of returned products

In [162]:
# 1.Way
returned_products_total_price = returnedRetailRDD.map(lambda x : abs(int(x.split(";")[3])) * (float(x.split(";")[5].replace(",","."))))

In [163]:
returned_products_total_price

PythonRDD[153] at RDD at PythonRDD.scala:53

In [164]:
returned_products_total_price = returned_products_total_price.reduce(lambda x,y: x+y)

In [165]:
returned_products_total_price

896812.4899999979

In [166]:
# 2. Way

In [167]:
def cancelled_price(line):
    is_cancelled = True if(line.split(';')[0].startswith('C')) else False
    quantity = float(line.split(";")[3])
    price = float(line.split(";")[5].replace(",","."))
    total = quantity * price
                  
    return (is_cancelled, total)

In [168]:
canceled_total = retailRDD.map(cancelled_price)

In [169]:
canceled_total.take(5)

[(False, 25.5),
 (False, 17.85),
 (False, 70.80000000000001),
 (False, 39.599999999999994),
 (False, 11.100000000000001)]

In [170]:
canceled_total.reduceByKey(lambda x,y : x+y).take(5)

[(False, 10644560.423998535), (True, -896812.4899999979)]