# Venmo - Social Network Analytics


In [47]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz ## Install Apache Spark
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark



In [48]:
import os
from pyspark.sql import SparkSession
# Define Java and Spark home path in Google Colab
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [59]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# from pyspark import SparkConf, SparkContext
# from datetime import datetime, date, timedelta
# from dateutil import relativedelta
# from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
# from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import col, explode, array
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import random
import networkx as nx

In [60]:
# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Venmo") \
    .getOrCreate()

In [61]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
#To upload files, use the following command
from google.colab import files
files.upload()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Saving VenmoSample.snappy.parquet to VenmoSample.snappy.parquet


In [62]:
venmo_df = spark.read.parquet("VenmoSample.snappy.parquet")

In [53]:
# See the schema
venmo_df.printSchema()

root
 |-- user1: integer (nullable = true)
 |-- user2: integer (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- is_business: boolean (nullable = true)
 |-- story_id: string (nullable = true)



In [54]:
# See the first 5 entries
venmo_df.show(5)

+-------+-------+----------------+-------------------+------------+-----------+--------------------+
|  user1|  user2|transaction_type|           datetime| description|is_business|            story_id|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+
|1218774|1528945|         payment|2015-11-27 10:48:19|        Uber|      false|5657c473cd03c9af2...|
|5109483|4782303|         payment|2015-06-17 11:37:04|      Costco|      false|5580f9702b64f70ab...|
|4322148|3392963|         payment|2015-06-19 07:05:31|Sweaty balls|      false|55835ccb1a624b14a...|
| 469894|1333620|          charge|2016-06-03 23:34:13|          🎥|      false|5751b185cd03c9af2...|
|2960727|3442373|         payment|2016-05-29 23:23:42|           ⚡|      false|574b178ecd03c9af2...|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+
only showing top 5 rows



**Algorithm Explanation:**

Step 1: Finding direct friend

For User 1, we'll find the list of his/her direct friends (User 2)


Step 2: Finding friends of friends

For direct friends of User 1, aka User 2, find their direct firends

Then remove the ID of User 1 and User 2 to get the list of "friends of friends"

**Computational Complexity:**

Step 1: O(n) because it will iterate all rows

Step 2: O(n*m) because it will iterate all rows and go through the friends list

Overall: O(n+n*m)

In [63]:
# Prepare the vertices and edges DataFrame
vertices = venmo_df.selectExpr("user1 as id").union(venmo_df.selectExpr("user2 as id")).distinct()
edges = venmo_df.select(col("user1").alias("src"), col("user2").alias("dst"))

In [69]:
import pandas as pd

# Convert Spark DataFrames to Pandas DataFrames
vertices_pd = pd.DataFrame(vertices.collect(), columns=vertices.columns)
edges_pd = pd.DataFrame(edges.collect(), columns=edges.columns)

In [78]:
# Create a directed graph from the edges DataFrame
G = nx.from_pandas_edgelist(edges_pd, source='src', target='dst', create_using=nx.DiGraph())

# Function to find friends and friends of friends
def find_friends(user_id):
    # Direct friends
    direct_friends = list(G.successors(user_id)) + list(G.predecessors(user_id))
    # Friends of friends
    friends_of_friends = set()
    for friend in direct_friends:
        friends_of_friends.update(set(G.successors(friend)))
        friends_of_friends.update(set(G.predecessors(friend)))
    friends_of_friends.discard(user_id)
    return list(set(direct_friends)), list(friends_of_friends - set(direct_friends))

In [79]:
# Suppose we are interested in user 1218774, User 1 of the first row
user_id = 1218774
direct_friends, friends_of_friends = find_friends(user_id)

In [80]:
num_friends = len(direct_friends)
num_fof = len(friends_of_friends)
print(f"Number of Friends: {num_friends}, Number of Friends of Friends: {num_fof}")

Number of Friends: 5, Number of Friends of Friends: 35


In [74]:
clustering_coefficient = nx.clustering(G.to_undirected(), user_id)
print(f"Clustering Coefficient: {clustering_coefficient}")

Clustering Coefficient: 0.2


In [75]:
pagerank = nx.pagerank(G)
user_pagerank = pagerank.get(user_id, 0)
print(f"PageRank: {user_pagerank}")

PageRank: 4.936346395858258e-07
