In [2]:
import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, col
import geohash2 as gh

In [3]:
# Set up SparkSession
spark = SparkSession.builder \
    .appName("Local Storage ETL") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Set up directory and file paths
csv_dir = "restaurant_csv"
csv_files = [os.path.join(csv_dir, f) for f in os.listdir(csv_dir) if f.endswith('.csv')]

df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(csv_files)

23/05/01 19:56:25 WARN Utils: Your hostname, MacBook-Pro-Mervert.local resolves to a loopback address: 127.0.0.1; using 192.168.0.119 instead (on interface en0)
23/05/01 19:56:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/01 19:56:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df.dropDuplicates().count()

1997

In [5]:
df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657| -

In [6]:
# Define a function to geocode the address using the OpenCage Geocoding API
def geocode(row):
    if row.lat is None or row.lng is None:
        # Get the address of the restaurant from the DataFrame
        address = row.city + ", " + row.country

        # Make a request to the OpenCage Geocoding API
        response = requests.get("https://api.opencagedata.com/geocode/v1/json",
                                params={"key": "bba82bd17cd64b70a5522222c43c52c6", "q": address})

        # Parse the latitude and longitude from the API response
        if response.status_code == 200:
            data = response.json()
            results = data.get("results", [])
            if len(results) > 0:
                geometry = results[0].get("geometry", {})
                lat = geometry.get("lat")
                lng = geometry.get("lng")
                return (row.id, lat, lng)

# Filter for rows where lat or lng is null
null_df = df.filter(isnull('lat') | isnull('lng'))

# Geocode the null values in the DataFrame
geocoded = null_df.rdd.map(geocode).filter(lambda x: x is not None)
geocoded_df = geocoded.toDF(["id", "geocoded_lat", "geocoded_lng"])

joined_df = df.join(geocoded_df, ["id"], "left")

# Replace null values in lat and lng columns with the geocoded values
from pyspark.sql.functions import coalesce
updated_df = joined_df.withColumn("lat", coalesce(joined_df["lat"], joined_df["geocoded_lat"])) \
                      .withColumn("lng", coalesce(joined_df["lng"], joined_df["geocoded_lng"]))

# Drop the geocoded_lat and geocoded_lng columns
final_df = updated_df.drop("geocoded_lat", "geocoded_lng")


                                                                                

In [7]:
null_df.show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [8]:
final_df.show()



+------------+------------+--------------------+-----------------------+-------+----------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|            city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+----------------+------+--------+
|206158430215|           8|     The Green Olive|                  53370|     US|     Haltom City|32.789| -97.280|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|        Grayling|44.657| -84.744|
|128849018936|          57|The Yellow Submarine|                   5679|     FR|           Paris|48.872|   2.335|
|154618822657|           2|        Bella Cucina|                  41484|     US|     Fort Pierce|27.412| -80.391|
|240518168596|          21|      The Lazy Daisy|                  93164|     US|      Mendenhall|39.860| -75.646|
|223338299416|          25|       The Cozy Cafe|                  36937|     US|Great Ba

                                                                                

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [10]:
@udf(returnType=StringType())
def geohash_udf(lat, lng):
    lat = float(lat)
    lng = float(lng)
    return gh.encode(lat, lng, precision=4)

df_with_geohash = final_df.withColumn("geohash", geohash_udf(final_df.lat, final_df.lng))

In [11]:
df_with_geohash.show()

                                                                                

+------------+------------+--------------------+-----------------------+-------+----------------+------+--------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|            city|   lat|     lng|geohash|
+------------+------------+--------------------+-----------------------+-------+----------------+------+--------+-------+
|206158430215|           8|     The Green Olive|                  53370|     US|     Haltom City|32.789| -97.280|   9vff|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|        Grayling|44.657| -84.744|   dpgw|
|128849018936|          57|The Yellow Submarine|                   5679|     FR|           Paris|48.872|   2.335|   u09w|
|154618822657|           2|        Bella Cucina|                  41484|     US|     Fort Pierce|27.412| -80.391|   dhyg|
|240518168596|          21|      The Lazy Daisy|                  93164|     US|      Mendenhall|39.860| -75.646|   dr44|
|223338299416|          

In [12]:
import zipfile
import os

# define the path to the zipped files directory
zip_path = "./weather_csv"

# loop through all the files in the directory
for filename in os.listdir(zip_path):
    if filename.endswith(".zip"):
        # construct the full file path
        file_path = os.path.join(zip_path, filename)

        # extract the files to the same directory as the zipped files
        with zipfile.ZipFile(file_path, "r") as zip_ref:
            zip_ref.extractall(zip_path)

In [13]:
path = './weather_csv/weather'
weather_df = spark.read.option("mergeSchema", "true").option("recursiveFileLookup", "true").parquet(path)

In [14]:
weather_df = weather_df.withColumn("geohash", geohash_udf(weather_df.lat, weather_df.lng))

In [15]:
from pyspark.sql.functions import col

# Assuming your weather dataframe is named 'weather_df'
weather_df = weather_df.withColumnRenamed("lat", "w_lat").withColumnRenamed("lng", "w_lng")


In [16]:
weather_df.show()

+--------+-------+----------+----------+----------+-------+
|   w_lng|  w_lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|   9e31|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|   9e31|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|   9e34|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|   9e34|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|   9e34|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|   9e34|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|   9e34|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|   9emm|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|   9emm|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|   9emm|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|   9emm|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|   9emm|
|-104.828|19.1932|      81.6|      27.6|2017-08-29|   9emm|
| -104.78|19.1964|      81.6|      27.6|

                                                                                

In [17]:
weather_df = weather_df.dropDuplicates(['geohash', 'wthr_date'])

In [18]:
# Partition weather_df by geohash
num_partitions = 100
weather_df = weather_df.repartition(num_partitions, 'geohash')

# Left join df_with_geohash with weather_df
joined_df = df_with_geohash.join(weather_df, on=['geohash'], how='left')

In [19]:
joined_df.count()

                                                                                

172591

In [40]:
joined_df.show()

                                                                                

+-------+------------+------------+----------------+-----------------------+-------+-------+------+-------+--------+-------+----------+----------+----------+
|geohash|          id|franchise_id|  franchise_name|restaurant_franchise_id|country|   city|   lat|    lng|   w_lng|  w_lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+------------+------------+----------------+-----------------------+-------+-------+------+-------+--------+-------+----------+----------+----------+
|   dn4h|197568495625|          10|The Golden Spoon|                  24784|     US|Decatur|34.578|-87.021|-87.1862|34.4982|      76.3|      24.6|2017-08-11|
|   dn4h|197568495625|          10|The Golden Spoon|                  24784|     US|Decatur|34.578|-87.021|-87.0083|34.6264|      69.7|      20.9|2016-10-13|
|   dn4h|197568495625|          10|The Golden Spoon|                  24784|     US|Decatur|34.578|-87.021|-87.0083|34.6264|      70.9|      21.6|2016-10-31|
|   dn4h|197568495625|          10|The Golden Spoon|

In [None]:
# Write the final_df to the local file system in the parquet format, preserving data partitioning
joined_df.write.mode("overwrite").parquet("enriched_data.parquet")

