In [1]:
import pandas as pd
import dask.dataframe as dd

import ast

import os
from datetime import datetime
from dotenv import load_dotenv
from warnings import filterwarnings

In [2]:
load_dotenv()
filterwarnings("ignore")

CLOUD_STORAGE_BUCKET = "gs://graph_data_science/twitter"
run_time = datetime.today().strftime("%Y_%m_%d_%H_%M_%S")

## Ties

In [21]:
data_type = "ties"
data = dd.read_csv(f"{CLOUD_STORAGE_BUCKET}/data/{data_type}*.csv").compute()
# data.following = data.following.apply(ast.literal_eval)
# data = data.explode("following").dropna()

print(data.shape)
print(data.user.nunique(), data.following.nunique())
data.head()

(243, 2)
231 233


Unnamed: 0,user,following
0,1210565171107815429,"[1281443510, 632966383, 27631809, 14922097, 22..."
1,3432386097,"[33836629, 188343397, 235684766, 17268874, 439..."
2,1120633726478823425,[]
3,4398626122,[]
4,48008938,"[1464797992872034304, 630995607, 27493883, 123..."


In [22]:
data.user.value_counts()

1427408443296657408    12
1120633726478823425     2
1210565171107815429     1
1075605139              1
14717311                1
                       ..
842780484752949253      1
3429950987              1
84022193                1
16047950                1
25422302                1
Name: user, Length: 231, dtype: int64

In [23]:
data_n = data[~(data.user.isin([1427408443296657408, 1120633726478823425]))]

In [24]:
file = f'gs://ego_networks/twitter/ties'

data.to_csv(
    f"{file}/{run_time}.csv",
    index=False,
)

# dd.to_parquet(dd.from_pandas(data_n, npartitions = 10), 
#               path = file,
#               append = False,
#               overwrite = True,
#               write_index = False,
#               schema = {"user": pa.int64(), "following": pa.large_list(pa.int64())},
#              ignore_divisions=True)

# during append operations
# dd.to_parquet(dd.from_pandas(data, npartitions = 10), 
#               path = NEW_CLOUD_STORAGE_BUCKET,
#               append = True,
#               overwrite = False,
#              ignore_divisions=True)

# data_n = dd.read_parquet(
#               path = file).compute()
data_n = dd.read_csv(f"{file}/*.csv").compute()
data_n.head()

Unnamed: 0,user,following
0,1210565171107815429,"[1281443510, 632966383, 27631809, 14922097, 22..."
1,3432386097,"[33836629, 188343397, 235684766, 17268874, 439..."
2,1120633726478823425,[]
3,4398626122,[]
4,48008938,"[1464797992872034304, 630995607, 27493883, 123..."


In [61]:
import pyarrow as pa

schema = {"user": pa.string(), "following": pa.large_list(pa.int32())}

# custom_dtypes = {"user": str, "following": int, }

## Node Features

In [25]:
data_type = "node_features"

feature_fields = [
    "id",
    "name",
    "profile_image_url",
    "public_metrics",
    "username",
    "verified",
]

data = dd.read_csv(f"{CLOUD_STORAGE_BUCKET}/data/{data_type}*.csv", dtype={"withheld": "object"}).compute()
data = data.drop(columns="withheld").drop_duplicates()[feature_fields]

print(data.shape)
data.head()

(162771, 6)


Unnamed: 0,id,name,profile_image_url,public_metrics,username,verified
0,794433401591693312,Aran Komatsuzaki,https://pbs.twimg.com/profile_images/150798283...,"{'followers_count': 16111, 'following_count': ...",arankomatsuzaki,False
1,2509504696,Jericho Brown,https://pbs.twimg.com/profile_images/111968593...,"{'followers_count': 40468, 'following_count': ...",jerichobrown,True
2,1938030980,Angel Wicky,https://pbs.twimg.com/profile_images/144139505...,"{'followers_count': 355162, 'following_count':...",Angel_Wicky_II,True
3,335544910,AC Justice Project,https://pbs.twimg.com/profile_images/145415812...,"{'followers_count': 1049, 'following_count': 4...",ACJProject,False
4,2622261,Brian Ulicny,https://pbs.twimg.com/profile_images/730781652...,"{'followers_count': 796, 'following_count': 24...",bulicny,False


In [26]:
data.id.value_counts()

1427408443296657408    9
794433401591693312     1
130978857              1
153130011              1
822121500              1
                      ..
161988348              1
133152680              1
19906690               1
246103                 1
111394815              1
Name: id, Length: 162763, dtype: int64

In [27]:
data_n = data[~(data.id.isin([1427408443296657408, 1120633726478823425]))]

In [28]:
file = f'gs://ego_networks/twitter/features/node'

data.to_csv(
    f"{file}/{run_time}.csv",
    index=False,
)

# dd.to_parquet(dd.from_pandas(data_n, npartitions = 10), 
#               path = file,
#               append = False,
#               overwrite = True,
#               write_index = False,
#              ignore_divisions=True)

# dd.to_parquet(dd.from_pandas(data, npartitions = 10), 
#               path = NEW_CLOUD_STORAGE_BUCKET,
#               append = True,
#               overwrite = False,
#              ignore_divisions=True)

# data_n = dd.read_parquet(
#               path = file).compute()
data_n = dd.read_csv(f"{file}/*.csv").compute()
data_n.head()

Unnamed: 0,id,name,profile_image_url,public_metrics,username,verified
0,794433401591693312,Aran Komatsuzaki,https://pbs.twimg.com/profile_images/150798283...,"{'followers_count': 16111, 'following_count': ...",arankomatsuzaki,False
1,2509504696,Jericho Brown,https://pbs.twimg.com/profile_images/111968593...,"{'followers_count': 40468, 'following_count': ...",jerichobrown,True
2,1938030980,Angel Wicky,https://pbs.twimg.com/profile_images/144139505...,"{'followers_count': 355162, 'following_count':...",Angel_Wicky_II,True
3,335544910,AC Justice Project,https://pbs.twimg.com/profile_images/145415812...,"{'followers_count': 1049, 'following_count': 4...",ACJProject,False
4,2622261,Brian Ulicny,https://pbs.twimg.com/profile_images/730781652...,"{'followers_count': 796, 'following_count': 24...",bulicny,False


In [14]:
data_type = "node_centrality"
file = f'gs://graph_data_science/twitter/data/processed/{data_type}.csv'

data = dd.read_csv(file).compute()

print(data.shape)
data.head()

(46, 9)


Unnamed: 0,user,cen_ev,cen_dg,cen_pg,rank_cen_ev,rank_cen_dg,rank_cen_pg,rank_combined,username
0,116994659,0.071912,0.000428,9e-06,1.0,0.978261,0.956522,0.9781,['Sam Harris:SamHarrisOrg']
1,44196397,0.060478,0.00062,1e-05,0.913043,1.0,1.0,0.970131,['Elon Musk:elonmusk']
2,18208354,0.062272,0.000402,9e-06,0.934783,0.956522,0.76087,0.879506,['Joe Rogan:joerogan']
3,106204123,0.069367,0.000367,9e-06,0.978261,0.913043,0.717391,0.862119,['Jonathan Haidt:JonHaidt']
4,95092020,0.054365,0.000314,9e-06,0.826087,0.804348,0.913043,0.846552,['Dr Jordan B Peterson:jordanbpeterson']


In [40]:
data_type = "node_measures"
file = f'gs://graph_data_science/twitter/data/processed/measures/2/{data_type}.csv'

data = dd.read_csv(file).compute()

print(data.shape)
data.head()

(1206648, 3)


Unnamed: 0,measure_name,node,measure_value
0,pagerank,1210565171107815429,0.0
1,pagerank,1281443510,0.0
2,pagerank,632966383,0.0
3,pagerank,27631809,0.0
4,pagerank,14922097,0.0


In [41]:
data_n = data[~(data.node.isin([1427408443296657408, 1120633726478823425]))]

In [43]:
data_type = "node"
file = f'gs://ego_networks/twitter/measures/node'
# dd.to_parquet(dd.from_pandas(data_n, npartitions = 10), 
#               path = file,
#               append = False,
#               overwrite = True,
#               write_index = False,
#              ignore_divisions=True)

data.to_csv(
    f"{file}/2.csv",
    index=False,
    # mode='w'
)

## New Object

In [38]:
class DataConfig:
    root_dir = "gs://ego_networks/twitter" #os.getenv("CLOUD_STORAGE_BUCKET")
    file_paths = {
            "ties": f"{root_dir}/ties",
            "node_features": f"{root_dir}/features/node",
            "node_measures": f"{root_dir}/measures/2/node",
        }

class DataReader(DataConfig):
    def __init__(self, data_type):
        super().__init__()
        self.data_type = data_type

    def run(self):
        try:
            data = dd.read_csv(
                urlpath=f"{self.file_paths.get(self.data_type)}/*.csv",
            ).compute()
            # logger.info(
            #     f"Read successful: {self.data_type}, shape: {data.shape}"
            # )
            return self.__preprocess(data)
        except Exception as error:
            # logger.error(f"Read unsuccessful: {self.data_type}, {error}")
            return pd.DataFrame()

    def __preprocess(self, data):
        if self.data_type == "ties":
            data.following = data.following.apply(ast.literal_eval)
            data = data.explode("following")
            return data.dropna()
        elif self.data_type == "node_features":
            return data.drop_duplicates().set_index("id")
        else:
            return data


class DataWriter(DataConfig):
    def __init__(self, data, data_type):
        super().__init__()
        self.data = data
        self.data_type = data_type

    def run(self, append=True):
        # logger.info(f"Writing {self.data_type}: {self.data.shape}")
        file_path = self.file_paths.get(self.data_type)
        if append:
            run_time = datetime.today().strftime("%Y_%m_%d_%H_%M_%S")
            file_path = f"{file_path}/{self.data_type}_{run_time}"
        else:
            file_path = f"{file_path}/{self.data_type}"

        self.data.to_csv(
            f"{file_path}.csv",
            index=False,
        )

In [39]:
# dw = DataWriter(data = data_n, data_type = 'node_measures')
# dw.run(append = False)

dr = DataReader(data_type = "ties").run()
dr.head()

Unnamed: 0,user,following
0,1210565171107815429,1281443510
0,1210565171107815429,632966383
0,1210565171107815429,27631809
0,1210565171107815429,14922097
0,1210565171107815429,228132689


In [35]:
data = dd.read_csv(
    urlpath=f"gs://ego_networks/twitter/ties/*.csv",
).compute()
data

Unnamed: 0,user,following
0,1210565171107815429,"[1281443510, 632966383, 27631809, 14922097, 22..."
1,3432386097,"[33836629, 188343397, 235684766, 17268874, 439..."
2,1120633726478823425,[]
3,4398626122,[]
4,48008938,"[1464797992872034304, 630995607, 27493883, 123..."
...,...,...
238,3021662641,"[2361631088, 446719282, 18208354, 12, 17562420..."
239,82380985,"[1265492474786701315, 2959310374, 37190937, 15..."
240,180635485,"[1519502693806522368, 12, 113240899, 303250898..."
241,25422302,"[97999865, 17048447, 191697965, 4316769252, 17..."


In [11]:
for d in ["ties", "node_measures", "node_features"]:
    print(DataReader(data_type = d).run().head())

                  user   following
0  1210565171107815429  1281443510
0  1210565171107815429   632966383
0  1210565171107815429    27631809
0  1210565171107815429    14922097
0  1210565171107815429   228132689
  measure_name                 node  measure_value
0     pagerank  1210565171107815429            0.0
1     pagerank           1281443510            0.0
2     pagerank            632966383            0.0
3     pagerank             27631809            0.0
4     pagerank             14922097            0.0
                   id              name  \
0  794433401591693312  Aran Komatsuzaki   
0  746058182582206464    Peter C. Baker   
0  989084476088209409    Samarth Bansal   
0  795701305691910144    Kevin Yang 楊凱筌   
0  880540913201577984              hazy   

                                   profile_image_url  \
0  https://pbs.twimg.com/profile_images/150798283...   
0  https://pbs.twimg.com/profile_images/151109864...   
0  https://pbs.twimg.com/profile_images/156630914...   
0