#### Initialize the Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, regexp_replace

spark = SparkSession.builder \
    .appName("Silver") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5") \
    .getOrCreate()

24/03/14 10:29:32 WARN Utils: Your hostname, Mansurs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.24 instead (on interface en0)
24/03/14 10:29:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.5.1/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/mansurcan/.ivy2/cache
The jars for the packages stored in: /Users/mansurcan/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a4d9fbe9-0cd3-469f-9b1f-b81686365dba;1.0
	confs: [default]
	found com.crealytics#spark-excel_2.12;0.13.5 in central
	found org.apache.poi#poi;4.1.2 in central
	found commons-codec#commons-codec;1.13 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.poi#poi-ooxml;4.1.2 in central
	found org.apache.poi#poi-ooxml-schemas;4.1.2 in central
	found org.apache.xmlbeans#xmlbeans;3.1.0 in central
	found com.github.virtuald#curvesapi;1.06 in central
	found com.norbitltd#spoiwo_2.12;1.7.0 in central
	found org.scala-lang.modules#scala-xml_2.12;1.2.0 in central
	found com.github.pjfanning#excel-streaming-reader;2.3.4 in c

#### Schemas for files

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Define the schemas for tables
locations_Schema = StructType([
    StructField("LocationId", StringType(), True),
    StructField("OrganisationID", StringType(), True),
    StructField("LocationPostCode_clean", StringType(), True)
])

rural_lookup_full_CL_cleaned_Schema = StructType([
    StructField("pcds_clean", StringType(), True),
    StructField("ladnm", StringType(), True)
])

lookup_full_cleaned_Schema = StructType([
    StructField("pcds_clean", StringType(), True),
    StructField("ru11ind", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True)
])

ruralCodes_Schema = StructType([
    StructField("ru11ind", StringType(), True),
    StructField("AreaName", StringType(), True)
])

##### Read the files

In [3]:
# Locations
df_locations = spark.read \
    .schema(locations_Schema) \
    .csv("Silver/locations_cleaned.csv", header=True)

# Rural lookup full CL cleaned
df_rural_lookup_full_CL = spark.read \
    .schema(rural_lookup_full_CL_cleaned_Schema) \
    .csv("Silver/rural_lookup_full_CL_cleaned.csv", header=True)

# Lookup full cleaned
df_lookup_full = spark.read \
    .schema(lookup_full_cleaned_Schema) \
    .csv("Silver/lookup_full_cleaned.csv", header=True)

# Rural codes
df_ruralCodes = spark.read \
    .schema(ruralCodes_Schema) \
    .csv("Bronze/ruralCodes.csv", header=True)

# Print schemas and show first 5 rows of each DataFrame to verify
df_locations.printSchema()
df_locations.show(5, truncate=False)

df_rural_lookup_full_CL.printSchema()
df_rural_lookup_full_CL.show(5, truncate=False)

df_lookup_full.printSchema()
df_lookup_full.show(5, truncate=False)

df_ruralCodes.printSchema()
df_ruralCodes.show(5, truncate=False)

root
 |-- LocationId: string (nullable = true)
 |-- OrganisationID: string (nullable = true)
 |-- LocationPostCode_clean: string (nullable = true)

+------------------------------------+------------------------------------+----------------------+
|LocationId                          |OrganisationID                      |LocationPostCode_clean|
+------------------------------------+------------------------------------+----------------------+
|e4b68000-d5d1-4917-b1ad-b58afebadf1c|13df52eb-c61d-48c9-968f-b99845c11fe5|PH263JQ               |
|c0259ee8-e9e1-48b1-86d3-af25544c5a19|c94e6850-c754-473e-985a-29169d71b602|KT27LX                |
|09698ea8-9d40-4aee-8608-b9b07b39ae1e|ed16f2db-cd29-4944-8357-f029bda1938b|WF128NJ               |
|386235b6-54c5-407e-916a-935c08ecce65|07b2aafe-cb44-44a5-add9-29eb579f8a73|CO30PG                |
|6ed76cde-950d-4d7c-9992-160356c14f88|e2b2cac7-5edd-428a-b892-8f510c6a43b6|SG138EG               |
+------------------------------------+----------------------

In [4]:
from pyspark.sql.functions import broadcast

broadcast_rural_codes_df = broadcast(df_ruralCodes)

df_lookup_full = df_lookup_full.join(broadcast_rural_codes_df, "ru11ind", "left_outer")
df_lookup_full.show(5,truncate=False)

+-------+----------+---------+---------+-----------------------+
|ru11ind|pcds_clean|lat      |long     |AreaName               |
+-------+----------+---------+---------+-----------------------+
|C1     |S705QS    |53.511883|-1.479421|Urban city and town    |
|C1     |S705QT    |53.5117  |-1.48072 |Urban city and town    |
|B1     |S705QU    |53.531497|-1.473448|Urban minor conurbation|
|C1     |S705QX    |53.512821|-1.482093|Urban city and town    |
|C1     |S705QY    |53.51308 |-1.481653|Urban city and town    |
+-------+----------+---------+---------+-----------------------+
only showing top 5 rows



#### Join the tables

In [5]:

result_df = df_locations.join(df_lookup_full, df_locations["LocationPostCode_clean"] == df_lookup_full["pcds_clean"], "left_outer") \
    .join(df_rural_lookup_full_CL, df_locations["LocationPostCode_clean"] == df_rural_lookup_full_CL["pcds_clean"], "left_outer") \
    .select(df_locations["*"], df_lookup_full["AreaName"], df_rural_lookup_full_CL["ladnm"].alias("district"))

result_df.show(n=1600,truncate=False)
result_df.count()

                                                                                

+------------------------------------+------------------------------------+----------------------+--------------------------------------------------------+-----------------------------------+
|LocationId                          |OrganisationID                      |LocationPostCode_clean|AreaName                                                |district                           |
+------------------------------------+------------------------------------+----------------------+--------------------------------------------------------+-----------------------------------+
|565dbf2b-690f-4672-b744-88689d8eae00|872d01b8-fee2-4cb1-a37a-d77d81851332|B128SL                |Urban major conurbation                                 |Birmingham                         |
|97d91ef6-6329-4ba7-98a7-a35c0831f425|6fac1e19-0d6a-490a-85d7-247d799d46d2|B375PE                |Urban major conurbation                                 |Solihull                           |
|5191ca71-379a-4d44-bcdd-83a05a89fe29|17

                                                                                

1592

#### Save the processed file

In [7]:
result_df.write.csv("Gold/locations_result.csv", header=True, mode="overwrite")
print("The table have been saved to the Gold.")



The table have been saved to the Gold.


                                                                                