### Importing Spark Libraries

In [None]:
import time
start = time.time()

In [None]:
from datetime import datetime, timedelta

# Spark libraries
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import explode
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.window import Window

### Creating Spark context

In [None]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Reading User followers data

In [None]:
# Parameters
partition_dt= datetime.today().date()

# Reading file Path
dir_path_ufl = "s3://aws-glue-neptune-data/postgre-data-lake/user_followers_list/"
#file_path_ufl = dir_path_ufl +"partition="+str(partition_dt)+"/"
file_path_ufl = dir_path_ufl +"partition="+"2022-03-01"+"/"

print(file_path_ufl)


### Reading only IFs for a user

In [None]:
# Considering only IFs
df_user_followers_raw = spark.read.format('parquet').load(file_path_ufl).\
selectExpr('user_id','individual_follower_ids as follower_ids').withColumn("num_followers", size("follower_ids"))

**We have 14 million users in this table**

In [None]:
df_user_followers_raw.select('user_id').count()

In [None]:
df_user_followers_raw.printSchema()

### Total Users

In [None]:
df_user_followers_raw.select('user_id').count()

#### Removing user who don't have any IF (no of IFs >0) and applying another filter no of IFs < 500k

In [None]:
#AND num_followers<=500000
df_user_followers=df_user_followers_raw.filter("num_followers>=100")

In [None]:
df_user_followers.registerTempTable("user_followers_raw")

####  Total Users after removing users who  have at least 1000 IF AND users who have less than 500k followers

In [None]:
df_user_followers_raw.select('user_id').distinct().count()

#### Remove users who follow more than 1k people

In [None]:
# # Reading file Path
# dir_path_ufol = "s3://aws-glue-neptune-data/postgre-data-lake/user_followings_list_v2/"
# #file_path_ufl = dir_path_ufl +"partition="+str(partition_dt)+"/"
# file_path_ufol = dir_path_ufol +"partition="+"2022-03-01"+"/"

#### Getting userids from user followings table which have following more than 1k

In [None]:
# df_user_followings = spark.read.format('parquet').load(file_path_ufol).\
# filter('following_count>1000').select('user_id')

#### There are 10k such users who have following > 1k

In [None]:
df_user_followings.select('user_id').count()

**Next step is to unnest the IFs for these users**

In [None]:
# Making array as string for us to be able to explode
df_user_followers = df_user_followers.withColumn("follower_ids",
   concat_ws(",",col("follower_ids")))

In [None]:
df_user_followers = df_user_followers.withColumn("follower_id",
                                                 explode(split(col("follower_ids"), ","))).drop("follower_ids")

In [None]:
df_user_followers.select('user_id').distinct().count()

#### Removing followerids who have following>1k from user followers table

#### Relaxing this condition

In [None]:
# df_user_followers = df_user_followers.join(df_user_followings, 
#                                            df_user_followers['follower_id'] == df_user_followings['user_id'], 
#                                            'leftanti')

#### We are left with only 8k users

In [None]:
df_user_followers.select('user_id').distinct().count()

In [None]:
df_user_followers.registerTempTable("user_followers")

In [None]:
query = """SELECT user_id, follower_id,CAST(mod(follower_id,100) as INT) as mod_follower
FROM user_followers
"""

In [None]:
df_user_followers = spark.sql(query)
df_user_followers = df_user_followers.repartition('mod_follower')

In [None]:
df_user_followers.registerTempTable("user_followers")

In [None]:
df_user_followers.printSchema()

### Adding IF  count for both userid and followerid

In [None]:
# log 10 of zero would be null, so removing users who have zero IF
query= """
SELECT uf.user_id,uf.follower_id,
ufr.num_followers as usr_followers,
CAST(log10(ufr.num_followers) AS INT) as usr_lgf, 
uf.mod_follower
FROM user_followers uf
LEFT JOIN user_followers_raw ufr
ON (uf.user_id=ufr.user_id)
"""

In [None]:
df_user_followers = spark.sql(query)

In [None]:
df_user_followers.show()

In [None]:
df_user_followers.printSchema()

In [None]:
df_user_followers.registerTempTable("user_followers")

In [None]:
spark.conf.set("spark.sql.shuffle.partitions",2001)

In [None]:
query="""WITH user_follower
AS
(
    SELECT user_id, follower_id, mod_follower, usr_followers,usr_lgf,mod_follower
    FROM user_followers
)
SELECT a.user_id,a.follower_id,b.user_id as other_followed_user,a.mod_follower
FROM user_follower a 
JOIN user_follower b
ON (a.follower_id=b.follower_id AND abs(a.usr_lgf-b.usr_lgf)<=1 AND a.mod_follower=b.mod_follower)
WHERE a.user_id<b.user_id AND b.usr_lgf IS NOT NULL 
"""
#print(query)

In [None]:
uf_joined = spark.sql(query)

In [None]:
uf_joined.printSchema()

In [None]:
import time
start = time.time()
uf_joined.count()
end = time.time()
print(end - start)

In [None]:
### Aggregating to get the number of followers

In [None]:
uf_joined = uf_joined.repartition('mod_follower')
uf_joined.registerTempTable("common_user_followers")

In [None]:
# query="""select user_id,other_followed_user, count(follower_id) as common_followers
# FROM common_user_followers
# GROUP BY user_id,other_followed_user"""

In [None]:
# cf_ct = spark.sql(query)

In [None]:
# cf_ct = uf_joined.groupBy("mod_follower","user_id","other_followed_user").agg(f.expr('count(follower_id)')\
#                               .alias('common_followers'))

In [None]:
cf_ct = uf_joined.groupBy("user_id","other_followed_user").agg(f.expr('count(follower_id)')\
                               .alias('common_followers'))

In [None]:
import time
start = time.time()

cf_ct.count()

end = time.time()
print(end - start)

#### Adding inverse and union to have all the possible combinations

In [None]:
cf_ct_inv = cf_ct.select('other_followed_user','user_id','common_followers')

In [None]:
cf_ct = cf_ct.union(cf_ct_inv)


In [None]:
cf_ct.select('user_id').distinct().count()

#### Common users between 162 and 470

In [None]:
# common followers are 26634 without any conditions
#  26056
cf_ct.filter("user_id=162 AND other_followed_user=470").show()

In [None]:
#cf_ct.filter("user_id=10462628 AND other_followed_user=14039248").show()

In [None]:
#### Common users between Raj and Harsh

In [None]:
cf_ct.filter("user_id=470 AND other_followed_user=162").show()

In [None]:
#cf_ct.filter("user_id=14039248 AND other_followed_user=10462628").show()

### Adding IF  count for both the user pairs and calculating jaccard distance

In [None]:
# Reading user followers again
partition_dt= datetime.today().date()

# Reading file Path
dir_path_ufl = "s3://aws-glue-neptune-data/postgre-data-lake/user_followers_list/"
#file_path_ufl = dir_path_ufl +"partition="+str(partition_dt)+"/"
file_path_ufl = dir_path_ufl +"partition="+"2022-03-01"+"/"

print(file_path_ufl)

# Considering only IFs
df_user_followers = spark.read.format('parquet').load(file_path_ufl).\
selectExpr('user_id','individual_follower_ids as follower_ids')
#AND no_of_followers<500000
df_user_followers=df_user_followers.withColumn("no_of_followers", size("follower_ids")).\
filter("no_of_followers>=100")

In [None]:
cf_ct = cf_ct.join(df_user_followers,
                   cf_ct['user_id'] == df_user_followers['user_id'], 
                                           'left').select(cf_ct['user_id'],col('other_followed_user'),
                                                    col("no_of_followers").alias("u1_followers")
                                                          ,col("common_followers"))

In [None]:
cf_ct = cf_ct.join(df_user_followers,cf_ct['other_followed_user'] == df_user_followers['user_id'], 
                                           'left').select(cf_ct['user_id'],col('other_followed_user'),col('u1_followers'),col("no_of_followers").alias("u2_followers"),col("common_followers"))

In [None]:
cf_ct.printSchema()

In [None]:
# F(A)+F(B) - common followers = F(AUB)
# # JD = 1- (F(AUB)/F(A intersection B))

# cf_ct = spark.sql("cf_ct.*,((f.common_followers)/(u1_followers+u2_followers-common_followers)) as jaccard )
cf_ct = cf_ct.withColumn('jaccard', 
                         expr("ROUND(((common_followers)/(u1_followers+u2_followers-common_followers)),4)"))

In [None]:
cf_ct.filter("user_id=470 AND other_followed_user=162").show()

In [None]:
cf_ct.filter("user_id=162 AND other_followed_user=470").show()

### Writing the result to s3 

In [None]:
# 7 mins to write data
#s3://ml-test-analytics/test/stg-glue/common_followers/

In [None]:
import time
start = time.time()
cf_ct.write.mode('overwrite').parquet("s3://ml-test-analytics/test/stg-glue/user_common_followers_raw/")

end = time.time()
print(end - start)