# Spark in Action - Chapter 3 Python Version - Lab 230

In [None]:
import os
import logging
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
        .appName("Union of two dataframes") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel('warn')

22/10/29 17:20:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
def get_absolute_file_path(path, filename):
    # To get absolute path for a given filename
    current_dir = os.getcwd() #os.path.dirname(__file__)
    relative_path = "{}{}".format(path, filename)
    absolute_file_path = os.path.join(current_dir, relative_path)
    return absolute_file_path

In [None]:
filename1 = 'Restaurants_in_Wake_County_NC.csv'
path1 = '../net.jgp.books.spark.ch03/data/'
absolute_file_path1 = get_absolute_file_path(path1, filename1)

In [None]:
filename2 = 'Restaurants_in_Durham_County_NC.json'
path2 = '../net.jgp.books.spark.ch03/data/'
absolute_file_path2 = get_absolute_file_path(path2, filename2)

In [None]:
df1 = spark.read.csv(path=absolute_file_path1, header=True, inferSchema=True)

In [None]:
df2 = spark.read.json(absolute_file_path2)

In [None]:
"""
* Builds the dataframe containing the Wake county restaurants
*
* @return A dataframe
"""
def build_wake_restaurants_dataframe(df):
    drop_cols = ["OBJECTID", "GEOCODESTATUS", "PERMITID"]
    df = df.withColumn("county", F.lit("Wake")) \
        .withColumnRenamed("HSISID", "datasetId") \
        .withColumnRenamed("NAME", "name") \
        .withColumnRenamed("ADDRESS1", "address1") \
        .withColumnRenamed("ADDRESS2", "address2") \
        .withColumnRenamed("CITY", "city") \
        .withColumnRenamed("STATE", "state") \
        .withColumnRenamed("POSTALCODE", "zip") \
        .withColumnRenamed("PHONENUMBER", "tel") \
        .withColumnRenamed("RESTAURANTOPENDATE", "dateStart") \
        .withColumn("dateEnd", F.lit(None)) \
        .withColumnRenamed("FACILITYTYPE", "type") \
        .withColumnRenamed("X", "geoX") \
        .withColumnRenamed("Y", "geoY") \
        .drop("OBJECTID", "GEOCODESTATUS", "PERMITID")

    df = df.withColumn("id",
                       F.concat(F.col("state"), F.lit("_"),
                                F.col("county"), F.lit("_"),
                                F.col("datasetId")))
    # I left the following line if you want to play with repartitioning
    # df = df.repartition(4);
    return df


In [None]:
"""
* Builds the dataframe containing the Durham county restaurants
*
* @return A dataframe
"""
def build_durham_restaurants_dataframe(df):
    drop_cols = ["fields", "geometry", "record_timestamp", "recordid"]
    df =  df.withColumn("county", F.lit("Durham")) \
            .withColumn("datasetId", F.col("fields.id")) \
            .withColumn("name", F.col("fields.premise_name")) \
            .withColumn("address1", F.col("fields.premise_address1")) \
            .withColumn("address2", F.col("fields.premise_address2")) \
            .withColumn("city", F.col("fields.premise_city")) \
            .withColumn("state", F.col("fields.premise_state")) \
            .withColumn("zip", F.col("fields.premise_zip")) \
            .withColumn("tel", F.col("fields.premise_phone")) \
            .withColumn("dateStart", F.col("fields.opening_date")) \
            .withColumn("dateEnd", F.col("fields.closing_date")) \
            .withColumn("type", F.split(F.col("fields.type_description"), " - ").getItem(1)) \
            .withColumn("geoX", F.col("fields.geolocation").getItem(0)) \
            .withColumn("geoY", F.col("fields.geolocation").getItem(1)) \
            .drop(*drop_cols)

    df = df.withColumn("id",
                       F.concat(F.col("state"), F.lit("_"),
                                F.col("county"), F.lit("_"),
                                F.col("datasetId")))
    # I left the following line if you want to play with repartitioning
    # df = df.repartition(4);
    return df


In [None]:
"""
* Performs the union between the two dataframes.
*
* @param df1 Left Dataframe to union on
* @param df2 Right Dataframe to union from
"""
def combineDataframes(df1, df2):
    df = df1.unionByName(df2)
    df.show(5)
    df.printSchema()
    logging.warning("We have {} records.".format(df.count()))
    partition_count = df.rdd.getNumPartitions()
    logging.warning("Partition count: {}".format(partition_count))



In [None]:
wakeRestaurantsDf = build_wake_restaurants_dataframe(df1)
durhamRestaurantsDf = build_durham_restaurants_dataframe(df2)

In [None]:
combineDataframes(wakeRestaurantsDf, durhamRestaurantsDf)

+----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+-------------------+-----------------+------------+-----------+------+-------+------------------+
| datasetId|                name|            address1|address2|       city|state|       zip|           tel|          dateStart|             type|        geoX|       geoY|county|dateEnd|                id|
+----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+-------------------+-----------------+------------+-----------+------+-------+------------------+
|4092016024|                WABA|2502 1/2 HILLSBOR...|    null|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18 02:00:00|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_4092016024|
|4092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|    null|       CARY|   NC|     27518|(919) 852-6651|2011-11-08 01:00:00|       Food Stand|-78.78211173|35.73717591|  Wake|   



In [None]:
spark.stop()