In [None]:
!pip install pyspark




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, count, avg, desc, weekofyear, hour, explode, split


In [None]:
spark = SparkSession.builder \
    .appName("YelpDataAnalysis") \
    .getOrCreate()


In [None]:
business = spark.read.json("/content/bd_basics/data/yelp_academic_dataset_business.json")
#review = spark.read.json("")
#user = spark.read.json("")
checkin = spark.read.json("/content/bd_basics/data/yelp_academic_dataset_checkin.json")
tip = spark.read.json("/content/bd_basics/data/yelp_academic_dataset_tip.json")


In [None]:
#видалення пропусків
business = business.dropna(subset=["business_id", "name", "categories", "hours"])
checkin = checkin.dropna(subset=["business_id", "date"])
tip = tip.dropna(subset=["business_id", "user_id", "text"])

In [None]:
#видалення дублікатів
business = business.dropDuplicates(["business_id"])
checkin = checkin.dropDuplicates(["business_id"])
tip = tip.dropDuplicates(["business_id", "user_id"])

In [None]:
from pyspark.sql.functions import col, trim

#видалення зайвих пробілів
business = business.withColumn("name", trim(col("name"))) \
                         .withColumn("categories", trim(col("categories")))
tip = tip.withColumn("text", trim(col("text")))


In [None]:
checkin = checkin.withColumn("checkin_date", explode(split(col("date"), ", ")))

result1 = business.join(checkin, "business_id", "inner") \
    .filter(business.categories.contains("Mexican")) \
    .groupBy("business_id", "name") \
    .count() \
    .filter(col("count") > 50)

result1.write.csv("result1.csv", header=True)
result1.show()

In [None]:
result2 = business \
    .filter(col("hours").isNotNull()) \
    .filter(col("hours").getItem("Saturday").isNotNull()) \
    .groupBy("categories", col("hours").getItem("Saturday").alias("Saturday")) \
    .count() \
    .orderBy(desc("count"))

#result2.write.csv("result2.csv", header=True)
result2.show()

In [None]:
result3 = business.join(tip, "business_id", "inner") \
    .withColumn("category", explode(split(col("categories"), ", "))) \
    .groupBy("category") \
    .count() \
    .orderBy(desc("count"))

#result3.write.csv("result3.csv", header=True)
result3.show()


In [None]:
checkin_businesses = checkin.select("business_id").distinct()
tip_businesses = tip.select("business_id").distinct()

result4 = checkin_businesses.union(tip_businesses) \
    .groupBy("business_id") \
    .count() \
    .orderBy(desc("count"))

#result4.write.csv("result4.csv", header=True)
result4.show()

In [None]:
business_with_hours = business.withColumn("work_duration",
    (col("hours.Saturday").substr(7, 2).cast("int") - col("hours.Saturday").substr(1, 2).cast("int"))
)

result5 = business_with_hours.join(tip, "business_id", "inner") \
    .filter((col("work_duration") > 10) & (col("compliment_count") > 100))

#result5.write.csv("result5.csv", header=True)
result5.show()

In [None]:
result6 = business.filter(col("is_open") == 1) \
    .groupBy("city") \
    .count() \
    .orderBy(desc("count"))

#result6.write.csv("result6.csv", header=True)
result6.show()

In [None]:
result7 = business.withColumn("category", explode(split(col("categories"), ", "))) \
    .groupBy("state", "category") \
    .agg(count("business_id").alias("business_count")) \
    .orderBy(desc("business_count"))

#result7.write.csv("result7.csv", header=True)
result7.show()

In [None]:
result8 = business.filter((col("city") == "San Francisco") & (col("stars") > 4.5))

#result8.write.csv("result8.csv", header=True)
result8.show()

In [None]:
result9 = business.filter(col("categories").contains("Burgers")) \
    .orderBy(desc("review_count"))

#result9.write.csv("result9.csv", header=True)
result9.show()


In [None]:
result10 = business \
    .filter(
        (col("attributes").getItem("BusinessParking") == "true") &  # если это строка "true"
        (col("stars") > 4) &
        ((col("hours").getItem("Saturday").isNotNull()) | (col("hours").getItem("Sunday").isNotNull()))
    )

#result10.write.csv("result10.csv", header=True)
result10.show()
