In [None]:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col

def quiet_logs(sc):
  logger = sc._jvm.org.apache.log4j
  logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
  logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

HDFS_NAMENODE = 'hdfs://192.168.0.25:9000'
spark_conf = SparkConf()

#if main application is not executing on the same network with spark-master
#then some additional configuration for spark context is required
spark_conf.setAll([
    ('spark.master', 'spark://127.17.0.1:7077'), 
    ('spark.app.name', 'example join'),
    ('spark.submit.deployMode', 'client'),
    ('spark.ui.showConsoleProgress', 'true'),
    ('spark.eventLog.enabled', 'false'),
    ('spark.logConf', 'false'),
    ('spark.driver.bindAddress', '0.0.0.0'),
    ('spark.driver.host', '192.168.0.25'),
])
#in case that main application is executing on the same network with spark-master
#only this line is needed
#conf = SparkConf().setAppName("example join").setMaster("spark://spark-master:7077")


sc = SparkContext(conf=spark_conf)
spark = SparkSession(sc)
quiet_logs(spark)

In [None]:
employees = sc.parallelize([("Rafferty", 31), ("Jones", 33), ("Heisenberg", 33), \
    ("Robinson", 34), ("Smith", 34), ("Williams", None)]).toDF(["LastName", "DepartmentID"])

employees.show()

In [None]:
departments = sc.parallelize([(31, "Sales"), (33, "Engineering"), \
    (34, "Clerical"), (35, "Marketing")]).toDF(["DepartmentID", "DepartmentName"])

departments.show()

In [None]:
# inner join
employees.join(departments, "DepartmentID").show()

# left outer join
employees.join(departments, ["DepartmentID"], "left_outer").show()

# right outer join
employees.join(departments, ["DepartmentID"], "right_outer").show()

# cartesian join
employees.crossJoin(departments).show(10)

In [None]:
products = sc.parallelize([ \
  ("steak", "1990-01-01", "2000-01-01", 150), \
  ("steak", "2000-01-02", "2020-01-01", 180), \
  ("fish", "1990-01-01", "2020-01-01", 100) \
]).toDF(["name", "startDate", "endDate", "price"])

products.show()

In [None]:
orders = sc.parallelize([ \
  ("1995-01-01", "steak"), \
  ("2000-01-01", "fish"), \
  ("2005-01-01", "steak") \
]).toDF(["date", "product"])

orders.show()

In [None]:
orders.join(products, (orders["product"] == products["name"]) \
    & (orders["date"] >= products["startDate"]) \
    & (orders["date"] <= products["endDate"])).show()