In [1]:
import os
import sys
import glob
from os.path import abspath
os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
sys.path[:0]=[spark_python,py4j]
os.environ['PYTHONPATH']=py4j                          
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Examples").enableHiveSupport().getOrCreate()

## CREATE TWO DATAFRAMES

In [2]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

## Lets create a hive database and write the two dataframes as hive tables 

In [8]:
spark.sql("create database if not exists employee")

DataFrame[]

In [9]:
empDF.createOrReplaceTempView("empDF")
deptDF.createOrReplaceTempView("deptDF")
spark.sql("create table employee.emp_tbl as select * from empDF")
spark.sql("create table employee.dept_tbl as select * from deptDF")

DataFrame[]

## JOIN OPERATION

In [10]:
empDF_deptDF_join = empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner")

In [11]:
empDF_deptDF_join.show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



## PIVOT OPERATION / TRANSPOSE

In [18]:
df_pivot = empDF_deptDF_join.groupBy("dept_id","dept_name").count()

In [21]:
df_pivot.createOrReplaceTempView("df_pivot")
df_trans = spark.sql("SELECT dept_id, STACK(1, 'dept_name', dept_name) AS (name,col) FROM df_pivot")

In [23]:
df_trans.show()

+-------+---------+---------+
|dept_id|     name|      col|
+-------+---------+---------+
|     10|dept_name|  Finance|
|     20|dept_name|Marketing|
|     40|dept_name|       IT|
+-------+---------+---------+



In [25]:
df_trans.createOrReplaceTempView("df_trans")
df_trans = spark.sql("""
with Cte AS(
    select *,
        ROW_NUMBER() OVER(Partition by dept_id order by dept_id) as Rn
        FROM df_trans)
        select Cte.dept_id,
        MAX(CASE WHEN Cte.Rn = 1 THEN Cte.col END) as COL1,
        MAX(CASE WHEN Cte.Rn = 2 THEN Cte.col END) as COL2,
        MAX(CASE WHEN Cte.Rn = 3 THEN Cte.col END) as COL3
    FROM Cte
    GROUP BY Cte.name, Cte.dept_id
""")
df_trans.show()

+-------+---------+----+----+
|dept_id|     COL1|COL2|COL3|
+-------+---------+----+----+
|     10|  Finance|null|null|
|     20|Marketing|null|null|
|     40|       IT|null|null|
+-------+---------+----+----+



## UNION OF DATFRAMES WITH DIFFERENT COLUMN NAMES 

In [31]:
from pyspark.sql.functions import * 
from pyspark.sql import Row

def customUnion(df1,df2):
    cols1 = [x.lower() for x in df1.columns]
    cols2 = [x.lower() for x in df2.columns]
    total_cols = sorted(cols1+list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1,total_cols)).union(df2.select(expr(cols2,total_cols)))
    return appended
df_union = customUnion(empDF,deptDF)
df_union.show()

+-------+---------+-----------+------+------+--------+------+---------------+-----------+
|dept_id|dept_name|emp_dept_id|emp_id|gender|    name|salary|superior_emp_id|year_joined|
+-------+---------+-----------+------+------+--------+------+---------------+-----------+
|   null|     null|         10|     1|     M|   Smith|  3000|             -1|       2018|
|   null|     null|         20|     2|     M|    Rose|  4000|              1|       2010|
|   null|     null|         10|     3|     M|Williams|  1000|              1|       2010|
|   null|     null|         10|     4|     F|   Jones|  2000|              2|       2005|
|   null|     null|         40|     5|      |   Brown|    -1|              2|       2010|
|   null|     null|         50|     6|      |   Brown|    -1|              2|       2010|
|     10|  Finance|       null|  null|  null|    null|  null|           null|       null|
|     20|Marketing|       null|  null|  null|    null|  null|           null|       null|
|     30| 

## REMOVING DUPLICATE COLUMN NAMES AFTER JOIN 

In [33]:
dept1 = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns1 = ["dept_name","emp_dept_id"]
df3 = spark.createDataFrame(data=dept1, schema = deptColumns1)
df3 = empDF.join(df3,empDF.emp_dept_id == df3.emp_dept_id, 'inner')
df3.show()

+------+--------+---------------+-----------+-----------+------+------+---------+-----------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|emp_dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-----------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|         10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|         10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|         10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|         20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|         40|
+------+--------+---------------+-----------+-----------+------+------+---------+-----------+



In [35]:
cols_new = []
seen = set()
for c in df3.columns:
    c = c.lower()
    cols_new.append('{}_dup'.format(c) if c in seen else c)
    seen.add(c)
df3 = df3.toDF(*cols_new).select(*[c for c in cols_new if not c.endswith('_dup')])
df3.show()

+------+--------+---------------+-----------+-----------+------+------+---------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|
+------+--------+---------------+-----------+-----------+------+------+---------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|
+------+--------+---------------+-----------+-----------+------+------+---------+



## PYSPARK WITH MYSQL 

In [None]:
jdbcHostname = "use your hostname"
jdbcDatabase = "use your database name"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

In [None]:
pushdown_query = "(select * from tablename) emp_alias"
df1 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query)
df1.show()