In [5]:
import sys
import pandas as pd
import numpy as np
import re
import os
import random

import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('retweet graph').getOrCreate()
sc = spark.sparkContext

In [2]:
def isNaN(num):
    return num != num

In [3]:
def retweetgraph(data, filename, num = 5):
    retweets = []
    for line in data:
        retweets.append([line[0], line[1]])
    # Find out nodes with more than 'num' weighted degree
    nodes = dict()
    for line in retweets:
        if line[0] not in nodes:
            nodes[line[0]] = 0
        nodes[line[0]] += 1
        if line[1] not in nodes:
            nodes[line[1]] = 0
        nodes[line[1]] += 1

    for i in list(nodes):
        if nodes[i] <= num:
            del nodes[i]
            
    # Find undirected weighted edges without self loop
    temp = dict()
    for retweet in retweets:
        if retweet[0] == retweet[1]:
            continue
        if retweet[0] in nodes and retweet[1] in nodes:
            if (retweet[0], retweet[1]) in temp:
                temp[(retweet[0], retweet[1])] += 1
            elif (retweet[1], retweet[0]) in temp:
                temp[(retweet[1], retweet[0])] += 1
            else:
                temp[(retweet[0], retweet[1])] = 1
    edges = list(temp.items())
    f = open(filename, 'w')
    for edge in edges:
        f.write('{}\t{}\t{}\n'.format(edge[0][0], edge[0][1], edge[1]))
    f.close()

In [8]:
df = pd.read_csv('humans1.csv', sep=';', index_col='id', usecols = ['id', 'screen_name', 'is_retweet', 'text_retweet'])
df1 = df[df.apply(lambda x: (x['is_retweet']==True) and (isNaN(x['text_retweet'])==False), axis=1)]

In [9]:
def get_retweet_user(text_retweet):
    try:
        return re.match(r"RT @(\S+):.*", text_retweet).group(1)
    except:
        return 'False'
    
df1['retweet_username'] = df1.apply(lambda x: get_retweet_user(x['text_retweet']), axis=1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  import sys


In [10]:
retweetgraph(df1[df1['retweet_username'] != 'False'][['screen_name', 'retweet_username']].values.tolist(), 'retweetgraph.txt', 2)




#### Put graphframe jar file matching the pyspark version in current directory
spark-sumbit --packages graphframe:graphframes:<graphframe version> retweet_graph_lpa.py

In [1]:
!spark-submit --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 retweet_graph_lpa.py

Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
:: loading settings :: url = jar:file:/home/hadoop/anaconda3/lib/python3.7/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-420d91f3-49d7-4720-bf18-beece5282171;1.0
	confs: [default]
	found graphframes#graphframes;0.7.0-spark2.4-s_2.11 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 262ms :: artifacts dl 9ms
	:: modules in use:
	graphframes#graphframes;0.7.0-spark2.4-s_2.11 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-------------

2019-04-08 06:53:07 INFO  Utils:54 - Copying /home/hadoop/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar to /tmp/spark-645e57e1-811b-459b-9f3b-080145d3954e/userFiles-21eca652-c8f6-4095-9945-5ff24d571332/org.slf4j_slf4j-api-1.7.16.jar
2019-04-08 06:53:07 INFO  Executor:54 - Starting executor ID driver on host localhost
2019-04-08 06:53:07 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34125.
2019-04-08 06:53:07 INFO  NettyBlockTransferService:54 - Server created on 172.17.0.1:34125
2019-04-08 06:53:07 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-04-08 06:53:07 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 172.17.0.1, 34125, None)
2019-04-08 06:53:07 INFO  BlockManagerMasterEndpoint:54 - Registering block manager 172.17.0.1:34125 with 366.3 MB RAM, BlockManagerId(driver, 172.17.0.1, 34125, None)
2019-04-08 06:53:07 INFO  Blo

In [5]:
sdf = spark.read.parquet('retweet_lpa.parquet')

In [6]:
sdf.show()

+---------------+-------------+---------------+------------------+
|             id|        label|           name|          pagerank|
+---------------+-------------+---------------+------------------+
|   rissmontrone|  94489280526|   rissmontrone|1.9903495694739561|
|    NaplesRocks| 506806140937|    NaplesRocks|0.5686619742746646|
|   MarciaBelsky| 738734374914|   MarciaBelsky|0.5686619742746646|
|  JaninaLarssen| 584115552258|  JaninaLarssen|0.6249595097278564|
|    JustJen2015| 197568495618|    JustJen2015|0.8501496515406236|
|       guardian| 429496729602|       guardian| 0.709405812907644|
|        NYCer99| 584115552258|        NYCer99|0.5686619742746646|
|      _ItsToni_|1614907703299|      _ItsToni_|0.5686619742746646|
|      Klassic54| 584115552258|      Klassic54|0.5686619742746646|
|     xande18000| 111669149701|     xande18000|0.5686619742746646|
|    Vaibhav_AAP| 730144440320|    Vaibhav_AAP|  1.88227113484914|
|     hugomartin|1194000908294|     hugomartin|1.1316373288065