In [0]:
# Creating SparkSession Object
from pyspark.sql import SparkSession

spark_session_object = SparkSession.builder.getOrCreate()
# spark_session_object = sparkSession.builder.appName("Exl PySpark Application").getOrCreate()
print(spark_session_object)



<pyspark.sql.session.SparkSession object at 0x7f0929e19f10>


In [0]:
# Concept of Defining our own custom Datatypes & Nullable Constrains using - StructType & StructField
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

data_emp = [("Alex",None,"Tom",3235245,"M",12000),\
          ("John","","XXXX",89983243,"M",12000), \
          ("Andy","Steve","Morision",435435,"M",12000), \
          ("Arun","OOOO","Gopal",454425,"M",12000), \
          ("Claudia","","Schepars",978643,"F",8000), \
          ("Nancy","MMMM","Paul",2353462,"F",12000)]

schema_emp = StructType([StructField("First_Name",StringType(),True),\
    StructField("Middle_Name",StringType(),True),\
        StructField("Last_Name",StringType(),True),\
            StructField("Emp_ID",IntegerType(),False),\
                StructField("Gender",StringType(),True),\
                    StructField("Salary",IntegerType(),True),])

df_emp_wit_cols_dt_nullable = spark_session_object.createDataFrame(data_emp,schema_emp)
df_emp_wit_cols_dt_nullable.show()
df_emp_wit_cols_dt_nullable.printSchema()




+----------+-----------+---------+--------+------+------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|
+----------+-----------+---------+--------+------+------+
|      Alex|       null|      Tom| 3235245|     M| 12000|
|      John|           |     XXXX|89983243|     M| 12000|
|      Andy|      Steve| Morision|  435435|     M| 12000|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|
|   Claudia|           | Schepars|  978643|     F|  8000|
|     Nancy|       MMMM|     Paul| 2353462|     F| 12000|
+----------+-----------+---------+--------+------+------+

root
 |-- First_Name: string (nullable = true)
 |-- Middle_Name: string (nullable = true)
 |-- Last_Name: string (nullable = true)
 |-- Emp_ID: integer (nullable = false)
 |-- Gender: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [0]:

# SparkSQL

df_emp_wit_cols_dt_nullable.createOrReplaceTempView("tbl_employee_details")
spark_sql_emp_detials1 = spark_session_object.sql("select * from tbl_employee_details where Gender='M'")
spark_sql_emp_detials1.show()

spark_sql_emp_detials2 = spark_session_object.sql("select *,(case when gender='M' then 'Male' when gender='F' then 'Female' else gender end) as gender_col_new from tbl_employee_details")

spark_sql_emp_detials2.show()

# We are seeing 3 different ways of mentioning the columns inside the dataframe.

# df_pyspark_trans_bonus = df_emp_wit_cols_dt_nullable.withColumn("yearly_bonus",df_emp_wit_cols_dt_nullable.Salary*30/100)
df_pyspark_trans_bonus = spark_sql_emp_detials2.withColumn("yearly_bonus",df_emp_wit_cols_dt_nullable.Salary*30/100)

df_pyspark_trans_bonus.show()

# df_pyspark_trans_bonus_1 = df_emp_wit_cols_dt_nullable.withColumn("yearly_bonus",df_emp_wit_cols_dt_nullable["Salary"]*25/100)
df_pyspark_trans_bonus_1 = spark_sql_emp_detials2.withColumn("yearly_bonus",df_emp_wit_cols_dt_nullable["Salary"]*25/100)

df_pyspark_trans_bonus_1.show()

from pyspark.sql.functions import col

# df_pyspark_trans_bonus_2 = df_emp_wit_cols_dt_nullable.withColumn("yearly_bonus",col("Salary")*20/100)
df_pyspark_trans_bonus_2 = spark_sql_emp_detials2.withColumn("yearly_bonus",col("Salary")*20/100)

df_pyspark_trans_bonus_2.show()

+----------+-----------+---------+--------+------+------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|
+----------+-----------+---------+--------+------+------+
|      Alex|       null|      Tom| 3235245|     M| 12000|
|      John|           |     XXXX|89983243|     M| 12000|
|      Andy|      Steve| Morision|  435435|     M| 12000|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|
+----------+-----------+---------+--------+------+------+

+----------+-----------+---------+--------+------+------+--------------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|gender_col_new|
+----------+-----------+---------+--------+------+------+--------------+
|      Alex|       null|      Tom| 3235245|     M| 12000|          Male|
|      John|           |     XXXX|89983243|     M| 12000|          Male|
|      Andy|      Steve| Morision|  435435|     M| 12000|          Male|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|          Male|
|   Claudia|           |

In [0]:
from pyspark.sql.functions import col,lit

df_pyspark_trans_3 = spark_sql_emp_detials2.drop('gender')
df_pyspark_trans_3.show()

df_pyspark_trans_4 = df_pyspark_trans_3.withColumnRenamed('gender_col_new','Gender').withColumn("Organization",lit("Microsoft Pvt Ltd"))\
    .withColumn("Comments",lit("Random values - shgdjds")).withColumn("Org ID",lit("50520"))
# df_pyspark_trans_4.show()
df_pyspark_trans_4.show(truncate=False)

df_pyspark_trans_5 = df_pyspark_trans_4.drop('Comments')
df_pyspark_trans_5.show()

df_pyspark_trans_6 = df_pyspark_trans_5.filter(df_pyspark_trans_5['Gender'] == 'Male')
df_pyspark_trans_6.show()

+----------+-----------+---------+--------+------+--------------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Salary|gender_col_new|
+----------+-----------+---------+--------+------+--------------+
|      Alex|       null|      Tom| 3235245| 12000|          Male|
|      John|           |     XXXX|89983243| 12000|          Male|
|      Andy|      Steve| Morision|  435435| 12000|          Male|
|      Arun|       OOOO|    Gopal|  454425| 12000|          Male|
|   Claudia|           | Schepars|  978643|  8000|        Female|
|     Nancy|       MMMM|     Paul| 2353462| 12000|        Female|
+----------+-----------+---------+--------+------+--------------+

+----------+-----------+---------+--------+------+------+-----------------+-----------------------+------+
|First_Name|Middle_Name|Last_Name|Emp_ID  |Salary|Gender|Organization     |Comments               |Org ID|
+----------+-----------+---------+--------+------+------+-----------------+-----------------------+------+
|Alex      |null  

In [0]:
# Union - Merging data vertically, Duplicates concept

from pyspark.sql.types import StructType,StructField,StringType,IntegerType

data_emp_one = [("Alex",None,"Tom",3235245,"M",12000),\
          ("John","","XXXX",89983243,"M",12000), \
          ("Andy","Steve","Morision",435435,"M",12000), \
          ("Arun","OOOO","Gopal",454425,"M",12000), \
          ("Claudia","","Schepars",978643,"F",8000), \
          ("Nancy","MMMM","Paul",2353462,"F",12000)]

data_emp_two = [("Alex",None,"Tom",3235245,"M",12000),\
          ("Arun","OOOO","Gopal",454425,"M",12000), \
          ("karam","chand","minj",2353462,"M",12000)]

schema_emp = StructType([StructField("First_Name",StringType(),True),\
    StructField("Middle_Name",StringType(),True),\
        StructField("Last_Name",StringType(),True),\
            StructField("Emp_ID",IntegerType(),False),\
                StructField("Gender",StringType(),True),\
                    StructField("Salary",IntegerType(),True),])

df_pyspark_emp_one = spark_session_object.createDataFrame(data_emp_one,schema_emp)

df_pyspark_emp_two = spark_session_object.createDataFrame(data_emp_two,schema_emp)

# df_pyspark_emp_one.show()
# df_pyspark_emp_two.show()

df_pyspark_emp_union = df_pyspark_emp_one.union(df_pyspark_emp_two)
df_pyspark_emp_union.show()

df_pyspark_emp_dedup = df_pyspark_emp_union.dropDuplicates()
df_pyspark_emp_dedup.show()

df_pyspark_emp_dedup_another = df_pyspark_emp_union.distinct()
df_pyspark_emp_dedup_another.show()

# Sorting

df_emp_sort = df_pyspark_emp_dedup_another.sort(col('Salary'))
df_emp_sort.show()

df_emp_sort_asc = df_pyspark_emp_dedup_another.sort(col('Salary'),ascending=False)
df_emp_sort_asc.show()


+----------+-----------+---------+--------+------+------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|
+----------+-----------+---------+--------+------+------+
|      Alex|       null|      Tom| 3235245|     M| 12000|
|      John|           |     XXXX|89983243|     M| 12000|
|      Andy|      Steve| Morision|  435435|     M| 12000|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|
|   Claudia|           | Schepars|  978643|     F|  8000|
|     Nancy|       MMMM|     Paul| 2353462|     F| 12000|
|      Alex|       null|      Tom| 3235245|     M| 12000|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|
|     karam|      chand|     minj| 2353462|     M| 12000|
+----------+-----------+---------+--------+------+------+

+----------+-----------+---------+--------+------+------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|
+----------+-----------+---------+--------+------+------+
|      Alex|       null|      Tom| 3235245|     M| 12000|
|      John| 

In [0]:
df_pyspark_name_sort = df_pyspark_emp_dedup_another.orderBy(col('First_Name').desc())
df_pyspark_name_sort.show()

df_pyspark_emp_dedup_another.count()

df_pyspark_emp_dedup_another.groupBy(col('Gender')).count().show()

# Another way fo importing it

from pyspark.sql import types as T
from pyspark.sql import functions as F

data_emp = [("Alex",None,"Tom",12000,"India","D145"), \
            ("John","ken","XXXX",9000,"USA","D155"), \
            ("Andy","DDDD","YYYY",7000,"India","D145"), \
            ("Arun","","ZZZZ",5000,"USA","D165"), \
            ("John","ken","XXXX",9000,"USA","D155"), \
            ("Nancy","MMMM","LLLL",12000,"GER","D145"), \
            ("Sam","HHHH","KKKK",60000,"India","D175")]

schema_with_dt_emp = T.StructType([T.StructField("First_Name",T.StringType(),True), \
                                        T.StructField("Middle_Name",T.StringType(),True), \
                                        T.StructField("Last_Name",T.StringType(),True), \
                                        T.StructField("Salary",T.IntegerType(),True), \
                                T.StructField("Country",T.StringType(),True),\
                                T.StructField("Dept_ID",T.StringType(),True)])

data_dept = [("Finance","D145"),\
          ("Travel","D155"),\
          ("IT","D165"),\
          ("Marketing","D185")]

schema_with_dt_dept = T.StructType([T.StructField("Dept_Name",T.StringType(),True),\
                             T.StructField("Dept_ID",T.StringType(),True)])

df_pyspark_emp_join = spark_session_object.createDataFrame(data_emp,schema_with_dt_emp)
df_pyspark_emp_join.show()

df_pyspark_dept_join = spark_session_object.createDataFrame(data_dept,schema_with_dt_dept)
df_pyspark_dept_join.show()

df_pyspark_emp_dept_inner_joins = df_pyspark_emp_join.join(df_pyspark_dept_join,df_pyspark_emp_join.Dept_ID==df_pyspark_dept_join.Dept_ID,'inner')
df_pyspark_emp_dept_inner_joins.show()

df_pyspark_emp_dept_left_joins = df_pyspark_emp_join.join(df_pyspark_dept_join,df_pyspark_emp_join.Dept_ID==df_pyspark_dept_join.Dept_ID,'left')
df_pyspark_emp_dept_left_joins.show()

df_pyspark_emp_dept_right_joins = df_pyspark_emp_join.join(df_pyspark_dept_join,df_pyspark_emp_join.Dept_ID==df_pyspark_dept_join.Dept_ID,'right')
df_pyspark_emp_dept_right_joins.show()

df_pyspark_emp_dept_outer_joins = df_pyspark_emp_join.join(df_pyspark_dept_join,df_pyspark_emp_join.Dept_ID==df_pyspark_dept_join.Dept_ID,'outer')
df_pyspark_emp_dept_outer_joins.show()

+----------+-----------+---------+--------+------+------+
|First_Name|Middle_Name|Last_Name|  Emp_ID|Gender|Salary|
+----------+-----------+---------+--------+------+------+
|     karam|      chand|     minj| 2353462|     M| 12000|
|     Nancy|       MMMM|     Paul| 2353462|     F| 12000|
|      John|           |     XXXX|89983243|     M| 12000|
|   Claudia|           | Schepars|  978643|     F|  8000|
|      Arun|       OOOO|    Gopal|  454425|     M| 12000|
|      Andy|      Steve| Morision|  435435|     M| 12000|
|      Alex|       null|      Tom| 3235245|     M| 12000|
+----------+-----------+---------+--------+------+------+

+------+-----+
|Gender|count|
+------+-----+
|     F|    2|
|     M|    5|
+------+-----+

+----------+-----------+---------+------+-------+-------+
|First_Name|Middle_Name|Last_Name|Salary|Country|Dept_ID|
+----------+-----------+---------+------+-------+-------+
|      Alex|       null|      Tom| 12000|  India|   D145|
|      John|        ken|     XXXX|  90