In [3]:
import os
import sys

SPARK_HOME = "/usr/lib/spark3"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.9.5-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))


from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.ui.port", "4099")

spark = SparkSession.builder.config(conf=conf).appName("Spark SQL").getOrCreate()


from pyspark.sql.types import *

import pyspark.sql.functions as f

# spark.stop()

Picked up _JAVA_OPTIONS: 
Picked up _JAVA_OPTIONS: 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
!hdfs dfs -cat /datasets/twitter/twitter_sample_small.tsv | head -n 5

In [4]:
data_path = '/datasets/twitter/twitter_sample_small.tsv'

schema = StructType(fields=[
    StructField("user_id", StringType()),
    StructField("follower_id", StringType())
])

df = spark.read\
          .schema(schema)\
          .format("csv")\
          .option("sep", "\t")\
          .load(data_path)

from pyspark.sql.functions import udf

In [77]:
fd = spark.createDataFrame([['2', '1'], ['3', '1'], ['4', '1'], ['5', '2'], ['5', '3'], ['5', '4'], ['2', '3'], ['3', '2']], schema)

In [78]:
fd.show()

+-------+-----------+
|user_id|follower_id|
+-------+-----------+
|      2|          1|
|      3|          1|
|      4|          1|
|      5|          2|
|      5|          3|
|      5|          4|
|      2|          3|
|      3|          2|
+-------+-----------+



In [85]:
id_from = '1'
id_to = '5'

fd = fd.withColumn("paths", udf(lambda: [], ArrayType(StringType()))())
fd.cache()

# max_length = len(df)
max_path_length = fd.count()-1

# run BFS and mark visited from id_from to id_to
queue_ids = [id_from]
while queue_ids:
  first_in_queue = queue_ids.pop(0)
  neighbours = fd.filter(fd.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
  if not neighbours: continue # leaf
  if id_to in neighbours: break # found target

  filter_length = udf(lambda paths: not paths or len(paths[0].split(',')) < max_path_length, BooleanType())

  # add new neighbours which are not visited
  new_neighbours = fd.filter(filter_length(fd.paths))\
              .filter(fd.follower_id.isin(neighbours))\
              .select('follower_id').rdd.flatMap(lambda x: x).collect()
  queue_ids.extend(list(set(new_neighbours)))

  # append node for each path
  current_paths = fd.filter(fd.follower_id == first_in_queue).select('paths').rdd.flatMap(lambda x: x).first()
  append_path = udf(lambda node: [i+','+node for i in current_paths] if current_paths else [node], ArrayType(StringType()))
  fd = fd.withColumn("paths", f.when(fd.follower_id.isin(neighbours), append_path(f.col("follower_id"))).otherwise(f.col("paths")))  

is_not_empty_udf = udf(lambda x: len(x) != 0, BooleanType())
paths = fd.filter(fd.user_id == id_to).filter(is_not_empty_udf(fd.paths)).select('paths').rdd.flatMap(lambda x: x[0]).collect()
paths = [id_from+','+p+','+id_to for p in paths]
# print(paths)

In [86]:
paths

['1,2,5', '1,3,5', '1,4,5']

24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1429_0 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1633_1 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1499_1 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1499_0 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1429_1 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1566_0 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1288_0 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1566_1 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1464_1 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1633_0 !
24/03/17 06:01:09 WARN BlockManagerMasterEndpoint:

In [72]:
fd.filter(fd.user_id == id_to).filter(is_not_empty_udf(fd.paths)).select('paths').rdd.flatMap(lambda x: x).collect()

[['2'], ['3']]

In [69]:
fd.show()

+-------+-----------+-----+
|user_id|follower_id|paths|
+-------+-----------+-----+
|      2|          1|   []|
|      3|          1|   []|
|      4|          2|  [2]|
|      4|          3|  [3]|
+-------+-----------+-----+



- - -

In [None]:
is_empty_udf = udf(lambda x: len(x) == 0, BooleanType())

df.filter(is_empty_udf(df.empty_list)).show(5)

In [None]:
  if id_from not in df_forward.index: 
    print('id_from not_found')
    return -1
  df_forward.loc[id_from, 'is_visited'] = True
  is_not_found = True
  while queue_ids and is_not_found:
    first_in_queue = queue_ids.pop(0)
    if first_in_queue not in df_forward.index: continue # leaf
    neighbours = df_forward.loc[first_in_queue, 'user_id'] # list of users
    for n in neighbours:
      if n == id_to:
        is_not_found = False
        break
      if n in df_forward.index and not df_forward.loc[n, 'is_visited']:
        df_forward.loc[n, 'is_visited'] = True
        queue_ids.append(n)
  if is_not_found:
    print('path not_found')
    return -1
  
  # get the shortest path using df_backward from id_to to id_from
  res = [id_to]
  while res[-1] != id_from:
    for n in df_backward.loc[res[-1], 'follower_id']:
      if n in df_forward.index and df_forward.loc[n, 'is_visited']:
        res.append(n)
        break

In [None]:


neighbours = df_forward.filter(df_forward.follower_id == first_in_queue).first()
if neighbours is None: continue # leaf
neighbours = neighbours['user_id']
if id_to in neighbours: break

new_neighbours = df_forward.filter(df_forward.follower_id.isin(neighbours)).filter(df_forward.is_visited == 0)
queue_ids.extend
list_follower_ids = [i['follower_id'] for i in new_neighbours.select('follower_id').collect()]
queue_ids.extend(list_follower_ids)

df_forward = df_forward.withColumn("is_visited", f.when(f.col("follower_id").isin(neighbours), 1).otherwise(df_forward.is_visited))

In [None]:
empty_list_udf = udf(lambda: [], ArrayType(StringType()))
is_empty_udf = udf(lambda x: len(x) == 0, BooleanType())

df_forward  = df.groupby("follower_id").agg(f.collect_list("user_id").alias("user_id"))
df_forward = df_forward.withColumn("parents", empty_list_udf())

df_backward = df.groupby("user_id").agg(f.collect_list("follower_id").alias("follower_id"))

df_forward.cache()
df_backward.cache()

In [None]:
df_forward.filter(df_forward.follower_id == '12').select('user_id').rdd.flatMap(lambda x: x).collect()

In [None]:
df_forward.withColumn("parents", udf(lambda x: list(x) + ['12'], ArrayType(StringType()))(df_forward.parents)).show(5)

In [None]:
add_parent = udf(lambda L: list(L) + ['12'], ArrayType(StringType()))

df_forward.withColumn("parents", f.when(f.col("follower_id") == '126', add_parent(f.col("parents"))).otherwise(f.col("parents"))).show(10)

In [None]:
id_from, id_to = '12', '107'

empty_list_udf = udf(lambda: [], ArrayType(StringType()))
is_empty_udf = udf(lambda x: len(x) == 0, BooleanType())
is_not_empty_udf = udf(lambda x: len(x) != 0, BooleanType())

df_forward  = df.groupby("follower_id").agg(f.collect_list("user_id").alias("user_id"))
df_forward = df_forward.withColumn("parents", empty_list_udf())

df_backward = df.groupby("user_id").agg(f.collect_list("follower_id").alias("follower_id"))

# df_forward.cache()
# df_backward.cache()


# run BFS and mark visited from id_from to id_to
queue_ids = [id_from]
while queue_ids:
  first_in_queue = queue_ids.pop(0)
  neighbours = df_forward.filter(df_forward.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
  if not neighbours: continue # leaf
  neighbours = neighbours[0] # now it is a list of follower_id's
  if id_to in neighbours: break # found target

  # add new neighbours which are not visited
  new_neighbours = df_forward.filter(is_empty_udf(df_forward.parents))\
              .filter(df_forward.follower_id.isin(neighbours))\
              .select('follower_id').rdd.flatMap(lambda x: x).collect()
  queue_ids.extend(new_neighbours)

  # update parents for each visited neighbour
  for n in neighbours:
    add_parent = udf(lambda L: list(L) + [first_in_queue], ArrayType(StringType()))
    df_forward = df_forward.withColumn("parents", f.when(f.col("follower_id") == n, add_parent(f.col("parents"))).otherwise(f.col("parents")))

In [None]:
back_neighbours = df_backward.filter(df_backward.user_id == id_to).select('follower_id').rdd.flatMap(lambda x: x).collect()[0]

In [None]:
len(back_neighbours)

In [None]:
df_forward.filter(df_forward.follower_id.isin(back_neighbours)).filter(is_not_empty_udf(f.col("parents"))).collect()

In [None]:
back_neighbours_with_parents = df_forward.filter(df_forward.follower_id.isin(back_neighbours)).filter(is_not_empty_udf(f.col("parents"))).collect()

In [None]:
back_neighbours_with_parents

In [5]:
df.filter(df.follower_id == '12').select('user_id').rdd.flatMap(lambda x: x).collect()

                                                                                

['126', '380', '422', '648']

In [10]:
df.filter(df.follower_id == '12').select('path').rdd.flatMap(lambda x: x).collect()[0]

                                                                                

[]

In [22]:
df.filter(df.follower_id == '12').select('path').rdd.flatMap(lambda x: x).first()

''

In [58]:
# def shortest_path(df, id_from, id_to):
id_from = '12'
id_to = '34'

df = df.withColumn("path", f.lit(""))


# df.cache()


# run BFS and mark visited from id_from to id_to
queue_ids = [id_from]
while queue_ids:
  first_in_queue = queue_ids.pop(0)
  neighbours = df.filter(df.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
  if not neighbours: continue # leaf
  if id_to in neighbours: break # found target

  # add new neighbours which are not visited
  new_neighbours = df.filter(df.path == "")\
              .filter(df.follower_id.isin(neighbours))\
              .select('follower_id').rdd.flatMap(lambda x: x).collect()
  queue_ids.extend(list(set(new_neighbours)))

  # path = current_path + follower_id
  current_path = df.filter(df.follower_id == first_in_queue).select('path').rdd.flatMap(lambda x: x).first()
  append_path = udf(lambda node: current_path+','+node, StringType())
  df = df.withColumn("path", f.when(df.follower_id.isin(neighbours), append_path(f.col("follower_id"))).otherwise(f.col("path")))  
  

In [61]:
df.filter(df.user_id == '107').filter(df.path != "").select('path').rdd.flatMap(lambda x: x).collect()

[',422,53,52', ',422,53,52,107,586']

In [62]:
df.filter(df.user_id == '107').show(10)

+-------+-----------+------------------+
|user_id|follower_id|              path|
+-------+-----------+------------------+
|    107|         52|        ,422,53,52|
|    107|        586|,422,53,52,107,586|
|    107|      81503|                  |
|    107|     620743|                  |
|    107|     624493|                  |
|    107|     667563|                  |
|    107|    1044751|                  |
|    107|    1115281|                  |
|    107|    1252851|                  |
|    107|    2560911|                  |
+-------+-----------+------------------+
only showing top 10 rows



In [44]:
append_path = udf(lambda node: current_path+node, StringType())

In [45]:
df.withColumn("path", append_path(f.col("follower_id"))).show(10)

+-------+-----------+------+
|user_id|follower_id|  path|
+-------+-----------+------+
|     12|       2241|  2241|
|     12|      13349| 13349|
|     12|      41873| 41873|
|     12|      82473| 82473|
|     12|     414853|414853|
|     12|     755452|755452|
|     12|     758983|758983|
|     12|     793023|793023|
|     12|     794748|794748|
|     12|     806280|806280|
+-------+-----------+------+
only showing top 10 rows



In [None]:
df.withColumn("path", f.when(df.follower_id.isin(neighbours), append_path(f.col("follower_id"))).otherwise(f.col("path")))  

In [27]:
shortest_path(df, '12', '107')

                                                                                

AnalysisException: cannot resolve 'CASE WHEN (follower_id IN ('126', '380', '422', '648')) THEN <lambda>(follower_id) ELSE path END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type, got CASE WHEN ... THEN array<string> ELSE string END;
'Project [user_id#0, follower_id#1, CASE WHEN follower_id#1 IN (126,380,422,648) THEN <lambda>(follower_id#1)#81 ELSE path#70 END AS path#82]
+- Project [user_id#0, follower_id#1,  AS path#70]
   +- Project [user_id#0, follower_id#1,  AS path#39]
      +- Project [user_id#0, follower_id#1]
         +- Project [user_id#0, follower_id#1, <lambda>()#8 AS path#9]
            +- Relation [user_id#0,follower_id#1] csv


In [None]:
df

In [None]:
def shortest_path(df, id_from, id_to):
  empty_lists = udf(lambda: [], ArrayType(StringType()))
  is_empty_udf = udf(lambda x: len(x) == 0, BooleanType())
  is_not_empty_udf = udf(lambda x: len(x) != 0, BooleanType())

  df_forward  = df.groupby("follower_id").agg(f.collect_list("user_id").alias("user_id"))
  df_forward = df_forward.withColumn("parents", empty_lists())

  df_backward = df.groupby("user_id").agg(f.collect_list("follower_id").alias("follower_id"))
  df_backward = df_backward.withColumn("path", empty_lists())

  # df_forward.cache()
  # df_backward.cache()


  # run BFS and mark visited from id_from to id_to
  queue_ids = [id_from]
  while queue_ids:
    first_in_queue = queue_ids.pop(0)
    neighbours = df_forward.filter(df_forward.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
    if not neighbours: continue # leaf
    else: neighbours = neighbours[0] # now it is a list of follower_id's
    if id_to in neighbours: break # found target

    # add new neighbours which are not visited
    new_neighbours = df_forward.filter(is_empty_udf(df_forward.parents))\
                .filter(df_forward.follower_id.isin(neighbours))\
                .select('follower_id').rdd.flatMap(lambda x: x).collect()
    queue_ids.extend(new_neighbours)

    # update parents for each visited neighbour
    # for n in neighbours:
    #   add_parent = udf(lambda L: list(L) + [first_in_queue], ArrayType(StringType()))
    #   df_forward = df_forward.withColumn("parents", f.when(f.col("follower_id") == n, add_parent(f.col("parents"))).otherwise(f.col("parents")))
    
    add_parent = udf(lambda L: list(L) + [first_in_queue], ArrayType(StringType()))
    df_forward = df_forward.withColumn("parents", f.when(df_forward.follower_id.isin(neighbours), add_parent(f.col("parents"))).otherwise(f.col("parents")))

  print('forward is done')
  # accumulate all the shortest paths using df_backward from id_to to id_from
  queue_ids = [id_to]
  while queue_ids:
    print('queue_ids:', queue_ids)
    first_in_queue = queue_ids.pop(0)
    neighbours_no_path = df_backward.filter(df_backward.user_id == first_in_queue).filter(is_empty_udf(f.col("path"))).select('follower_id').rdd.flatMap(lambda x: x).collect()[0]
    neighbours_no_path_parents = df_forward.filter(df_forward.follower_id.isin(neighbours_no_path)).filter(is_not_empty_udf(f.col("parents"))).select('follower_id').rdd.flatMap(lambda x: x).collect()
    print('back_neigh:', neighbours_no_path_parents)
    if not neighbours_no_path_parents: continue # id_from in back_neighbours
    queue_ids.extend(neighbours_no_path_parents)
    # print('back_neigh:', back_neighbours_with_parents)
    
    # update path for each visited neighbour
    curr_path = df_backward.filter(df_backward.user_id == first_in_queue).select('path').rdd.flatMap(lambda x: x).collect()[0]
    print('curr_path:', curr_path)
    update_path = udf(lambda x: [x]+curr_path, ArrayType(StringType()))
    df_backward = df_backward.withColumn("path", f.when(df_backward.user_id.isin(neighbours_no_path_parents), update_path(f.col("user_id"))).otherwise(f.col("path")))
  
  # get all paths
  source_neighbours = df_forward.filter(df_forward.follower_id == id_from).select('user_id').rdd.flatMap(lambda x: x).collect()[0]
  return df_backward.filter(df_backward.user_id.isin(source_neighbours)).filter(is_not_empty_udf(f.col("path"))).select('path').rdd.flatMap(lambda x: x).collect()
  
  

In [None]:
df_backward.filter(df_backward.user_id == '20').filter(is_empty_udf(f.col("path"))).select('follower_id').rdd.flatMap(lambda x: x).collect()

In [None]:
shortest_path(df, '52', '23')

In [None]:
id_from = '12'
id_to = '34'

empty_lists = udf(lambda: [], ArrayType(StringType()))
is_empty_udf = udf(lambda x: len(x) == 0, BooleanType())
is_not_empty_udf = udf(lambda x: len(x) != 0, BooleanType())

df_forward  = df.groupby("follower_id").agg(f.collect_list("user_id").alias("user_id"))
df_forward = df_forward.withColumn("parents", empty_lists())

df_backward = df.groupby("user_id").agg(f.collect_list("follower_id").alias("follower_id"))
df_backward = df_backward.withColumn("path", empty_lists())

# df_forward.cache()
# df_backward.cache()


# run BFS and mark visited from id_from to id_to
queue_ids = [id_from]
while queue_ids:
  first_in_queue = queue_ids.pop(0)
  neighbours = df_forward.filter(df_forward.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
  if not neighbours: continue # leaf
  else: neighbours = neighbours[0] # now it is a list of follower_id's
  if id_to in neighbours: break # found target

  # add new neighbours which are not visited
  new_neighbours = df_forward.filter(is_empty_udf(df_forward.parents))\
              .filter(df_forward.follower_id.isin(neighbours))\
              .select('follower_id').rdd.flatMap(lambda x: x).collect()
  queue_ids.extend(new_neighbours)
  
  add_parent = udf(lambda L: list(L) + [first_in_queue], ArrayType(StringType()))
  df_forward = df_forward.withColumn("parents", f.when(df_forward.follower_id.isin(neighbours), add_parent(f.col("parents"))).otherwise(f.col("parents")))

In [None]:
df_forward.filter(df_forward.follower_id == '586').show(2)

In [None]:
df_forward.filter(df_forward.follower_id == '107').show(2)

In [None]:
df_forward.filter(df_forward.follower_id == '52').show(2)

In [None]:
df_backward.filter(df_backward.user_id == '586').show(2)

- - -

In [None]:
def shortest_path(df, id_from, id_to):
  df  = df.withColumn("parents", f.lit(0))
  df.cache()

  # run BFS and mark visited from id_from to id_to
  queue_ids = [id_from]
  while queue_ids:
    first_in_queue = queue_ids.pop(0)
    neighbours = df.filter(df.follower_id == first_in_queue).select('user_id').rdd.flatMap(lambda x: x).collect()
    if not neighbours: continue # leaf
    if id_to in neighbours: break # found target

    new_neighbours = df.filter(df.is_visited == 0).filter(df.follower_id.isin(neighbours)).select('follower_id').rdd.flatMap(lambda x: x).collect()
    queue_ids.extend(new_neighbours)

    df = df.withColumn("is_visited", f.when(f.col("follower_id").isin(new_neighbours), 1).otherwise(df.is_visited))


  # get the shortest path using df_backward from id_to to id_from
  res = [id_to]
  while True:
    neighbours = df.filter(df.user_id == res[-1]).select('follower_id').rdd.flatMap(lambda x: x).collect()

    # filter df_forward where follower_id in neighbours and is_visited:
    best_id = df.filter(df.is_visited == 1).filter(df.follower_id.isin(neighbours)).select('follower_id').rdd.flatMap(lambda x: x).collect()
    if not best_id: return (res+[id_from])[::-1]
    res.append(best_id[0])

In [None]:
examples = [[12, 15],[12, 17],[12, 13],[12, 16],[12, 38],[12, 295],[47147585, 34],[29068146, 34]]
for i in examples:
  print(i, nx.shortest_path(graph, source=str(i[0]), target=str(i[1])))

In [None]:
shortest_path(df, '12', '15')

In [None]:
shortest_path(df, '12', '17')

In [None]:
shortest_path(df, '12', '13')

In [None]:
shortest_path(df, '12', '16')

In [None]:
shortest_path(df, '12', '38')

In [None]:
shortest_path(df, '47147585', '34')

In [None]:
shortest_path(df, '29068146', '34')

In [None]:
def shortest_path(df, id_from, id_to):
  df_forward  = df.groupby("follower_id").agg(f.collect_list("user_id").alias("user_id"))
  df_backward = df.groupby("user_id").agg(f.collect_list("follower_id").alias("follower_id"))
  df_forward  = df_forward.withColumn("is_visited", f.lit(0))
  # cache to speed up
  df_forward.cache()
  df_backward.cache()

  # run BFS and mark visited from id_from to id_to
  queue_ids = [id_from]
  while queue_ids:
    first_in_queue = queue_ids.pop(0)
    print(first_in_queue)
    neighbours = df_forward.filter(df_forward.follower_id == first_in_queue).first()
    if neighbours is None: continue # leaf
    neighbours = neighbours['user_id']
    if id_to in neighbours: break

    new_neighbours = df_forward.filter(df_forward.follower_id.isin(neighbours)).filter(df_forward.is_visited == 0)
    queue_ids.extend
    list_follower_ids = [i['follower_id'] for i in new_neighbours.select('follower_id').collect()]
    queue_ids.extend(list_follower_ids)

    df_forward = df_forward.withColumn("is_visited", f.when(f.col("follower_id").isin(neighbours), 1).otherwise(df_forward.is_visited))

  print(df_forward.filter(df_forward.is_visited == 1).collect())
  

  # get the shortest path using df_backward from id_to to id_from
  res = [id_to]
  while True:
    neighbours = df_backward.filter(df_backward.user_id == res[-1]).first()['follower_id']
    print(res[-1], neighbours)

    # filter df_forward where follower_id in neighbours and is_visited:
    best_id = df_forward.filter(df_forward.follower_id.isin(neighbours)).filter(df_forward.is_visited == 1).first()['follower_id']
    if not best_id:
      return (res+[id_from])[::-1]

In [None]:
df_ = df.toPandas()

In [None]:
df_

In [None]:
nx.shortest_path(graph, source='12', target='34')

In [None]:
import networkx as nx

graph = nx.from_pandas_edgelist(df_, 'follower_id', 'user_id', create_using=nx.DiGraph())

shortest_path = nx.shortest_path(graph, source='12', target='34')


In [None]:
import numpy as np
def generate_random_path(source):
  path = [source]
  for i in range(100):
    neighbours = df_[df_['follower_id']==path[-1]]['user_id']
    if not len(neighbours): break
    path.append(np.random.choice(neighbours))
  return path[-1], len(path)

def generate_random_path1(target):
  path = [target]
  for i in range(100):
    neighbours = df_[df_['user_id']==path[-1]]['follower_id']
    if not len(neighbours): break
    path.append(np.random.choice(neighbours))
  return path[-1], len(path)

In [None]:
df_.groupby('follower_id').count().iloc[44002]

12, 15
12, 17
12, 13
12, 16
12, 38
12, 295
47147585, 34
29068146, 34

In [None]:
shortest_path(df_, '12', '16')

In [None]:
df_[ df_['follower_id']=='38']

In [None]:
df_[ df_['follower_id']=='23']

In [None]:
nx.shortest_path(graph, source='12', target='16')

In [None]:
df_[df_['follower_id']=='12']

In [None]:
graph.degree

In [None]:
def shortest_path(df, id_from, id_to):
  df_forward  = df.groupby('follower_id').agg(list)
  df_forward.loc[:,'is_visited'] = False
  df_backward = df.groupby('user_id').agg(list)

  # run BFS and mark visited from id_from to id_to
  queue_ids = [id_from]
  if id_from not in df_forward.index: 
    print('id_from not_found')
    return -1
  df_forward.loc[id_from, 'is_visited'] = True
  is_not_found = True
  while queue_ids and is_not_found:
    first_in_queue = queue_ids.pop(0)
    if first_in_queue not in df_forward.index: continue # leaf
    neighbours = df_forward.loc[first_in_queue, 'user_id'] # list of users
    for n in neighbours:
      if n == id_to:
        is_not_found = False
        break
      if n in df_forward.index and not df_forward.loc[n, 'is_visited']:
        df_forward.loc[n, 'is_visited'] = True
        queue_ids.append(n)
  if is_not_found:
    print('path not_found')
    return -1
  
  # get the shortest path using df_backward from id_to to id_from
  res = [id_to]
  while res[-1] != id_from:
    for n in df_backward.loc[res[-1], 'follower_id']:
      if n in df_forward.index and df_forward.loc[n, 'is_visited']:
        res.append(n)
        break
  
  return res[::-1]