In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [2]:
from pyspark.sql.functions import expr

In [3]:
customers = spark.read.csv(
    path = "customer.csv",
    sep = ",",
    header=True,
    quote='"',
    schema="customer_id INT, name STRING, address STRING,website STRING, credit_limit FLOAT")


orders = spark.read.csv(
    path = "orders.csv",
    sep = ",",
    header=True,
    quote='"',
    schema="order_id INT, customer_id INT, status STRING,salesman_id INT, order_date TIMESTAMP")


employees = spark.read.csv(
    path = "employees.csv",
    sep = ",",
    header=True,
    quote='"',
    schema="employee_id INT, fname STRING, lname STRING,email STRING,phone INT, hire_date TIMESTAMP, manager_id INT, job_title STRING")


In [4]:
customers.show()
customers.registerTempTable("customers")

+-----------+----+-------+-------+------------+
|customer_id|name|address|website|credit_limit|
+-----------+----+-------+-------+------------+
|        123| abc|   null|   null|        null|
|        456| def|   null|   null|        null|
|        789| ghk|   null|   null|        null|
+-----------+----+-------+-------+------------+



In [5]:
orders.show()
orders.registerTempTable("orders")

+--------+-----------+--------+-----------+----------+
|order_id|customer_id|  status|salesman_id|order_date|
+--------+-----------+--------+-----------+----------+
|      98|        123| pending|         12|      null|
|      76|        456|complete|         34|      null|
|      54|        789|complete|         56|      null|
|    5345|        123|complete|         12|      null|
|     453|        456|complete|         34|      null|
|    4645|        789|complete|         12|      null|
|     768|        123| pending|         12|      null|
|    5754|        456|complete|         56|      null|
|      46|        789|complete|         12|      null|
|     325|        123| pending|         12|      null|
|      79|        456|complete|         56|      null|
|     453|        789| pending|         56|      null|
+--------+-----------+--------+-----------+----------+



In [6]:
employees.show()
employees.registerTempTable("employees")

+-----------+---------+-------+-----+-----+---------+----------+---------+
|employee_id|    fname|  lname|email|phone|hire_date|manager_id|job_title|
+-----------+---------+-------+-----+-----+---------+----------+---------+
|         12|     dhbf| kdfhjk| null| null|     null|      null|     null|
|         34|     dkfj|  sklfj| null| null|     null|      null|     null|
|         56|isjfiosjf|kdjflid| null| null|     null|      null|     null|
|         78|      efr|     rw| null| null|     null|      null|     null|
|         69| fdfvsdvf|  dgsdg| null| null|     null|      null|     null|
+-----------+---------+-------+-----+-----+---------+----------+---------+



In [7]:
orders.rdd.filter(lambda row: row["status"] == "pending").map(lambda row: (row["customer_id"], 1)).reduceByKey(lambda x,y: x * y).sortBy(lambda x:x[1], ascending= False).collect()

[(123, 1), (789, 1)]

In [8]:
orders.rdd.filter(lambda row: row["status"] == "pending").map(lambda row: (row["customer_id"], 1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x:x[1], ascending= False).collect()

[(123, 3), (789, 1)]

In [9]:
orders.rdd.filter(lambda y: y["status"] == "pending").map(lambda y: (y["customer_id"], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x:x[1], ascending= False).collect()

[(123, 3), (789, 1)]

#Example Question


The following schema describes Students who are enrolled in different subjects. Your task is to use the RDD API to compute the grade of all students in Big Data Subject and Sort the resulting RDD from highest to lowest grade. The variable students and grades are a reference to the DataFrames STUDENTS and GRADES respectively.

Students : student_ID, Student_name, Subject, Date
Grades : student_ID, Grade



In [35]:
students = spark.read.option("inferSchema","true").option("header","true").csv("STUDENTS.csv")
students.show()
#students_rdd = Students.rdd
#students_rdd.collect()
students.rdd.collect()

+----------+------------+------------+---------+
|student_id|student_name|     subject|     date|
+----------+------------+------------+---------+
|       123|         abc|    Big Data|30-Oct-20|
|       456|       priya|          ML|30-Oct-20|
|       789|       vibha|    Big Data|30-Oct-20|
|       741|         hmk|          ML|30-Oct-20|
|       852|         lfg|    Big Data|30-Oct-20|
|       963|         sdb|Data Science|30-Oct-20|
|       951|     jchnsjk|Data Science|30-Oct-20|
+----------+------------+------------+---------+



[Row(student_id=123, student_name='abc', subject='Big Data', date='30-Oct-20'),
 Row(student_id=456, student_name='priya', subject='ML', date='30-Oct-20'),
 Row(student_id=789, student_name='vibha', subject='Big Data', date='30-Oct-20'),
 Row(student_id=741, student_name='hmk', subject='ML', date='30-Oct-20'),
 Row(student_id=852, student_name='lfg', subject='Big Data', date='30-Oct-20'),
 Row(student_id=963, student_name='sdb', subject='Data Science', date='30-Oct-20'),
 Row(student_id=951, student_name='jchnsjk', subject='Data Science', date='30-Oct-20')]

In [34]:
grades = spark.read.option("inferSchema","true").option("header","true").csv("GRADES.csv")
grades.show()
#grades_rdd = Grades.rdd
#grades_rdd.collect()
grades.rdd.collect()

+----------+-----+
|student_id|grade|
+----------+-----+
|       123|  2.7|
|       456|  1.3|
|       789|  1.0|
|       741|  3.3|
|       852|  4.0|
|       963|  2.3|
|       951|  2.0|
+----------+-----+



[Row(student_id=123, grade=2.7),
 Row(student_id=456, grade=1.3),
 Row(student_id=789, grade=1.0),
 Row(student_id=741, grade=3.3),
 Row(student_id=852, grade=4.0),
 Row(student_id=963, grade=2.3),
 Row(student_id=951, grade=2.0)]

In [36]:
students.rdd.filter(lambda row: row["subject"] == "Big Data").join(grades.rdd).sortBy(lambda x: x[1][1],ascending= False).collect()

[(852, ('lfg', 4.0)), (123, ('abc', 2.7)), (789, ('vibha', 1.0))]

In [39]:
students.rdd.filter(lambda row: row["subject"] == "Big Data").join(grades.rdd).sortBy(lambda x: x[1][1],ascending= True).collect()

[(789, ('vibha', 1.0)), (123, ('abc', 2.7)), (852, ('lfg', 4.0))]

In [37]:
students.rdd.fullOuterJoin(grades.rdd).collect()

[(456, ('priya', 1.3)),
 (852, ('lfg', 4.0)),
 (123, ('abc', 2.7)),
 (789, ('vibha', 1.0)),
 (741, ('hmk', 3.3)),
 (963, ('sdb', 2.3)),
 (951, ('jchnsjk', 2.0))]

In [38]:
students.rdd.join(grades.rdd).collect()

[(456, ('priya', 1.3)),
 (852, ('lfg', 4.0)),
 (123, ('abc', 2.7)),
 (789, ('vibha', 1.0)),
 (741, ('hmk', 3.3)),
 (963, ('sdb', 2.3)),
 (951, ('jchnsjk', 2.0))]

In [14]:
students.rdd.leftOuterJoin(grades.rdd).collect()

[(456, ('priya', 1.3)),
 (852, ('lfg', 4.0)),
 (123, ('abc', 2.7)),
 (789, ('vibha', 1.0)),
 (741, ('hmk', 3.3)),
 (963, ('sdb', 2.3)),
 (951, ('jchnsjk', 2.0))]

In [15]:
students.rdd.rightOuterJoin(grades.rdd).collect()

[(456, ('priya', 1.3)),
 (852, ('lfg', 4.0)),
 (123, ('abc', 2.7)),
 (789, ('vibha', 1.0)),
 (741, ('hmk', 3.3)),
 (963, ('sdb', 2.3)),
 (951, ('jchnsjk', 2.0))]

In [16]:
grades.rdd.fullOuterJoin(students.rdd).collect()

[(456, (1.3, 'priya')),
 (852, (4.0, 'lfg')),
 (123, (2.7, 'abc')),
 (789, (1.0, 'vibha')),
 (741, (3.3, 'hmk')),
 (963, (2.3, 'sdb')),
 (951, (2.0, 'jchnsjk'))]

In [29]:
students.rdd.filter(lambda y: y["subject"] == "Big Data").collect()

[Row(student_id=123, student_name='abc', subject='Big Data', date='30-Oct-20'),
 Row(student_id=789, student_name='vibha', subject='Big Data', date='30-Oct-20'),
 Row(student_id=852, student_name='lfg', subject='Big Data', date='30-Oct-20')]