In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyarrow

import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb: 3000"

import torch

print('Cuda status: %s' % str(torch.cuda.is_available()))
print('Divices available: %d' % torch.cuda.device_count())
print('Device name: %s' % torch.cuda.get_device_name())

Cuda status: True
Divices available: 1
Device name: NVIDIA GeForce GTX 1650


### Spark Session

In [2]:
import findspark
findspark.init('C:\spark')

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName('Reviews Session')
         .config('spark.executor.memory', '4g')
         .config('spark.executor.cores', '2')
         .config('spark.driver.memory', '2g')
         .getOrCreate())

spark

### Getting unzipped files from other file path

In [3]:
from file_handling_tools import FileHandling

# This will decompress the multiple zip files.
# Having the input directory where the zip files are, and the output directory where the unzipped files will be stored.

input_directory = r"C:\Users\jdieg\Desktop\henry\proyectos\FINAL\data\google_maps"
output_directory = r"C:\Users\jdieg\Desktop\data_camp\projects\restaurant_reviews\data\google"
# output_directory_1 = r"C:\Users\jdieg\Desktop\data_camp\unzipping_test"

# file_handling = FileHandling(input_directory, output_directory_1)

# # Execute the decompression process (Execute it only for the first time)
# file_handling.unzip_folders()

### Have a look at Data Files.
On:
* metadata-sitios (json files)
* reviews (json files)

To get fields names, type of data, and so on..

#### metadata-sitios (json file)

In [4]:
# Multi Loading json files for metadata
metadata_df = spark.read.json("data/google/*00[2-3]/*/metadata*/*.json")

In [4]:
# metadata_df.count()

3025011

In [5]:
# Print the Schema to have a lot at structure DataFrame
# metadata_df.printSchema()

root
 |-- MISC: struct (nullable = true)
 |    |-- Accessibility: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Amenities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Atmosphere: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Crowd: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Dining options: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- From the business: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Getting here: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health & safety: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health and safety: array (nullable = true)


It seems there are few fields with struct type, in other words, nested data which can be any type of it, since a simple list, dictionary, dictionary with nested disctionaries even so.. let's leave it for latter. 

And it'd be nice to have a cleaner visualization for the nested columns.  I created a simple class in the info_spark_dataset library to only print column names in the style of points 

In [6]:
# from info_spark_dataset import InfoDataSet
# info_data = InfoDataSet(metadata_df._jdf, metadata_df.sparkSession)

Data Dictionary for metadata

Columns:
	
1. MISC: Nested column, for latter.
2. address: 
3. avg_rating
4. category
5. description
6. gmap_id
7. hours
8. latitude
9. longitude
10. name
11. num_of_reviews
12. price
13. relative_results
14. state
15. url

Since we are interested on only restaurants, let's begin filtering out what places are restaurants, this may reduce dataset.

### Category

In [7]:
# Have a look at some rows on category
# metadata_df.select('category').show(100, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|category                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[Pharmacy]                                                                                                                                                                |
|[General store, ATM]                                                                                                                                                      |
|[Pet groomer]                                                                                                                         

There are multiple categories that are not necesary restaurants. There are many ways to approach this, but here, I will get for good to filter any rows's category that contains at least 1 word -"Restaurant"-

To get this done:
* Create an User Defined Functions:
    * That flags the counting of any "restaurant" occurance
* Create a new column with the prior counting of "restaurant" word
* Filter out any row that flags more than 0 rows counting

In [5]:

import pyspark.sql.functions as F
from pyspark.sql.types import *


def filter_category(target_cat, categories):
    """
    Filter categories based on a target category.

    This function takes a target category and a list of categories as input.
    It counts the number of categories containing the target category (case-insensitive) within them.
    
    Parameters:
    - target_cat (str): The target category to filter.
    - categories (list of str): A list of categories to search within.
    
    Returns:
    - int: The count of categories containing the target category (case-insensitive).

    Example:
    ```python
    target_category = 'Restaurant'
    categories = ['Fast Food Restaurant', 'Italian Restaurant', 'Café']
    count = filter_category(target_category, categories)
    print(count)  # Output: 2
    ```
    """
    # If categories is None, return 0
    if categories is None:
        return 0
    # Count the number of categories containing the target category (case-insensitive)
    return len([category for category in categories if target_cat.lower() in {cat.lower() for cat in category.split()}])


# Define a User Defined Function (UDF) using the filter_category function
udf_FilterCategory = F.udf(filter_category, IntegerType())

# Apply the UDF to the DataFrame to create a new column 'cat_count_temp' containing category counts
metadata_df = metadata_df.withColumn('cat_count_temp', udf_FilterCategory(F.lit('Restaurant'), metadata_df.category))
# Filter the DataFrame to include only rows where 'cat_count_temp' is greater than 0
output_dataframe = metadata_df.filter(metadata_df.cat_count_temp.cast('int') > 0)


In [6]:
restaurants_metadata = metadata_df.filter(metadata_df.cat_count_temp.cast('int') > 0)
# restaurants_metadata.count()

212008

### Finally, our dataset containing only Restaurant as category.

In [7]:
restaurants_metadata = restaurants_metadata.drop(restaurants_metadata['cat_count_temp'])

In [64]:
# restaurants_metadata.count(), len(restaurants_metadata.columns) --> (212008, 15)

In [8]:
google_ids = restaurants_metadata.select('gmap_id')

In [9]:
google_ids = google_ids.withColumnRenamed('gmap_id','gmap_id_1')

### Extracting reviews from Google Json Files

In [10]:
# First read all file from the data folder in the google_review_restaurants.
# Remember, PySpark supports global file paths to leaveare wild cards in directories
# Wherever a .json file is.
google_df = spark.read.json("data/google/*/*/reviews-estados/review-*/*.json")

In [11]:
# google_df.count(), google_df.columns
# Results >>
# (89946359,
#  ['gmap_id', 'name', 'pics', 'rating', 'resp', 'text', 'time', 'user_id'])

In [12]:
google_df.dtypes

[('gmap_id', 'string'),
 ('name', 'string'),
 ('pics', 'array<struct<url:array<string>>>'),
 ('rating', 'bigint'),
 ('resp', 'struct<text:string,time:bigint>'),
 ('text', 'string'),
 ('time', 'bigint'),
 ('user_id', 'string')]

In order to filter out all reviews that are related restaurantes, we are going to use the last restaurants data frame, and use the gmap_id field to join it to the reviews so by right joining will allow to get only all those rows in the restaurante dataframe.

In [13]:
# For a beter memory usage, we can broadcast the smaller dataframe so all resourses will concentrate to join on the fields values
from pyspark.sql.functions import broadcast

restaurant_reviews = google_df.join(
    # Broadcast the smaller dataframe
    broadcast(google_ids), 
    # Join on gmap_id 
    google_df['gmap_id'] == google_ids['gmap_id_1'], 
    # To the right -google_ids datafrane-
    how='right')

In [14]:
restaurant_reviews = restaurant_reviews.drop(restaurant_reviews['pics'])
restaurant_reviews = restaurant_reviews.drop(restaurant_reviews['resp'])
restaurant_reviews = restaurant_reviews.drop(restaurant_reviews['gmap_id_1'])


In [15]:
# restaurant_reviews.show(20)

In [16]:
# Filtering out nulls on texts column
text_filter = ~restaurant_reviews['text'].isNull()
restaurant_reviews = restaurant_reviews.where(text_filter)

In [17]:
# restaurant_reviews.count()
# this counting should compute 7093988

LEt's get rid of dulicates so we can get a clean table of unnecesary rows

In [18]:
restaurant_reviews = restaurant_reviews.drop_duplicates()
# restaurant_reviews.count()

Let's convert dates field, from unix date to date format. It will be used for partition.

In [19]:
from pyspark.sql.functions import from_unixtime, year

restaurant_reviews = restaurant_reviews.withColumn('date', from_unixtime(restaurant_reviews.time / 1000))
restaurant_reviews = restaurant_reviews.drop(restaurant_reviews['time'])

In [20]:
restaurant_reviews = restaurant_reviews.withColumn('year', year("date"))

Treating `\n` and `,` as plain text by adding an extra coma to text reviews will ensure that those two characters are going to be taken as text and not as delimeters

In [21]:
from pyspark.sql.functions import col, regexp_replace, regexp_extract, when

# Replace double quotes with two quoptes to escape them
restaurant_reviews = restaurant_reviews.withColumn('text', regexp_replace(col("text"), '"', '""'))

# Replace the '\n' with period with space in forward
restaurant_reviews = restaurant_reviews.withColumn('text', regexp_replace(col('text'), '\n', '. '))

# Enclose text in double quotes
restaurant_reviews = restaurant_reviews.withColumn('text', col('text').cast('string'))

# rename column
restaurant_reviews = restaurant_reviews.withColumnRenamed('text', 'reviews')

In [22]:
# Define a resgular expression to extract text following "(translated by Google)"
pattern = r'\(Translated by Google\)(.*)?\(Original\)'

# Extract text
restaurant_reviews = restaurant_reviews.withColumn('reviews', 
                                                   when(col("reviews").startswith('(Translated by Google)'), regexp_extract("reviews", pattern, 1))
                                                   .otherwise(col("reviews")))

In [23]:
from pyspark.sql.functions import col, trim, regexp_replace

# Cast rating as int
restaurant_reviews = restaurant_reviews.withColumn("rating", col("rating").cast("int"))

# Trim any trailing and leading speaces
restaurant_reviews = restaurant_reviews.withColumn("reviews", trim(col("reviews")))

# Get rid of any null value in reviews text
restaurant_reviews = restaurant_reviews.withColumn("reviews", regexp_replace("reviews", "\x00", ""))

## Load to PostgreSQL

In [24]:
# Upload pyspark dataframe into postgreSQL

dataBaseName = "restaurt_project"


url = f"jdbc:postgresql://127.0.0.1:5432/{dataBaseName}"
properties = {
    "user": "postgres",
    "password": "2050",
    "driver": "org.postgresql.Driver",
    "encoding": "utf-8"
}

table_name = "restaurants_reviews"

restaurant_reviews.write.jdbc(url=url, table=table_name, mode="overwrite", properties=properties)

### In case you need csv or parquet files, you can run the folowwing cells.

In [145]:
# ## Write csv files partitioned by year
# restaurant_reviews \
#     .coalesce(1) \
#     .write \
#     .partitionBy("year") \
#     .option("header", "true") \
#     .option("escape", "\"") \
#     .mode("overwrite") \
#     .csv("data/reviews_data")

In [81]:
# ## Write parquet files partitioned by year
# restaurant_reviews \
#     .coalesce(1) \
#     .write \
#     .partitionBy("year") \
#     .option("header", "true") \
#     .option("escape", "\"") \
#     .mode("overwrite") \
#     .parquet("data/reviews_data_parquet")

## Metadata Ingestion

In [38]:
restaurants_metadata.printSchema()

root
 |-- MISC: struct (nullable = true)
 |    |-- Accessibility: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Amenities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Atmosphere: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Crowd: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Dining options: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- From the business: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Getting here: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health & safety: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health and safety: array (nullable = true)


### Unpacking sub fields in MISC column

In [125]:
# For testing porpuses, let's grab some data from the hole dataset- 
columns_for_miscelaneus = ["gmap_id", "MISC"]
miscelaneus = restaurants_metadata.selectExpr(columns_for_miscelaneus)
# Take only 10 rows form dataset
testing = miscelaneus.limit(10)

In [126]:
from pyspark.sql.functions import col

# Define the list of subfields
# Getting field's names from struct type -keys-
misc_schema = (testing.select("MISC")
               # Getting schema
               .schema[0]
               # Then dtype
               .dataType)

sub_fields = [field.name for field in misc_schema.fields]

# Creating a list of column expresions
column_expr = [col(f"MISC.{subField}").alias(f"{subField}") for subField in sub_fields]

# Select all existing columns along with the new columns
miscelaneus = miscelaneus.select(*testing.columns, *column_expr).drop("MISC")

In [128]:
table_name = "misc_categories"

miscelaneus.write.jdbc(url=url, table=table_name, mode="overwrite", properties=properties)