# Graph Analysis

## If you already saw "graphs-setup-and-plot.ipynb", you can skip parts 1-4 and move onto part 5

## 1. Project Setup & Environment

In [None]:
# Install & configure dependencies
!pip install pyspark
!pip install graphframes



In [None]:
# spark functionality
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, lit, year, month, split, explode, size, regexp_replace, trim, row_number, rand, broadcast
from pyspark.ml.feature import StringIndexer # incoding strings
from pyspark.ml.evaluation import RegressionEvaluator # for evaluation
from graphframes import GraphFrame # for graphs
from pyspark.sql.window import Window

# for visualizations
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx

from networkx.algorithms import bipartite # for bipartite

from google.colab import drive
from pyspark.sql import Row

In [None]:
# initialize SparkSession with GraphFrames support
spark = SparkSession.builder \
    .appName("YelpEDA_GraphAnalysis") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()

# verify the session
spark

## 2. Data Ingestion

In [None]:
business_df = spark.read.json("/kaggle/input/yelp-dataset/yelp_academic_dataset_business.json") \
    .select("business_id", "name", "categories")
user_df = spark.read.json("/kaggle/input/yelp-dataset/yelp_academic_dataset_user.json") \
    .select("user_id", "name", "review_count", "average_stars", "friends")
review_df = spark.read.json("/kaggle/input/yelp-dataset/yelp_academic_dataset_review.json") \
    .select("user_id", "business_id", "stars", "date")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/kaggle/input/yelp-dataset/yelp_academic_dataset_business.json.

## 3. Graph Creation

In [None]:
# create vertices for the graphs

# User vertices for friendship graph
target_user_cols = ["user_id", "name", "review_count", "average_stars"]
user_vertices = user_df.selectExpr(
    "user_id as id",
    "name",
    "review_count",
    "average_stars"
)

# Business vertices for review graph
business_vertices = business_df.selectExpr(
    "business_id as id",
    "name",
    "categories"
)

In [None]:
# combine vertices for the bipartite graph

# we can reuse user_vertices and business_vertices, adding missing columns
u_verts = user_vertices.withColumn("categories", lit(None).cast("string"))
b_verts = business_vertices.withColumn("review_count", lit(None).cast("int")) \
                          .withColumn("average_stars", lit(None).cast("double"))
bipartite_vertices = u_verts.unionByName(b_verts)

In [None]:
# create edges:

# create Friend edges by explode friends of each user after splitting them (User <-> User)
friend_edges = user_df.select(
    col("user_id").alias("src"),
    explode(
        split(trim(col("friends")), ", *")
    ).alias("dst")
).filter(col("dst") != "")

# review edges (User -> Business)
review_edges = review_df.selectExpr(
    "user_id as src",
    "business_id as dst",
    "stars as rating",
    "date"
)

In [None]:
# build the graphs

g_friend = GraphFrame(user_vertices, friend_edges)
g_review = GraphFrame(business_vertices, review_edges)
g_bi = GraphFrame(bipartite_vertices, review_edges)

## 4. Graph Visulaization

### 4.1 Plot a Small Subset of the User-Friends Graph

In [None]:
# pick 5 low-degree users as seeds for better visuals
low_degree = friend_edges.groupBy("src") \
    .count().withColumnRenamed("count","friend_count") \
    .filter(col("friend_count") <= 10)

seed_users = low_degree.select("src").distinct().limit(10)

# for each seed, take 10 of their friends
one_hop = friend_edges.join(seed_users, "src")
w = Window.partitionBy("src").orderBy(rand())
sample_one_hop = (
    one_hop
    .withColumn("rn", row_number().over(w))
    .filter(col("rn") <= 10)
    .select("src","dst")
)

# build the induced vertex set (seeds + those friends)
vertices = (
    sample_one_hop.select(col("src").alias("id"))
    .union(sample_one_hop.select(col("dst").alias("id")))
    .distinct()
)

# pull in any edges among that vertex set (including friend <-> friend)
sub_edges = (
    friend_edges
    .join(vertices.withColumnRenamed("id","v1"), friend_edges.src == col("v1"))
    .join(vertices.withColumnRenamed("id","v2"), friend_edges.dst == col("v2"))
    .select("src","dst")
)

# map IDs -> names
user_names = user_df.select(col("user_id").alias("id"), col("name"))
named = (
    sub_edges
    .join(user_names, sub_edges.src == user_names.id)
    .select("src","dst", col("name").alias("src_name"))
    .join(
      user_names.withColumnRenamed("id","dst_id")
                .withColumnRenamed("name","dst_name"),
      sub_edges.dst == col("dst_id")
    )
    .select("src_name","dst_name")
)

# collect just this tiny subgraph and plot
pdf = named.toPandas()
Gf = nx.from_pandas_edgelist(pdf, "src_name", "dst_name")

In [None]:
pos = nx.spring_layout(Gf, k=0.8, iterations=10, seed=2)
plt.figure(figsize=(12,6))
nx.draw_networkx_nodes(Gf, pos, node_size=900)
nx.draw_networkx_edges(Gf, pos, alpha=0.7)
nx.draw_networkx_labels(Gf, pos, font_size=9)
plt.title("10 users + 10 Friends Each + Mutual Edges")
plt.axis("off")
plt.show()

### 4.2 Bipartite Plot for Users and Businesses

In [None]:
# pick 5 businesses with more than 2 reviews
biz_pop = (
    review_df.groupBy("business_id").count()
             .filter(col("count") >= 2)
             .orderBy(rand())
             .select(col("business_id"))
             .limit(5)
)

# for each business, sample up to 5 distinct users who reviewed it
rev = review_edges.select(col("src").alias("user_id"), col("dst").alias("business_id"))
biz_window = Window.partitionBy("business_id").orderBy(rand())
sample_ub = (
    rev.join(biz_pop, "business_id")
       .withColumn("rn", row_number().over(biz_window))
       .filter(col("rn") <= 5)
       .select("user_id", "business_id")
       .distinct()
)

# build vertex set (users + businesses)
vb = (
    sample_ub.select(col("user_id").alias("id"))
             .union(sample_ub.select(col("business_id").alias("id")))
             .distinct()
)

# pull in any edges among that vertex set (ensure user–biz edges)
sub_edges = (
    rev.join(vb.withColumnRenamed("id","u"), rev.user_id == col("u"))
       .join(vb.withColumnRenamed("id","b"), rev.business_id == col("b"))
       .select(col("user_id"), col("business_id"))
)

# map IDs → names
users = user_df.select(col("user_id").alias("user_id"), col("name").alias("user_name"))
bizs  = business_df.select(col("business_id"), col("name"))

named = (
    sub_edges
      .join(users, "user_id")
      .join(bizs, "business_id")
      .select("user_name", "name")
)

# also bring the rating into pandas dataframe
small_rev = (
    review_df
      .join(sub_edges, ["user_id","business_id"])
      .select("user_id","business_id","stars")
)

# map IDs -> names in Spark
named = (
    small_rev
    .join(user_df.select("user_id", col("name").alias("user_name")), "user_id")
    .join(business_df.select("business_id", col("name")), "business_id")
    .select("user_name","name","stars")
)

# now collect
pdf = named.toPandas()

B = nx.Graph()
B.add_nodes_from(pdf["user_name"].unique(), bipartite=0)
B.add_nodes_from(pdf["name"].unique(), bipartite=1)
B.add_edges_from(list(zip(pdf["user_name"], pdf["name"])))

In [None]:
users = pdf['user_name'].unique().tolist()
business = pdf['name'].unique().tolist()

pos = {
    u: (0, 1 - idx/(len(users)-1))
    for idx, u in enumerate(users)
}
pos.update({
    b: (1, 1 - idx/(len(business)-1))
    for idx, b in enumerate(business)
})

edge_labels = {
    (row.user_name, row['name']): row.stars
    for _, row in pdf.iterrows()
}

plt.figure(figsize=(18,12))
nx.draw_networkx_nodes(B, pos, nodelist=users,    node_size=300)
nx.draw_networkx_nodes(B, pos, nodelist=business, node_shape='s', node_size=2000)
nx.draw_networkx_edges(B, pos, alpha=0.6)
nx.draw_networkx_labels(B, pos, font_size=9)
nx.draw_networkx_edge_labels(B, pos, edge_labels=edge_labels, font_size=8)
plt.title("Bipartite plot")
plt.axis('off')
plt.show()

# 5. Graph Algorithms

## PageRank

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
user = spark.read.parquet("/content/drive/MyDrive/Big_Data_Analytics_Good/user.parquet")

In [None]:
user.printSchema()

In [None]:
user.show(5, truncate = False)

In [None]:
vertices = user.selectExpr("user_id as id").distinct() #choose distinct userIDs and rename the column to id

In [None]:
#creates edges by splitting the friends column into individual friend IDs, trimming spaces and renaming columns to src and std
edges = user.select("user_id", "friends") \
    .withColumn("friend", explode(split("friends", ","))) \
    .withColumn("friend", trim("friend")) \
    .selectExpr("user_id as src", "friend as dst") \
    .filter("dst != ''")

In [None]:
g = GraphFrame(vertices, edges)

In [None]:
#user.count() doesn't need to run this because it takes a while, 1987897

### Taking a sample of 50k since 2m rows are too many even for collab

In [None]:
duplicates = user.groupBy("user_id") \
    .agg(count("*").alias("count")) \
    .filter("count > 1")

duplicates.show()

In [None]:
#select 50k users with 5-100 friends and orders them by number of friends
sampled_users = user \
    .withColumn("friend_list", split("friends", ",")) \
    .withColumn("degree", size("friend_list")) \
    .filter(size(split("friends", ",")) >= 5) \
    .filter(size(split("friends", ",")) <= 100) \
    .orderBy("degree", ascending=False) \
    .limit(50000) \
    .select("user_id", "friends")
    #sampled vertices
sample_vertices = sampled_users.selectExpr("user_id as id")

#creates edges from sampled users
sample_edges = sampled_users \
    .withColumn("friend", explode(split("friends", ","))) \
    .withColumn("friend", trim("friend")) \
    .selectExpr("user_id as src", "friend as dst") \
    .filter("dst != ''")

#keeps only the edges where both src and dst are in the sampled vertices
sample_edges = sample_edges.join(
    broadcast(sample_vertices),
    sample_edges["dst"] == sample_vertices["id"],
    "inner"
).select("src", "dst")

#repartitioning for better parallel processing
sample_edges = sample_edges.repartition(200, "src")
sample_vertices = sample_vertices.repartition(200)

#cache to speed up repeated access
sample_edges.cache()
sample_vertices.cache()

#creating the sample graph
sample_g = GraphFrame(sample_vertices, sample_edges)

In [None]:
pagerank_result = sample_g.pageRank(resetProbability=0.15, maxIter=2)

In [None]:
pagerank_result.vertices.select("id", "pagerank").orderBy("pagerank", ascending=False).show(5, truncate = False)

In [None]:
communities = sample_g.labelPropagation(maxIter=5)

In [None]:
communities.select("id", "label").orderBy("label").show()

In [None]:
spark.sparkContext.setCheckpointDir("/tmp/graphframes-checkpoint")

In [None]:
connected_components = sample_g.connectedComponents()

In [None]:
connected_components.select("id", "component").show(5)

In [None]:
sample_vertices.select("id").show(5, truncate=False)

In [None]:
shortest_paths = sample_g.shortestPaths(landmarks=["vaQPHmzVAxOGNEtG3YLqjQ", "DdKfYau2SVM9boInOM4tYg"])

In [None]:
shortest_paths.select("id", "distances").show(5)

In [None]:
motifs = sample_g.find("(v)-[e]->(u); (u)-[f]->(w)")

In [None]:
motifs.show(5)

In [None]:
bfs_result = sample_g.bfs(fromExpr="id = 'user1'", toExpr="id = 'user2'")

In [None]:
bfs_result.show(10)