In [1]:
# Basic
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from decimal import Decimal
conf = SparkConf().setMaster("local").setAppName("Bankaccount-SQL")
sc = SparkContext(conf = conf)

# Creation of the list from where the RDD is going to be created
acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
# Create the RDD
acTransRDD = sc.parallelize(acTransList)
# Collect the values from the RDDs to the driver program
acTransRDD.collect()

['SB10001,1000',
 'SB10002,1200',
 'SB10003,8000',
 'SB10004,400',
 'SB10005,300',
 'SB10006,10000',
 'SB10007,500',
 'SB10008,56',
 'SB10009,30',
 'SB10010,7000',
 'CR10001,7000',
 'SB10002,-10']

In [2]:
# Apply filter and create another RDD of good transaction records
goodTransRecords = acTransRDD.filter(lambda trans: Decimal(trans.split(",")[1]) > 0).filter(lambda trans: (trans.split(",")[0]).startswith('SB') == True)

# Collect the values from the RDDs to the driver program
goodTransRecords.collect()

# Apply filter and create another RDD of high value transaction records
highValueTransRecords = goodTransRecords.filter(lambda trans: Decimal(trans.split(",")[1]) > 1000)

# Collect the values from the RDDs to the driver program
highValueTransRecords.collect()

# The function that identifies the bad amounts
badAmountLambda = lambda trans: Decimal(trans.split(",")[1]) <= 0

In [3]:
# The function that identifies bad accounts
badAcNoLambda = lambda trans: (trans.split(",")[0]).startswith('SB') == False
# Apply filter and create another RDD of bad amount records
badAmountRecords = acTransRDD.filter(badAmountLambda)
# Collect the values from the RDDs to the driver program
badAmountRecords.collect()

['SB10002,-10']

In [4]:
# Apply filter and create another RDD of bad account records
badAccountRecords = acTransRDD.filter(badAcNoLambda)
# Do the union of two RDDs and create another RDD
badTransRecords  = badAmountRecords.union(badAccountRecords)
# Collect the values from the RDDs to the driver program
badTransRecords.collect()

['SB10002,-10', 'CR10001,7000']

In [5]:
# Collect the values from the RDDs to the driver program
badAccountRecords.collect()

['CR10001,7000']

In [6]:
# The function that calculates the sum
sumAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a+b)
sumAmount

Decimal('28486')

In [7]:
# The function that calculates the maximum
maxAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a > b else b)
maxAmount

Decimal('10000')

In [8]:
# The function that calculates the minimum
minAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a < b else b)
minAmount

Decimal('30')

In [9]:
# Combine all the elements
combineAllElements = acTransRDD.flatMap(lambda trans: trans.split(","))
combineAllElements.collect()

['SB10001',
 '1000',
 'SB10002',
 '1200',
 'SB10003',
 '8000',
 'SB10004',
 '400',
 'SB10005',
 '300',
 'SB10006',
 '10000',
 'SB10007',
 '500',
 'SB10008',
 '56',
 'SB10009',
 '30',
 'SB10010',
 '7000',
 'CR10001',
 '7000',
 'SB10002',
 '-10']

In [10]:
# Find the good account numbers
allGoodAccountNos = combineAllElements.filter(lambda trans: trans.startswith('SB') == True)
allGoodAccountNos.distinct().collect()

['SB10001',
 'SB10002',
 'SB10003',
 'SB10004',
 'SB10005',
 'SB10006',
 'SB10007',
 'SB10008',
 'SB10009',
 'SB10010']

In [11]:
# Create the RDD
acTransRDD = sc.parallelize(acTransList)
# Create the RDD containing key value pairs by doing mapping operation
acKeyVal = acTransRDD.map(lambda trans: (trans.split(",")[0],Decimal(trans.split(",")[1])))
# Create the RDD by reducing key value pairs by doing applying sum operation to the values
accSummary = acKeyVal.reduceByKey(lambda a,b : a+b).sortByKey()

# Collect the values from the RDDs to the driver program
accSummary.collect()

[('CR10001', Decimal('7000')),
 ('SB10001', Decimal('1000')),
 ('SB10002', Decimal('1190')),
 ('SB10003', Decimal('8000')),
 ('SB10004', Decimal('400')),
 ('SB10005', Decimal('300')),
 ('SB10006', Decimal('10000')),
 ('SB10007', Decimal('500')),
 ('SB10008', Decimal('56')),
 ('SB10009', Decimal('30')),
 ('SB10010', Decimal('7000'))]

In [12]:
# Creation of the list from where the RDD is going to be created
acMasterList = ["SB10001,Roger,Federer", "SB10002,Pete,Sampras", "SB10003,Rafel,Nadal", "SB10004,Boris,Becker", "SB10005,Ivan,Lendl"]
# Creation of the list from where the RDD is going to be created
acBalList = ["SB10001,50000", "SB10002,12000", "SB10003,3000", "SB10004,8500", "SB10005,5000"]
# Create the RDD
acMasterRDD = sc.parallelize(acMasterList)
# Collect the values to the driver program
acMasterRDD.collect()

['SB10001,Roger,Federer',
 'SB10002,Pete,Sampras',
 'SB10003,Rafel,Nadal',
 'SB10004,Boris,Becker',
 'SB10005,Ivan,Lendl']

In [13]:
# Create the RDD
acBalRDD = sc.parallelize(acBalList)
# Collect the values to the driver program
acBalRDD.collect()

['SB10001,50000',
 'SB10002,12000',
 'SB10003,3000',
 'SB10004,8500',
 'SB10005,5000']

In [14]:
# Create account master tuples
acMasterTuples = acMasterRDD.map(lambda master: master.split(",")).map(lambda masterList: (masterList[0], masterList[1] + " " + masterList[2]))
# Collect the values to the driver program
acMasterTuples.collect()

[('SB10001', 'Roger Federer'),
 ('SB10002', 'Pete Sampras'),
 ('SB10003', 'Rafel Nadal'),
 ('SB10004', 'Boris Becker'),
 ('SB10005', 'Ivan Lendl')]

In [15]:
# Create balance tuples
acBalTuples = acBalRDD.map(lambda trans: trans.split(",")).map(lambda transList: (transList[0], transList[1]))
# Collect the values to the driver program
acBalTuples.collect()

[('SB10001', '50000'),
 ('SB10002', '12000'),
 ('SB10003', '3000'),
 ('SB10004', '8500'),
 ('SB10005', '5000')]

In [16]:
# Join the tuples
acJoinTuples = acMasterTuples.join(acBalTuples).sortByKey().map(lambda tran: (tran[0], tran[1][0],tran[1][1]))
# Collect the values to the driver program
acJoinTuples.collect()

[('SB10001', 'Roger Federer', '50000'),
 ('SB10002', 'Pete Sampras', '12000'),
 ('SB10003', 'Rafel Nadal', '3000'),
 ('SB10004', 'Boris Becker', '8500'),
 ('SB10005', 'Ivan Lendl', '5000')]

In [17]:
# ------- Not included Section I in the slide -------
# Find the account name and balance
acNameAndBalance = acJoinTuples.map(lambda tran: (tran[1],tran[2]))
# Collect the values to the driver program
acNameAndBalance.collect()

[('Roger Federer', '50000'),
 ('Pete Sampras', '12000'),
 ('Rafel Nadal', '3000'),
 ('Boris Becker', '8500'),
 ('Ivan Lendl', '5000')]

In [18]:
# Find the account tuples sorted by amount
acTuplesByAmount = acBalTuples.map(lambda tran: (Decimal(tran[1]), tran[0])).sortByKey(False)
# Collect the values to the driver program
acTuplesByAmount.collect()

[(Decimal('50000'), 'SB10001'),
 (Decimal('12000'), 'SB10002'),
 (Decimal('8500'), 'SB10004'),
 (Decimal('5000'), 'SB10005'),
 (Decimal('3000'), 'SB10003')]

In [19]:
# Get the top element
acTuplesByAmount.first()

(Decimal('50000'), 'SB10001')

In [20]:
# Get the top 3 elements
acTuplesByAmount.take(3)

[(Decimal('50000'), 'SB10001'),
 (Decimal('12000'), 'SB10002'),
 (Decimal('8500'), 'SB10004')]

In [21]:
# Count by the key
acBalTuples.countByKey()

defaultdict(int,
            {'SB10001': 1,
             'SB10002': 1,
             'SB10003': 1,
             'SB10004': 1,
             'SB10005': 1})

In [22]:
# Count all the records
acBalTuples.count()

5

In [23]:
# Print the contents of the account name and balance RDD
acNameAndBalance.foreach(print)

In [24]:
# Find the balance total using accumulator
balanceTotal = sc.accumulator(0.0)
balanceTotal.value

0.0

In [25]:
# Do the summation
acBalTuples.foreach(lambda bals: balanceTotal.add(float(bals[1])))

# Print the results
balanceTotal.value

78500.0

In [26]:
# ------ Not included Section I in the slide End -------

# To use SQL command.
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [27]:
# Creation of the list from where the RDD is going to be created
acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
spark = SparkSession(sc)

acTransDF = sc.parallelize(acTransList).map(lambda trans: trans.split(",")).map(lambda p: Row(accNo=p[0], tranAmount=float(p[1]))).toDF()

acTransDF.createOrReplaceTempView("trans")

# Print the structure of the DataFrame
acTransDF.printSchema()
# Show the first few records of the DataFrame
acTransDF.show()

root
 |-- accNo: string (nullable = true)
 |-- tranAmount: double (nullable = true)

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10001|    1000.0|
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10004|     400.0|
|SB10005|     300.0|
|SB10006|   10000.0|
|SB10007|     500.0|
|SB10008|      56.0|
|SB10009|      30.0|
|SB10010|    7000.0|
|CR10001|    7000.0|
|SB10002|     -10.0|
+-------+----------+



In [28]:
# Use SQL to create another DataFrame containing the good transaction records
goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
# Register temporary table in the DataFrame for using it in SQL
goodTransRecords.createOrReplaceTempView("goodtrans")
# Show the first few records of the DataFrame
goodTransRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10001|    1000.0|
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10004|     400.0|
|SB10005|     300.0|
|SB10006|   10000.0|
|SB10007|     500.0|
|SB10008|      56.0|
|SB10009|      30.0|
|SB10010|    7000.0|
+-------+----------+



In [29]:
# Use SQL to create another DataFrame containing the high value transaction records
highValueTransRecords = spark.sql("SELECT accNo, tranAmount FROM goodtrans WHERE tranAmount > 1000")
# Show the first few records of the DataFrame
highValueTransRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10006|   10000.0|
|SB10010|    7000.0|
+-------+----------+



In [30]:
# Use SQL to create another DataFrame containing the bad account records
badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
# Show the first few records of the DataFrame
badAccountRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
+-------+----------+



In [31]:
# Use SQL to create another DataFrame containing the bad amount records
badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|     -10.0|
+-------+----------+



In [32]:
# Do the union of two DataFrames and create another DataFrame
badTransRecords = badAccountRecords.union(badAmountRecords)
# Show the first few records of the DataFrame
badTransRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
|SB10002|     -10.0|
+-------+----------+



In [33]:
# Calculate the sum
sumAmount = spark.sql("SELECT sum(tranAmount)as sum FROM goodtrans")
# Show the first few records of the DataFrame
sumAmount.show()

+-------+
|    sum|
+-------+
|28486.0|
+-------+



In [34]:
# Calculate the maximum
maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
# Show the first few records of the DataFrame
maxAmount.show()

+-------+
|    max|
+-------+
|10000.0|
+-------+



In [35]:
# Calculate the minimum
minAmount = spark.sql("SELECT min(tranAmount)as min FROM goodtrans")
# Show the first few records of the DataFrame
minAmount.show()

+----+
| min|
+----+
|30.0|
+----+



In [36]:
# Use SQL to create another DataFrame containing the good account numbers
goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
# Show the first few records of the DataFrame
goodAccNos.show()

+-------+
|  accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+



In [37]:
# Create the DataFrame using API for the high value transaction records
highValueTransRecords = goodTransRecords.filter("tranAmount > 1000")
# Show the first few records of the DataFrame
highValueTransRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10006|   10000.0|
|SB10010|    7000.0|
+-------+----------+



In [38]:
# Create the DataFrame using API for the bad account records
badAccountRecords = acTransDF.filter("accNo NOT like 'SB%'")
# Show the first few records of the DataFrame
badAccountRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
+-------+----------+



In [39]:
# Create the DataFrame using API for the bad amount records
badAmountRecords = acTransDF.filter("tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()

+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|     -10.0|
+-------+----------+



In [40]:
# Calculate the sum
sumAmount = goodTransRecords.agg({"tranAmount": "sum"})
# Show the first few records of the DataFrame
sumAmount.show()

+---------------+
|sum(tranAmount)|
+---------------+
|        28486.0|
+---------------+



In [41]:
# Calculate the maximum
maxAmount = goodTransRecords.agg({"tranAmount": "max"})
# Show the first few records of the DataFrame
maxAmount.show()

+---------------+
|max(tranAmount)|
+---------------+
|        10000.0|
+---------------+



In [42]:
# Calculate the minimum
minAmount = goodTransRecords.agg({"tranAmount": "min"})
# Show the first few records of the DataFrame
minAmount.show()

+---------------+
|min(tranAmount)|
+---------------+
|           30.0|
+---------------+



In [43]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

import collections

In [44]:
# For windows students
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("SparkSQL").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), age=int(fields[2]), numFriends=int(fields[3]))

lines = spark.sparkContext.textFile("fakefriends.csv")

people = lines.map(mapper)
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")

In [45]:
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

for teen in teenagers.collect():
    print(teen)

Row(ID=21, name="b'Miles'", age=19, numFriends=268)
Row(ID=52, name="b'Beverly'", age=19, numFriends=269)
Row(ID=54, name="b'Brunt'", age=19, numFriends=5)
Row(ID=106, name="b'Beverly'", age=18, numFriends=499)
Row(ID=115, name="b'Dukat'", age=18, numFriends=397)
Row(ID=133, name="b'Quark'", age=19, numFriends=265)
Row(ID=136, name="b'Will'", age=19, numFriends=335)
Row(ID=225, name="b'Elim'", age=19, numFriends=106)
Row(ID=304, name="b'Will'", age=19, numFriends=404)
Row(ID=341, name="b'Data'", age=18, numFriends=326)
Row(ID=366, name="b'Keiko'", age=19, numFriends=119)
Row(ID=373, name="b'Quark'", age=19, numFriends=272)
Row(ID=377, name="b'Beverly'", age=18, numFriends=418)
Row(ID=404, name="b'Kasidy'", age=18, numFriends=24)
Row(ID=409, name="b'Nog'", age=19, numFriends=267)
Row(ID=439, name="b'Data'", age=18, numFriends=417)
Row(ID=444, name="b'Keiko'", age=18, numFriends=472)
Row(ID=492, name="b'Dukat'", age=19, numFriends=36)
Row(ID=494, name="b'Kasidy'", age=18, numFriends=194)

In [46]:
schemaPeople.groupBy("age").count().orderBy("age").show()

spark.stop()

+---+-----+
|age|count|
+---+-----+
| 18|    8|
| 19|   11|
| 20|    5|
| 21|    8|
| 22|    7|
| 23|   10|
| 24|    5|
| 25|   11|
| 26|   17|
| 27|    8|
| 28|   10|
| 29|   12|
| 30|   11|
| 31|    8|
| 32|   11|
| 33|   12|
| 34|    6|
| 35|    8|
| 36|   10|
| 37|    9|
+---+-----+
only showing top 20 rows

