# 1. One time Data Loading -  scripts/data_processing.py

    - We are loading Users and Business Data as registered entities and saving in parquet format in S3.
    - Other streaming datasets we are loading some part of it, so the system will have base data
    - Review (any business place), Checkin and Tips data will be processed as stream.

In [None]:
import sys

from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql.types import StructType, StructField, DoubleType, LongType
from pyspark.sql.types import TimestampType

from sampling import *
from sentiment import *

global sample


baseInputPath = baseInputPath


def process_user_data(spark, sample):
    """
    This function processes user data in PySpark performs transformations,
    writes to Parquet, and finally returns the user DataFrame.
    It uses caching by checking if the parquet file exists.

    if sampling is defined, it will join the sampled users with the user data, to speed up the process.

    It will read all the users data registered in the yelp platform and do the required transformations like
        - converting the yelping_since column to timestamp
        - making the elite column as array
    """
    try:
        userDf = spark.read.parquet(f"{sample_output_path(sample)}/user") 
    except Exception as e:

        sampled_users, is_sampled = get_sampled_users_data(spark, sample)
        userDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_user.json')
        if is_sampled:
            userDf = userDf.join(sampled_users, on = ["user_id"])

        userDf = userDf \
            .drop("friends") \
            .withColumn("elite", split(col("elite"), ", ")) \
            .withColumn("yelping_since", col("yelping_since").cast("timestamp"))

        userDf.repartition(1).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/user")
        userDf = spark.read.parquet(f"{sample_output_path(sample)}/user")
        print(f"sample users ares = {userDf.count()}")
    return userDf


def process_business_data(spark, sample):
    """
    This function processes user data in PySpark performs transformations,
    writes to Parquet, and finally returns the business DataFrame.
    It uses caching by checking if the parquet file exists.

    it will do the following transformations:
        - convert the categories column to array (this will help us to do the aggregation on the categories)
        - convert the hours column to map
        - convert the attributes column to map
        - etc.

    """
    try:
        businessDf = spark.read.parquet(f"{sample_output_path(sample)}/business")
    except Exception as e:

        schema = StructType([
            StructField("address", StringType(), True),
            StructField("attributes", MapType(StringType(), StringType()), True),
            StructField("business_id", StringType(), True),
            StructField("categories", StringType(), True),
            StructField("city", StringType(), True),
            StructField("hours", MapType(StringType(), StringType()), True),
            StructField("is_open", LongType(), True),
            StructField("latitude", DoubleType(), True),
            StructField("longitude", DoubleType(), True),
            StructField("name", StringType(), True),
            StructField("postal_code", StringType(), True),
            StructField("review_count", LongType(), True),
            StructField("stars", DoubleType(), True),
            StructField("state", StringType(), True),
        ])

        sampled_business, is_sampled = get_sampled_business_data(spark, sample)
        businessDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_business.json', schema)
        if is_sampled:
            businessDf = businessDf.join(sampled_business, on = ["business_id"])

        businessDf = businessDf \
            .withColumn("categories", split(col("categories"), ", "))
        businessDf.repartition(2).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/business")
        businessDf = spark.read.parquet(f"{sample_output_path(sample)}/business")
        print(f"sample business ares = {businessDf.count()}")

    return businessDf


def process_friends_data(spark, sample):
    """
    This function processes friends data in Spark, attempting to read a parquet file,
    and in case of an exception, retrieves sampled user data, joins it if sampled, selects
    relevant columns, prints the schema, writes the data to parquet, reads it again, and finally
    returns the resulting DataFrame.
    """
    try:
        friendsDf = spark.read.parquet(f"{sample_output_path(sample)}/friends")
    except Exception as e:

        sampled_users, is_sampled = get_sampled_users_data(spark, sample)
        friendsDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_user.json')
        if is_sampled:
            friendsDf = friendsDf.join(sampled_users, on = ["user_id"])

        friendsDf = friendsDf.select("user_id", split(col("friends"), ", ").alias("friends"))

        friendsDf.printSchema()
        friendsDf.repartition(2).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/friends")
        friendsDf = spark.read.parquet(f"{sample_output_path(sample)}/friends")
        print("sample friends ares = ", friendsDf.count())
    return friendsDf


# def process_checkin_data(spark, sample):
#     """
#     these are one time historical data load, so system will have some initial data, and later it
#     will come as stream.
#     """
#     try:
#         checkinDf = spark.read.parquet(f"{sample_output_path(sample)}/checkin")
#     except Exception as e:

#         sampled_business, is_sampled = get_sampled_business_data(spark, sample)
#         checkinDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_checkin.json')
#         if is_sampled:
#             checkinDf = checkinDf.join(sampled_business, on = ["business_id"])

#         checkinDf = checkinDf \
#             .withColumn("date", expr("transform(split(date, ', '), d -> to_timestamp(d))").cast(ArrayType(TimestampType())))

#         checkinDf.printSchema()

#         checkinDf.repartition(1).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/checkin")
#         checkinDf = spark.read.parquet(f"{sample_output_path(sample)}/checkin")
#         print("sample checkin ares = ", checkinDf.count())
#     return checkinDf


# def process_tip_data(spark, sample):
#     """
#     these are one time historical data load, so system will have some initial data, and later it
#     will come as stream.
#     """
#     try:
#         tipDf = spark.read.parquet(f"{sample_output_path(sample)}/tip")
#     except Exception as e:

#         sampled_users, is_sampled = get_sampled_users_data(spark, sample)
#         tipDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_tip.json')
#         if is_sampled:
#             tipDf = tipDf.join(sampled_users, on = ["user_id"])

#         tipDf = tipDf.withColumn("date", col("date").cast("timestamp"))

#         tipDf.repartition(1).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/tip")
#         tipDf = spark.read.parquet(f"{sample_output_path(sample)}/tip")
#         print("sample tip ares = ", tipDf.count())
#     return tipDf


# def process_review_data(spark, sample):
#     """
#     these are one time historical data load, so system will have some initial data, and later it
#     will come as stream.
#     """
#     try:
#         reviewDf = spark.read.parquet(f"{sample_output_path(sample)}/review")
#     except Exception as e:
#         sampled_users, is_sampled = get_sampled_users_data(spark, sample)
#         reviewDf = spark.read.json(f'{baseInputPath}/yelp_academic_dataset_review.json')
#         if is_sampled:
#             reviewDf = reviewDf.join(sampled_users, on = ["user_id"])

#         reviewDf = reviewDf \
#             .withColumn("date", col("date").cast("timestamp")) \
#             .withColumn("sentiment",  get_sentiment(col("text"))) \
#             .withColumn("frequent_words", tokenize_and_get_top_words(col("text")))

#         reviewDf.printSchema()
#         reviewDf.repartition(4).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/review")
#         reviewDf = spark.read.parquet(f"{sample_output_path(sample)}/review")
#         print("sample review ares = ", reviewDf.count())
#     return reviewDf


if __name__ == "__main__":

    if len(sys.argv) != 4:
        print("Usage: data_processing.py <baseInputPath> <baseOutputPath> <sample>")
        exit(-1)

    baseInputPath = sys.argv[1]
    baseOutputPath = sys.argv[2]
    sample = float(sys.argv[3])

    sparkSession = init_spark()
    user_df = process_user_data(sparkSession, sample)
    business_df = process_business_data(sparkSession, sample)
    friends_df = process_friends_data(sparkSession, sample)
    checkin_df = process_checkin_data(sparkSession, sample)
    tip_df = process_tip_data(sparkSession, sample)
    review_df = process_review_data(sparkSession, sample)

    sparkSession.stop()


# 2. Sentiment analysis and word tokenization - scripts/sentiment.py
    
    - This is a class which have all the natural language processing functions.
    - This class is used to perform sentiment analysis and tokenize the words.
    - It uses NLTK python library for that


In [None]:
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk
from nltk.tokenize import word_tokenize
from nltk.probability import FreqDist
from nltk.corpus import stopwords
from pyspark.sql.types import StringType, ArrayType, MapType
from pyspark.sql.functions import col, udf

nltk.download('vader_lexicon')
nltk.download('stopwords')
nltk.download('punkt')
sia = SentimentIntensityAnalyzer()
stop_words = set(stopwords.words('english'))


@udf(StringType())
def get_sentiment(text):
    """
    This method will return the sentiment of the text.
    It will use the nltk vader sentiment analyzer.
    it will return the sentiment as positive, negative or neutral.
    """
    sentiment_score = sia.polarity_scores(text)["compound"]
    if sentiment_score >= 0.05:
        return "positive"
    elif sentiment_score <= -0.05:
        return "negative"
    else:
        return "neutral"


@udf(ArrayType(StringType()))
def tokenize_and_get_top_words(text, sample_size=0.0001):
    """
    This method will tokenize the text and will return the top 10 words.
    it will also remove the stop words to capture the most important words.
    """
    tokens = word_tokenize(text)
    tokens = [word.lower() for word in tokens if word.isalpha()]
    tokens = [word for word in tokens if word not in stop_words]
    freq_dist = FreqDist(tokens)
    top_words = [word  for word, k in freq_dist.most_common(10)]
    return top_words



# 3. Streaming Data Ingestion - scripts/data_ingestion.py

    - this module will handle the incremental data loading from Kafka brokers.
    - the process will consume review, tips, checkin data from kafka, as the users and business will be    registered but these 3 data sets are dynamic.
    - We will read data from kafka, enrich the batch and then write it 'append' mode.


    -  This is the main method, which will be called when the program will be executed.
    - it will read the arguments from the command line.
    - it will also initialize the spark session.
    - it will also call the consumer class to read the data from the kafka topic.
    - it will call the methods to read the data from the kafka topic.
    - it will also call the methods to write the data to the parquet files.

In [2]:
import sys

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

from commons import *
from sampling import *
from sentiment import *

global sample

class Consumer:
    """
    - This class will consume the data from kafka and write to the parquet files.
    - While writing to the parquet files, it will also perform some transformations.
    - It will also perform sampling if the sample is specified.
    - it will also perform sentiment analysis and tokenize the words to
    - capture the most frequent words and sentiment of the review.
    """

    def __init__(self, server, output_path):
        self.server = server
        self.output_path = output_path

    def read_from_topic(self, spark, topic):
        """
        Reading Data from the kafka topic.
        """
        print(f"reading data from the topic = ", topic, "server = ", self.server)
        df = (
            spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.server)
            .option("startingOffsets", "earliest")
            .option("subscribe", topic)
            .load()
        )
       
    
    print(f"is spark is reading the streams from kafka = {df.isStreaming}")
        df.printSchema()
        return df.withColumn("json_string", col("value").cast(StringType()))

    
    
    def write_stream(self, df_result, topic_name):
        """
        Write stream data to the parquet files.
        it writes data as append mode, to avoid whole data rewrite.
        we are using trigger to write data every 10 seconds.
        """
        writer = df_result \
            .writeStream \
            .outputMode("append") \
            .format("parquet") \
            .option("path", f"{self.output_path}/{topic_name}/data") \
            .option("checkpointLocation", f"{self.output_path}/{topic_name}/checkpoint") \
            .trigger(processingTime="10 seconds") \
            .start()
        writer.awaitTermination()

        
        
    def read_checkins(self, spark):
        """
        This method will read the checkins data from the kafka topic.
        it also performs the transformation on the data, by converting the json string to the dataframe.
        """
        topicName = "checkins"
        stream_df = self.read_from_topic(spark, topicName)
        schema = StructType([
            StructField("business_id", StringType(), True),
            StructField("date", StringType(), True),
        ])
        df_result = stream_df.select(from_json(col("json_string"), schema).alias("data"))
        self.write_stream(df_result, topicName)

    def read_tips(self, spark):
        """
        This method will read the tips data from the kafka topic.
        it also performs the transformation on the data, by converting the json string to the dataframe.
        """
        topicName = "tips"
        stream_df = self.read_from_topic(spark, topicName)
        schema = StructType([
            StructField("business_id", StringType(), True),
            StructField("compliment_count", LongType(), True),
            StructField("date", StringType(), True),
            StructField("text", StringType(), True),
            StructField("user_id", StringType(), True),
        ])
        df_result = stream_df.select(from_json(col("json_string"), schema).alias("data"))
        self.write_stream(df_result, topicName)

    def process_review_data_df(self, review_df, x):
        """
        This method will process the review data batch.
        each batch will be processed and written to the parquet files.
        a batch is the new data that is read from the kafka topic for the first time.
        it will do following things:

            - perform sentiment analysis on the review text.
            - tokenize the words and get the most frequent words.
            - write the data to the parquet files.

        As this is a generic method, it will be called for each batch, and more and more functionality can be added to it.
        """
        sampled_users, is_sampled = get_sampled_users_data(spark, sample)
        if is_sampled:
            print("got sampled users ... processing that.")
            sampled_users.printSchema()
            review_df.printSchema()
            review_df = review_df.join(sampled_users, on=["user_id"])

        review_df = review_df \
            .withColumn("date", col("date").cast("timestamp")) \
            .withColumn("sentiment",  get_sentiment(col("text"))) \
            .withColumn("frequent_words", tokenize_and_get_top_words(col("text")))

        review_df.printSchema()
        review_df.repartition(1).write.mode("append").parquet(f"{sample_output_path(sample)}/review")
        print("sample review ares = ", review_df.count())
        return review_df

    def read_reviews(self, spark):
        """
        This method will read the review data from the kafka topic.
        it also performs the transformation on the data, by converting the json string to the dataframe.
        it will use the function process_review_data_df to process each batch.
        """
        topicName = "reviews"
        schema = StructType([
            StructField("business_id", StringType(), True),
            StructField("cool", LongType(), True),
            StructField("date", StringType(), True),
            StructField("funny", LongType(), True),
            StructField("review_id", StringType(), True),
            StructField("stars", DoubleType(), True),
            StructField("text", StringType(), True),
            StructField("useful", LongType(), True),
            StructField("user_id", StringType(), True),
        ])
        stream_df = self.read_from_topic(spark, topicName)
        df_result = stream_df.select(from_json(col("json_string"), schema).alias("data")).select("data.*")
        writer = df_result.writeStream.outputMode("append").foreachBatch(self.process_review_data_df).start()
        writer.awaitTermination(30)



if __name__ == "__main__":
    
    if len(sys.argv) >= 4:
        server = sys.argv[1]
        topic = sys.argv[2]
        output_path = sys.argv[3]
        sample = float(sys.argv[4])
        spark = init_spark()
        consumer = Consumer(server, output_path)
        consumer.read_reviews(spark)
        consumer.read_tips(spark)
        consumer.read_checkins(spark)
        spark.stop()
    else:
        print("Invalid number of arguments. Please pass the server and topic name")


# 4. Business Related Attributes - scripts/attributes/business.py

    - We are extracting following attributes from business :
    
        - Different Business categories and how frequently a user will  visit them.
        - User preferences for different categories.
        - It captures frequency as well as strars.
        
        - Businesses local will help to identify the users location.
        - It will also help to get popular travel destinations for a users, which can be used for travel related segments.

In [None]:

from pyspark.sql.functions import col, udf
from pyspark.sql.functions import collect_list, avg
from pyspark.sql.functions import explode, create_map
from pyspark.sql.functions import size
from pyspark.sql.types import IntegerType, MapType, FloatType
from pyspark.sql.types import StringType


@udf(MapType(StringType(), IntegerType()))
def merge_maps_array(map_array):
    """
    Its an UDF to merge the maps in the array.
    It will help tp bring a generalize format of the data and will make then earlier to process.

    Input data will be
        [{"Active Life": 1, "Arts & Entertainment": 1,
        "Automotive": 1, "Beauty & Spas": 1, "Education": 1}]
    Output data will be
        {"Active Life": 1, "Arts & Entertainment": 1,
        "Automotive": 1, "Beauty & Spas": 1, "Education": 1}

    """
    result = {}
    for m in map_array:
        for k, v in m.items():
            result[k] = result.get(k, 0) + v
    return result


def get_customer_category_counts(review_df, business_df):
    """
     - This method will get the customer category counts.
     - customer category count will be the count of the categories that the customer has visited.
        its important to capture the categories that the customer has visited,
        as it will help us to understand the customer's interest in the business.
     - number of categories will be the size of the map.

    """
    df = review_df.select("user_id", "business_id") \
        .join(business_df.select("business_id", "categories"), on=["business_id"]) \
        .select("user_id", explode("categories").alias("category")) \
        .groupBy("user_id", "category").count() \
        .withColumn("category_map", create_map(col("category"), col("count"))) \
        .groupBy("user_id").agg(collect_list(col("category_map")).alias("category_map")) \
        .withColumn("category_map", merge_maps_array(col("category_map"))) \
        .select("user_id", "category_map")

    return df


@udf(MapType(StringType(), FloatType()))
def merge_maps_array_float(map_array):
    """
    Its an UDF to merge the maps in the array.
    """
    result = {}
    for m in map_array:
        for k, v in m.items():
            result[k] = result.get(k, 0) + v
    return result


def get_customer_category_avg_rating(review_df, business_df):
    """
    - This method will get the customer category avg rating.
    - customer category avg rating will be the avg of the ratings that the customer has given to the business.
    - number of categories will be the size of the map.
    """
    df = review_df.select("user_id", "business_id", "stars") \
        .filter(col("stars").isNotNull()) \
        .join(business_df.select("business_id", "categories"), on=["business_id"]) \
        .select("user_id", "stars", explode("categories").alias("category")) \
        .groupBy("user_id", "category").agg(avg(col("stars")).alias("avg_stars")) \
        .withColumn("category_map", create_map(col("category"), col("avg_stars"))) \
        .groupBy("user_id").agg(collect_list(col("category_map")).alias("category_map")) \
        .withColumn("category_avg_stars", merge_maps_array_float(col("category_map"))) \
        .select("user_id", "category_avg_stars")

    return df


@udf(StringType())
def home_city(items):
    """
    This method will get the home city of the customer.
    It uses the location frequency where customer uses services and pick the most frequent city.
    """
    if items is None:
        return None
    return max(items, key=lambda x: items.count(x))


@udf(MapType(StringType(), IntegerType()))
def traveling_city(items, home):
    """
    This method will get the traveling map.
    Traveling map will be the map of the cities that the customer has visited.
    """
    if items is None:
        return None
    travel_map = {}
    for i in items:
        if i == home:
            continue
        travel_map[i] = travel_map.get(i, 0) + 1
    return travel_map


def get_customer_area(review_df, business_df):
    """
    this method will get the customer geographical area data with latitude and longitude.
    it also captures the home city and the other traveling cities and how frequently user visits them
    """
    df = review_df.select("user_id", "business_id") \
        .join(business_df.select("business_id", "city"), on=["business_id"]) \
        .select("user_id", "city") \
        .groupBy("user_id").agg(collect_list(col("city")).alias("city_freq")) \
        .withColumn("city", home_city(col("city_freq"))) \
        .withColumn("travel_map", traveling_city(col("city_freq"), col("city"))) \
        .select("user_id", "city", "travel_map") \
        .join(business_df.select("city", "latitude", "longitude").dropDuplicates(["city"]), on=["city"]) \
        .select("user_id", "city", "travel_map", "latitude", "longitude") \
        .withColumnRenamed("city", "user_city") \

    return df


# 5. Review Related Attributes - scripts/attributes/review.py

    - Here we can capture, sentiment of a review, and how frequently user gives them.
    - We can also find which key-word user uses most to get specific information like: even user dont like place he can say fries are great here.

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import explode, create_map
from pyspark.sql.types import IntegerType, MapType
from pyspark.sql.types import StringType



@udf(MapType(StringType(), IntegerType()))
def merge_maps_array(map_array):
    """
    Its an UDF to merge the maps in the array.
    It will help tp bring a generalize format of the data and will make then earlier to process.
    """
    result = {}
    for m in map_array:
        for k, v in m.items():
            result[k] = result.get(k, 0) + v
    return result


def get_sentiments_count(review_df):
    """
    - This method will get the sentiment count of the user.
    - sentiment count will be the count of the sentiment that the customer has given.
    - its important to capture the sentiment that the customer has given, and what the ratio of the sentiment is.
    """
    df = review_df.select("user_id", "sentiment").groupBy("user_id", "sentiment").count() \
        .withColumnRenamed("count", "sentiment_count") \
        .withColumn("sentiment_map", create_map(col("sentiment"), col("sentiment_count"))) \
        .groupBy("user_id").agg(collect_list(col("sentiment_map")).alias("sentiment_map")) \
        .withColumn("sentiment_map", merge_maps_array(col("sentiment_map")))

    return df


def most_frequent_words(review_df):
    """
    - This method will get the most frequent words used by the user.
    - most frequent words will be the words that the customer has used the most.
    - its important to capture the most frequent words used by the customer
    """
    return review_df.select("user_id", explode("frequent_words").alias("frequent_words")) \
        .groupBy("user_id", "frequent_words").count() \
        .withColumn("frequent_words_map", create_map(col("frequent_words"), col("count"))) \
        .groupBy("user_id").agg(collect_list(col("frequent_words_map")).alias("frequent_words_map")) \
        .withColumn("frequent_words_map", merge_maps_array(col("frequent_words_map")))



# 6. User Related Attributes  - scripts/attributes/users_agg.py

    - Compute differnt user level attributes.
        - avg_rating : average rating given by the customer
        - min_stars : minimum rating given by the customer
        - max_stars : maximum rating given by the customer
        
        - first_seen :  when the customer put their first review
        - last_seen : when the customer put their last review
        
        - date_diff : how many days the customer has been inactive. (last_seen - first_seen)
                      this can be used for customer retention.
        
        - different_business_count : number of different business the customer has visited
                                     it captures how customers is using platform.
        

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import size


def get_friends_count(friends_df):
    """
    This method will get the friends count of the user.
    friends count will be the count of the friends that the customer has.
    it captures how many friends the customer has and it's important to know if the customer is social or not
    and how big their social circle is.
    """
    df = friends_df.select("user_id", size(col("friends")).alias("friends_count"))
    return df


def get_customer_agg_value(spark, review_df):
    """
    This method will get the customer aggregation value.
    customer aggregation value will be the aggregation of the customers different attributes.
    it will compute attributes like:
        - first_seen :  when the customer put their first review
        - last_seen : when the customer put their last review
        - date_diff : how many days the customer has been inactive. (last_seen - first_seen)
                      this can be used for customer retention.
        - different_business_count : number of different business the customer has visited
                                     it captures how customers is using platform.
        - avg_rating : average rating given by the customer
        - min_stars : minimum rating given by the customer
        - max_stars : maximum rating given by the customer

    """
    review_df.createOrReplaceTempView("review")
    return spark.sql("""
                        select 
                            user_id, 
                            min(date) as first_seen, 
                            max(date) as last_seen, 
                            DATEDIFF(max(date), min(date)) as date_diff,
                            count(distinct business_id) as different_business_count,
                            avg(stars) as avg_rating,
                            min(stars) as min_stars,
                            max(stars) as max_stars
                        from review
                        group by user_id 
                    """)



# 7. Feature Aggregation - scripts/feature_aggregator.py 

    - It merges all the attributes in a single table which will have so many dimensions.
    - Each dimension will be independent of each other.
    
    - Complex data types like Map will captures multiple dimensions for users. Example
        A user can on average give 4.3 stars for his yelp review but 
        category_avg_stars = {category -> rating}, will capture that at more granular level and category can be treated as other dimension.
        
        
            

In [6]:
import sys

from data_processing import *

from attributes.users_agg import *
from attributes.business import *
from attributes.reviews import *

from storage import *


def merge_attributes(spark, sample):
    """
    - This method will merge all the attributes of the user.
    - it will merge all the attributes of the user and will create a single dataframe.
    - it will be used to create the customer profile and for our use case customer is the key
    """
    user_df = process_user_data(spark, sample)
    business_df = process_business_data(spark, sample)
    friends_df = process_friends_data(spark, sample)
    checkin_df = process_checkin_data(spark, sample)
    tip_df = process_tip_data(spark, sample)
    review_df = process_review_data(spark, sample)

    avg_catg_start_df = get_customer_category_avg_rating(review_df, business_df)
    customer_area_df = get_customer_area(review_df, business_df)
    user_agg_df = get_customer_agg_value(spark, review_df)
    user_category_df = get_customer_category_counts(review_df, business_df)
    friends_count_df = get_friends_count(friends_df)
    sentiment_count_df = get_sentiments_count(review_df)
    frequent_words_df = most_frequent_words(review_df)

    complete_user_df = user_df \
        .join(user_agg_df, on=["user_id"]) \
        .join(user_category_df, on=["user_id"]) \
        .join(friends_count_df, on=["user_id"]) \
        .join(sentiment_count_df, on=["user_id"]) \
        .join(frequent_words_df, on=["user_id"]) \
        .join(customer_area_df, on=["user_id"]) \
        .join(avg_catg_start_df, on=["user_id"])

    complete_user_df.printSchema()
    # complete_user_df.show()
    print("total counts are = ", complete_user_df.count(), "  user counts = ", user_df.count())
    
    #datacube
    complete_user_df.repartition(4).write.mode("overwrite").parquet(f"{sample_output_path(sample)}/combined")
    return spark.read.parquet(f"{sample_output_path(sample)}/combined")


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: user_attributes.py <sample>")
        exit(-1)

    sample = float(sys.argv[1])
    sparkSession = init_spark()
    merged_df = merge_attributes(sparkSession, sample)
    save_spark_df_to_db(merged_df, "users")
    sparkSession.stop()


## DEMO

### Streamlit App Link : https://yelp-customer-segmentation.streamlit.app/