### Yelp Dataset 10Mar2025 Validations or Proof of Concepts

#### 1. Extrat Tar File

In [None]:
# !pip3 install -r requirements.txt

In [None]:
import tarfile
import os

# Define the path to the tar file and an extraction directory
tar_path = '/media/oem/onetbsamdot/datasets/yelp_10Mar2025/Yelp-JSON/Yelp JSON/yelp_dataset.tar'
extract_path = '/media/oem/onetbsamdot/datasets/yelp_10Mar2025/Yelp-JSON/Yelp JSON/'  # or another directory of your choice

# Create the extraction directory if it doesn't exist
os.makedirs(extract_path, exist_ok=True)

# Open and extract the tar file
# with tarfile.open(tar_path, 'r') as tar:
#     tar.extractall(path=extract_path)

# List the extracted files to verify
print(os.listdir(extract_path))

In [None]:
# Check for spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyRapidsSparkApp") \
    .master("local[*]") \
    .config("spark.jars", "/home/oem/jars/rapids-4-spark_2.12-25.02.0.jar") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.executor.resource.gpu.amount", "1") \
    .config("spark.task.resource.gpu.amount", "1") \
    .getOrCreate()

print("RAPIDS enabled:", spark.conf.get("spark.rapids.sql.enabled", "Not set"))


#### 2. Import JSON Tables

In [None]:
# Define file paths (ensure extract_path is defined)
business_file = os.path.join(extract_path, 'yelp_academic_dataset_business.json')
review_file   = os.path.join(extract_path, 'yelp_academic_dataset_review.json')
checkin_file  = os.path.join(extract_path, 'yelp_academic_dataset_checkin.json')
tip_file      = os.path.join(extract_path, 'yelp_academic_dataset_tip.json')
user_file     = os.path.join(extract_path, 'yelp_academic_dataset_user.json')

In [None]:
import pandas as pd
from concurrent.futures import ProcessPoolExecutor # leverage mutli-core processes for parallel compute

# Create a directory to store the converted Parquet files
parquet_dir = "./data"
os.makedirs(parquet_dir, exist_ok=True)

# Function to convert a JSON file to Parquet using Pandas
def convert_json_to_parquet(json_path, parquet_path):
    print(f"Converting {json_path} to {parquet_path}...")
    # Read the JSON file (JSON Lines mode)
    df = pd.read_json(json_path, lines=True)
    # Write the DataFrame to Parquet (without the index)
    df.to_parquet(parquet_path, index=False)
    print(f"Saved {parquet_path}")


# List the conversion tasks as (source, destination) tuples
tasks = [
    (business_file, os.path.join(parquet_dir, 'business.parquet')),
    (review_file, os.path.join(parquet_dir, 'review.parquet')),
    (checkin_file, os.path.join(parquet_dir, 'checkin.parquet')),
    (tip_file, os.path.join(parquet_dir, 'tip.parquet')),
    (user_file, os.path.join(parquet_dir, 'user.parquet')),
]

# Use a ProcessPoolExecutor with 5 workers to convert files in parallel
with ProcessPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(convert_json_to_parquet, src, dest) for src, dest in tasks]
    # Optionally wait for all tasks to complete
    for future in futures:
        future.result()

print("All conversions completed!")

In [None]:
# Load each Parquet file into a Spark DataFrame
business_df = spark.read.parquet(os.path.join(parquet_dir, 'business.parquet')).sample(False, 0.1, seed=42)
review_df   = spark.read.parquet(os.path.join(parquet_dir, 'review.parquet')).sample(False, 0.1, seed=42)
checkin_df  = spark.read.parquet(os.path.join(parquet_dir, 'checkin.parquet')).sample(False, 0.1, seed=42)
tip_df      = spark.read.parquet(os.path.join(parquet_dir, 'tip.parquet')).sample(False, 0.1, seed=42)
user_df     = spark.read.parquet(os.path.join(parquet_dir, 'user.parquet')).sample(False, 0.1, seed=42)

# Show a sample from one DataFrame
business_df.show(5)
spark.stop()