In [1]:
from pyspark import SparkContext, SparkConf
from pyspark import sql
from pyspark.sql import Row                       # To use Row method for column
#from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

In [2]:
conf = SparkConf().set("spark.executor.memory", "4g")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)

In [3]:
empRDD = sc.textFile("emp.txt")
depRDD = sc.textFile("dept.txt")
salRDD = sc.textFile("salgrade.txt")

In [4]:
emp_col = Row('EMPNO', 'ENAME', 'JOB', 'MGR', 'HIREDATE', 'SAL', 'COMM', 'DEPTNO')
dep_col = Row('DEPTNO', 'DNAME', 'DLOC')
sal_col = Row('GRADE', 'LOSAL', 'HISAL')

In [5]:
empROW = empRDD.map(lambda x: x.split(',')).map(lambda r: emp_col(*r))
depRow = depRDD.map(lambda x: x.split(',')).map(lambda r: dep_col(*r))
salROW = salRDD.map(lambda x: x.split(',')).map(lambda r: sal_col(*r))

In [6]:
empDF = sqlContext.createDataFrame(empROW)
depDF = sqlContext.createDataFrame(depRow)
salDF = sqlContext.createDataFrame(salROW)

In [7]:
# Converting Dataframe values to it types

def toInt(i):
    return i.cast("integer")

def toDouble(d):
    return d.cast("double")

emp = empDF.withColumn("EMPNO", toInt(empDF.EMPNO)).withColumn("MGR", toInt(empDF.MGR)).withColumn("HIREDATE",to_date('HIREDATE')).withColumn("SAL", toDouble(empDF.SAL)).withColumn('COMM', when(empDF.COMM == 'NULL', lit(None)).otherwise(empDF.COMM)).withColumn("DEPTNO", toInt(empDF.DEPTNO)).filter('ENAME != "ENAME"')
dept = depDF.withColumn("DEPTNO", toInt(depDF.DEPTNO)).na.drop()
sal = salDF.withColumn("GRADE", toInt(salDF.GRADE)).withColumn("LOSAL", toDouble(salDF.LOSAL)).withColumn("HISAL", toDouble(salDF.HISAL)).na.drop()

sqlContext.registerDataFrameAsTable(emp, "emptab")
sqlContext.registerDataFrameAsTable(dept, "depttab")
sqlContext.registerDataFrameAsTable(sal, "saltab")


In [8]:
# 1) Display all the information of the emp,dept,sal table?

emp.select('*').show()
dept.select('*').show()
sal.select('*').show()

+-----+------+---------+----+----------+------+-------+------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE|   SAL|   COMM|DEPTNO|
+-----+------+---------+----+----------+------+-------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|   null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.0| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.0| 500.00|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.0|   null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.0|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.0|   null|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.0|   null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.0|   null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|   null|    10|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.0|   0.00|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.0|   null|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.0|   null|    30|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000.0|   null|

In [10]:
#Who is earning more than 2000 rs

emp.filter(col("SAL") > 2000).show()

+-----+-----+---------+----+----------+------+----+------+
|EMPNO|ENAME|      JOB| MGR|  HIREDATE|   SAL|COMM|DEPTNO|
+-----+-----+---------+----+----------+------+----+------+
| 7566|JONES|  MANAGER|7839|1981-04-02|2975.0|null|    20|
| 7698|BLAKE|  MANAGER|7839|1981-05-01|2850.0|null|    30|
| 7782|CLARK|  MANAGER|7839|1981-06-09|2450.0|null|    10|
| 7788|SCOTT|  ANALYST|7566|1982-12-09|3000.0|null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0|null|    10|
| 7902| FORD|  ANALYST|7566|1981-12-03|3000.0|null|    20|
+-----+-----+---------+----+----------+------+----+------+



In [11]:
#wants to remove the duplicates based on one column

emp.dropDuplicates(['JOB']).show()

+-----+-----+---------+----+----------+------+------+------+
|EMPNO|ENAME|      JOB| MGR|  HIREDATE|   SAL|  COMM|DEPTNO|
+-----+-----+---------+----+----------+------+------+------+
| 7788|SCOTT|  ANALYST|7566|1982-12-09|3000.0|  null|    20|
| 7499|ALLEN| SALESMAN|7698|1981-02-20|1600.0|300.00|    30|
| 7369|SMITH|    CLERK|7902|1980-12-17| 800.0|  null|    20|
| 7566|JONES|  MANAGER|7839|1981-04-02|2975.0|  null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0|  null|    10|
+-----+-----+---------+----+----------+------+------+------+



In [12]:
# How to Find the number of duplicate rows in spark

emp.groupBy("JOB").count().show()

+---------+-----+
|      JOB|count|
+---------+-----+
|  ANALYST|    2|
| SALESMAN|    4|
|    CLERK|    4|
|  MANAGER|    3|
|PRESIDENT|    1|
+---------+-----+



In [14]:
#remove the null values based on column

emp.filter(col("COMM").isNotNull()).show()

#if any column haves 'null' other than comm also will get droped(but in this dataset we dont have like that values)
emp.na.drop(how = 'any').show() 

#instead of "any" if I give "all" when all the row values are null then only it will get drop

+-----+------+--------+----+----------+------+-------+------+
|EMPNO| ENAME|     JOB| MGR|  HIREDATE|   SAL|   COMM|DEPTNO|
+-----+------+--------+----+----------+------+-------+------+
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600.0| 300.00|    30|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250.0| 500.00|    30|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250.0|1400.00|    30|
| 7844|TURNER|SALESMAN|7698|1981-09-08|1500.0|   0.00|    30|
+-----+------+--------+----+----------+------+-------+------+

+-----+------+--------+----+----------+------+-------+------+
|EMPNO| ENAME|     JOB| MGR|  HIREDATE|   SAL|   COMM|DEPTNO|
+-----+------+--------+----+----------+------+-------+------+
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600.0| 300.00|    30|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250.0| 500.00|    30|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250.0|1400.00|    30|
| 7844|TURNER|SALESMAN|7698|1981-09-08|1500.0|   0.00|    30|
+-----+------+--------+----+----------+------+-------+------+



In [15]:
#Fill the null values
emp.na.fill({'COMM':50}).show()

+-----+------+---------+----+----------+------+-------+------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE|   SAL|   COMM|DEPTNO|
+-----+------+---------+----+----------+------+-------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|     50|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.0| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.0| 500.00|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.0|     50|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.0|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.0|     50|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.0|     50|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.0|     50|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|     50|    10|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.0|   0.00|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.0|     50|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.0|     50|    30|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000.0|     50|

In [16]:
#Replace JOB value is CLERK by CLK
emp.na.replace(['CLERK'],['CLK'], 'JOB').show()

#Replace wherever value value is 7902 with 1234
emp.na.replace([7902],[1234]).show()

+-----+------+---------+----+----------+------+-------+------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE|   SAL|   COMM|DEPTNO|
+-----+------+---------+----+----------+------+-------+------+
| 7369| SMITH|      CLK|7902|1980-12-17| 800.0|   null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.0| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.0| 500.00|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.0|   null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.0|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.0|   null|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.0|   null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.0|   null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|   null|    10|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.0|   0.00|    30|
| 7876| ADAMS|      CLK|7788|1983-01-12|1100.0|   null|    20|
| 7900| JAMES|      CLK|7698|1981-12-03| 950.0|   null|    30|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000.0|   null|

In [17]:
# Give name of the higehest salary getter

emp.withColumn("RANK", dense_rank().over(Window.orderBy(desc("SAL")))).filter(col("RANK") == 1).select("ENAME").head()[0]

u'KING'

In [18]:
#Finding second highest salary without ranking analytical function

emp.filter(col("SAL") == emp.filter(col("SAL") < emp.select(max("SAL")).head()[0]).select(max("SAL")).head()[0]).show()

+-----+-----+-------+----+----------+------+----+------+
|EMPNO|ENAME|    JOB| MGR|  HIREDATE|   SAL|COMM|DEPTNO|
+-----+-----+-------+----+----------+------+----+------+
| 7788|SCOTT|ANALYST|7566|1982-12-09|3000.0|null|    20|
| 7902| FORD|ANALYST|7566|1981-12-03|3000.0|null|    20|
+-----+-----+-------+----+----------+------+----+------+



In [19]:
#Passing multiple condition at a time in join

# 38. Display the location of SMITH.

emp.alias("e").join(dept.alias("d"), [col("e.DEPTNO") == col("d.DEPTNO"), col("e.ENAME") == "SMITH"]).select("DLOC").show()

+------+
|  DLOC|
+------+
|DALLAS|
+------+



In [21]:
#Self_Join & multiple condition passing

#find the person who is earning more than the manager
e = emp.alias("e")
m = emp.alias("m")

cond = [col("e.MGR") == col("m.EMPNO"), col("e.SAL") > col("m.SAL")]

e.join(m, cond).select("e.*").show()

+-----+-----+-------+----+----------+------+----+------+
|EMPNO|ENAME|    JOB| MGR|  HIREDATE|   SAL|COMM|DEPTNO|
+-----+-----+-------+----+----------+------+----+------+
| 7788|SCOTT|ANALYST|7566|1982-12-09|3000.0|null|    20|
| 7902| FORD|ANALYST|7566|1981-12-03|3000.0|null|    20|
+-----+-----+-------+----+----------+------+----+------+



# UDF Practice 

In [23]:
sal_inc = sc.parallelize([(100,10),(200,20)]).toDF(['SAL', 'INC'])

In [24]:
sal_inc.show()

+---+---+
|SAL|INC|
+---+---+
|100| 10|
|200| 20|
+---+---+



Qn: create a new column return as tuple if sal is greater than 100 then return loan as Personal and loan amt as inc * 12 * 5

In [26]:
schema = StructType([
    StructField("loan", StringType()),
    StructField("loanAmt", IntegerType())
])

In [28]:
def prodAndLoan(a, b): 
    if (a > 100): 
        return ("Personal loan", b * 12 * 5) 
    else: 
        return ("No loan", 0)

In [29]:
prod_prop = udf(prodAndLoan, schema)

In [30]:
loan_sanctioned = sal_inc.withColumn("newCol", prod_prop(sal_inc.SAL, sal_inc.INC))

In [31]:
loan_sanctioned.show(truncate = 0 )

+---+---+---------------------+
|SAL|INC|newCol               |
+---+---+---------------------+
|100|10 |[No loan, 0]         |
|200|20 |[Personal loan, 1200]|
+---+---+---------------------+



In [32]:
loan_sanctioned.select("SAL", "INC", col("newCol.loanAmt")).show()

+---+---+-------+
|SAL|INC|loanAmt|
+---+---+-------+
|100| 10|      0|
|200| 20|   1200|
+---+---+-------+



In [33]:
#incase if it is not follows struct type standard use col("INC")[1] to get second value in the list(loanAmt)

test = sc.parallelize([(100,[10,40]),(200,[20,30])]).toDF(['SAL', 'INC'])

In [34]:
test.select(col("INC")[1]).show()

+------+
|INC[1]|
+------+
|    40|
|    30|
+------+

