## spark read s3 json

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window, Row, SparkSession

import psycopg2
import pprint
import boto3
import json
import sys
import os

pp = pprint.PrettyPrinter(indent = 3)
print('imported modules.')

# Set Java home environment variable
# os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/temurin-8.jdk/Contents/Home'  # Update this path to match your Java installation

# read creds.json
with open("creds.json", "r") as f:
    creds = json.load(f)
    f.close()

imported modules.


## Spark

In [2]:
# Stop any existing Spark session
if 'spark' in locals():
    spark.stop()

In [None]:
try:
    # Create Spark session with required configurations
    spark = SparkSession.builder \
        .appName("YelpAnalysis") \
        .master("spark://spark-master:7077") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", "4") \
        .config("spark.worker.memory", "2g") \
        .config("spark.cores.max", "4") \
        .config("spark.hadoop.fs.s3a.access.key", creds["aws_client"]) \
        .config("spark.hadoop.fs.s3a.secret.key", creds["aws_secret"]) \
        .config("spark.jars.packages", 
                "org.apache.hadoop:hadoop-aws:3.3.4," + 
                "org.apache.hadoop:hadoop-common:3.3.4," +
                "org.apache.hadoop:hadoop-aws:3.3.4," + 
                "com.amazonaws:aws-java-sdk-bundle:1.12.261," +
                "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2," +
                "org.apache.logging.log4j:log4j-api:2.17.2," +
                "org.apache.logging.log4j:log4j-core:2.17.2," + 
                "org.apache.hadoop:hadoop-client:3.3.4," + 
                "io.delta:delta-core_2.12:2.4.0," + 
                "org.postgresql:postgresql:42.2.18") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
        # .config("spark.jars.packages", "org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client:3.3.4,io.delta:delta-core_2.12:2.3.0,org.postgresql:postgresql:9.4.1212") \
        
    
except Exception as e:
    print(str(e))

### read s3

In [5]:
client = boto3.client('s3')
bucket = "yelp-stevenhurwitt-2"
file = "yelp_academic_dataset_business.json"

bucket_meta = client.list_objects(Bucket = bucket)
print('files in s3 bucket:')
print('')
for c in bucket_meta['Contents']:
    print(c['Key'])



ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjects operation: The AWS Access Key Id you provided does not exist in our records.

## spark read json

In [9]:
bucket = "yelp-stevenhurwitt-2"
file = "yelp_academic_dataset_business.json"

# Define schema for Yelp business data
business_schema = StructType([
    StructField("business_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("stars", DoubleType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", MapType(StringType(), StringType()), True),
    StructField("categories", StringType(), True),
    StructField("hours", MapType(StringType(), StringType()), True)
])

# Read JSON file from S3
business_df = spark.read \
    .format("json") \
    .schema(business_schema) \
    .option("multiLine", "false") \
    .load(f"s3a://{bucket}/{file}")


25/03/26 18:55:44 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


## verify data load

In [10]:
# Verify the data load
print("Row count:", business_df.count())
business_df.show(5, truncate=False)

                                                                                

Row count: 150346


                                                                                

+----------------------+------------------------+-------------------------------+-------------+-----+-----------+----------+------------+-----+------------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|business_id           |name                    |address                        |city         |state|p

In [3]:
def read_json(filename):
    """
    reads a yelp .json file from s3 bucket.

    keyword arguments:
    filename - name of file (str)

    returns: json_file (json)
    """

    bucket = "yelp-dataset-stevenhurwitt"
    file = "raw/yelp_academic_dataset_business.json"
    response = client.get_object(Bucket = bucket, Key = file)
    
    file_content = response['Body'].read().decode('utf-8')
    json_file = json.loads("[" + file_content.replace("}\n{", "},\n{") + "]")
    return(json_file)

## read json files

In [4]:
business_file = read_json("raw/yelp_academic_dataset_business.json")
checkin_file = read_json("raw/yelp_academic_dataset_checkin.json")
review_file = read_json("raw/yelp_academic_dataset_review.json")
tip_file = read_json("raw/yelp_academic_dataset_tip.json")
user_file = read_json("raw/yelp_academic_dataset_user.json")

print("read json files from s3.")

read json files from s3.


## read delta

In [11]:
def read_delta(path: str) -> DataFrame:
    """
    Read a Delta table from S3 path
    
    Args:
        path (str): S3 path to delta table
        
    Returns:
        DataFrame: Spark DataFrame containing the delta table data
    """
    try:
        df = spark.read \
            .format("delta") \
            .option("inferSchema", "true") \
            .load(path)
            
        print(f"Successfully read delta table from: {path}")
        print(f"Number of rows: {df.count()}")
        return df
        
    except Exception as e:
        print(f"Error reading delta table from {path}")
        print(f"Error: {str(e)}")
        return None

In [None]:
# Example usage:
bucket = "yelp-stevenhurwitt-2"

# Read all delta tables
business_file = read_delta(f"s3a://{bucket}/business")
checkin_file = read_delta(f"s3a://{bucket}/checkins")
review_file = read_delta(f"s3a://{bucket}/reviews")
tip_file = read_delta(f"s3a://{bucket}/tips")
user_file = read_delta(f"s3a://{bucket}/users")

# Verify data loaded successfully
for df, name in [(business_file, "business"), 
                 (checkin_file, "checkins"),
                 (review_file, "reviews"),
                 (tip_file, "tips"),
                 (user_file, "users")]:
    if df is not None:
        print(f"\n{name} table schema:")
        df.printSchema()

                                                                                

Successfully read delta table from: s3a://yelp-stevenhurwitt-2/business


25/03/26 19:06:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

## business

In [7]:
business_file.show(20)

[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+-------------------+-----+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+
|         business_id|                name|             address|               city|state|postal_code|     latitude|     longitude|stars|review_count|is_open|          attributes|          categories|               hours|
+--------------------+--------------------+--------------------+-------------------+-----+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+
|lwItZ1Ck3KtpCgG4C...|Stomel Elliot Att...|  532 Rte 70 W, Fl 2|        Cherry Hill|   NJ|      08002|    39.915478|    -75.016973|  5.0|           5|      1|                null|DUI Law, Professi...|{Monday -> 0:0-0:...|
|8rb-3VYXE37IZix4y...|Sharky's Sports B...|820 N Black Horse...|       Williamstown|   NJ|      08094|   39.6968

                                                                                

## checkins

In [10]:
checkin_file.show(20)



+--------------------+--------------------+
|         business_id|                date|
+--------------------+--------------------+
|k9-r1j1VeWY1BLS0I...|2011-10-02 14:21:...|
|k91vKZa8oDHGFEumL...|2012-05-05 23:00:...|
|k920l7wHSjLZDJLvn...|2016-05-07 18:28:...|
|k93U5RS4ohNqk9912...|2013-09-21 17:45:...|
|k94QZmoxe9RwUgwu1...|2016-05-27 14:21:...|
|k97mvU1TQ0Inr5vJ1...|2013-06-15 16:18:...|
|k99d0o0T_qS8-oidI...|2013-07-15 14:07:...|
|k9A4tAmei12tLprkr...|2011-01-21 16:04:...|
|k9ASnJ6A-nBZhdwPN...|2011-01-19 22:19:...|
|k9AnxGfiuHdB5DZo-...|2011-01-18 14:34:...|
|k9CwhDTFoInTVUvmv...|2011-04-02 22:18:...|
|k9H5CerrIu4tSiPQR...|2014-02-23 19:55:...|
|k9HFcXSsqMPy29Lt_...|2014-07-12 23:37:...|
|k9HrEbKuLHDc2rzxZ...|2012-12-08 21:08:...|
|k9KqXzZByboldRrAs...|2018-03-01 19:12:...|
|k9Ms586e_elwkuSfk...|2010-07-04 00:01:...|
|k9QaQYOU-egM3UpS8...|2010-11-13 15:52:...|
|k9SrdFwfKDQt-uLic...|2013-07-15 11:43:...|
|k9TX0DT-WNz2S741t...|2011-07-01 03:33:...|
|k9TyrHMGojZvQznyG...|2010-11-02

                                                                                

## users

In [11]:
user_file.show(20)



+--------------------+---------+------------+-------------------+--------------------+------+-----+----+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|             user_id|     name|review_count|      yelping_since|             friends|useful|funny|cool|fans|elite|average_stars|compliment_hot|compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool|compliment_funny|compliment_writer|compliment_photos|
+--------------------+---------+------------+-------------------+--------------------+------+-----+----+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|XACigsMQP4VYX970e...|       Al|           3|2019-06-30 19

                                                                                

## reviews

In [13]:
review_file.show(20)

                                                                                

+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+----+-----+
|           review_id|             user_id|         business_id|stars|useful|funny|cool|                text|               date|year|month|
+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+----+-----+
|dlIO7e_OGH0rjZ2n6...|GB3qwSzhx2d8DmDSI...|ecI3FBTM0f99Fnml3...|  5.0|     0|    0|   0|One of my all tim...|2006-05-19 02:22:45|2006|    5|
|p8gMItBQKZO4ka_K-...|FBRjdSizGuMyxQuSS...|fOhnSqmO4XY5vSI8w...|  3.0|     2|    0|   0|a light, airy atm...|2006-05-28 08:28:17|2006|    5|
|wtFktH-lJsZFcD9Ye...|6OV_PFTl9RW2FmYQo...|udHVIrP8z10Y4M0oc...|  5.0|     1|    0|   0|Great place for p...|2006-05-07 02:16:44|2006|    5|
|Pv7IhPgwadOpgKIad...|gfQqQYI5_hCAGEHlH...|Rrd1WEcFWYRH85HdH...|  3.0|     0|    1|   0|The bagels are pr...|2006-05-04 06:16:17|2006|    5|
|p_MRUqC20YmV

                                                                                

## join reviews and user_name and business_name

In [None]:
jdbc_url = "jdbc:postgresql://" + creds["postgres_host"] + ":5433/" + creds["postgres_db"]

connection_properties = {
    "user": creds["postgres_user"],        # PostgreSQL username
    "password": creds["postgres_password"], # PostgreSQL password
    "driver": "org.postgresql.Driver"       # JDBC driver class
}

join_query = """
    SELECT 
        r.*,
        u.name as user_name,
        b.name as business_name
    FROM reviews r
    LEFT JOIN users u ON r.user_id = u.user_id 
    LEFT JOIN business b ON r.business_id = b.business_id
"""

# Execute query using Spark's JDBC connection
df_reviews_join = spark.read \
    .jdbc(url=jdbc_url,
          table=f"({join_query}) as reviews_join",
          properties=connection_properties)

df_reviews_join.show()

In [None]:
joined_df = spark.sql("""
    SELECT 
        r.*,
        u.name as user_name,
        b.name as business_name
    FROM reviews r
    LEFT JOIN users u ON r.user_id = u.user_id 
    LEFT JOIN business b ON r.business_id = b.business_id
""")

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `reviews` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 6 pos 9;
'Project [r.*, 'u.name AS user_name#3472, 'b.name AS business_name#3473]
+- 'Join LeftOuter, ('r.business_id = 'b.business_id)
   :- 'Join LeftOuter, ('r.user_id = 'u.user_id)
   :  :- 'SubqueryAlias r
   :  :  +- 'UnresolvedRelation [reviews], [], false
   :  +- 'SubqueryAlias u
   :     +- 'UnresolvedRelation [users], [], false
   +- SubqueryAlias b
      +- SubqueryAlias business
         +- View (`business`, [business_id#37,name#38,address#39,city#40,state#41,postal_code#42,latitude#43,longitude#44,stars#45,review_count#46,is_open#47,attributes#48,categories#49,hours#50])
            +- Relation [business_id#37,name#38,address#39,city#40,state#41,postal_code#42,latitude#43,longitude#44,stars#45,review_count#46,is_open#47,attributes#48,categories#49,hours#50] parquet


## tips

In [12]:
tip_file.show(20)

+--------------------+--------------------+--------------------+-------------------+----------------+----+
|             user_id|         business_id|                text|               date|compliment_count|year|
+--------------------+--------------------+--------------------+-------------------+----------------+----+
|veuUOGS0bbOeQzu71...|aurSXIlX86Ob94kYv...|             Awesome|2021-01-01 23:19:41|               0|2021|
|JfAqGalRYKo3Byw73...|mP9dVul2VKgVIs_kZ...|SO quick and easy...|2021-01-21 03:03:02|               0|2021|
|ojvZZ_ZWRlfJDJN_k...|J7-mw216H21rjz-E7...|Love Kenny's flow...|2021-01-25 03:52:12|               0|2021|
|77xznhdIIfltTaY5j...|m3NH0HumRCp4ARx8R...|Must try the chee...|2021-02-15 18:51:36|               0|2021|
|lvthTfCQGD0qaEk6j...|ruXD0lB3rq4FMvnQL...|They have a touch...|2021-02-15 20:18:45|               0|2021|
|rsopw45VdcXxw0zEY...|djeYYE2MIT36obh5m...|The one on McCarr...|2021-01-01 05:19:55|               0|2021|
|3wVSwirz80_gXRcIB...|TsZEKqs0wzP3WHn

In [10]:
tip_file.groupBy("year").count().orderBy("year").show()



+----+------+
|year| count|
+----+------+
|2009|   665|
|2010| 26712|
|2011| 83395|
|2012|110459|
|2013|107563|
|2014|109160|
|2015| 89686|
|2016| 94333|
|2017| 93909|
|2018| 67033|
|2019| 57646|
|2020| 32436|
|2021| 34993|
|2022|   925|
+----+------+



                                                                                

## spark sql

In [11]:
# Create temporary views for the DataFrames
business_file.createOrReplaceTempView("business")
checkin_file.createOrReplaceTempView("checkins")

# Perform the join using Spark SQL
joined_df = spark.sql("""
    SELECT 
        b.name as business_name,
        b.business_id,
        c.date as checkin_date
    FROM business b
    INNER JOIN checkins c 
        ON b.business_id = c.business_id
    ORDER BY b.name, c.date
""")

# Show results
print("Number of joined records:", joined_df.count())
joined_df.show(20, truncate=False)

                                                                                

Number of joined records: 131930


                                                                                

+---------------------------------------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## reviews

In [8]:
review_file.show(20)

                                                                                

+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+----+-----+
|           review_id|             user_id|         business_id|stars|useful|funny|cool|                text|               date|year|month|
+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+----+-----+
|dlIO7e_OGH0rjZ2n6...|GB3qwSzhx2d8DmDSI...|ecI3FBTM0f99Fnml3...|  5.0|     0|    0|   0|One of my all tim...|2006-05-19 02:22:45|2006|    5|
|p8gMItBQKZO4ka_K-...|FBRjdSizGuMyxQuSS...|fOhnSqmO4XY5vSI8w...|  3.0|     2|    0|   0|a light, airy atm...|2006-05-28 08:28:17|2006|    5|
|wtFktH-lJsZFcD9Ye...|6OV_PFTl9RW2FmYQo...|udHVIrP8z10Y4M0oc...|  5.0|     1|    0|   0|Great place for p...|2006-05-07 02:16:44|2006|    5|
|Pv7IhPgwadOpgKIad...|gfQqQYI5_hCAGEHlH...|Rrd1WEcFWYRH85HdH...|  3.0|     0|    1|   0|The bagels are pr...|2006-05-04 06:16:17|2006|    5|
|p_MRUqC20YmV

## write to postgres

In [7]:
# ! pip install psycopg2

In [6]:
from pyspark.sql import DataFrame
import psycopg2

def drop_postgres_tables():
    """Drop PostgreSQL tables in correct order using psycopg2"""
    try:
        # Create direct PostgreSQL connection
        conn = psycopg2.connect(
            host=creds["postgres_host"],
            port=5433,
            database=creds["postgres_db"],
            user=creds["postgres_user"],
            password=creds["postgres_password"]
        )
        
        # Create cursor
        cur = conn.cursor()
        
        # Tables in reverse dependency order
        tables = ["tips", "reviews", "checkins", "users", "business"]
        
        for table in tables:
            cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
            print(f"Dropped table: {table}")
            
        # Commit changes and close connections
        conn.commit()
        cur.close()
        conn.close()
        print("Successfully dropped all tables")
            
    except Exception as e:
        print(f"Error dropping tables: {str(e)}")

# Drop existing tables first
drop_postgres_tables()

Dropped table: tips
Dropped table: reviews
Dropped table: checkins
Dropped table: users
Dropped table: business
Successfully dropped all tables


In [8]:
def write_to_postgres(df, table_name):
    """Write DataFrame to PostgreSQL table"""
    
    # Get database credentials from docker-compose environment
    jdbc_url = "jdbc:postgresql://" + creds["postgres_host"] + ":5433/" + creds["postgres_db"]
    connection_properties = {
        "user": creds["postgres_user"],
        "password": creds["postgres_password"],
        "driver": "org.postgresql.Driver"
    }
    
    try:
        print(f"Writing {table_name} to PostgreSQL...")
        
        # Convert complex types for PostgreSQL compatibility
        if table_name == "business":
            df = df.withColumn("attributes", to_json("attributes")) \
                  .withColumn("hours", to_json("hours"))
        elif table_name == "yelp_users":
            # Check if columns are arrays before converting
            if "elite" in df.columns and df.schema["elite"].dataType.typeName() == "array":
                df = df.withColumn("elite", array_join("elite", ","))
            if "friends" in df.columns and df.schema["friends"].dataType.typeName() == "array":
                df = df.withColumn("friends", array_join("friends", ","))
        
        # Write to PostgreSQL
        df.write \
            .jdbc(url=jdbc_url,
                  table=table_name,
                  mode="overwrite",
                  properties=connection_properties)
        
        print(f"Successfully wrote {df.count()} rows to {table_name}")
        
    except Exception as e:
        print(f"Error writing to {table_name}: {str(e)}")
        # Print schema for debugging
        print("\nSchema of the DataFrame:")
        df.printSchema()

# Write all tables in correct order
tables_to_write = {
    "business": business_file,
    "yelp_users": user_file,
    "checkins": checkin_file,
    "reviews": review_file,
    "tips": tip_file
}

for table_name in ["business", "yelp_users", "checkins", "reviews", "tips"]:
    if table_name in tables_to_write:
        write_to_postgres(tables_to_write[table_name], table_name)

Writing business to PostgreSQL...


                                                                                

Successfully wrote 150346 rows to business
Writing yelp_users to PostgreSQL...


                                                                                

Successfully wrote 1987897 rows to yelp_users
Writing checkins to PostgreSQL...


                                                                                

Successfully wrote 131930 rows to checkins
Writing reviews to PostgreSQL...


                                                                                

Successfully wrote 6990280 rows to reviews
Writing tips to PostgreSQL...




Successfully wrote 908915 rows to tips


                                                                                

## counts

In [9]:
jdbc_url = "jdbc:postgresql://" + creds["postgres_host"] + ":5433/" + creds["postgres_db"]

connection_properties = {
    "user": creds["postgres_user"],        # PostgreSQL username
    "password": creds["postgres_password"], # PostgreSQL password
    "driver": "org.postgresql.Driver"       # JDBC driver class
}

count_query = """
SELECT 
    table_name,
    count
FROM (
    SELECT 'business' as table_name, COUNT(*) as count FROM business
    UNION ALL
    SELECT 'yelp_users' as table_name, COUNT(*) as count FROM yelp_users
    UNION ALL
    SELECT 'checkins' as table_name, COUNT(*) as count FROM checkins
    UNION ALL
    SELECT 'reviews' as table_name, COUNT(*) as count FROM reviews
    UNION ALL
    SELECT 'tips' as table_name, COUNT(*) as count FROM tips
) counts
ORDER BY table_name
"""

# Execute query using Spark's JDBC connection
df_counts = spark.read \
    .jdbc(url=jdbc_url,
          table=f"({count_query}) as counts",
          properties=connection_properties)

df_counts.show()

[Stage 100:>                                                        (0 + 1) / 1]

+----------+-------+
|table_name|  count|
+----------+-------+
|  business| 150346|
|  checkins| 131930|
|   reviews|6990280|
|      tips| 908915|
|yelp_users|1987897|
+----------+-------+



                                                                                

### review

In [5]:
pp.pprint(review_file[0])

{  'address': '1616 Chapala St, Ste 2',
   'attributes': {'ByAppointmentOnly': 'True'},
   'business_id': 'Pns2l4eNsfO8kk83dixA6A',
   'categories': 'Doctors, Traditional Chinese Medicine, '
                 'Naturopathic/Holistic, Acupuncture, Health & Medical, '
                 'Nutritionists',
   'city': 'Santa Barbara',
   'hours': None,
   'is_open': 0,
   'latitude': 34.4266787,
   'longitude': -119.7111968,
   'name': 'Abby Rappoport, LAC, CMQ',
   'postal_code': '93101',
   'review_count': 7,
   'stars': 5.0,
   'state': 'CA'}


## write to delta w/ manifest

In [None]:
def read_and_write_delta_with_manifest(input_path: str, output_path: str, table_name: str) -> None:
    """
    Read a Delta table and write it with symlink manifest for Athena compatibility
    
    Args:
        input_path (str): S3 path to source delta table
        output_path (str): S3 path for output delta table
        table_name (str): Name of the table for logging
    """
    try:
        # Read Delta table
        print(f"Reading {table_name} table from {input_path}")
        df = spark.read.format("delta").load(input_path)
        
        # Write with Delta format
        print(f"Writing {table_name} table to {output_path}")
        df.write \
            .format("delta") \
            .mode("overwrite") \
            .save(output_path)
        
        # Generate manifest file
        print(f"Generating manifest for {table_name}")
        deltaTable = DeltaTable.forPath(spark, output_path)
        deltaTable.generate("symlink_format_manifest")
        
        print(f"Successfully processed {table_name} table")
        print(f"Row count: {df.count()}")
        
    except Exception as e:
        print(f"Error processing {table_name} table")
        print(f"Error: {str(e)}")

# Define bucket and paths
bucket = "yelp-stevenhurwitt-2"
tables = {
    "business": {"input": f"s3a://{bucket}/business", "output": f"s3a://{bucket}/business_athena"},
    "checkins": {"input": f"s3a://{bucket}/checkins", "output": f"s3a://{bucket}/checkins_athena"},
    "reviews": {"input": f"s3a://{bucket}/reviews", "output": f"s3a://{bucket}/reviews_athena"},
    "tips": {"input": f"s3a://{bucket}/tips", "output": f"s3a://{bucket}/tips_athena"},
    "users": {"input": f"s3a://{bucket}/users", "output": f"s3a://{bucket}/users_athena"}
}

# Process all tables
for table_name, paths in tables.items():
    print(f"\nProcessing {table_name} table...")
    read_and_write_delta_with_manifest(
        input_path=paths["input"],
        output_path=paths["output"],
        table_name=table_name
    )

## write to postgres

In [None]:
def write_to_postgres(df, table_name, mode="overwrite"):
    """
    Write DataFrame to PostgreSQL
    """
    try:
        print(f"Writing {table_name} to PostgreSQL...")
        
        df.write \
            .format("jdbc") \
            .option("url", f"jdbc:postgresql://{creds['postgres_host']}:5433/{creds['postgres_db']}") \
            .option("driver", "org.postgresql.Driver") \
            .option("dbtable", table_name) \
            .option("user", creds["postgres_user"]) \
            .option("password", creds["postgres_pass"]) \
            .mode(mode) \
            .save()
            
        print(f"Successfully wrote {table_name} to PostgreSQL")
        
    except Exception as e:
        print(f"Error writing {table_name} to PostgreSQL: {str(e)}")

# Process each table
tables = {
    "business": business_file,
    "checkins": checkin_file,
    "reviews": review_file,
    "tips": tip_file,
    "users": user_file
}

# Write each table to PostgreSQL
for table_name, df in tables.items():
    if df is not None:
        # Convert map types to json strings for PostgreSQL compatibility
        if table_name == "business":
            df = df.withColumn("attributes", to_json("attributes")) \
                  .withColumn("hours", to_json("hours"))
        elif table_name == "users":
            df = df.withColumn("elite", array_join("elite", ",")) \
                  .withColumn("friends", array_join("friends", ","))
                  
        write_to_postgres(df, table_name)

# Verify row counts
for table_name in tables.keys():
    count_df = spark.read \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{creds['postgres_host']}:5433/{creds['postgres_db']}") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"(SELECT COUNT(*) as count FROM {table_name}) tmp") \
        .option("user", creds["postgres_user"]) \
        .option("password", creds["postgres_pass"]) \
        .load()
    
    print(f"{table_name} row count: {count_df.collect()[0]['count']}")

### tip

In [6]:
pp.pprint(tip_file[0])

{  'address': '1616 Chapala St, Ste 2',
   'attributes': {'ByAppointmentOnly': 'True'},
   'business_id': 'Pns2l4eNsfO8kk83dixA6A',
   'categories': 'Doctors, Traditional Chinese Medicine, '
                 'Naturopathic/Holistic, Acupuncture, Health & Medical, '
                 'Nutritionists',
   'city': 'Santa Barbara',
   'hours': None,
   'is_open': 0,
   'latitude': 34.4266787,
   'longitude': -119.7111968,
   'name': 'Abby Rappoport, LAC, CMQ',
   'postal_code': '93101',
   'review_count': 7,
   'stars': 5.0,
   'state': 'CA'}


### user

In [7]:
pp.pprint(user_file[0])

{  'address': '1616 Chapala St, Ste 2',
   'attributes': {'ByAppointmentOnly': 'True'},
   'business_id': 'Pns2l4eNsfO8kk83dixA6A',
   'categories': 'Doctors, Traditional Chinese Medicine, '
                 'Naturopathic/Holistic, Acupuncture, Health & Medical, '
                 'Nutritionists',
   'city': 'Santa Barbara',
   'hours': None,
   'is_open': 0,
   'latitude': 34.4266787,
   'longitude': -119.7111968,
   'name': 'Abby Rappoport, LAC, CMQ',
   'postal_code': '93101',
   'review_count': 7,
   'stars': 5.0,
   'state': 'CA'}


### checkin

In [8]:
pp.pprint(checkin_file[0])

{  'address': '1616 Chapala St, Ste 2',
   'attributes': {'ByAppointmentOnly': 'True'},
   'business_id': 'Pns2l4eNsfO8kk83dixA6A',
   'categories': 'Doctors, Traditional Chinese Medicine, '
                 'Naturopathic/Holistic, Acupuncture, Health & Medical, '
                 'Nutritionists',
   'city': 'Santa Barbara',
   'hours': None,
   'is_open': 0,
   'latitude': 34.4266787,
   'longitude': -119.7111968,
   'name': 'Abby Rappoport, LAC, CMQ',
   'postal_code': '93101',
   'review_count': 7,
   'stars': 5.0,
   'state': 'CA'}


### business

In [9]:
pp.pprint(business_file[0])

{  'address': '1616 Chapala St, Ste 2',
   'attributes': {'ByAppointmentOnly': 'True'},
   'business_id': 'Pns2l4eNsfO8kk83dixA6A',
   'categories': 'Doctors, Traditional Chinese Medicine, '
                 'Naturopathic/Holistic, Acupuncture, Health & Medical, '
                 'Nutritionists',
   'city': 'Santa Barbara',
   'hours': None,
   'is_open': 0,
   'latitude': 34.4266787,
   'longitude': -119.7111968,
   'name': 'Abby Rappoport, LAC, CMQ',
   'postal_code': '93101',
   'review_count': 7,
   'stars': 5.0,
   'state': 'CA'}


In [10]:
dynamodb = boto3.resource('dynamodb', endpoint_url="https://us-east-2.console.aws.amazon.com?arn=arn:aws:dynamodb:us-east-2134132211607:8000")
print('created dynamo resource.')

# try:
#     yelp_business = dynamodb.create_table(
#             TableName='yelp.business',
#             KeySchema=[
#                 {
#                     'AttributeName': 'business_id',
#                     'KeyType': 'HASH'  # Partition key
#                 }
#             ],
#             AttributeDefinitions=[
#                 {
#                     'AttributeName': 'name',
#                     'AttributeType': 'S'
#                 }
#             ],
#             ProvisionedThroughput={
#                 'ReadCapacityUnits': 25,
#                 'WriteCapacityUnits': 20
#             }
#         )
#     print('created dynamo table.')

# except Exception as e:
#     print(f"exception: {e}")
#     print("failed to create dynamo table.")

created dynamo resource.


In [11]:
print(bucket) 

yelp-dataset-stevenhurwitt


In [12]:
print(file)

raw/yelp_academic_dataset_business.json


In [13]:
print(len(business_file))
print(len(checkin_file))
# review_file = read_json("raw/yelp_academic_dataset_review.json")
# tip_file = read_json("raw/yelp_academic_dataset_tip.json")
# user_file = read_json("raw/yelp_academic_dataset_user.json")


150346
150346


In [14]:
print('business json file has {} records with size of {} mb.'.format(len(business_file), sys.getsizeof(business_file)/1000000))
print('tip json file has {} records with size of {} mb.'.format(len(tip_file), sys.getsizeof(tip_file)/1000000))
print('user json file has {} records with size of {} mb.'.format(len(user_file), sys.getsizeof(user_file)/1000000))
print('review json file has {} records with size of {} mb.'.format(len(review_file), sys.getsizeof(review_file)/1000000))
print('checkin json file has {} records with size of {} mb.'.format(len(checkin_file), sys.getsizeof(checkin_file)/1000000))


business json file has 150346 records with size of 1.28316 mb.
tip json file has 150346 records with size of 1.28316 mb.
user json file has 150346 records with size of 1.28316 mb.
review json file has 150346 records with size of 1.28316 mb.
checkin json file has 150346 records with size of 1.28316 mb.


In [15]:
# df_pandas = business.toPandas()

In [16]:
# html_df = df_pandas.to_html()

In [17]:
# display(html_df)

In [18]:
# ! gcloud auth login --no-launch-browser