In [1]:
import pandas as pd

files = [
    "/Users/maqsat/Desktop/spark/restaurant_csv/part-00000-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv",
    "/Users/maqsat/Desktop/spark/restaurant_csv/part-00001-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv",
    "/Users/maqsat/Desktop/spark/restaurant_csv/part-00002-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv",
    "/Users/maqsat/Desktop/spark/restaurant_csv/part-00003-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv",
    "/Users/maqsat/Desktop/spark/restaurant_csv/part-00004-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv",
]

dataframes = [pd.read_csv(file) for file in files]
restaurants_df = pd.concat(dataframes, ignore_index=True)

print(restaurants_df.head())

             id  franchise_id    franchise_name  restaurant_franchise_id  \
0  257698037796            37        Cafe Crepe                    26468   
1   25769803831            56  The Waffle House                    72230   
2   85899345988            69    Dragonfly Cafe                    18952   
3  111669149758            63        Cafe Paris                    84488   
4  163208757268            21    The Lazy Daisy                    96638   

  country                city     lat     lng  
0      IT               Milan  45.533   9.171  
1      FR               Paris  48.873   2.305  
2      NL           Amsterdam  52.392   4.911  
3      NL  Amsterdam Zuidoost  52.310   4.942  
4      US            Columbus  40.115 -83.015  


In [5]:
missing_lat_lng = restaurants_df[(restaurants_df['lat'].isnull()) | (restaurants_df['lng'].isnull())]
print(f"Rows with missing coordinates: {missing_lat_lng.shape[0]}")

Rows with missing coordinates: 1


In [7]:
import requests

# OpenCage API key
API_KEY = "50a17c8b-6bd9-439d-b763-586f13a76ad2"

def fetch_coordinates(address):
    """
    Gets coordinates (latitude, longitude) for a given address via the OpenCage API.
    """
    url = f"https://api.opencagedata.com/geocode/v1/json?q={address}&key={API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        if data["results"]:
            location = data["results"][0]["geometry"]
            return location["lat"], location["lng"]
    return None, None

In [9]:
# Handling rows with missing values
for index, row in missing_lat_lng.iterrows():
    address = f"{row['city']}, {row['country']}"
    latitude, longitude = fetch_coordinates(address)
    if latitude and longitude:
        # Updating missing values
        restaurants_df.loc[restaurants_df['id'] == row['id'], ['lat', 'lng']] = latitude, longitude

In [11]:
from math import floor

# Simple Alternative of GeoHash, based rounding of coordinates.
def custom_geohash(lat, lng, precision=4):
    lat = round(lat, precision)
    lng = round(lng, precision)
    return f"{lat}_{lng}"

# Using the function to generate "GeoHash"
restaurants_df['custom_geohash'] = restaurants_df.apply(
    lambda row: custom_geohash(row['lat'], row['lng']), axis=1
)

# check result
restaurants_df.head()

Unnamed: 0,id,franchise_id,franchise_name,restaurant_franchise_id,country,city,lat,lng,custom_geohash
0,257698037796,37,Cafe Crepe,26468,IT,Milan,45.533,9.171,45.533_9.171
1,25769803831,56,The Waffle House,72230,FR,Paris,48.873,2.305,48.873_2.305
2,85899345988,69,Dragonfly Cafe,18952,NL,Amsterdam,52.392,4.911,52.392_4.911
3,111669149758,63,Cafe Paris,84488,NL,Amsterdam Zuidoost,52.31,4.942,52.31_4.942
4,163208757268,21,The Lazy Daisy,96638,US,Columbus,40.115,-83.015,40.115_-83.015


In [13]:
import os
import zipfile
import pandas as pd

zip_folder = "/Users/maqsat/Desktop/spark/weather_dataset"
output_folder = "/Users/maqsat/Desktop/spark/extracted_files"

os.makedirs(output_folder, exist_ok=True)

# unpacking
for zip_file in os.listdir(zip_folder):
    if zip_file.endswith(".zip"):
        zip_path = os.path.join(zip_folder, zip_file)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(output_folder)

# all files Parquet
parquet_files = []
for root, _, files in os.walk(output_folder):
    for file in files:
        if file.endswith(".parquet"):
            parquet_files.append(os.path.join(root, file))

# Displays a list of all found Parquet files
print(f"Found {len(parquet_files)} files Parquet.")
for file in parquet_files:
    print(file)

# Example of Parquet file processing
for parquet_file in parquet_files:
    try:
        df = pd.read_parquet(parquet_file)
        print(f"read the file: {parquet_file}")
        print(df.head())  # show first rows
    except Exception as e:
        print(f"error at reading {parquet_file}: {e}")

Found 552 files Parquet.
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=20/._part-00135-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=20/._part-00134-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=20/._part-00220-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=18/._part-00015-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=18/._part-00212-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extracted_files/__MACOSX/weather/year=2016/month=10/day=18/._part-00016-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
/Users/maqsat/Desktop/spark/extra

In [15]:
!pwd

/Users/maqsat/Desktop/spark


In [17]:
parquet_files = []
for root, _, files in os.walk(output_folder):
    for file in files:
        if file.endswith(".parquet") and "__MACOSX" not in root and not file.startswith("._"):
            parquet_files.append(os.path.join(root, file))

for parquet_file in parquet_files:
    try:
        df = pd.read_parquet(parquet_file)
        print(f"read the file: {parquet_file}")
        print(df.head())  # show first rows
    except Exception as e:
        print(f"error at reading {parquet_file}: {e}")

read the file: /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00135-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
       lng      lat  avg_tmpr_f  avg_tmpr_c   wthr_date
0  167.113 -34.6508        61.7        16.5  2016-10-20
1  170.386 -34.6508        62.2        16.8  2016-10-20
2  173.659 -34.6508        62.4        16.9  2016-10-20
3  176.932 -34.6508        62.8        17.1  2016-10-20
4  112.704 -34.4463        52.8        11.6  2016-10-20
read the file: /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00134-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
       lng      lat  avg_tmpr_f  avg_tmpr_c   wthr_date
0 -111.202  18.7496        81.5        27.5  2016-10-20
1 -111.155  18.7550        81.5        27.5  2016-10-20
2 -111.107  18.7604        81.4        27.4  2016-10-20
3 -111.059  18.7657        81.3        27.4  2016-10-20
4 -111.012  18.7711        81.2        27.3  2016-10-20
read t

In [21]:
# Initialize the resulting DataFrame
enriched_data = restaurants_df.copy()

# weather data processing
for parquet_file in parquet_files:
    try:
        weather_df = pd.read_parquet(parquet_file)

        if 'lat' in weather_df.columns and 'lng' in weather_df.columns:
            weather_df['geohash'] = weather_df.apply(
                lambda row: custom_geohash(row['lat'], row['lng']), axis=1
            )
        else:
            print(f"Skip file (no 'lat' and 'lng'): {parquet_file}")
            continue

        # Check GeoHash intersection
        common_geohash = set(restaurants_df['custom_geohash']).intersection(set(weather_df['geohash']))
        print(f"Geohash general {parquet_file}: {len(common_geohash)}")

        # Combining restaurant and weather data
        enriched_data = pd.merge(
            enriched_data,
            weather_df,
            left_on="custom_geohash",
            right_on="geohash",
            how="left",
            suffixes=("", "_weather")
        )
        print(f": {parquet_file}")
    except Exception as e:
        print(f"errors {parquet_file}: {e}")

# Removing duplicate columns
enriched_data = enriched_data.loc[:, ~enriched_data.columns.duplicated()]
enriched_data.drop(columns=["lat_x", "lng_x", "geohash"], inplace=True, errors="ignore")

output_path = "/Users/maqsat/Desktop/spark/extracted_files/enriched_restaurant_data.parquet"
enriched_data.to_parquet(output_path, index=False)
print(f"Saved: {output_path}")

Geohash general /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00135-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet: 0
: /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00135-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
Geohash general /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00134-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet: 0
: /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00134-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
Geohash general /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00220-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet: 0
: /Users/maqsat/Desktop/spark/extracted_files/weather/year=2016/month=10/day=20/part-00220-44bd3411-fbe4-4e16-b667-7ec0fc3ad489.c000.snappy.parquet
Geohash general /Users/maqsat/Desktop/spark/extracted_files/w

In [23]:
enriched_data.to_csv("/Users/maqsat/Desktop/spark/extracted_files/enriched_restaurant_data.csv", index=False)

In [25]:
enriched_check = pd.read_parquet("/Users/maqsat/Desktop/spark/extracted_files/enriched_restaurant_data.parquet")
print(enriched_check.head())
print(enriched_check.info())

             id  franchise_id    franchise_name  restaurant_franchise_id  \
0  257698037796            37        Cafe Crepe                    26468   
1   25769803831            56  The Waffle House                    72230   
2   85899345988            69    Dragonfly Cafe                    18952   
3  111669149758            63        Cafe Paris                    84488   
4  163208757268            21    The Lazy Daisy                    96638   

  country                city     lat     lng  custom_geohash  lng_weather  \
0      IT               Milan  45.533   9.171    45.533_9.171          NaN   
1      FR               Paris  48.873   2.305    48.873_2.305          NaN   
2      NL           Amsterdam  52.392   4.911    52.392_4.911          NaN   
3      NL  Amsterdam Zuidoost  52.310   4.942     52.31_4.942          NaN   
4      US            Columbus  40.115 -83.015  40.115_-83.015          NaN   

   lat_weather  avg_tmpr_f  avg_tmpr_c wthr_date  avg_tmpr_f_weather  \
0 

In [29]:
import os

output_base_path = "/Users/maqsat/Desktop/spark/extracted_files/"
os.makedirs(output_base_path, exist_ok=True)

# We save data by key, for example, by country
for country, partition_data in enriched_data.groupby('country'):
    # Create a subdirectory for each country
    partition_path = os.path.join(output_base_path, f"country={country}")
    os.makedirs(partition_path, exist_ok=True)
    
    # Saving data in Parquet format
    partition_file = os.path.join(partition_path, "data.parquet")
    partition_data.to_parquet(partition_file, index=False)
    print(f"Dataset for {country} saved in {partition_file}")

Dataset for AT saved in /Users/maqsat/Desktop/spark/extracted_files/country=AT/data.parquet
Dataset for ES saved in /Users/maqsat/Desktop/spark/extracted_files/country=ES/data.parquet
Dataset for FR saved in /Users/maqsat/Desktop/spark/extracted_files/country=FR/data.parquet
Dataset for GB saved in /Users/maqsat/Desktop/spark/extracted_files/country=GB/data.parquet
Dataset for IT saved in /Users/maqsat/Desktop/spark/extracted_files/country=IT/data.parquet
Dataset for NL saved in /Users/maqsat/Desktop/spark/extracted_files/country=NL/data.parquet
Dataset for US saved in /Users/maqsat/Desktop/spark/extracted_files/country=US/data.parquet
