# Initialization

In [4]:
import pyspark
import numpy as np
import pandas as pd
import pandas.api.types

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf #needed for UDF
from pyspark.sql.types import * #needed for all the sql types
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, split, explode
from datetime import date, timedelta
from pyspark.sql.window import Window

import datetime
import operator

In [5]:
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

# Spark Session

In [6]:
## Create a spark session
spark = SparkSession \
    .builder \
    .appName("yelp_development") \
    .getOrCreate()

In [4]:
## Import dependencies libraries
spark \
    .sparkContext \
    .addPyFile("D:/EB5001/dependencies/yelp_ETL.py")
    
import yelp_ETL as yetl

In [8]:
## Path to data source
INPUT_PATH = "D:/EB5001/input/"
OUTPUT_PATH = "D:/EB5001/output/"

# Yelp Business

In [6]:
df_business = yetl.import_yelp_data(spark, 'business', INPUT_PATH)

In [7]:
df_business = yetl.transform_business(df_business)

# Yelp Tip

In [8]:
df_tip = yetl.import_yelp_data(spark, 'tip', INPUT_PATH)

# Business Level Data Preparation

## Tip Business

In [9]:
df_tip_business = yetl.transform_tip_business(df_tip, df_business)

## Data Loading

In [10]:
# yetl.load_yelp_data_daily(df_tip_business, 'tip', OUTPUT_PATH)

## Aggregation

In [11]:
df_tip_combined = yetl.import_yelp_yearly_data(spark, 'tip', '2017', OUTPUT_PATH)
# df_tip_combined = yetl.transform_tip_business_yearly(df_tip_combined)

In [12]:
# df_tip_combined.show()

In [13]:
# yetl.load_yelp_data_yearly(df_tip_combined, 'tip', '2017', OUTPUT_PATH)

## Extract

In [9]:
df_tip_2017 = spark.read.parquet(OUTPUT_PATH + 'review/2017_parquet/*')

In [10]:
df_tip_2017.printSchema()

root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- avg_stars: double (nullable = true)
 |-- total_review_count: long (nullable = true)
 |-- city: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- categories: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_text: string (nullable = true)



In [19]:
df_tip_2017.show()

+----------+----+--------------------+---------+------------------+----------+--------------------+-------------+---------------+--------------------+----------------+--------------------+
|      date|hour|                name|avg_stars|total_review_count|      city|             address|     latitude|      longitude|          categories|compliment_count|            tip_text|
+----------+----+--------------------+---------+------------------+----------+--------------------+-------------+---------------+--------------------+----------------+--------------------+
|2017-05-17|   3|Stephano's Greek ...|      4.0|               295| Las Vegas|6115 S Fort Apach...|   36.0771648|   -115.2986213|            Lebanese|               0|I love the food h...|
|2017-05-17|   3|Stephano's Greek ...|      4.0|               295| Las Vegas|6115 S Fort Apach...|   36.0771648|   -115.2986213|       Mediterranean|               0|I love the food h...|
|2017-05-17|   3|Stephano's Greek ...|      4.0|       