In [None]:
!pip install pyspark polars gcsfs fastparquet



In [None]:
import os
import json
import pandas as pd
import polars as pl
from datetime import date, timedelta, datetime
import time
import re

import pyspark.pandas as ps
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

from google.cloud import storage
import pyarrow.parquet as pq
from google.colab import auth
from google.colab import drive
drive.mount('/content/drive')
auth.authenticate_user()



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
spark = SparkSession.builder \
        .appName("ETL_yelp") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.jars", "/path/to/gcs-connector-hadoop2-latest.jar") \
        .getOrCreate()

In [None]:
spark

## `user`

Data del usuario incluyendo referencias a otros usuarios amigos y a toda la metadata asociada al usuario.

In [None]:
users = spark.read.format("parquet").option("header", "true").load('/content/drive/MyDrive/Yelp/user.parquet')

In [None]:
users.show(5)

users.printSchema()

+--------------------+------+------------+-------------------+------+-----+-----+--------------------+--------------------+----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|             user_id|  name|review_count|      yelping_since|useful|funny| cool|               elite|             friends|fans|average_stars|compliment_hot|compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool|compliment_funny|compliment_writer|compliment_photos|
+--------------------+------+------------+-------------------+------+-----+-----+--------------------+--------------------+----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|qVc8ODYU5SZjKXVBg.

In [None]:
users.describe().show()

+-------+--------------------+------------+------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|             user_id|        name|      review_count|      yelping_since|            useful|             funny|              cool|             elite|             friends|              fans|     average_stars|   compliment_hot|   compliment_more| compliment_profile|    compliment_cute|    compliment_list|   compliment_note| compliment_plain|   compliment_cool|  compliment_funny|compliment_writer| compliment_photos|
+-------+--------------------+------------+------------------+-------------------+------------------+------------------+------------------+-------

## `tips`

Consejos escritos por el usuario.

In [None]:
tips = spark.read.json('/content/drive/MyDrive/Yelp/tip.json')

In [None]:
# Mostramos el DF
tips.show(5, False)

tips.printSchema()

+----------------------+----------------+-------------------+---------------------------------------------------------+----------------------+
|business_id           |compliment_count|date               |text                                                     |user_id               |
+----------------------+----------------+-------------------+---------------------------------------------------------+----------------------+
|3uLgwr0qeCNMjKenHJwPGQ|0               |2012-05-18 02:17:21|Avengers time with the ladies.                           |AGNUgVwnZUey3gcPCJ76iw|
|QoezRbYQncpRqyrLH6Iqjg|0               |2013-02-05 18:35:10|They have lots of good deserts and tasty cuban sandwiches|NBN4MgHP9D3cw--SnauTkA|
|MYoRNLb5chwjQe3c_k37Gg|0               |2013-08-18 00:56:08|It's open even when you think it isn't                   |-copOvldyKh1qr-vzkDEvw|
|hV-bABTK-glh5wj31ps_Jw|0               |2017-06-27 23:05:38|Very decent fried chicken                                |FjMQVZjSqY8syIO-53KFKw|

In [None]:
# Información del DF
tips.describe().show()

+-------+--------------------+--------------------+-------------------+--------------------+--------------------+
|summary|         business_id|    compliment_count|               date|                text|             user_id|
+-------+--------------------+--------------------+-------------------+--------------------+--------------------+
|  count|              908915|              908915|             908915|              908915|              908915|
|   mean|                NULL|0.012524823553357574|               NULL|1.792114762548089...|                NULL|
| stddev|                NULL|  0.1207633932798416|               NULL|1.411112521356809...|                NULL|
|    min|---kPU91CF4Lq2-Wl...|                   0|2009-04-16 13:11:49|                   !|---r61b7EpVPkb4UV...|
|    max|zzyx5x0Z7xXWWvWnZ...|                   6|2022-01-19 20:38:55|to the apple gor...|zzxZW6U5lCCQQeVfL...|
+-------+--------------------+--------------------+-------------------+-----------------

## `business`

Información del comercio, incluyendo localización, atributos y categorías.

In [None]:
business_pd = pd.read_pickle('/content/drive/MyDrive/Yelp/business.pkl')
business_pd = business_pd.iloc[:, :14]
business = spark.createDataFrame(business_pd)

Información de la tabla

In [None]:
business.show(5)

business.printSchema()

+--------------------+--------------------+--------------------+-------------+-----+-----------+----------+------------+-----+------------+-------+--------------------+--------------------+--------------------+
|         business_id|                name|             address|         city|state|postal_code|  latitude|   longitude|stars|review_count|is_open|          attributes|          categories|               hours|
+--------------------+--------------------+--------------------+-------------+-----+-----------+----------+------------+-----+------------+-------+--------------------+--------------------+--------------------+
|Pns2l4eNsfO8kk83d...|Abby Rappoport, L...|1616 Chapala St, ...|Santa Barbara|  NaN|      93101|34.4266787|-119.7111968|  5.0|           7|      0|{ByAppointmentOnl...|Doctors, Traditio...|                NULL|
|mpf3x-BjTdTEA3yCZ...|       The UPS Store|87 Grasso Plaza S...|       Affton|  NaN|      63123| 38.551126|  -90.335695|  3.0|          15|      1|{Business

In [None]:
# Número de filas y columnas
(business.count(), len(business.columns))

(150346, 14)

In [None]:
business.describe().show()

+-------+--------------------+--------------------+-----------------+-----------+------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|summary|         business_id|                name|          address|       city| state|      postal_code|          latitude|         longitude|             stars|      review_count|            is_open|          categories|
+-------+--------------------+--------------------+-----------------+-----------+------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|  count|              150346|              150346|           150346|     150346|150346|           150346|            150346|            150346|            150346|            150346|             150346|              150243|
|   mean|                NULL|              1252.4|7369.333333333333|       NULL|   NaN|45177.8175542610

Eliminamos columnas innecesarias

In [None]:
business = business.drop("postal_code", "is_open", "hours", "attributes")

In [None]:
business.show(5)

+--------------------+--------------------+--------------------+-------------+-----+----------+------------+-----+------------+--------------------+
|         business_id|                name|             address|         city|state|  latitude|   longitude|stars|review_count|          categories|
+--------------------+--------------------+--------------------+-------------+-----+----------+------------+-----+------------+--------------------+
|Pns2l4eNsfO8kk83d...|Abby Rappoport, L...|1616 Chapala St, ...|Santa Barbara|  NaN|34.4266787|-119.7111968|  5.0|           7|Doctors, Traditio...|
|mpf3x-BjTdTEA3yCZ...|       The UPS Store|87 Grasso Plaza S...|       Affton|  NaN| 38.551126|  -90.335695|  3.0|          15|Shipping Centers,...|
|tUFrWirKiKi_TAnsV...|              Target|5255 E Broadway Blvd|       Tucson|  NaN| 32.223236| -110.880452|  3.5|          22|Department Stores...|
|MTSW4McQd7CbVtyjq...|  St Honore Pastries|         935 Race St| Philadelphia|   CA|39.9555052| -75.155564

Buscamos los 5 estados con más negocios

In [None]:
# Registramos el DF business como una tabla temporal 'business'
business.createOrReplaceTempView('business')

In [None]:
spark.sql("""
            SELECT COUNT(*) AS Number, state
            FROM business
            GROUP BY state
            ORDER BY Number DESC
            LIMIT 5
""").show()

+------+-----+
|Number|state|
+------+-----+
| 34039|   PA|
| 26329|   FL|
| 12056|   TN|
| 11246|   IN|
| 10913|   MO|
+------+-----+



Filtramos por los estados que se trabajarán y las filas en donde la columna **categories** contenga las palabras 'Food' y 'Restaurants'

In [None]:
business = spark.sql("""
                          SELECT *
                          FROM business
                          WHERE state IN ("PA", "FL") AND
                                (categories LIKE '%Food%' OR
                                 categories LIKE '%Restaurant%')
            """)

business.show(5, truncate=False)

business.printSchema()

business.count()

+----------------------+--------------------+------------------+------------+-----+-------------+--------------+-----+------------+-------------------------------------------------------------------------------------------------------------+
|business_id           |name                |address           |city        |state|latitude     |longitude     |stars|review_count|categories                                                                                                   |
+----------------------+--------------------+------------------+------------+-----+-------------+--------------+-----+------------+-------------------------------------------------------------------------------------------------------------+
|0bPLkL0QhhPO5kt1_EXmNQ|Zio's Italian Market|2575 E Bay Dr     |Largo       |FL   |27.9161159   |-82.7604608   |4.5  |100         |Food, Delis, Italian, Bakeries, Restaurants                                                                  |
|kfNv-JZpuN6TVNSO6hHdkw|Hibachi 

25848

## `reviews`

Reseñas completas, incluyendo el user_id que escribió el review y el business_id por el cual se escribe la reseña.

In [None]:
reviews = spark.read.json('/content/drive/MyDrive/Yelp/review.json')

In [None]:
reviews.show(5)

reviews.printSchema()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [None]:
reviews.describe().show()

+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|summary|         business_id|              cool|               date|              funny|           review_id|             stars|                  text|            useful|             user_id|
+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|  count|             6990280|           6990280|            6990280|            6990280|             6990280|           6990280|               6990280|           6990280|             6990280|
|   mean|                NULL|0.4986175088837643|               NULL|0.32655959417934616|                NULL|  3.74858374771826|                  NULL|1.1846089140921394|                NULL|
| stddev|                NULL|2.172

Eliminamos columnas innecesarias

In [None]:
reviews = reviews.drop("date", "review_id", "user_id")

In [None]:
reviews.show(5)

+--------------------+----+-----+-----+--------------------+------+
|         business_id|cool|funny|stars|                text|useful|
+--------------------+----+-----+-----+--------------------+------+
|XQfwVwDr-v0ZS3_Cb...|   0|    0|  3.0|If you decide to ...|     0|
|7ATYjTIgM3jUlt4UM...|   1|    0|  5.0|I've taken a lot ...|     1|
|YjUWPpI6HXG530lwP...|   0|    0|  3.0|Family diner. Had...|     0|
|kxX2SOes4o-D3ZQBk...|   1|    0|  5.0|Wow!  Yummy, diff...|     1|
|e4Vwtrqf-wpJfwesg...|   1|    0|  4.0|Cute interior and...|     1|
+--------------------+----+-----+-----+--------------------+------+
only showing top 5 rows



In [None]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)



Filtramos los reviews de los restaurantes para los estados elegidos

In [None]:
# Registramos los DFs reviews y business(limpio) como tablas temporales 'reviews' y 'business'
reviews.createOrReplaceTempView('reviews')
business.createOrReplaceTempView('business')

In [None]:
reviews = spark.sql("""
                            SELECT r.*
                            FROM reviews r
                            JOIN business b ON r.business_id = b.business_id
                        """)

In [None]:
reviews.show(5)

+--------------------+----+-----+-----+--------------------+------+
|         business_id|cool|funny|stars|                text|useful|
+--------------------+----+-----+-----+--------------------+------+
|neL2xrin-uMJl5ABK...|   1|    1|  3.0|I have mix feelin...|     1|
|neL2xrin-uMJl5ABK...|   0|    0|  4.0|OK, so I've been ...|     1|
|ctOOp80WBFPj3wPZy...|   0|    0|  3.0|Not a bad taco be...|     1|
|neL2xrin-uMJl5ABK...|   0|    0|  2.0|Bleh. Big disappo...|     2|
|neL2xrin-uMJl5ABK...|   0|    0|  4.0|Boyfriend and I o...|     1|
+--------------------+----+-----+-----+--------------------+------+
only showing top 5 rows



In [None]:
reviews.count()

2053858

Guardamos los archivos limpios en el datalake

In [None]:
business = business.toPandas()

In [None]:
business.to_parquet('gs://yelp-and-maps-data-processed/Yelp/business_clean.parquet')

In [None]:
review_business = reviews.toPandas()

In [18]:
review_business.to_parquet('gs://yelp-and-maps-data-processed/Yelp/review_clean.parquet')