In [1]:
%load_ext autoreload
%autoreload 2 
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import findspark
findspark.init()

In [3]:
from random import random
from operator import add
from pyspark.sql import SparkSession
import pandas as pd


DATA_DIR = '/home/bob/gtd/Iconic/data/samples/'

In [4]:
spark = SparkSession\
    .builder\
    .appName("IconicSparkPy")\
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [5]:
def read_dataframe(path):
    return spark.read.csv(DATA_DIR + path, sep = '\t', header = True, inferSchema = True)

papers = read_dataframe('Papers.txt')
authors = read_dataframe('Authors.txt')
fos = read_dataframe('FieldsOfStudy.txt')
affs = read_dataframe('Affiliations.txt')
paa = read_dataframe('PaperAuthorAffiliations.txt')
pf = read_dataframe('PaperFieldsOfStudy.txt')
pr = read_dataframe('PaperReferences.txt')
fc = read_dataframe('FieldOfStudyChildren.txt')

# Preliminaries
## Get root field of study 

In [149]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col

In [150]:
def computeRootFos( fieldId):
    filtered = fc.filter(col("child") == fieldId)
#     print(filtered.take(1))
    if len(filtered.take(1)) == 0:
        return fieldId
    parent = filtered.take(1)[0][0]
    if parent == fieldId:
#         print(parent)
        return parent
    else:
#         print('rec')
        return computeRootFos( parent)

In [13]:
computeRootFos(8105449)

12843

add the root field of study to each paper

In [14]:
from pyspark.sql import Row
pf.rdd.map(lambda row : Row(paper = row[0], fos = row[1], similarity = row[2])).toDF()
fields = pf.select("fos").distinct().collect()
fields = map(lambda field : {'fos' : field[0], 'root' :computeRootFos(field[0])}, fields)
fields = list(fields)

DataFrame[fos: bigint, paper: bigint, similarity: double]

In [15]:
fr = pd.DataFrame(fields)

define helper function to use the root dataframe for getting the root of a field id

In [17]:
def getRootFos(fos):
    return fr[fr["fos"] == 124952713]["root"]

In [31]:
from pyspark.sql.functions import udf
def getRootFos(fos):
    return fr[fr["fos"] == fos]["root"][0]

rootFos_udf = udf(lambda fos : getRootFos(fos))

In [33]:
from pyspark.sql.functions import lit
rootFos_udf(lit(124952713))
getRootFos(124952713)

Column<b'<lambda>(124952713)'>

124952713

## Aggregated new data

In [151]:
# ego country
authorDetails = paa.join(pf, "paper")\
    .select("author", "paper","affiliation", "fos")\
    .rdd.map(lambda row : Row(author = row[0], paper = row[1], affiliation = row[2], fos = getRootFos(row[3]))).toDF()\
    .groupBy("author", "fos", "affiliation")\
    .agg(F.countDistinct("paper"))\
    .join(MAG.dfAff, col("affiliation") == col("id"))\
    .select("author","fos","countryCode","country")

authorDetails

ValueError: RDD is empty

# Filtered tables

In [181]:
# FILTERED_PAA_YEARS
paa = paa.join(papers, col("id") == col("paper"))\
    .filter( (col("year") >= 2007) & (col("year") <= 2017))\
    .select("paper", "author", "affiliation")
paa

DataFrame[paper: int, author: bigint, affiliation: bigint]

In [182]:
# PF_WITH_ROOT
pf = pf.withColumn("root", rootFos_udf("fos"))
pf

DataFrame[paper: int, fos: bigint, similarity: double, root: string]

In [183]:
fos = 10
# FILTERED_PAA_FOS
paa = paa.join(pf, "paper")\
    .filter(col("root") == fos)\
    .select("paper", "author", "affiliation")
paa

DataFrame[paper: int, author: bigint, affiliation: bigint]

## Add test data

In [168]:
from pyspark.sql import Row

In [233]:
tpaa = [(1, 2, "RO"),(1,3, "RO"),(2,3, "RO"),(2,1, "RO"),(3,2, "RO"),(1,5, "EN"),(3,3, "RO"),(3,5, "EN"),(4,7, "IT")]
rdd = spark.sparkContext.parallelize(tpaa)
rdd = rdd.map(lambda x: Row(paper=int(x[0]), author=int(x[1]), countryCode = x[2]))
tpaa = spark.createDataFrame(rdd)
tpaa

DataFrame[author: bigint, countryCode: string, paper: bigint]

In [218]:
tpr = [(1,2), (3, 2), (3,4), (4, 1)]
rdd = spark.sparkContext.parallelize(tpr)
rdd = rdd.map(lambda x: Row(citing=int(x[0]), cited=int(x[1])))
tpr = spark.createDataFrame(rdd)
tpr

DataFrame[cited: bigint, citing: bigint]

## New table head

In [234]:
paa = tpaa
pr = tpr

In [184]:
# EGO_country
ego_country = paa.join(affs, col("affiliation") == col("id"))\
    .select("author", "paper", "affiliation", "countryCode")

In [190]:
# EGO_papers
paa.groupBy("author").count().withColumnRenamed("count","papers")\
    .collect()

[Row(author=7, papers=1),
 Row(author=5, papers=2),
 Row(author=1, papers=1),
 Row(author=3, papers=3),
 Row(author=2, papers=2)]

In [221]:
#EGO_citations
paa.join(pr, col("paper") == col("cited"))\
    .groupBy("author")\
    .count().withColumnRenamed("count","citations")\
    .collect()
    

[Row(author=7, citations=1),
 Row(author=5, citations=1),
 Row(author=1, citations=2),
 Row(author=3, citations=3),
 Row(author=2, citations=1)]

In [228]:
#EGO_coauthors
from pyspark.sql.functions import countDistinct
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("2.author").alias("coauthors"))\
    .collect()

[Row(author=5, coauthors=2),
 Row(author=1, coauthors=1),
 Row(author=3, coauthors=3),
 Row(author=2, coauthors=2)]

In [231]:
#EGO_edges - numarul de colaborari ale ego-ului, chiar daca sunt colaborari cu aceeasi persoana
from pyspark.sql.functions import countDistinct
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .filter( col("1.author") != col("2.author"))\
    .groupBy("1.author")\
    .agg(F.count("2.author"))\
    .collect()

[Row(author=5, count(2.author)=4),
 Row(author=1, count(2.author)=1),
 Row(author=3, count(2.author)=5),
 Row(author=2, count(2.author)=4)]

In [235]:
# ALTER_domestic
# paa = ego_country
paa = tpaa
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .filter(col("1.countryCode") == col("2.countryCode"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("2.author").alias("domestics"))\
    .collect()

[Row(author=1, domestics=1),
 Row(author=3, domestics=2),
 Row(author=2, domestics=1)]

In [237]:
# ALTER_nondomestic
# paa = ego_country
paa = tpaa
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .filter(col("1.countryCode") != col("2.countryCode"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("2.author").alias("nondomestics"))\
    .collect()

[Row(author=5, nondomestics=2),
 Row(author=3, nondomestics=1),
 Row(author=2, nondomestics=1)]

In [255]:
# ALTER_citations - including or not including ego's publications ?
from pyspark.sql.functions import count
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .join(paa.alias("3"), col("2.author") == col("3.author"))\
    .filter(col("1.paper") != col("3.paper"))\
    .join(pr, col("cited") == col("3.paper"))\
    .groupBy(col("1.author"))\
    .agg(F.count("citing").alias("alter_citations"))\
    .collect()
    
    

[Row(author=5, alter_citations=6),
 Row(author=1, alter_citations=1),
 Row(author=3, alter_citations=2),
 Row(author=2, alter_citations=6)]

In [204]:
from pyspark.sql.functions import count, sum
#ALTER_papers - count total number of papers written by coauthors
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .join(paa.alias("3"), col("2.author") == col("3.author"))\
    .filter(col("3.paper") != col("1.author"))\
    .groupBy(col("1.author").alias("ego"), col("2.author").alias("alter"))\
    .agg(F.countDistinct("3.paper").alias("papers"))\
    .groupBy("ego")\
    .agg(F.sum("papers"))\
    .collect()

[Row(ego=7, sum(papers)=1),
 Row(ego=5, sum(papers)=7),
 Row(ego=1, sum(papers)=3),
 Row(ego=3, sum(papers)=5),
 Row(ego=2, sum(papers)=6)]

For below, we need a join that takes left elements that do not meet the condition with 0 default value

In [205]:
# ALTER_EGO_papers
paa.alias("1").join(paa.alias("2"),  col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("2.paper").alias("papers_not_alone"))\
    .collect()

[Row(author=5, papers_not_alone=2),
 Row(author=1, papers_not_alone=1),
 Row(author=3, papers_not_alone=3),
 Row(author=2, papers_not_alone=2)]

In [206]:
# ALTER_EGO_citations
paa.alias("1").join(paa.alias("2"),  col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .join(pr, col("1.paper") == col("cited"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("citing").alias("citations"))\
    .collect()

[Row(author=5, citations=3),
 Row(author=1, citations=2),
 Row(author=3, citations=3),
 Row(author=2, citations=3)]

In [207]:
# ALTER_country
paa.alias("1").join(paa.alias("2"),  col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .groupBy("1.author")\
    .agg(F.countDistinct("2.countryCode").alias("countries"))\
    .collect()

[Row(author=5, countries=2),
 Row(author=1, countries=1),
 Row(author=3, countries=2),
 Row(author=2, countries=2)]

In [208]:
# ALTER_MAX_papers
paa.alias("1").join(paa.alias("2"), col("1.paper") == col("2.paper"))\
    .join(paa.alias("3"), col("2.author") == col("3.author"))\
    .filter(col("1.paper") != col("3.paper"))\
    .groupBy(col("1.author").alias("ego"), col("2.author").alias("alter"))\
    .agg(F.countDistinct("3.paper").alias("papers"))\
    .groupBy("ego")\
    .agg(F.max("papers"))\
    .collect()

[Row(ego=5, max(papers)=3),
 Row(ego=1, max(papers)=2),
 Row(ego=3, max(papers)=3),
 Row(ego=2, max(papers)=3)]

In [209]:
# ALTER_MAX_citations
paa.alias("1").join(paa.alias("2"),  col("1.paper") == col("2.paper"))\
    .filter(col("1.author") != col("2.author"))\
    .join(paa.alias("3"), col("2.author") == col("3.author"))\
    .filter(col("1.paper") != col("3.paper"))\
    .join(pr, col("1.paper") == col("cited"))\
    .groupBy("1.author", "2.author")\
    .agg(F.countDistinct("citing").alias("citations"))\
    .groupBy("1.author")\
    .agg(F.max("citations"))\
    .collect()

[Row(author=5, max(citations)=3),
 Row(author=1, max(citations)=2),
 Row(author=3, max(citations)=3),
 Row(author=2, max(citations)=3)]