In [2]:
import pandas as pd
import numpy as np
import json
import gzip
import pickle
import ast
import yaml
import json
import findspark
import glob

# Inicialización de SPARK

In [3]:
import findspark
findspark.init('/opt/spark-2.4.5')

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark_configurations = SparkConf()\
    .setMaster('local[4]')\
    .setAppName('Taller_2')\
    .set("spark.driver.memory", "7g")

sc = pyspark.SparkContext(conf = spark_configurations)


spark = SparkSession\
    .builder\
    .master('local[4]')\
    .appName("Taller_2") \
    .getOrCreate()

# Funciones de apoyo para Spark

In [4]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql import types
from pyspark.sql.window import Window

def flat_df(spark_df):
    flat_cols = [c[0] for c in spark_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in spark_df.dtypes if c[1][:6] == 'struct']
    flattened_df = spark_df.select(*flat_cols, *[c + ".*" for c in nested_cols])
    return flattened_df

def missing_values_table_spark(spark_df):
    total = spark_df.count()
    funcion_porcentaje_nulos = [(100*F.sum(F.col(c).isNull().cast(types.IntegerType()))/total).alias(c) 
                                for c in spark_df.columns
                               ]
    funcion_cantidad_nulos = [(F.sum(F.col(c).isNull().cast(types.IntegerType()))).alias(c) 
                                for c in spark_df.columns
                               ]
    resultado_porcentaje = spark_df\
        .select(*funcion_porcentaje_nulos).collect()[0]
    resultado_cantidad = spark_df\
        .select(*funcion_cantidad_nulos).collect()[0]
    resultado = pd.DataFrame(zip(spark_df.columns, resultado_porcentaje, resultado_cantidad),
                             columns = ["Columna", "Porcentaje de nulos", "Cantidad de nulos"]
                            )
    resultado["Porcentaje de nulos"] = resultado["Porcentaje de nulos"].apply(lambda x : round(x, 2))
    display(resultado)
    return resultado

# Lectura de datos

In [5]:
archivos = glob.glob('./data/*.json')
#print(archivos)

# se lee el archivo business
business = spark.read.json("./data/yelp_academic_dataset_business.json")
print("business schema:" )
business.printSchema()
business = flat_df(business)
print()

# se lee el archivo user
user = spark.read.json("./data/yelp_academic_dataset_user.json")
print("user schema:" )
user.printSchema()
user = flat_df(user)
print()

# se lee el archivo review
review = spark.read.json("./data/yelp_academic_dataset_review.json")
print("review schema:" )
review.printSchema()
review = flat_df(review)
print()

# se lee el archivo checkin
checkin = spark.read.json("./data/yelp_academic_dataset_checkin.json")
print("checkin schema:" )
checkin.printSchema()
checkin = flat_df(checkin)
print()

# se lee el archivo tip
tip = spark.read.json("./data/yelp_academic_dataset_tip.json")
print("tip schema:" )
tip.printSchema()
tip = flat_df(tip)
print()


business schema:
root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- Go

# Reporte de nulos

In [6]:
print("business null report:" )
missing_values_table_spark(business)
print()
print()

print("user null report:" )
missing_values_table_spark(user)
print()
print()

print("review null report:" )
missing_values_table_spark(review)
print()
print()

print("checkin null report:" )
missing_values_table_spark(checkin)
print()
print()

print("tip null report:" )
missing_values_table_spark(tip)
print()
print()


business null report:


Unnamed: 0,Columna,Porcentaje de nulos,Cantidad de nulos
0,address,0.0,0
1,business_id,0.0,0
2,categories,0.25,524
3,city,0.0,0
4,is_open,0.0,0
5,latitude,0.0,0
6,longitude,0.0,0
7,name,0.0,0
8,postal_code,0.0,0
9,review_count,0.0,0




user null report:


Unnamed: 0,Columna,Porcentaje de nulos,Cantidad de nulos
0,average_stars,0.0,0
1,compliment_cool,0.0,0
2,compliment_cute,0.0,0
3,compliment_funny,0.0,0
4,compliment_hot,0.0,0
5,compliment_list,0.0,0
6,compliment_more,0.0,0
7,compliment_note,0.0,0
8,compliment_photos,0.0,0
9,compliment_plain,0.0,0




review null report:


Unnamed: 0,Columna,Porcentaje de nulos,Cantidad de nulos
0,business_id,0.0,0
1,cool,0.0,0
2,date,0.0,0
3,funny,0.0,0
4,review_id,0.0,0
5,stars,0.0,0
6,text,0.0,0
7,useful,0.0,0
8,user_id,0.0,0




checkin null report:


Unnamed: 0,Columna,Porcentaje de nulos,Cantidad de nulos
0,business_id,0.0,0
1,date,0.0,0




tip null report:


Unnamed: 0,Columna,Porcentaje de nulos,Cantidad de nulos
0,business_id,0.0,0
1,compliment_count,0.0,0
2,date,0.0,0
3,text,0.0,0
4,user_id,0.0,0






# Limpieza

Dado que la base **review** será el pilar para poder hacer por lo menos uno de los sistemas de recomendacion, se limpia y se deja lista para utilizarla en los modelos.

In [7]:
review = review\
    .select(*([F.col("date").cast(types.TimestampType())]+[F.col(c) for c in review.columns if c!="date"]))\
    .select("*", F.year(col("date")).alias("year_review"))


# Base para Filtrado Colaborativo

Primero definimos los filtros y las ventanas para los groupby.

In [7]:
w_user_business = Window\
    .partitionBy(["user_id", "business_id"])
w_year =  Window\
    .partitionBy(col("year_review"))
open_businesses_id = [row.business_id for row in business.select(col('business_id')).distinct().collect()]

Ahora generamos la base de los ultimos review hechos por cada persona.

In [49]:
# se sacan solamente el ultimo review por negocio por persona de negocios abiertos
last_review = review\
    .select(col("user_id"), 
            col("business_id"),
            col("year_review"),
            col("review_id"),
            col("stars"),
           )\
    .filter(col("last_review")==col('date'))\
    .drop_duplicates(subset = ["user_id", "business_id"])

Finalmente se guardan los datos de la base.

In [None]:
#last_review.write.json("data/CF_base")

# Fixtures

Primero definimos la funcion que genera los información que alimenta la base de datos de la página web. A estos datos los llamamos fixtures (nomenclatura de Django).

In [None]:
def generar_fixtures(data:pd.DataFrame, name_model : str, name_fixture : str, sample_subset = None, max_index = 0):
    name_file = name_fixture
    if sample_subset:
        data = data.sample(sample_subset)
        name_file = name_fixture+"_"+sample_subset
    # json
    fixture_json = []
    for index, row in data.iterrows():
        actual = {}
        actual["model"] = "taller_2."+name_model
        actual["pk"] = max_index + index
        #row["date"] = pd.to_datetime(row["date"]).strftime("%Y-%m-%dT%H:%M")
        actual["fields"] = row.to_dict()
        #print(actual["fields"])
        fixture_json.append(actual)
    with open('data/fixtures/'+name_file+'.json', 'w', encoding='utf-8') as json_file:
        json.dump(fixture_json, json_file,indent=4, sort_keys=True, default=str, ensure_ascii = False)

Debido a que la información es muy grande se parte en varios bloques.

Generacion de Review:

In [None]:
max_index = 1187872+1274445+1
review_2019 = last_review.filter(col('year_review')==2019).toPandas()

generar_fixtures(review, "Review", "review_2019", max_index = 0)

Generacion de User:

In [None]:
user_pd = []
with open("./data/yelp_academic_dataset_user.json") as f:
    for jsonObj in f:
        dataframe_row = json.loads(jsonObj)
        user_pd.append(dataframe_row)
f.close()
user_pd = pd.DataFrame.from_records(user_pd)

generar_fixtures(user_pd, "User", "user_basic_info")

Generacion de Business:

In [None]:
business_pd = business\
    .select(['address',
             'business_id',
             'categories',
             'city',
             'is_open',
             'latitude',
             'longitude',
             'name',
             'postal_code',
             'review_count',
             'stars',
             'state'
            ])\
    .toPandas()

generar_fixtures(business_pd, "Business", "businesses_basic_info")