In [2]:
import findspark
findspark.init('/home/gani/spark-2.4.4-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, concat, col, split

In [10]:
def wakeRestaurantDF(df):
    drop_cols = ['OBJECTID','GEOCODESTATUS','PERMITID']
    df = df.withColumn("county", lit("Wake")) \
        .withColumnRenamed("HSISID", "datasetId") \
        .withColumnRenamed("NAME", "name") \
        .withColumnRenamed("ADDRESS1", "address1") \
        .withColumnRenamed("ADDRESS2", "address2") \
        .withColumnRenamed("CITY", "city") \
        .withColumnRenamed("STATE", "state") \
        .withColumnRenamed("POSTALCODE", "zip") \
        .withColumnRenamed("PHONENUMBER", "tel") \
        .withColumnRenamed("RESTAURANTOPENDATE", "dateStart") \
        .withColumn("dateEnd", lit(None)) \
        .withColumnRenamed("FACILITYTYPE", "type") \
        .withColumnRenamed("X", "geoX") \
        .withColumnRenamed("Y", "geoY") \
        .drop(*drop_cols)
    df = df.withColumn("id",
                       concat(col("state"), lit("_"), col("county"), lit("_"), col("datasetId")))
    return df

In [11]:
def durhamRestaurantsDF(df):
    drop_cols = ["fields", "geometry", "record_timestamp", "recordid"]
    df =  df.withColumn("county", lit("Durham")) \
            .withColumn("datasetId", col("fields.id")) \
            .withColumn("name", col("fields.premise_name")) \
            .withColumn("address1", col("fields.premise_address1")) \
            .withColumn("address2", col("fields.premise_address2")) \
            .withColumn("city", col("fields.premise_city")) \
            .withColumn("state", col("fields.premise_state")) \
            .withColumn("zip", col("fields.premise_zip")) \
            .withColumn("tel", col("fields.premise_phone")) \
            .withColumn("dateStart", col("fields.opening_date")) \
            .withColumn("dateEnd", col("fields.closing_date")) \
            .withColumn("type", split(col("fields.type_description"), " - ").getItem(1)) \
            .withColumn("geoX", col("fields.geolocation").getItem(0)) \
            .withColumn("geoY", col("fields.geolocation").getItem(1)) \
            .drop(*drop_cols)

    df = df.withColumn("id",
                       concat(col("state"), lit("_"), col("county"), lit("_"), col("datasetId")))
    return df

In [14]:
def combineDf(df1,df2):
    df = df1.unionByName(df2)
    df.show(5)
    df.printSchema()

In [5]:
spark = SparkSession.builder.appName('dfUnion').getOrCreate()

In [6]:
df1 = spark.read.csv('Restaurants_in_Wake_County_NC.csv',inferSchema=True,header=True)
df2 = spark.read.json('Restaurants_in_Durham_County_NC.json')

In [12]:
wakeDF = wakeRestaurantDF(df1)

In [13]:
durhamDF = durhamRestaurantsDF(df2)

In [15]:
combineDf(wakeDF, durhamDF)

+----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+-------------------+-----------------+------------+-----------+------+-------+------------------+
| datasetId|                name|            address1|address2|       city|state|       zip|           tel|          dateStart|             type|        geoX|       geoY|county|dateEnd|                id|
+----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+-------------------+-----------------+------------+-----------+------+-------+------------------+
|4092016024|                WABA|2502 1/2 HILLSBOR...|    null|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18 05:30:00|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_4092016024|
|4092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|    null|       CARY|   NC|     27518|(919) 852-6651|2011-11-08 05:30:00|       Food Stand|-78.78211173|35.73717591|  Wake|   