In [7]:
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev, col, abs, split, explode, count, when, sum, countDistinct
import traceback
warnings.filterwarnings("ignore")

In [8]:
caminho_jar = "C:\\Spark\\jars\\postgresql-42.7.3.jar" 

In [9]:
spark = SparkSession.builder \
                    .appName('yelp_data') \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.sql.shuffle.partitions", "4") \
                    .config("spark.driver.extraClassPath", caminho_jar) \
                    .config("spark.executor.extraClassPath", caminho_jar) \
                    .getOrCreate()


In [10]:
#import yelp review data into dataframe
yelp_review = spark.read.json("./data/yelp_academic_dataset_review.json")
# import yelp business data into dataframe
yelp_business = spark.read.json('./data/yelp_academic_dataset_business.json')
# import yelp user data into dataframe
yelp_user = spark.read.json('./data/yelp_academic_dataset_user.json')
# import yelp tip data into dataframe
yelp_tip = spark.read.json('./data/yelp_academic_dataset_tip.json')
# import yelp checkin data into dataframe
yelp_checkin = spark.read.json('./data/yelp_academic_dataset_checkin.json')

In [11]:
# yelp_review.printSchema()
# yelp_business.printSchema()
# yelp_user.printSchema()
# yelp_tip.printSchema()
# yelp_checkin.printSchema()

In [12]:
cont_yelp_review = yelp_review.count() #review = 6990280
print("Tamanho da base de dados yelp_review:", cont_yelp_review)
cont_yelp_business = yelp_business.count() #business = 150346
print("Tamanho da base de dados yelp_business:", cont_yelp_business)
cont_yelp_user = yelp_user.count() #user = 1987897
print("Tamanho da base de dados yelp_user:", cont_yelp_user)
cont_yelp_tip = yelp_tip.count() #tip = 908915
print("Tamanho da base de dados yelp_tip:", cont_yelp_tip)
cont_yelp_checkin = yelp_checkin.count() #checkin = 131930
print("Tamanho da base de dados yelp_checkin:", cont_yelp_checkin)


Tamanho da base de dados yelp_review: 6990280
Tamanho da base de dados yelp_business: 150346
Tamanho da base de dados yelp_user: 1987897
Tamanho da base de dados yelp_tip: 908915
Tamanho da base de dados yelp_checkin: 131930


In [13]:
yelp_business.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{NULL, NULL, NULL...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                NULL|      0|   34.4266787|  -119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{NULL, NULL, NULL...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1|   

In [14]:
business_sem_nulos = yelp_business.filter(yelp_business['categories'].isNotNull())
category = business_sem_nulos.select('categories')
individual_category = business_sem_nulos.select(explode(split('categories', ',')).alias('category'))
grouped_category = individual_category.groupby('category').count()
top_category = grouped_category.sort('count',ascending=False)
top_category.show(10,truncate=False)

+-----------------+-----+
|category         |count|
+-----------------+-----+
| Restaurants     |36978|
| Food            |20998|
| Shopping        |18915|
|Restaurants      |15290|
| Home Services   |10563|
| Nightlife       |9990 |
| Beauty & Spas   |9907 |
| Bars            |9130 |
| Health & Medical|8832 |
| Local Services  |8556 |
+-----------------+-----+
only showing top 10 rows



In [15]:
#Obtendo a quantidade de categorias 
num_of_unique_categories = business_sem_nulos.select('business_id','categories').withColumn("categories", explode(split("categories", ", "))).select('categories').distinct().count()
print(f'Number of Unique Categories: {num_of_unique_categories}')

Number of Unique Categories: 1311


In [16]:
#Separando as categorias por linha
business_exploded = business_sem_nulos.select('*',explode(split("categories", ", ")).alias('category'))
# business_exploded.show()

In [17]:
#Obtendo exemplos de business_id
# business_exploded.select('business_id').collect()

# Visualizando aparição do businnes_id por categoria
# business_exploded.filter(business_exploded['business_id']=='Ucl9Vo5lwrUmYbV8Dv8X5g').show()

In [18]:
#Matendo apenas a primeira categoria para o business_id
business_sem_duplicatas = business_exploded.select('*').dropDuplicates(['business_id'])

#Visualizando o dataframe sem as duplicidades
# business_sem_duplicatas.filter(business_sem_duplicatas['business_id']=='Ucl9Vo5lwrUmYbV8Dv8X5g').show()
# business_sem_duplicatas.show()

In [19]:
# colunas_atributos = business_sem_duplicatas.select('attributes.*').columns
# #Filtrando apenas as colunas que contêm valores True ou False
# colunas_com_true_false = [col('attributes.' + c) for c in colunas_atributos
#                           if business_sem_duplicatas.filter(col('attributes.' + c).isin(['True', 'False'])).count() > 0]

# # Selecionando as colunas originais do DataFrame mais as colunas de atributos que contêm True ou False
# bussines_att_filtrado = business_sem_duplicatas.select(['*'] + colunas_com_true_false)
# bussines_att_filtrado.show()


In [20]:
business_sem_duplicatas_filtrado = business_sem_duplicatas.select("business_id", "city", "is_open", "review_count", "stars", "state", "category")

In [21]:
# yelp_tip 
# yelp_tip.show()

# Mantendo apenas os business_id's da base bussines_att_filtrado
tip_filtrado = yelp_tip.select('business_id', 'compliment_count')
tip_filtrado = business_sem_duplicatas_filtrado.join(tip_filtrado, on='business_id', how='inner').select(tip_filtrado.columns)

# #Calculando a quantidade de elogios por business_id
tip_sum_compliment = tip_filtrado.groupBy('business_id').agg(sum('compliment_count').alias('total_compliments'))
# tip_sum_compliment.show()

In [22]:
# yelp_review 
# yelp_review.show()

# Mantendo apenas os business_id's da base bussines_att_filtrado
review_filtrado = yelp_review.select('business_id', 'stars', 'user_id')
review_filtrado = business_sem_duplicatas_filtrado.join(review_filtrado, on='business_id', how='inner').select(review_filtrado.business_id, review_filtrado.stars, review_filtrado.user_id)

#Criando um novo dataframe com a média das estrelas
df_avg_review_stars = review_filtrado.groupBy('business_id').agg(mean('stars'))
# df_avg_review_stars.show()

# Classificando as médias das estrelas em categorias 
df_avg_review_stars = df_avg_review_stars.withColumn('classification', 
                                                     when(df_avg_review_stars['avg(stars)'] > 3, 'positiva')
                                                     .when(df_avg_review_stars['avg(stars)'] == 3, 'neutra')
                                                     .otherwise('negativa'))



In [23]:
# yelp_checkin 
# yelp_checkin.show()

# Mantendo apenas os business_id's da base bussines_att_filtrado
checkin_filtrado = business_sem_duplicatas_filtrado.join(yelp_checkin, on='business_id', how='inner').select(yelp_checkin.columns)

# Separando as datas em linhas
checkin_exploded = checkin_filtrado.select('*',explode(split("date", ", ")).alias('data_checkin'))

# Fazendo contagem do business_id por data
checkin_agrupado = checkin_exploded.groupBy('business_id').agg(countDistinct('data_checkin').alias('quantidade_datas'))
# cont_date.show()


In [24]:
# yelp_user 
# yelp_user.show()
# review_filtrado.show()

# Mantendo apenas os business_id's da base bussines_att_filtrado
user_filtrado = yelp_user.join(review_filtrado, on='user_id', how='inner').select(yelp_user.average_stars, yelp_user.compliment_cool,  review_filtrado.business_id)

# Fazendo contagem do business_id por data
user_agrupado = user_filtrado.groupBy('business_id').agg(countDistinct('compliment_cool').alias('qtd_compliment_cool'))


In [25]:
dataframes = [
    (business_sem_duplicatas_filtrado, "business"),
    (tip_sum_compliment, "tip"),
    (df_avg_review_stars, "review"),
    (checkin_agrupado, "checkin")   
]

In [26]:
# Conexão com o PostgreSQL
jdbc_hostname = "localhost"  
jdbc_port = 5432  
jdbc_database = "postgres"
jdbc_username = "postgres"
jdbc_password = "password"
jdbc_url = f"jdbc:postgresql://{jdbc_hostname}:{jdbc_port}/{jdbc_database}"


# Escrever o DataFrame no PostgreSQL
for df, table_name in dataframes:
    try:
        df.write.jdbc(url=jdbc_url,
                      table=table_name,
                      mode="overwrite",  
                      properties={"user": jdbc_username, "password": jdbc_password, "driver": "org.postgresql.Driver"})
    

    except Exception as e:
        traceback.print_exc()