In [1]:
#Imports
import pandas as pd

#Spark
import findspark
findspark.init(r"C:\Users\Lucas\Desktop\ADA\spark-2.3.2-bin-hadoop2.7")

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.functions import min
from pyspark.sql.functions import udf
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

from pyspark.sql.types import StringType
from pyspark.sql.types import TimestampType

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.getOrCreate()

import warnings
warnings.filterwarnings('ignore')

In [2]:

def get_match_file(leak, charity, node_type):
    '''import csv match file path'''
    return '../generated/inspected_matches/' + node_type +'/' + node_type +'_'+ leak + '_' + charity + '_matches.csv'


In [3]:
def get_matches(leak):
    '''return matches files between offshore entities and the different charity dataset (wikipedia, INGO and Forbes)'''
   
    charity_types = ['wikipedia', 'INGO', 'forbes']
    
    first = True
    
    matches = None
    
    for charity_type in charity_types:
            
            if first:
                matches = spark.read.csv(get_match_file(leak, charity_type, 'entity'), header=True)
                first = False
            else:
                new_matches = spark.read.csv(get_match_file(leak, charity_type, 'entity'), header=True)
                matches = matches.union(new_matches)
    
    return matches.drop('_c0')

In [4]:
def filter_edges(edges, nodes):
    '''Given a set of nodes, returns the edges connected to those nodes'''
    
    entity_ids = nodes.map(lambda r: r[0]).collect()
    
    return edges.filter(lambda r: r[0] in entity_ids or r[2] in entity_ids)

In [5]:
def filter_nodes(nodes, edges):
    '''Given a set of edges, return the nodes connected to those edges'''
    
    start_ids = edges.map(lambda r: r[0]).collect()
   
    end_ids = edges.map(lambda r: r[2]).collect()
    
    return nodes.rdd.filter(lambda r: r[0] in start_ids or r[0] in end_ids)

In [6]:
def get_address_matches(leak):
    '''return the match node id of first level connection with registered address '''
    
    #return the path of edges csv file
    edges = spark.read.csv('../data/'+ leak + '/*.edges.csv', header=True)
    
    #return the path of address nodes csv file
    address_nodes = spark.read.csv('../data/' + leak + '/*address.csv', header=True)
    
    #filter the edges that contain a registered address
    address_edges = edges.rdd.filter(lambda r: r[1] == 'registered_address')
    
    #get the matches 
    matches=get_matches(leak)
    
    #filter the edges connected to matches
    entity_address_edges = filter_edges(address_edges, matches.rdd)
    
    #filter thes nodes linked to these edges
    leak_address = filter_nodes(address_nodes,entity_address_edges)
    
    #create a dataframe from it
    address = spark.createDataFrame(leak_address)
    
    ''' Conversion of the spark dataframe in pandas ones : '''
    
    #for the matches :
    matches_pd = pd.DataFrame(matches.collect(), columns=["node_id","ShellName","CharityName","CharityHeadquarters"])
    
    #for the nodes connected to the matches :
    address_pd = pd.DataFrame(address.collect(),
                            columns = ["node_id","name","address","country_codes","countries","sourceID","valid_until","note"])
    
    #for the edges connected to the matches :
    entity_address_edges_pd = pd.DataFrame(entity_address_edges.collect(),
                            columns = ["node_id","TYPE","END_ID","link","start_date","end_date","sourceID","valid_until"])
    
    '''Apply 'int' type to the columns used for the merge : '''
    
    matches_pd['node_id'] = matches_pd['node_id'].apply(int)
   
    address_pd['node_id'] = address_pd['node_id'].apply(int)
    
    entity_address_edges_pd['node_id']=entity_address_edges_pd['node_id'].apply(int)
    
    #rename the column adequatly for merging
    address_pd.rename(columns = {'node_id':'END_ID'}, inplace=True)
    
    #drop unusefull columns in order to avoid conflict when merging
    address_pd.drop(['note','valid_until','sourceID'],axis = 1, inplace = True)
    
    #first we merge in order to obtain correspondance between the matches and the corresponding edges with registered adress
    maches_edges = matches_pd.merge(entity_address_edges_pd, on = 'node_id')
    
    maches_edges['END_ID'] = maches_edges['END_ID'].apply(int)
    
    #then to node linked to these edges
    matches_nodes = maches_edges.merge(address_pd, on = 'END_ID')

    return matches_nodes

In [7]:
print('Paradise papers dataset :')

#Look for first level connection with registered address in Paradise dataset
paradise=get_address_matches('paradise')

print('There is ' + paradise.count() + 'connections with registered addresses to the matches.')

Paradise papers dataset :


TypeError: ufunc 'add' did not contain a loop with signature matching types dtype('<U21') dtype('<U21') dtype('<U21')

In [None]:
#Then we want to compare these addresses with the addresses of the actual charity headquarters
paradise.dropna(subset=['CharityHeadquarters'])

In [None]:
#Look for first level connection with registered address in Panamas dataset
panama = get_address_matches('panama')

In [None]:
#Look for first level connection with registered address in Offshore dataset
offshore = get_address_matches('offshore')