In [None]:
# PySpark isn't on sys.path by default
# We can either set these enviroment variables
# Or we can use external library findspark to add pyspark and dependencies to sys.path at runtime

# Set the environment variables
# import os
# os.environ['SPARK_HOME'] = "/home/developer/miniconda3/envs/spark/lib/python3.10/site-packages/pyspark"
# os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
# os.environ['PYSPARK_PYTHON'] = 'python'

In [10]:
# Using the findspark 
import findspark
findspark.init()

In [14]:
# Create spark session and spark context
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('demo1').getOrCreate()
sc = spark.sparkContext

In [15]:
sc

In [21]:
# Define a schema and some dummy data for it, than create a DataFrame.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row

# Schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("occupation", StringType(), True)
])

# Dummy data
data = [
    Row(id=1, name="Ravi", age=30, occupation="Developer"),
    Row(id=2, name="Rajni", age=35, occupation="Engineer"),
    Row(id=3, name="Ketan", age=25, occupation="Doctor"),
    Row(id=4, name="David", age=28, occupation="Software Engineer"),
    Row(id=5, name="Sam", age=22, occupation="Lawyer")
]

In [22]:
# Create dataframe
df = spark.createDataFrame(data, schema)
df.show()

+---+-----+---+-----------------+
| id| name|age|       occupation|
+---+-----+---+-----------------+
|  1| Ravi| 30|        Developer|
|  2|Rajni| 35|         Engineer|
|  3|Ketan| 25|           Doctor|
|  4|David| 28|Software Engineer|
|  5|  Sam| 22|           Lawyer|
+---+-----+---+-----------------+



In [23]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: string (nullable = true)



In [42]:
# Different types of joins

# Create schema for dummy data
employee_schema = StructType([
    StructField("emp_id", IntegerType(), False),
    StructField("emp_name", StringType(), True),
    StructField("emp_age", IntegerType(), True),
    StructField("dept_id", IntegerType(), True)
])
department_schema = StructType([
    StructField("dept_id", IntegerType(), True),
    StructField("dept_name", StringType(), True)
])

# Employee Data
employee_data = [
    Row(emp_id=1, emp_name="Ravi", emp_age=30, dept_id=1),
    Row(emp_id=2, emp_name="Rajni", emp_age=35, dept_id=10),
    Row(emp_id=3, emp_name="Ketan", emp_age=25, dept_id=2),
    Row(emp_id=4, emp_name="David", emp_age=28, dept_id=3),
    Row(emp_id=5, emp_name="Sam", emp_age=22, dept_id=1),
    Row(emp_id=6, emp_name="Kevin", emp_age=29, dept_id=None)
]
department_data = [
    Row(dept_id=1, dept_name="Tech"),
    Row(dept_id=2, dept_name="Sales"),
    Row(dept_id=3, dept_name="Marketing"),
    Row(dept_id=4, dept_name="HR"),
    Row(dept_id=None, dept_name="Temp"),
]


In [43]:
df_employees = spark.createDataFrame(employee_data, employee_schema)
df_employees.show()

+------+--------+-------+-------+
|emp_id|emp_name|emp_age|dept_id|
+------+--------+-------+-------+
|     1|    Ravi|     30|      1|
|     2|   Rajni|     35|     10|
|     3|   Ketan|     25|      2|
|     4|   David|     28|      3|
|     5|     Sam|     22|      1|
|     6|   Kevin|     29|   NULL|
+------+--------+-------+-------+



In [44]:
df_departments = spark.createDataFrame(department_data, department_schema)
df_departments.show()

+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|     Tech|
|      2|    Sales|
|      3|Marketing|
|      4|       HR|
|   NULL|     Temp|
+-------+---------+



In [45]:
# INNER JOIN
# since `inner` is the default join type, we can omit it
df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id)
df_joined.show()

+------+--------+-------+-------+-------+---------+
|emp_id|emp_name|emp_age|dept_id|dept_id|dept_name|
+------+--------+-------+-------+-------+---------+
|     1|    Ravi|     30|      1|      1|     Tech|
|     5|     Sam|     22|      1|      1|     Tech|
|     3|   Ketan|     25|      2|      2|    Sales|
|     4|   David|     28|      3|      3|Marketing|
+------+--------+-------+-------+-------+---------+



In [46]:
# Cross Join
df_cross_joined = df_employees.crossJoin(df_departments)
df_cross_joined.show()



+------+--------+-------+-------+-------+---------+
|emp_id|emp_name|emp_age|dept_id|dept_id|dept_name|
+------+--------+-------+-------+-------+---------+
|     1|    Ravi|     30|      1|      1|     Tech|
|     1|    Ravi|     30|      1|      2|    Sales|
|     1|    Ravi|     30|      1|      3|Marketing|
|     1|    Ravi|     30|      1|      4|       HR|
|     1|    Ravi|     30|      1|   NULL|     Temp|
|     2|   Rajni|     35|     10|      1|     Tech|
|     2|   Rajni|     35|     10|      2|    Sales|
|     2|   Rajni|     35|     10|      3|Marketing|
|     2|   Rajni|     35|     10|      4|       HR|
|     2|   Rajni|     35|     10|   NULL|     Temp|
|     3|   Ketan|     25|      2|      1|     Tech|
|     3|   Ketan|     25|      2|      2|    Sales|
|     3|   Ketan|     25|      2|      3|Marketing|
|     3|   Ketan|     25|      2|      4|       HR|
|     3|   Ketan|     25|      2|   NULL|     Temp|
|     4|   David|     28|      3|      1|     Tech|
|     4|   D

                                                                                

In [47]:
# Left outer Join
df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id, "left")
df_joined.show()

+------+--------+-------+-------+-------+---------+
|emp_id|emp_name|emp_age|dept_id|dept_id|dept_name|
+------+--------+-------+-------+-------+---------+
|     1|    Ravi|     30|      1|      1|     Tech|
|     2|   Rajni|     35|     10|   NULL|     NULL|
|     3|   Ketan|     25|      2|      2|    Sales|
|     4|   David|     28|      3|      3|Marketing|
|     5|     Sam|     22|      1|      1|     Tech|
|     6|   Kevin|     29|   NULL|   NULL|     NULL|
+------+--------+-------+-------+-------+---------+



In [48]:
# Right outer Join
df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id, "right")
df_joined.show()

+------+--------+-------+-------+-------+---------+
|emp_id|emp_name|emp_age|dept_id|dept_id|dept_name|
+------+--------+-------+-------+-------+---------+
|     5|     Sam|     22|      1|      1|     Tech|
|     1|    Ravi|     30|      1|      1|     Tech|
|     3|   Ketan|     25|      2|      2|    Sales|
|     4|   David|     28|      3|      3|Marketing|
|  NULL|    NULL|   NULL|   NULL|      4|       HR|
|  NULL|    NULL|   NULL|   NULL|   NULL|     Temp|
+------+--------+-------+-------+-------+---------+



In [49]:
# Full outer Join
df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id, "full")
df_joined.show()

+------+--------+-------+-------+-------+---------+
|emp_id|emp_name|emp_age|dept_id|dept_id|dept_name|
+------+--------+-------+-------+-------+---------+
|     6|   Kevin|     29|   NULL|   NULL|     NULL|
|  NULL|    NULL|   NULL|   NULL|   NULL|     Temp|
|     5|     Sam|     22|      1|      1|     Tech|
|     1|    Ravi|     30|      1|      1|     Tech|
|     3|   Ketan|     25|      2|      2|    Sales|
|     4|   David|     28|      3|      3|Marketing|
|  NULL|    NULL|   NULL|   NULL|      4|       HR|
|     2|   Rajni|     35|     10|   NULL|     NULL|
+------+--------+-------+-------+-------+---------+

